import matplotlib.pyplot as plt import utils as u from matplotlib.animation import FuncAnimation from random import shuffle import numpy as np def find_bmu(som, exhausted, x): '''Return the (g,h) index of the BMU in the grid''' #wrong_dist_sq = np.asarray([u.calc_length(x, s) for s in som]) dist_sq = exhausted * (np.square(som - x)).sum(axis=2) return np.unravel_index(np.argmin(dist_sq, axis=None), dist_sq.shape) def dist_comp(som, exhausted, x): distsq = [] for i in range(som.shape[0]): for j in range(som.shape[1]): distsq.append([(i, j), exhausted[i][j] * u.calc_length(x, som[i][j])]) return sorted(distsq, key=lambda x: x[1]) def update_weights(som, exhausted, train_ex, learn_rate, radius_sq, bmu_coord, algorithm): '''Update the weights of the SOM cells when given a single training example and the model parameters along with BMU coordinates as a tuple''' g, h = bmu_coord # if radius is close to zero then only BMU is changed if radius_sq < 1e-3: som[g, h, :] += learn_rate * (train_ex - som[g, h, :]) return som match algorithm: case 'kohonen': # Change all cells in a neighborhood of BMU for i in range(som.shape[0]): for j in range(som.shape[1]): dist_sq = np.square(i - g) + np.square(j - h) dist_func = np.exp(-dist_sq / 2 / radius_sq) som[i, j, :] += learn_rate * \ dist_func * (train_ex - som[i, j, :]) case 'neuron gas': dist_rank = dist_comp(som, exhausted, train_ex) for i in range(len(dist_rank)): dist_func = np.exp(-i / 2 / np.sqrt(radius_sq)) som[dist_rank[i][0][0], dist_rank[i][0][1], :] += \ learn_rate * dist_func * \ (train_ex - som[dist_rank[i][0][0], dist_rank[i][0][1], :]) case _: raise NotImplementedError( f'algorithm {algorithm} is not implemented yet') return som def train_som(som, train_data, learn_rate=.1, radius_sq=1, lr_decay=.1, radius_decay=.1, epochs=20, algorithm='kohonen'): '''Main routine for training an SOM. It requires an initialized SOM grid or a partially trained grid as parameter''' exhausted = np.ones((som.shape[0], som.shape[1])) learn_rate_0 = learn_rate radius_0 = radius_sq soms_with_error = [ (som.copy(), calc_som_error(som, exhausted, train_data))] for epoch in np.arange(epochs): shuffle(train_data) for train_ex in train_data: g, h = find_bmu(som, exhausted, train_ex) som = update_weights(som, exhausted, train_ex, learn_rate, radius_sq, (g, h), algorithm) exhausted[g][h] += 1 # Update learning rate and radius learn_rate = learn_rate_0 * np.exp(-epoch * lr_decay) radius_sq = radius_0 * np.exp(-epoch * radius_decay) exhausted = np.ones((som.shape[0], som.shape[1])) error = calc_som_error(som, exhausted, train_data) soms_with_error.append((som.copy(), error)) if error < 1e-3: break return soms_with_error def calc_som_error(som, exhausted, train_data): errors = [] for train_ex in train_data: g, h = find_bmu(som, exhausted, train_ex) errors.append(u.calc_length(train_ex, som[g][h])) return np.mean(np.sqrt(np.asarray(errors))) def plot_with_data(soms, data, name_suffix='_'): fig, ax = plt.subplots() ax.set_xlabel('X') ax.set_ylabel('Y') time_text = ax.text(0.05, 0.95, 'epoch=0', horizontalalignment='left', verticalalignment='top', transform=ax.transAxes) # data lst_x, lst_y = zip(*data) lst_x = list(lst_x) lst_y = list(lst_y) ax.scatter(lst_x, lst_y) som_data = soms[0] lst_x, lst_y = zip(*som_data[0]) lst_x = list(lst_x) lst_y = list(lst_y) som_plot, = ax.plot(lst_x, lst_y, color='black', marker='X') plt.grid(True) def update_plot_som(i): som_data = soms[i] time_text.set_text(f'epoch={i}') lst_x, lst_y = zip(*som_data[0]) lst_x = list(lst_x) lst_y = list(lst_y) som_plot.set_data(lst_x, lst_y) return [time_text, som_plot] anim = FuncAnimation(fig, update_plot_som, frames=len(soms), blit=True) anim.save(f'animationSOMs{name_suffix}.gif') plt.show() def init_neurons(data, k, rand: np.random.RandomState = None, method='random'): match method: case 'zeros': return np.zeros((1, k, 2)) case 'random': lst_x, lst_y = zip(*data) minimal = min(min(lst_x), min(lst_y)) maximal = max(max(lst_x), max(lst_y)) return (maximal - minimal) * rand.random_sample((1, k, 2)) + minimal case _: raise NotImplementedError( f'method {method} is not implemented yet') def print_som_stats(soms_with_errors, train_data): print('=' * 20) exhausted = np.ones( (soms_with_errors[0][0].shape[0], soms_with_errors[0][0].shape[1])) soms, errs = zip(*soms_with_errors) m = np.mean(errs) std = np.std(errs) min_err = np.min(errs) dead_neurons_count = [] for som in soms: dead_neurons_count.append( 20-len(set([find_bmu(som, exhausted, x) for x in train_data]))) print("Średni błąd: ", m) print("Odchylenie standardowe: ", std) print("Błąd minimalny: ", min_err) print( f'Średnia liczba nieaktywnych neuronów: {np.mean(dead_neurons_count)}') print( f'Odchylenie standardowe liczby nieaktywnych neuronów: {np.std(dead_neurons_count)}') print('=' * 20)