#! /usr/bin/python import numpy import sys import os sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)),os.pardir)) import helperFunctions class ClassifierPrototype: def __init__(self, sigmaN = 0.00178, configFile=None, probSampleRuns=1000): self.sigmaN = helperFunctions.getConfig(configFile, 'activeLearning', 'sigmaN', sigmaN, 'float', True) self.probSampleRuns = helperFunctions.getConfig(configFile, 'activeLearning', 'probSampleRuns', probSampleRuns, 'float', True) self.K = [] self.alpha = [] self.X = [] self.y = [] self.yBin = [] self.yUni = [] def checkModel(self): if not numpy.all(numpy.isfinite(self.K)): raise Exception('not numpy.all(numpy.isfinite(self.K))') if not numpy.all(numpy.isfinite(self.alpha)): raise Exception('not numpy.all(numpy.isfinite(self.alpha))') if not numpy.all(numpy.isfinite(self.X)): raise Exception('not numpy.all(numpy.isfinite(self.X))') if not numpy.all(numpy.isfinite(self.y)): raise Exception('not numpy.all(numpy.isfinite(self.y))') def train2(self, X, y, sigmaN=None): self.train(X[0,:],y[0,:],sigmaN) for idx in range(1,X.shape[0]): self.update(X[idx,:],y[idx,:]) # X.shape = (number of samples, feat dim), y.shape = (number of samples, 1) def train(self, X, y, sigmaN=None): if sigmaN is not None: self.sigmaN = sigmaN self.X = X self.y = y self.yUni = numpy.asmatrix(numpy.unique(numpy.asarray(self.y))) tmpVec = numpy.asmatrix(numpy.empty([self.y.shape[0],1])) self.yBin = numpy.asmatrix(numpy.empty([self.y.shape[0],self.yUni.shape[1]])) for cls in range(self.yUni.shape[1]): mask = (self.y == self.yUni[0,cls]) tmpVec[mask == True] = 1 tmpVec[mask == False] = -1 self.yBin[:,cls] = tmpVec self.K = numpy.dot(X,X.T) self.alpha = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), self.yBin) self.checkModel() # x.shape = (1, feat dim), y.shape = (1, 1) def update(self, x, y): # update for known classes binY = numpy.asmatrix(numpy.ones((1,self.alpha.shape[1])))*(-1) if self.alpha.shape[1] > 1: binY[0,numpy.asarray(numpy.squeeze(numpy.asarray(y==self.yUni)))] = 1 elif y==self.yUni: binY[0,0] = 1 # get new kernel columns k = numpy.dot(self.X,x.T) selfK = numpy.sum(numpy.multiply(x,x), axis=1) # get score of new sample infY = self.infer(x, k=k) # update alpha term1 = 1.0 / (self.sigmaN + self.calcSigmaF(x, k, selfK).item(0)) term2 = numpy.asmatrix(numpy.ones((self.X.shape[0] + 1,x.shape[0])), dtype=numpy.float)*(-1.0) term2[0:self.X.shape[0],:] = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), k) term3 = infY - binY self.alpha = numpy.add(numpy.append(self.alpha, numpy.zeros((1,self.alpha.shape[1])), axis=0), numpy.dot(numpy.dot(term1,term2),term3)) # update K self.K = numpy.append(numpy.append(self.K, k, axis=1), numpy.append(k.T, selfK, axis=1), axis=0) # update samples self.X = numpy.append(self.X, x, axis=0) self.y = numpy.append(self.y, y, axis=0) # update binary labels for knwon class if (self.yUni == y).any(): mask = (y == self.yUni) tmpVec = numpy.empty([self.yBin.shape[1], 1]) tmpVec[mask.T == True] = 1 tmpVec[mask.T == False] = -1 self.yBin = numpy.append(self.yBin, tmpVec.T, axis=0) # create labels and alpha for new class else: # create bigger matrices tmpyBin = numpy.asmatrix(numpy.empty([self.yBin.shape[0] + 1, self.yBin.shape[1] + 1])) tmpAlpha = numpy.asmatrix(numpy.empty([self.alpha.shape[0], self.alpha.shape[1] + 1])) # index of new class tmpIdx = -1 # check all knwon classes for cls in range(self.yUni.shape[1]): # just copy all classes with lower label if y > self.yUni[0,cls]: tmpyBin[0:-1,cls] = self.yBin[:,cls] tmpyBin[tmpyBin.shape[0] - 1,cls] = -1 tmpAlpha[:,cls] = self.alpha[:,cls] tmpIdx = cls # copy classes with higher label to shiftet position else: # y < self.yUni[cls] tmpyBin[0:-1,cls + 1] = self.yBin[:,cls] tmpyBin[tmpyBin.shape[0] - 1,cls + 1] = -1 tmpAlpha[:,cls + 1] = self.alpha[:,cls] # add new binary label vector for new class tmpyBin[0:-1,tmpIdx + 1] = -1 tmpyBin[tmpyBin.shape[0] - 1,tmpIdx + 1] = 1 # set new binary matrix and append new label self.yBin = tmpyBin self.yUni = numpy.sort(numpy.append(self.yUni, y, axis=1)) # train new alpha for new class tmpAlpha[:,tmpIdx + 1] = numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), self.yBin[:,tmpIdx + 1]) self.alpha = tmpAlpha self.checkModel() # X.shape = (number of samples, feat dim), loNoise = {0,1} def infer(self, x, loNoise=False, k=None): if k is None: k = numpy.dot(self.X,x.T) loNoise = loNoise and (self.yUni == -1).any() pred = numpy.asmatrix(numpy.dot(k.T,self.alpha[:,int(loNoise):])) if not numpy.all(numpy.isfinite(pred)): raise Exception('not numpy.all(numpy.isfinite(pred))') return pred # X.shape = (number of samples, feat dim) def test(self, x, loNoise=False): loNoise = loNoise and (self.yUni == -1).any() return self.yUni[0,numpy.argmax(self.infer(x, loNoise), axis=1) + int(loNoise)] # X.shape = (number of samples, feat dim) def calcSigmaF(self, x, k=None, selfK=None): if k is None: k = numpy.dot(self.X,x.T) if selfK is None: selfK = numpy.sum(numpy.multiply(x,x), axis=1) sigmaF = numpy.subtract(selfK, numpy.sum(numpy.multiply(numpy.linalg.solve(numpy.add(self.K, numpy.identity(self.X.shape[0], dtype=numpy.float)*self.sigmaN), k),k).T, axis=1)) if not numpy.all(numpy.isfinite(sigmaF)): raise Exception('not numpy.all(numpy.isfinite(sigmaF))') return sigmaF # X.shape = (number of samples, feat dim) def calcProbs(self, x, mu=None, sigmaF=None, nmb=None): # set number of sampling iterations if nmb is None: nmb = self.probSampleRuns # get mu and sigma if mu is None: mu = self.infer(x) if sigmaF is None: sigmaF = self.calcSigmaF(x) # prepare probs = numpy.asmatrix(numpy.zeros(mu.shape)) for idx in range(nmb): draws = numpy.asmatrix(numpy.random.randn(mu.shape[0],mu.shape[1])) draws = numpy.add(numpy.multiply(draws, numpy.repeat(sigmaF, draws.shape[1], axis=1)), mu) maxIdx = numpy.argmax(draws, axis=1) idxs = (range(len(maxIdx)),numpy.squeeze(maxIdx)) probs[idxs] = probs[idxs] + 1 # convert absolute to relative amount return probs/float(nmb) # x.shape = (feat dim, number of samples) def calcAlScores(self, x): return None # x.shape = (feat dim, number of samples) def getAlScores(self, x): alScores = self.calcAlScores(x) if not numpy.all(numpy.isfinite(alScores)): raise Exception('not numpy.all(numpy.isfinite(alScores))') if alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1: raise Exception('alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1') return alScores # x.shape = (feat dim, number of samples) def chooseSample(self, x): return numpy.argmax(self.getAlScores(x), axis=0).item(0)