|
# This program requires Python 2.3 or above
# robin meier 17-02-06 paris http://robin.meier.free.fr http://www.neuromuse.org
#
# provides the methods necessary to run a SOM on a distant server using xmlrpc
from DocXMLRPCServer import DocXMLRPCServer, DocXMLRPCRequestHandler
from SocketServer import ThreadingMixIn
import sys
from psyco import *
from numarray import *
from numarray.random_array import *
from math import *
from socket import *
from cPickle import *
def test(lolo):
lolo = lolo + 1
return lolo
class Som:
def __init__(self):
self.somsize = 100
self.weights = random((3,3))
self.inputvector = (0.1, 0.1, 0.1)
self.activation = random((3))
self.winner = 0
self.distances = random((5,5))
self.winx = 1
self.winy = 1
def initializeSom(self, somsize=100, vectorsize=3, verbose=0):
"""create a som
specify somsize and vectorsize"""
self.somsize = somsize
self.weights = random((somsize,vectorsize))
if verbose:
return "som created"
else:
return "ok"
def randomizeWeights(self, inputvector, verbose=0):
"""randomizes weights of the som
gives back pickled weights"""
self.weights = random((self.somsize,len(inputvector)))
if verbose:
return dumps(self.weights)
else:
return dumps("ok")
def computeActivation(self, inputvector, verbose=0):
"""euclidean distance without sqrt between input and each neuron
gives back pickled activation"""
self.inputvector = list(inputvector)
# return dumps(self.inputvector)
self.activation = sum(((self.weights-self.inputvector)*(self.weights-self.inputvector)),1)
if verbose:
return dumps(self.activation)
else:
return dumps("ok")
def computeWinner(self, verbose=0):
"""who's the winner?
gives back winner-no and winner-weights as list"""
self.winner = int((argsort(self.activation))[0])
if verbose:
return self.winner, list(self.weights[self.winner,:])
else:
return "ok"
def updateWeights(self, inputvector, learningrate=0.1, radius=3, verbose=0):
"""update weights
gives back pickled distances array"""
#conversion index->coord
seite = int(sqrt(self.somsize))
self.winy = int(self.winner/seite)
self.winx = int(self.winner%seite)
#calculate distances
dist = lambda x,y: (x-self.winx)**2+(y-self.winy)**2
self.distances = fromfunction(dist, (seite, seite))
#print self.distances
for i in range(self.somsize):
iy=int(i/seite)
ix = int(i%seite)
n = float(self.distances[iy,ix])
if n < radius:
delta = learningrate*(inputvector-(take(self.weights,i)))*(exp(-1*((n*n)/(2*radius*radius))))
put(self.weights,i,((take(self.weights,i)+delta)))
if verbose:
return dumps(self.distances)
else:
return dumps("ok")
class ThreadingServer(ThreadingMixIn, DocXMLRPCServer):
pass
serveraddr = ('', 8765)
srvr = ThreadingServer(serveraddr, DocXMLRPCRequestHandler)
srvr.set_server_title("self-organizing map as xmlrpc python-server")
srvr.set_server_name("self-organizing map server")
srvr.set_server_documentation("""welcome to this self-organizing map running as a python xmlrpc
server!""")
srvr.register_instance(Som())
srvr.register_introspection_functions()
srvr.register_function(test)
srvr.serve_forever() |
# This program requires Python 2.3 or above
# robin meier 17-02-06 paris http://robin.meier.free.fr http://www.neuromuse.org
#
# provides the methods necessary to run a SOM on a distant server using xmlrpc
from DocXMLRPCServer import DocXMLRPCServer, DocXMLRPCRequestHandler
from SocketServer import ThreadingMixIn
import sys
from psyco import *
from numarray import *
from numarray.random_array import *
from math import *
from socket import *
from cPickle import *
def test(lolo):
lolo = lolo + 1
return lolo
class Som:
def __init__(self):
self.somsize = 100
self.weights = random((3,3))
self.inputvector = (0.1, 0.1, 0.1)
self.activation = random((3))
self.winner = 0
self.distances = random((5,5))
self.winx = 1
self.winy = 1
def initializeSom(self, somsize=100, vectorsize=3, verbose=0):
"""create a som
specify somsize and vectorsize"""
self.somsize = somsize
self.weights = random((somsize,vectorsize))
if verbose:
return "som created"
else:
return "ok"
def randomizeWeights(self, inputvector, verbose=0):
"""randomizes weights of the som
gives back pickled weights"""
self.weights = random((self.somsize,len(inputvector)))
if verbose:
return dumps(self.weights)
else:
return dumps("ok")
def computeActivation(self, inputvector, verbose=0):
"""euclidean distance without sqrt between input and each neuron
gives back pickled activation"""
self.inputvector = list(inputvector)
# return dumps(self.inputvector)
self.activation = sum(((self.weights-self.inputvector)*(self.weights-self.inputvector)),1)
if verbose:
return dumps(self.activation)
else:
return dumps("ok")
def computeWinner(self, verbose=0):
"""who's the winner?
gives back winner-no and winner-weights as list"""
self.winner = int((argsort(self.activation))[0])
if verbose:
return self.winner, list(self.weights[self.winner,:])
else:
return "ok"
def updateWeights(self, inputvector, learningrate=0.1, radius=3, verbose=0):
"""update weights
gives back pickled distances array"""
#conversion index->coord
seite = int(sqrt(self.somsize))
self.winy = int(self.winner/seite)
self.winx = int(self.winner%seite)
#calculate distances
dist = lambda x,y: (x-self.winx)**2+(y-self.winy)**2
self.distances = fromfunction(dist, (seite, seite))
#print self.distances
for i in range(self.somsize):
iy=int(i/seite)
ix = int(i%seite)
n = float(self.distances[iy,ix])
if n < radius:
delta = learningrate*(inputvector-(take(self.weights,i)))*(exp(-1*((n*n)/(2*radius*radius))))
put(self.weights,i,((take(self.weights,i)+delta)))
if verbose:
return dumps(self.distances)
else:
return dumps("ok")
class ThreadingServer(ThreadingMixIn, DocXMLRPCServer):
pass
serveraddr = ('', 8765)
srvr = ThreadingServer(serveraddr, DocXMLRPCRequestHandler)
srvr.set_server_title("self-organizing map as xmlrpc python-server")
srvr.set_server_name("self-organizing map server")
srvr.set_server_documentation("""welcome to this self-organizing map running as a python xmlrpc
server!""")
srvr.register_instance(Som())
srvr.register_introspection_functions()
srvr.register_function(test)
srvr.serve_forever()
Related
|