[Zad 3] Add SOM
This commit is contained in:
parent
28d5d702ac
commit
14779630f3
152
zad3/som.py
Normal file
152
zad3/som.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import utils as u
|
||||||
|
from matplotlib.animation import FuncAnimation
|
||||||
|
from random import shuffle
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def find_bmu(som, x):
|
||||||
|
'''Return the (g,h) index of the BMU in the grid'''
|
||||||
|
#wrong_dist_sq = np.asarray([u.calc_length(x, s) for s in som])
|
||||||
|
dist_sq = (np.square(som - x)).sum(axis=2)
|
||||||
|
return np.unravel_index(np.argmin(dist_sq, axis=None), dist_sq.shape)
|
||||||
|
|
||||||
|
|
||||||
|
def dist_comp(som, x):
|
||||||
|
distsq = []
|
||||||
|
for i in range(som.shape[0]):
|
||||||
|
for j in range(som.shape[1]):
|
||||||
|
distsq.append([(i, j), u.calc_length(x, som[i][j])])
|
||||||
|
return sorted(distsq, key=lambda x: x[1])
|
||||||
|
|
||||||
|
|
||||||
|
# Update the weights of the SOM cells when given a single training example
|
||||||
|
# and the model parameters along with BMU coordinates as a tuple
|
||||||
|
def update_weights(som, train_ex, learn_rate, radius_sq,
|
||||||
|
bmu_coord, algorithm, step=3):
|
||||||
|
g, h = bmu_coord
|
||||||
|
# if radius is close to zero then only BMU is changed
|
||||||
|
if radius_sq < 1e-3:
|
||||||
|
som[g, h, :] += learn_rate * (train_ex - som[g, h, :])
|
||||||
|
return som
|
||||||
|
|
||||||
|
match algorithm:
|
||||||
|
case 'kohonen':
|
||||||
|
# Change all cells in a neighborhood of BMU
|
||||||
|
for i in range(max(0, g-step), min(som.shape[0], g+step)):
|
||||||
|
for j in range(max(0, h-step), min(som.shape[1], h+step)):
|
||||||
|
dist_sq = np.square(i - g) + np.square(j - h)
|
||||||
|
dist_func = np.exp(-dist_sq / 2 / radius_sq)
|
||||||
|
som[i, j, :] += learn_rate * \
|
||||||
|
dist_func * (train_ex - som[i, j, :])
|
||||||
|
case 'neuron gas':
|
||||||
|
dist_rank = dist_comp(som, train_ex)
|
||||||
|
for i in range(len(dist_rank)):
|
||||||
|
dist_func = np.exp(-i / 2 / np.sqrt(radius_sq))
|
||||||
|
som[dist_rank[i][0], dist_rank[i][1], :] += \
|
||||||
|
learn_rate * dist_func * \
|
||||||
|
(train_ex - som[dist_rank[i][0], dist_rank[i][1], :])
|
||||||
|
|
||||||
|
case _:
|
||||||
|
raise NotImplementedError(
|
||||||
|
f'algorithm {algorithm} is not implemented yet')
|
||||||
|
return som
|
||||||
|
|
||||||
|
|
||||||
|
def train_som(som, train_data, learn_rate=.1, radius_sq=1,
|
||||||
|
lr_decay=.1, radius_decay=.1, epochs=20, algorithm='kohonen'):
|
||||||
|
'''Main routine for training an SOM. It requires an initialized SOM grid
|
||||||
|
or a partially trained grid as parameter'''
|
||||||
|
learn_rate_0 = learn_rate
|
||||||
|
radius_0 = radius_sq
|
||||||
|
soms_with_error = [(som.copy(), calc_som_error(som, train_data))]
|
||||||
|
for epoch in np.arange(epochs):
|
||||||
|
shuffle(train_data)
|
||||||
|
for train_ex in train_data:
|
||||||
|
g, h = find_bmu(som, train_ex)
|
||||||
|
som = update_weights(som, train_ex,
|
||||||
|
learn_rate, radius_sq, (g, h), algorithm)
|
||||||
|
# Update learning rate and radius
|
||||||
|
learn_rate = learn_rate_0 * np.exp(-epoch * lr_decay)
|
||||||
|
radius_sq = radius_0 * np.exp(-epoch * radius_decay)
|
||||||
|
error = calc_som_error(som, train_data)
|
||||||
|
soms_with_error.append((som.copy(), error))
|
||||||
|
if error < 1e-3:
|
||||||
|
break
|
||||||
|
return soms_with_error
|
||||||
|
|
||||||
|
|
||||||
|
def calc_som_error(som, train_data):
|
||||||
|
errors = []
|
||||||
|
for train_ex in train_data:
|
||||||
|
g, h = find_bmu(som, train_ex)
|
||||||
|
errors.append(u.calc_length(train_ex, som[g][h]))
|
||||||
|
return np.mean(np.sqrt(np.asarray(errors)))
|
||||||
|
|
||||||
|
|
||||||
|
def plot_with_data(soms, data, name_suffix='_'):
|
||||||
|
fig, ax = plt.subplots()
|
||||||
|
ax.set_xlabel('X')
|
||||||
|
ax.set_ylabel('Y')
|
||||||
|
time_text = ax.text(0.05, 0.95, 'epoch=0', horizontalalignment='left',
|
||||||
|
verticalalignment='top', transform=ax.transAxes)
|
||||||
|
# data
|
||||||
|
lst_x, lst_y = zip(*data)
|
||||||
|
lst_x = list(lst_x)
|
||||||
|
lst_y = list(lst_y)
|
||||||
|
ax.scatter(lst_x, lst_y)
|
||||||
|
|
||||||
|
som_data = soms[0]
|
||||||
|
lst_x, lst_y = zip(*som_data[0])
|
||||||
|
lst_x = list(lst_x)
|
||||||
|
lst_y = list(lst_y)
|
||||||
|
som_plot, = ax.plot(lst_x, lst_y, color='black', marker='X')
|
||||||
|
plt.grid(True)
|
||||||
|
|
||||||
|
def update_plot_som(i):
|
||||||
|
som_data = soms[i]
|
||||||
|
time_text.set_text(f'epoch={i}')
|
||||||
|
lst_x, lst_y = zip(*som_data[0])
|
||||||
|
lst_x = list(lst_x)
|
||||||
|
lst_y = list(lst_y)
|
||||||
|
som_plot.set_data(lst_x, lst_y)
|
||||||
|
return [time_text, som_plot]
|
||||||
|
|
||||||
|
anim = FuncAnimation(fig, update_plot_som,
|
||||||
|
frames=len(soms), blit=True)
|
||||||
|
anim.save(f'animationSOMs{name_suffix}.gif')
|
||||||
|
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
def init_neurons(data, k, rand: np.random.RandomState = None, method='random'):
|
||||||
|
match method:
|
||||||
|
case 'zeros':
|
||||||
|
return np.zeros((1, k, 2))
|
||||||
|
case 'random':
|
||||||
|
lst_x, lst_y = zip(*data)
|
||||||
|
minimal = min(min(lst_x), min(lst_y))
|
||||||
|
maximal = max(max(lst_x), max(lst_y))
|
||||||
|
return (maximal - minimal) * rand.random_sample((1, k, 2)) + minimal
|
||||||
|
case _:
|
||||||
|
raise NotImplementedError(
|
||||||
|
f'method {method} is not implemented yet')
|
||||||
|
|
||||||
|
|
||||||
|
def print_som_stats(soms_with_errors, train_data):
|
||||||
|
print('=' * 20)
|
||||||
|
soms, errs = zip(*soms_with_errors)
|
||||||
|
m = np.mean(errs)
|
||||||
|
std = np.std(errs)
|
||||||
|
min_err = np.min(errs)
|
||||||
|
dead_neurons_count = []
|
||||||
|
for som in soms:
|
||||||
|
dead_neurons_count.append(
|
||||||
|
20-len(set([find_bmu(som, x) for x in train_data])))
|
||||||
|
print("Średni błąd: ", m)
|
||||||
|
print("Odchylenie standardowe: ", std)
|
||||||
|
print("Błąd minimalny: ", min_err)
|
||||||
|
print(
|
||||||
|
f'Średnia liczba nieaktywnych neuronów: {np.mean(dead_neurons_count)}')
|
||||||
|
print(
|
||||||
|
f'Odchylenie standardowe liczby nieaktywnych neuronów: {np.std(dead_neurons_count)}')
|
@ -1,5 +1,6 @@
|
|||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
from generate_points import get_random_point
|
from generate_points import get_random_point
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
def get_color(i):
|
def get_color(i):
|
||||||
@ -7,9 +8,9 @@ def get_color(i):
|
|||||||
|
|
||||||
|
|
||||||
def calc_length(a, b):
|
def calc_length(a, b):
|
||||||
# return ((b[0]-a[0])**2+(b[1]-a[1])**2)**0.5
|
'''Calculate Euclidian distance between points'''
|
||||||
# no need to calculate square root for comparison
|
assert len(a)==len(b)
|
||||||
return (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2
|
return np.square(np.asarray(b)-np.asarray(a)).sum()
|
||||||
|
|
||||||
|
|
||||||
def plot_data(data):
|
def plot_data(data):
|
||||||
|
28
zad3/zad3.py
28
zad3/zad3.py
@ -1,8 +1,11 @@
|
|||||||
import kmeans as km
|
import kmeans as km
|
||||||
|
import som
|
||||||
|
import numpy as np
|
||||||
import utils
|
import utils
|
||||||
import json
|
import json
|
||||||
|
|
||||||
METHODS = ['forgy', 'random_partition']
|
METHODS = ['forgy', 'random_partition']
|
||||||
|
SOM_INIT_METHODS = ['random', 'zeros']
|
||||||
|
|
||||||
|
|
||||||
def get_datas_from_json():
|
def get_datas_from_json():
|
||||||
@ -23,6 +26,31 @@ def get_datas_random():
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
datas = get_datas_from_json()
|
datas = get_datas_from_json()
|
||||||
|
rand = np.random.RandomState(0)
|
||||||
|
index = 1
|
||||||
|
print("Self-organizing map")
|
||||||
|
for data in datas:
|
||||||
|
print(f'Data set: {index}')
|
||||||
|
utils.plot_data(data)
|
||||||
|
for method in SOM_INIT_METHODS:
|
||||||
|
print(f'Initialization method: {method}')
|
||||||
|
errors = []
|
||||||
|
for k in range(2, 21, 2):
|
||||||
|
som_data = som.init_neurons(data, k, rand, method)
|
||||||
|
soms_with_error = som.train_som(som_data, data, algorithm='kohonen')
|
||||||
|
error = soms_with_error[-1][1]
|
||||||
|
errors.append((k, error))
|
||||||
|
soms,_ = zip(*soms_with_error)
|
||||||
|
#som.plot_with_data(soms, data, f'_{method}_{k}_data{index}')
|
||||||
|
utils.plot_error_data(errors)
|
||||||
|
soms_with_errors = []
|
||||||
|
for _ in range(100):
|
||||||
|
som_data = som.init_neurons(data, k, rand, method)
|
||||||
|
soms_with_error = som.train_som(som_data, data, algorithm='kohonen')
|
||||||
|
soms_with_errors.append(soms_with_error[-1])
|
||||||
|
som.print_som_stats(soms_with_errors, data)
|
||||||
|
index += 1
|
||||||
|
|
||||||
index = 1
|
index = 1
|
||||||
for data in datas:
|
for data in datas:
|
||||||
utils.plot_data(data)
|
utils.plot_data(data)
|
||||||
|
Loading…
Reference in New Issue
Block a user