import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets, linear_model
class Model:
def __init__(self):
self.data_points = []
def generate_data_man(self):
n_samples = 8
n_outliers = 2
x = []
y = []
m = 2
offset = 2
# Generate samples with a linear relationship
for i in range(n_samples):
x.append(i+1)
noise = 5*np.random.random(1)
y.append(m * x[-1] + offset + noise[0])
# Generate outliers with random noise
for i in range(n_samples, n_samples + n_outliers):
x.append(i+1)
noise = 100*np.random.random(1)
y.append(m * x[-1] + offset + noise[0])
# Convert the lists to NumPy arrays
x = np.array(x).reshape(-1, 1) # Reshape to a 2D array with one column
y = np.array(y)
print(x)
print(y)
self.data_points = list(zip(x, y))
return self.data_points, m
# 產生資料(100筆samples,10筆outliers)
def generate_data(self, n_samples=10, n_outliers=2):
np.random.seed(0)
x, y, coef = datasets.make_regression(
n_samples=n_samples,
n_features=1,
n_informative=1,
noise=10,
coef=True,
random_state=0,
)
# 亂數種子
np.random.seed(0)
# 取前n_outliers筆當作outliers
x[:n_outliers] = 3 + 0.5 * np.random.normal(size=(n_outliers, 1))
y[:n_outliers] = -3 + 10 * np.random.normal(size=n_outliers)
# print(x)
# print(y)
# print(x.shape)
# print(y.shape)
self.data_points = list(zip(x, y))
return self.data_points, coef
class View:
def plot_data(self, data_points, inlier_mask, outlier_mask, line_X, line_y, line_y_ransac):
lw = 1
# 在inliers畫黃綠色
plt.scatter(
[point[0] for i, point in enumerate(data_points) if inlier_mask[i]],
[point[1] for i, point in enumerate(data_points) if inlier_mask[i]],
color="yellowgreen", marker="x", label="Inliers"
)
# 在outliers畫紅色
plt.scatter(
[point[0] for i, point in enumerate(data_points) if outlier_mask[i]],
[point[1] for i, point in enumerate(data_points) if outlier_mask[i]],
color="red", marker="o", label="Outliers"
)
# Add grid lines
plt.grid(True)
# linear regressor: 藍色線段
plt.plot(line_X, line_y, color="navy", linewidth=lw, label="Linear regressor")
# RANSAC regressor: 紫色線段
plt.plot(line_X, line_y_ransac, color="purple", linewidth=lw, label="RANSAC regressor")
plt.legend(loc="lower right")
plt.xlabel("Input")
plt.ylabel("Response")
plt.show()
class Controller:
def __init__(self, model, view):
self.model = model
self.view = view
def generate_data(self):
# data_points, coef = self.model.generate_data(10, 3)
data_points, coef = self.model.generate_data_man()
x = [point[0] for point in data_points]
y = [point[1] for point in data_points]
# 傳統線性回歸
lr = linear_model.LinearRegression()
lr.fit(x, y)
# RANSAC線性回歸
ransac = linear_model.RANSACRegressor()
ransac.fit(x, y)
# Calculate distances for linear and RANSAC regressor
distances_linear = self.calculate_distances(data_points, lr.coef_, lr.intercept_)
distances_ransac = self.calculate_distances(data_points, ransac.estimator_.coef_, ransac.estimator_.intercept_)
# 取得資料遮罩(inliers或outliers)
inlier_mask = ransac.inlier_mask_
outlier_mask = ~inlier_mask
# 進行兩種模型預測
line_x = np.arange(min(x), max(x)).reshape(-1, 1)
line_y = lr.predict(line_x)
line_y_ransac = ransac.predict(line_x)
# Calculate signs for linear and RANSAC regressor
y_est = lr.predict(x)
signs_linear = self.calculate_signs(y, y_est)
y_est_ransac = ransac.predict(x)
signs_ransac = self.calculate_signs(y, y_est_ransac)
# 判斷線段與各點的距離,驗證outliers是否正確
# 觀察d2與outlier
for i, xp, yp, d1, d2, outlier, sign_linear, sign_ransac in zip(range(len(x)), x, y, distances_linear, distances_ransac, outlier_mask, signs_linear, signs_ransac):
y_est_p = y_est[i]
y_est_ransac_p = y_est_ransac[i]
print(f"{i+1} (xp,yp,yp_lr,yp_ransac):({xp[0].astype(int)}, {yp.astype(int)}, {y_est_p.astype(int)}, {y_est_ransac_p.astype(int)}) dist_lr:{d1[0]:.2f} dist_ransac: {d2[0]:.2f} outlier:{outlier} sign_lr:{sign_linear} sign_ransac: {sign_ransac}")
# print(f"{line_x.shape} {line_y.shape} {line_y_ransac.shape}")
# 繪製結果
self.view.plot_data(data_points, inlier_mask, outlier_mask, line_x, line_y, line_y_ransac)
# 判斷原始資料在線性回歸線的上方或下方
def calculate_signs(self, y_real, y_est):
signs = []
for y_r, y_e in zip(y_real, y_est):
if y_r < y_e:
sign = -1
elif y_r > y_e:
sign = 1
else:
sign = 0
# sign = 1 else -1 if y_r > y_e else 0 # Positive, Negative, or On the line
signs.append(sign)
return signs
# def calculate_signs(self, data_points, line_m, line_b):
# signs = []
# for point in data_points:
# x, y = point
# distance = line_m * x - y + line_b
# sign = -1 if distance > 0 else 1 if distance < 0 else 0 # Positive, Negative, or On the line
# signs.append(sign)
# return signs
def calculate_distances(self, data_points, line_m, line_b):
distances = []
for point in data_points:
x, y = point
A = -line_m
B = 1
C = -line_b
distance = abs(A * x + B * y + C) / np.sqrt(A**2 + B**2)
distances.append(distance)
return distances
if __name__ == "__main__":
model = Model()
view = View()
controller = Controller(model, view)
controller.generate_data()
留言列表