123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- #! /usr/bin/python
- import numpy
- import sys
- import os
- sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)),os.pardir))
- import helperFunctions
- class ClassifierPrototype:
- def __init__(self, sigmaN = 0.00178, configFile=None, probSampleRuns=1000):
- self.sigmaN = helperFunctions.getConfig(configFile, 'activeLearning', 'sigmaN', sigmaN, 'float', True)
- self.probSampleRuns = helperFunctions.getConfig(configFile, 'activeLearning', 'probSampleRuns', probSampleRuns, 'float', True)
- self.K = []
- self.alpha = []
- self.X = []
- self.y = []
- self.yBin = []
- self.yUni = []
- def checkModel(self):
- if not numpy.all(numpy.isfinite(self.K)):
- raise Exception('not numpy.all(numpy.isfinite(self.K))')
- if not numpy.all(numpy.isfinite(self.alpha)):
- raise Exception('not numpy.all(numpy.isfinite(self.alpha))')
- if not numpy.all(numpy.isfinite(self.X)):
- raise Exception('not numpy.all(numpy.isfinite(self.X))')
- if not numpy.all(numpy.isfinite(self.y)):
- raise Exception('not numpy.all(numpy.isfinite(self.y))')
- def train2(self, X, y, sigmaN=None):
- self.train(X[0,:],y[0,:],sigmaN)
- for idx in range(1,X.shape[0]):
- self.update(X[idx,:],y[idx,:])
- # X.shape = (number of samples, feat dim), y.shape = (number of samples, 1)
- def train(self, X, y, sigmaN=None):
- if sigmaN is not None:
- self.sigmaN = sigmaN
- self.X = X
- self.y = y
- self.yUni = numpy.asmatrix(numpy.unique(numpy.asarray(self.y)))
- tmpVec = numpy.asmatrix(numpy.empty([self.y.shape[0],1]))
- self.yBin = numpy.asmatrix(numpy.empty([self.y.shape[0],self.yUni.shape[1]]))
- for cls in range(self.yUni.shape[1]):
- mask = (self.y == self.yUni[0,cls])
- tmpVec[mask == True] = 1
- tmpVec[mask == False] = -1
- self.yBin[:,cls] = tmpVec
- self.K = numpy.dot(X,X.T)
- self.alpha = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), self.yBin)
- self.checkModel()
- # x.shape = (1, feat dim), y.shape = (1, 1)
- def update(self, x, y):
- # update for known classes
- binY = numpy.asmatrix(numpy.ones((1,self.alpha.shape[1])))*(-1)
- if self.alpha.shape[1] > 1:
- binY[0,numpy.asarray(numpy.squeeze(numpy.asarray(y==self.yUni)))] = 1
- elif y==self.yUni:
- binY[0,0] = 1
- # get new kernel columns
- k = numpy.dot(self.X,x.T)
- selfK = numpy.sum(numpy.multiply(x,x), axis=1)
- # get score of new sample
- infY = self.infer(x, k=k)
- # update alpha
- term1 = 1.0 / (self.sigmaN + self.calcSigmaF(x, k, selfK).item(0))
- term2 = numpy.asmatrix(numpy.ones((self.X.shape[0] + 1,x.shape[0])), dtype=numpy.float)*(-1.0)
- term2[0:self.X.shape[0],:] = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), k)
- term3 = infY - binY
- self.alpha = numpy.add(numpy.append(self.alpha, numpy.zeros((1,self.alpha.shape[1])), axis=0), numpy.dot(numpy.dot(term1,term2),term3))
- # update K
- self.K = numpy.append(numpy.append(self.K, k, axis=1), numpy.append(k.T, selfK, axis=1), axis=0)
- # update samples
- self.X = numpy.append(self.X, x, axis=0)
- self.y = numpy.append(self.y, y, axis=0)
- # update binary labels for knwon class
- if (self.yUni == y).any():
- mask = (y == self.yUni)
- tmpVec = numpy.empty([self.yBin.shape[1], 1])
- tmpVec[mask.T == True] = 1
- tmpVec[mask.T == False] = -1
- self.yBin = numpy.append(self.yBin, tmpVec.T, axis=0)
- # create labels and alpha for new class
- else:
- # create bigger matrices
- tmpyBin = numpy.asmatrix(numpy.empty([self.yBin.shape[0] + 1, self.yBin.shape[1] + 1]))
- tmpAlpha = numpy.asmatrix(numpy.empty([self.alpha.shape[0], self.alpha.shape[1] + 1]))
- # index of new class
- tmpIdx = -1
- # check all knwon classes
- for cls in range(self.yUni.shape[1]):
- # just copy all classes with lower label
- if y > self.yUni[0,cls]:
- tmpyBin[0:-1,cls] = self.yBin[:,cls]
- tmpyBin[tmpyBin.shape[0] - 1,cls] = -1
- tmpAlpha[:,cls] = self.alpha[:,cls]
- tmpIdx = cls
- # copy classes with higher label to shiftet position
- else: # y < self.yUni[cls]
- tmpyBin[0:-1,cls + 1] = self.yBin[:,cls]
- tmpyBin[tmpyBin.shape[0] - 1,cls + 1] = -1
- tmpAlpha[:,cls + 1] = self.alpha[:,cls]
- # add new binary label vector for new class
- tmpyBin[0:-1,tmpIdx + 1] = -1
- tmpyBin[tmpyBin.shape[0] - 1,tmpIdx + 1] = 1
- # set new binary matrix and append new label
- self.yBin = tmpyBin
- self.yUni = numpy.sort(numpy.append(self.yUni, y, axis=1))
- # train new alpha for new class
- tmpAlpha[:,tmpIdx + 1] = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), self.yBin[:,tmpIdx + 1])
- self.alpha = tmpAlpha
- self.checkModel()
- # X.shape = (number of samples, feat dim), loNoise = {0,1}
- def infer(self, x, loNoise=False, k=None):
- if k is None:
- k = numpy.dot(self.X,x.T)
- loNoise = loNoise and (self.yUni == -1).any()
- pred = numpy.asmatrix(numpy.dot(k.T,self.alpha[:,int(loNoise):]))
- if not numpy.all(numpy.isfinite(pred)):
- raise Exception('not numpy.all(numpy.isfinite(pred))')
- return pred
- # X.shape = (number of samples, feat dim)
- def test(self, x, loNoise=False):
- loNoise = loNoise and (self.yUni == -1).any()
- return self.yUni[0,numpy.argmax(self.infer(x, loNoise), axis=1) + int(loNoise)]
- # X.shape = (number of samples, feat dim)
- def calcSigmaF(self, x, k=None, selfK=None):
- if k is None:
- k = numpy.dot(self.X,x.T)
- if selfK is None:
- selfK = numpy.sum(numpy.multiply(x,x), axis=1)
- sigmaF = numpy.subtract(selfK, numpy.sum(numpy.multiply(numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), k),k).T, axis=1))
- if not numpy.all(numpy.isfinite(sigmaF)):
- raise Exception('not numpy.all(numpy.isfinite(sigmaF))')
- return sigmaF
- # X.shape = (number of samples, feat dim)
- def calcProbs(self, x, mu=None, sigmaF=None, nmb=None):
- # set number of sampling iterations
- if nmb is None:
- nmb = self.probSampleRuns
- # get mu and sigma
- if mu is None:
- mu = self.infer(x)
- if sigmaF is None:
- sigmaF = self.calcSigmaF(x)
- # prepare
- probs = numpy.asmatrix(numpy.zeros(mu.shape))
- for idx in range(nmb):
- draws = numpy.asmatrix(numpy.random.randn(mu.shape[0],mu.shape[1]))
- draws = numpy.add(numpy.multiply(draws, numpy.repeat(sigmaF, draws.shape[1], axis=1)), mu)
- maxIdx = numpy.argmax(draws, axis=1)
- idxs = (range(len(maxIdx)),numpy.squeeze(maxIdx))
- probs[idxs] = probs[idxs] + 1
- # convert absolute to relative amount
- return probs/float(nmb)
- # x.shape = (feat dim, number of samples)
- def calcAlScores(self, x):
- return None
- # x.shape = (feat dim, number of samples)
- def getAlScores(self, x):
- alScores = self.calcAlScores(x)
- if not numpy.all(numpy.isfinite(alScores)):
- raise Exception('not numpy.all(numpy.isfinite(alScores))')
- if alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1:
- raise Exception('alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1')
- return alScores
- # x.shape = (feat dim, number of samples)
- def chooseSample(self, x):
- return numpy.argmax(self.getAlScores(x), axis=0).item(0)
|