123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- #! /usr/bin/python
- import numpy
- import pickle
- import sys
- import os
- sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)),os.pardir))
- import helperFunctions
- class ClassifierPrototype:
- def __init__(self, sigmaN = 0.00178, configFile=None):
- self.sigmaN = helperFunctions.getConfig(configFile, 'activeLearning', 'sigmaN', sigmaN, 'float', True)
- self.invCreg = []
- self.X = []
- self.yBin = [] # .shape = (number of samples, number of unique classes)
- self.yUni = []
- self.w = [] # .shape = (feat dim, number of unique classes)
- def checkModel(self):
- if not numpy.all(numpy.isfinite(self.invCreg)):
- raise Exception('not numpy.all(numpy.isfinite(self.invCreg))')
- if not numpy.all(numpy.isfinite(self.w)):
- raise Exception('not numpy.all(numpy.isfinite(self.w))')
- if not numpy.all(numpy.isfinite(self.X)):
- raise Exception('not numpy.all(numpy.isfinite(self.X))')
- if not numpy.all(numpy.isfinite(self.yUni)):
- raise Exception('not numpy.all(numpy.isfinite(self.yUni))')
- if len(numpy.unique(self.y)) > 2:
- raise Exception('len(numpy.unique(self.y)) > 2')
- def train2(self, X, y, sigmaN=None):
- self.train(X[0,:],y[0,:],sigmaN)
- for idx in range(1,X.shape[0]):
- self.update(X[idx,:],y[idx,:])
- # X.shape = (number of samples, feat dim), y.shape = (number of samples, 1)
- def train(self, X, y, sigmaN=None):
- if sigmaN is not None:
- self.sigmaN = sigmaN
- # get all known classes
- self.yUni = numpy.asmatrix(numpy.unique(numpy.asarray(y)))
- # save stuff
- self.X = X
- # calculate inverse of C_reg
- self.invCreg = numpy.linalg.inv((X.T*X) + numpy.identity(X.shape[1])*self.sigmaN)
- # get binary labels for each ovr - classifier
- self.yBin = numpy.asmatrix(numpy.empty((y.shape[0],self.yUni.shape[1])), dtype=numpy.int)
- for cls in range(self.yUni.shape[1]):
- self.yBin[:,cls] = 2*(y == self.yUni[0,cls]) - 1
- # calculate w
- self.w = self.invCreg*X.T*self.yBin
- self.checkModel()
- # x.shape = (1, feat dim), y.shape = (1, 1)
- def update(self, x, y):
- # update w and C_reg
- tmpVec = self.invCreg*x.T
- tmpScalar = 1.0 + x*tmpVec
- self.w = self.w + tmpVec*((numpy.asmatrix(2*(y==self.yUni) - 1) - x*self.w)/tmpScalar)
- self.invCreg = self.invCreg - ((tmpVec*tmpVec.T)/tmpScalar)
- # update samples
- self.X = numpy.append(self.X, x, axis=0)
- # update binary labels for known class
- self.yBin = numpy.append(self.yBin, 2*(y == self.yUni) - 1, axis=0)
- # create labels and w for new class
- if not (self.yUni == y).any():
- # get insertion idx
- idx = numpy.searchsorted(numpy.ravel(numpy.asarray(self.yUni)), y[0,0])
- # store new label and find new class index
- self.yUni = numpy.insert(self.yUni, [idx], y, axis=1)
- ## add new binary label vector for new class
- self.yBin = numpy.insert(self.yBin, [idx], numpy.zeros((self.yBin.shape[0],1), dtype=numpy.int), axis=1)
- self.yBin[:-1,idx] = -1
- self.yBin[-1,idx] = 1
- ## train new w for new class
- self.w = numpy.insert(self.w, [idx], self.invCreg*self.X.T*self.yBin[:,idx], axis=1)
- self.checkModel()
- # X.shape = (number of samples, feat dim), loNoise = {0,1}
- def infer(self, x, loNoise=False):
- loNoise = loNoise and (self.yUni == -1).any()
- # division by number of training samples is a hack to prevent huge scores
- return numpy.asmatrix(x*self.w[:,int(loNoise):]) #/ float(self.X.shape[0])
- # X.shape = (number of samples, feat dim)
- def test(self, x, loNoise=False):
- loNoise = loNoise and (self.yUni == -1).any()
- return self.yUni[0,numpy.argmax(self.infer(x, loNoise), axis=1) + int(loNoise)]
- # X.shape = (number of samples, feat dim)
- def calcSigmaF(self, x, tmpVec=None):
- if tmpVec is None:
- return numpy.sum(numpy.multiply(x,(self.sigmaN*self.invCreg*x.T).T),axis=1)
- else:
- return self.sigmaN*tmpVec.T
- # X.shape = (number of samples, feat dim)
- def calcProbs(self, x, mu=None, sigmaF=None, nmb=1000):
- # get mu and sigma
- if mu is None:
- mu = self.infer(x)
- if sigmaF is None:
- sigmaF = self.calcSigmaF(x)
- # prepare
- probs = numpy.asmatrix(numpy.zeros(mu.shape))
- for idx in range(nmb):
- draws = numpy.asmatrix(numpy.random.randn(mu.shape[0],mu.shape[1]))
- draws = numpy.multiply(draws, numpy.repeat(sigmaF, draws.shape[1], axis=1)) + mu
- maxIdx = numpy.argmax(draws, axis=1)
- idxs = (range(len(maxIdx)),numpy.squeeze(maxIdx))
- probs[idxs] = probs[idxs] + 1
- # convert absolute to relative amount
- return probs/float(nmb)
- # x.shape = (feat dim, number of samples)
- def calcAlScores(self, x):
- return None
- # x.shape = (feat dim, number of samples)
- def getAlScores(self, x):
- alScores = self.calcAlScores(x)
- if not numpy.all(numpy.isfinite(alScores)):
- raise Exception('not numpy.all(numpy.isfinite(alScores))')
- if alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1:
- raise Exception('alScores.shape[0] != x.shape[0] or alScores.shape[1] != 1')
- return alScores
- # x.shape = (feat dim, number of samples)
- def chooseSample(self, x):
- return numpy.argmax(self.getAlScores(x), axis=0).item(0)
|