diff --git a/zad3/som.py b/zad3/som.py new file mode 100644 index 0000000..3a34bc7 --- /dev/null +++ b/zad3/som.py @@ -0,0 +1,152 @@ +import matplotlib.pyplot as plt +import utils as u +from matplotlib.animation import FuncAnimation +from random import shuffle +import numpy as np + + +def find_bmu(som, x): + '''Return the (g,h) index of the BMU in the grid''' + #wrong_dist_sq = np.asarray([u.calc_length(x, s) for s in som]) + dist_sq = (np.square(som - x)).sum(axis=2) + return np.unravel_index(np.argmin(dist_sq, axis=None), dist_sq.shape) + + +def dist_comp(som, x): + distsq = [] + for i in range(som.shape[0]): + for j in range(som.shape[1]): + distsq.append([(i, j), u.calc_length(x, som[i][j])]) + return sorted(distsq, key=lambda x: x[1]) + + +# Update the weights of the SOM cells when given a single training example +# and the model parameters along with BMU coordinates as a tuple +def update_weights(som, train_ex, learn_rate, radius_sq, + bmu_coord, algorithm, step=3): + g, h = bmu_coord + # if radius is close to zero then only BMU is changed + if radius_sq < 1e-3: + som[g, h, :] += learn_rate * (train_ex - som[g, h, :]) + return som + + match algorithm: + case 'kohonen': + # Change all cells in a neighborhood of BMU + for i in range(max(0, g-step), min(som.shape[0], g+step)): + for j in range(max(0, h-step), min(som.shape[1], h+step)): + dist_sq = np.square(i - g) + np.square(j - h) + dist_func = np.exp(-dist_sq / 2 / radius_sq) + som[i, j, :] += learn_rate * \ + dist_func * (train_ex - som[i, j, :]) + case 'neuron gas': + dist_rank = dist_comp(som, train_ex) + for i in range(len(dist_rank)): + dist_func = np.exp(-i / 2 / np.sqrt(radius_sq)) + som[dist_rank[i][0], dist_rank[i][1], :] += \ + learn_rate * dist_func * \ + (train_ex - som[dist_rank[i][0], dist_rank[i][1], :]) + + case _: + raise NotImplementedError( + f'algorithm {algorithm} is not implemented yet') + return som + + +def train_som(som, train_data, learn_rate=.1, radius_sq=1, + lr_decay=.1, radius_decay=.1, epochs=20, algorithm='kohonen'): + '''Main routine for training an SOM. It requires an initialized SOM grid + or a partially trained grid as parameter''' + learn_rate_0 = learn_rate + radius_0 = radius_sq + soms_with_error = [(som.copy(), calc_som_error(som, train_data))] + for epoch in np.arange(epochs): + shuffle(train_data) + for train_ex in train_data: + g, h = find_bmu(som, train_ex) + som = update_weights(som, train_ex, + learn_rate, radius_sq, (g, h), algorithm) + # Update learning rate and radius + learn_rate = learn_rate_0 * np.exp(-epoch * lr_decay) + radius_sq = radius_0 * np.exp(-epoch * radius_decay) + error = calc_som_error(som, train_data) + soms_with_error.append((som.copy(), error)) + if error < 1e-3: + break + return soms_with_error + + +def calc_som_error(som, train_data): + errors = [] + for train_ex in train_data: + g, h = find_bmu(som, train_ex) + errors.append(u.calc_length(train_ex, som[g][h])) + return np.mean(np.sqrt(np.asarray(errors))) + + +def plot_with_data(soms, data, name_suffix='_'): + fig, ax = plt.subplots() + ax.set_xlabel('X') + ax.set_ylabel('Y') + time_text = ax.text(0.05, 0.95, 'epoch=0', horizontalalignment='left', + verticalalignment='top', transform=ax.transAxes) + # data + lst_x, lst_y = zip(*data) + lst_x = list(lst_x) + lst_y = list(lst_y) + ax.scatter(lst_x, lst_y) + + som_data = soms[0] + lst_x, lst_y = zip(*som_data[0]) + lst_x = list(lst_x) + lst_y = list(lst_y) + som_plot, = ax.plot(lst_x, lst_y, color='black', marker='X') + plt.grid(True) + + def update_plot_som(i): + som_data = soms[i] + time_text.set_text(f'epoch={i}') + lst_x, lst_y = zip(*som_data[0]) + lst_x = list(lst_x) + lst_y = list(lst_y) + som_plot.set_data(lst_x, lst_y) + return [time_text, som_plot] + + anim = FuncAnimation(fig, update_plot_som, + frames=len(soms), blit=True) + anim.save(f'animationSOMs{name_suffix}.gif') + + plt.show() + + +def init_neurons(data, k, rand: np.random.RandomState = None, method='random'): + match method: + case 'zeros': + return np.zeros((1, k, 2)) + case 'random': + lst_x, lst_y = zip(*data) + minimal = min(min(lst_x), min(lst_y)) + maximal = max(max(lst_x), max(lst_y)) + return (maximal - minimal) * rand.random_sample((1, k, 2)) + minimal + case _: + raise NotImplementedError( + f'method {method} is not implemented yet') + + +def print_som_stats(soms_with_errors, train_data): + print('=' * 20) + soms, errs = zip(*soms_with_errors) + m = np.mean(errs) + std = np.std(errs) + min_err = np.min(errs) + dead_neurons_count = [] + for som in soms: + dead_neurons_count.append( + 20-len(set([find_bmu(som, x) for x in train_data]))) + print("Średni błąd: ", m) + print("Odchylenie standardowe: ", std) + print("Błąd minimalny: ", min_err) + print( + f'Średnia liczba nieaktywnych neuronów: {np.mean(dead_neurons_count)}') + print( + f'Odchylenie standardowe liczby nieaktywnych neuronów: {np.std(dead_neurons_count)}') diff --git a/zad3/utils.py b/zad3/utils.py index efb266c..c678fd1 100644 --- a/zad3/utils.py +++ b/zad3/utils.py @@ -1,5 +1,6 @@ import matplotlib.pyplot as plt from generate_points import get_random_point +import numpy as np def get_color(i): @@ -7,9 +8,9 @@ def get_color(i): def calc_length(a, b): - # return ((b[0]-a[0])**2+(b[1]-a[1])**2)**0.5 - # no need to calculate square root for comparison - return (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2 + '''Calculate Euclidian distance between points''' + assert len(a)==len(b) + return np.square(np.asarray(b)-np.asarray(a)).sum() def plot_data(data): diff --git a/zad3/zad3.py b/zad3/zad3.py index 32a8bf4..91168bf 100644 --- a/zad3/zad3.py +++ b/zad3/zad3.py @@ -1,8 +1,11 @@ import kmeans as km +import som +import numpy as np import utils import json METHODS = ['forgy', 'random_partition'] +SOM_INIT_METHODS = ['random', 'zeros'] def get_datas_from_json(): @@ -23,6 +26,31 @@ def get_datas_random(): def main(): datas = get_datas_from_json() + rand = np.random.RandomState(0) + index = 1 + print("Self-organizing map") + for data in datas: + print(f'Data set: {index}') + utils.plot_data(data) + for method in SOM_INIT_METHODS: + print(f'Initialization method: {method}') + errors = [] + for k in range(2, 21, 2): + som_data = som.init_neurons(data, k, rand, method) + soms_with_error = som.train_som(som_data, data, algorithm='kohonen') + error = soms_with_error[-1][1] + errors.append((k, error)) + soms,_ = zip(*soms_with_error) + #som.plot_with_data(soms, data, f'_{method}_{k}_data{index}') + utils.plot_error_data(errors) + soms_with_errors = [] + for _ in range(100): + som_data = som.init_neurons(data, k, rand, method) + soms_with_error = som.train_som(som_data, data, algorithm='kohonen') + soms_with_errors.append(soms_with_error[-1]) + som.print_som_stats(soms_with_errors, data) + index += 1 + index = 1 for data in datas: utils.plot_data(data)