#! /usr/bin/python import numpy import pickle import sys import os sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)),os.pardir)) import helperFunctions class ClassifierPrototype: def __init__(self, sigmaN = 0.00178, configFile=None): self.sigmaN = helperFunctions.getConfig(configFile, 'activeLearning', 'sigmaN', sigmaN, 'float', True) self.X = [] self.yBin = [] # .shape = (number of samples, number of unique classes) self.yUni = [] self.w = [] # .shape = (feat dim, number of unique classes) def checkModel(self): if not numpy.all(numpy.isfinite(self.w)): raise Exception('not numpy.all(numpy.isfinite(self.w))') if not numpy.all(numpy.isfinite(self.X)): raise Exception('not numpy.all(numpy.isfinite(self.X))') if not numpy.all(numpy.isfinite(self.yUni)): raise Exception('not numpy.all(numpy.isfinite(self.yUni))') def train2(self, X, y, sigmaN=None): self.train(X[0,:],y[0,:],sigmaN) for idx in range(1,X.shape[0]): self.update(X[idx,:],y[idx,:]) # X.shape = (number of samples, feat dim), y.shape = (number of samples, 1) def train(self, X, y, sigmaN=None): if sigmaN is not None: self.sigmaN = sigmaN # get all known classes self.yUni = numpy.asmatrix(numpy.unique(numpy.asarray(y))) # save stuff self.X = X # get binary labels for each ovr - classifier self.yBin = numpy.asmatrix(numpy.empty((y.shape[0],self.yUni.shape[1])), dtype=numpy.int) for cls in range(self.yUni.shape[1]): self.yBin[:,cls] = 2*(y == self.yUni[0,cls]) - 1 # sample reweighting rewDiagMat = helperFunctions.getReweightDiagMat(y, self.yUni) rewX = rewDiagMat*self.X rewY = rewDiagMat*self.yBin # calculate w #self.w = numpy.linalg.solve((rewX.T*rewX) + numpy.identity(X.shape[1])*self.sigmaN, rewX.T)*rewY self.w = helperFunctions.solveW(rewX, rewY, self.sigmaN) self.checkModel() # x.shape = (1, feat dim), y.shape = (1, 1) def update(self, x, y): # update samples self.X = numpy.append(self.X, x, axis=0) # update binary labels for known class self.yBin = numpy.append(self.yBin, 2*(y == self.yUni) - 1, axis=0) # create labels and w for new class if not (self.yUni == y).any(): # get insertion idx idx = numpy.searchsorted(numpy.ravel(numpy.asarray(self.yUni)), y[0,0]) # store new label and find new class index self.yUni = numpy.insert(self.yUni, [idx], y, axis=1) ## add new binary label vector for new class self.yBin = numpy.insert(self.yBin, [idx], numpy.zeros((self.yBin.shape[0],1), dtype=numpy.int), axis=1) self.yBin[:-1,idx] = -1 self.yBin[-1,idx] = 1 # sample reweighting rewDiagMat = helperFunctions.getReweightDiagMat(helperFunctions.getYfromYbin(self.yBin, self.yUni), self.yUni) rewX = rewDiagMat*self.X rewY = rewDiagMat*self.yBin[:,idx] # train new w for new class #self.w = numpy.insert(self.w, [idx], numpy.linalg.solve((rewX.T*rewX) + numpy.identity(x.shape[1])*self.sigmaN, rewX.T)*rewY, axis=1) self.w = numpy.insert(self.w, [idx], helperFunctions.solveW(rewX, rewY, self.sigmaN), axis=1) # sample reweighting rewDiagMat = helperFunctions.getReweightDiagMat(helperFunctions.getYfromYbin(self.yBin, self.yUni), self.yUni) rewX = rewDiagMat*self.X rewY = rewDiagMat*self.yBin # update w self.w = helperFunctions.solveW(rewX, rewY, self.sigmaN, self.w) self.checkModel() # X.shape = (number of samples, feat dim), loNoise = {0,1} def infer(self, x, loNoise=False): loNoise = loNoise and (self.yUni == -1).any() pred = numpy.asmatrix(numpy.dot(x,self.w[:,int(loNoise):])) if not numpy.all(numpy.isfinite(pred)): raise Exception('not numpy.all(numpy.isfinite(pred))') return pred # X.shape = (number of samples, feat dim) def test(self, x, loNoise=False): loNoise = loNoise and (self.yUni == -1).any() return self.yUni[0,numpy.argmax(self.infer(x, loNoise), axis=1) + int(loNoise)] # X.shape = (number of samples, feat dim) def calcSigmaF(self, x, tmpVec=None, rewX=None): if tmpVec is None: if rewX is None: rewDiagMat = helperFunctions.getReweightDiagMat(helperFunctions.getYfromYbin(self.yBin, self.yUni), self.yUni) rewX = numpy.dot(rewDiagMat,self.X) sigmaF = numpy.sum(numpy.multiply(x,(self.sigmaN*numpy.linalg.solve(numpy.add(numpy.dot(rewX.T,rewX), numpy.identity(x.shape[1])*self.sigmaN), x.T)).T),axis=1) else: sigmaF = self.sigmaN*tmpVec.T if not numpy.all(numpy.isfinite(sigmaF)): raise Exception('not numpy.all(numpy.isfinite(sigmaF))') return sigmaF # X.shape = (number of samples, feat dim) def calcProbs(self, x, mu=None, sigmaF=None, nmb=1000): # get mu and sigma if mu is None: mu = self.infer(x,(self.yUni == -1).any()) if sigmaF is None: sigmaF = self.calcSigmaF(x) # prepare probs = numpy.asmatrix(numpy.zeros(mu.shape)) for idx in range(nmb): draws = numpy.asmatrix(numpy.random.randn(mu.shape[0],mu.shape[1])) draws = numpy.add(numpy.multiply(draws, numpy.repeat(sigmaF, draws.shape[1], axis=1)), mu) maxIdx = numpy.argmax(draws, axis=1) idxs = (range(len(maxIdx)),numpy.squeeze(maxIdx)) probs[idxs] = probs[idxs] + 1 # convert absolute to relative amount return probs/float(nmb) # x.shape = (feat dim, number of samples) def calcAlScores(self, x): return None # x.shape = (feat dim, number of samples) def getAlScores(self, x): alScores = self.calcAlScores(x) if not numpy.all(numpy.isfinite(alScores)): raise Exception('not numpy.all(numpy.isfinite(alScores))') if alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1: raise Exception('alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1') return alScores # x.shape = (feat dim, number of samples) def chooseSample(self, x): return numpy.argmax(self.getAlScores(x), axis=0).item(0)