123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import numpy
- import math
- import ConfigParser
- import os
- import sys
- import subprocess
- import scipy.sparse.linalg
- import time
- def confusionMatrix(expected, predicted):
- #print expected.shape, predicted.shape
- #print numpy.unique(numpy.asarray(expected))
- yUni = numpy.asmatrix(numpy.unique(numpy.asarray(expected)))
- confMat = numpy.asmatrix(numpy.zeros((yUni.shape[1], yUni.shape[1])))
- for expc, pred in zip(expected, predicted):
- confMat[numpy.where(yUni == expc)[1], numpy.where(yUni == pred)[1]] += 1
- if numpy.sum(confMat) != max(predicted.shape):
- print numpy.sum(confMat), '!=', max(predicted.shape), '(', predicted.shape, ')'
- print 'cls expected: ', numpy.unique(numpy.asarray(expected))
- print 'cls predicted:', numpy.unique(numpy.asarray(predicted))
- raise Exception('# predicted cls > # expected cls')
- return confMat
- def getAvgAcc(confMat):
- return numpy.mean(numpy.diagonal(confMat)/numpy.sum(confMat, axis=1).T)
- def getConfig(pathtoConfig, section, option, default=None, dtype='str', verbose=False):
- # set default
- value = default
- defaultUsed = True
- # check if file is available
- if pathtoConfig is not None and os.path.isfile(pathtoConfig):
- # init
- config = ConfigParser.ConfigParser()
- configFile = open(pathtoConfig)
- config.readfp(configFile)
- configFile.close()
- # check if section and option is available
- if config.has_section(section) and config.has_option(section, option):
- # get requested type
- if dtype == 'str':
- value = config.get(section, option)
- elif dtype == 'int':
- value = config.getint(section, option)
- elif dtype == 'float':
- value = config.getfloat(section, option)
- elif dtype == 'bool':
- value = config.getboolean(section, option)
- elif dtype == 'strList':
- value = config.get(section, option).split(',')
- elif dtype == 'intList':
- value = [int(entry) for entry in config.get(section, option).split(',')]
- elif dtype == 'floatList':
- value = [float(entry) for entry in config.get(section, option).split(',')]
- elif dtype == 'boolList':
- value = [bool(entry) for entry in config.get(section, option).split(',')]
- else:
- raise Exception('Unknown dtype!')
- defaultUsed = False
- # print config
- if verbose:
- aux = ''
- if 'List' in dtype and len(value) > 0:
- aux = '| entryDtype:' + str(type(value[0]))
- print 'default:', defaultUsed, '| section:', section, '| option:', option, '| value:', value, '| dtype:', type(value), aux
- # return
- return value
- def getGitHash(gitPath=os.path.dirname(os.path.abspath(__file__))):
- curDir = os.getcwd()
- os.chdir(gitPath)
- gitHash = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD'], stderr=subprocess.STDOUT).strip()
- os.chdir(curDir)
- return gitHash
- def getYfromYbin(yBin, yUni):
- y = numpy.asmatrix(numpy.zeros((yBin.shape[0],1), dtype=numpy.int))
- for idx in range(yBin.shape[0]):
- y[idx,0] = int(yUni[0,numpy.ravel(yBin[idx,:] == 1)])
- return y
- def solveW(x, y, sigmaN, initW=None, maxIter=None):
- linOpX = scipy.sparse.linalg.aslinearoperator(x)
- w = numpy.asmatrix(numpy.empty((x.shape[1], y.shape[1])))
- def matvecFunc(curW):
- return linOpX.rmatvec(linOpX.matvec(curW)) + sigmaN*curW
- for clsIdx in range(y.shape[1]):
- if initW is not None:
- initWbin = numpy.ravel(numpy.asarray(initW[:,clsIdx])).T
- else:
- initWbin = None
- linOpW = scipy.sparse.linalg.LinearOperator((x.shape[1], x.shape[1]), matvec=matvecFunc, dtype=x.dtype)
- solvedWbin,info = scipy.sparse.linalg.cg(linOpW, linOpX.rmatvec(y[:, clsIdx]), x0=initWbin, maxiter=maxIter)
- if info != 0 and maxIter is None:
- print ''
- print 'WARNING: cg not converged!'
- print ''
- w[:,clsIdx] = numpy.asmatrix(solvedWbin).T
- return w
- def getClsWeights(y, yUni):
- clsWeights = numpy.empty(yUni.shape[1])
- for clsIdx in range(yUni.shape[1]):
- clsWeights[clsIdx] = y.shape[0] / (float(yUni.shape[1])*numpy.argwhere(y==yUni[0,clsIdx]).shape[0])
- return clsWeights
- def writeNotification(notificationPath, statusStr):
- try:
- open(os.path.join(notificationPath, statusStr), 'a').close()
- except:
- print ''
- print 'ERROR: writing notification to {} failed!'.format(notificationPath)
- sys.stdout.flush()
- def getReweightDiagMat(y, yUni=None, clsWeights=None):
- if yUni is None:
- yUni = numpy.asmatrix(numpy.unique(numpy.asarray(y)))
- if clsWeights is None:
- clsWeights = getClsWeights(y, yUni)
- sampleWeights = clsWeights[numpy.searchsorted(numpy.ravel(numpy.asarray(yUni)), numpy.ravel(numpy.asarray(y)))]
- sampleWeights = numpy.prod([sampleWeights], axis=0)
- return numpy.asmatrix(numpy.diag(numpy.sqrt(sampleWeights*numpy.ones(y.shape[0]))))
- def showProgressBarTerminal(current, total, pre):
- sys.stdout.write('\r%s %0.2f %%'%(pre,(float(current)/float(total))*100.0))
- sys.stdout.flush()
|