Source code for tools.tools

########################
# FFT AUTO PHASE METHODS
########################
[docs]def autops(data, fn, p0=0.0, p1=0.0): """ Automated phase correction from NMRglue by https://github.com/jjhelmus These functions provide support for automatic phasing of NMR data. Automatic linear phase correction Parameters data : ndarray Array of NMR data. fn : str or function Algorithm to use for phase scoring. Built in functions can be specified by one of the following strings: "acme", "peak_minima" p0 : float Initial zero order phase in degrees. p1 : float Initial first order phase in degrees. Returns ndata : ndarray Phased NMR data. """ import numpy as np import scipy.optimize from io import StringIO # Python3 use: from io import StringIO from contextlib import redirect_stdout if not callable(fn): fn = { 'peak_minima': _ps_peak_minima_score, 'acme': _ps_acme_score, }[fn] opt = [p0, p1] with StringIO() as buf, redirect_stdout(buf): opt = scipy.optimize.fmin(fn, x0=opt, args=(data, )) mystdout = buf.getvalue() return ps(data, p0=opt[0], p1=opt[1]), opt[0], opt[1], mystdout
[docs]def _ps_acme_score(ph, data): """ Phase correction using ACME algorithm by Chen Li et al. Journal of Magnetic Resonance 158 (2002) 164-168 Parameters * pd : tuple, current p0 and p1 values * data : ndarray, array of NMR data. Returns * score : float, value of the objective function (phase score) """ import numpy as np stepsize = 1 phc0, phc1 = ph s0 = ps(data, p0=phc0, p1=phc1) data = np.real(s0) # Calculation of first derivatives ds1 = np.abs((data[1:]-data[:-1]) / (stepsize*2)) p1 = ds1 / np.sum(ds1) # Calculation of entropy p1[p1 == 0] = 1 h1 = -p1 * np.log(p1) h1s = np.sum(h1) # Calculation of penalty pfun = 0.0 as_ = data - np.abs(data) sumas = np.sum(as_) if sumas < 0: pfun = pfun + np.sum((as_/2) ** 2) p = 1000 * pfun return h1s + p
[docs]def _ps_peak_minima_score(ph, data): """ Phase correction using simple minima-minimisation around highest peak This is a naive approach but is quick and often achieves reasonable results. The optimisation is performed by finding the highest peak in the spectra (e.g. TMSP) and then attempting to reduce minima surrounding it. Parameters * pd : tuple, current p0 and p1 values * data : ndarray, array of NMR data. Returns * score : float, value of the objective function (phase score) """ phc0, phc1 = ph s0 = ps(data, p0=phc0, p1=phc1) data = np.real(s0) i = np.argmax(data) mina = np.min(data[i-100:i]) minb = np.min(data[i:i+100]) return np.abs(mina - minb)
[docs]def ps(data, p0=0.0, p1=0.0, inv=False): """ Linear phase correction Parameters data : ndarray Array of NMR data. p0 : float Zero order phase in degrees. p1 : float First order phase in degrees. inv : bool, optional True for inverse phase correction Returns ndata : ndarray Phased NMR data. """ import numpy as np p0 = p0 * np.pi / 180. # convert to radians p1 = p1 * np.pi / 180. size = data.shape[-1] apod = np.exp(1.0j * (p0 + (p1 * np.arange(size) / size))).astype(data.dtype) if inv: apod = 1 / apod return apod * data
############## # MU FIT AUX ##############
[docs]def TauMu_mus(): ''' muon mean lifetime in microsecond from Particle Data Group 2017 (not present in scipy.constants) ''' return 2.1969811
[docs]def _errors_(component,available_components): ''' inputs: one legal mucomponent name contained in the _available_components_(), which must be the second input output: a list of errors (steps), one for each parameter of this component ''' #print(component,available_components) k = [item['name'] for item in available_components].index(component) return [pardict["error"] for pardict in available_components[k]['pardicts']]
[docs]def _limits_(component,available_components): ''' inputs: one legal mucomponent name contained in the _available_components_(), which must be the second input output: a list of lists of limits (low, high), one for each parameter of this component ''' k = [item['name'] for item in available_components].index(component) return [pardict["limits"] for pardict in available_components[k]['pardicts']]
[docs]def add_step_limits_to_model(dash_in): ''' input: original dashboard dash_in, already checked output: dash_out is a deepcopy including 'error' and 'limits' ''' from copy import deepcopy from mujpy.tools.tools import _available_components_, _errors_, _limits_ available_components = _available_components_() dash_out = deepcopy(dash_in) # these lists contain all parameter values in the dashboard, including their error steps and limits for component in dash_out['model_guess']: steps = _errors_(component['name'],available_components) limits = _limits_(component['name'],available_components) for j,pardict in enumerate(component['pardicts']): pardict['error'] = steps[j] pardict['limits'] = limits[j] return dash_out
[docs]def _available_components_(): ''' returns a list of template dictionaries (one per fit component): retreived magically from the mucomponents mumodel class. Each dictionary contains 'name' and 'pardicts', 'pardicts' = list of parameter dictionaries, keys: 'name', 'error, 'limits' errore are used by minuit as initial steps limits are [None,None] for uncostrained parameters A,B,φ,λ [0,None] for positive parity parameters Δ,σ and for positive defined parameters 'α','β','Λ','ν' [0,0] for fake parameter BL :: ({'name':'bl','pardicts':[{'name':'A','error':0.01,'limits'[None,None]}, {'name':'λ','error':0.01,'limits'[None,None]}}, ...) ''' from mujpy.mucomponents.mucomponents import mumodel from iminuit import describe available_components = [] # generates the template of available components. for name in [module for module in dir(mumodel()) if module[0]!='_']: # magical extraction of component names pars = describe(mumodel.__dict__[name])[2:] # [12:] because the first two arguments are self, x _pars = [] # print('pars are {}'.format(pars)) tip = eval('mumodel.'+name+'.__doc__') positive_defined = ['α','β','Λ','ν'] positive_parity = ['Δ','σ'] for parname in pars: # parname, error, limits # In this template only # {'name':'amplitude','error':0.01,'limits':[0, 0]} # parameter name will get a label later error, limits = 0.002, [None, None] # defaults for 'A', 'λ', 'Γ' if parname == 'B' or parname == 'Bd': error = 0.05 if parname == 'BL': error, limits = 0, [0,0] if parname == 'φ': error = 1.0 if parname in positive_defined+positive_parity: limits = [0., None] # add here special cases for errors and limits, e.g. positive defined parameters _pars.append({'name':parname,'error':error,'limits':limits}) available_components.append({'name':name,'pardicts':_pars,'tip':tip}) # [available_components[i]['name'] for i in range(len(available_components))] # list of just mucomponents method names return available_components
#def _available_gradients_(component): # ''' # returns True if the component has an analytic gradient # i.e. for component name xx in the mucomponents mumodel class, # a method _grad_xx_ in the same class. # ''' # from mujpy.mucomponents.mucomponents import mumodel # # methods_with_grad = [module[6:8] for module in dir(mumodel()) if module[0:6]=='_grad_']: # magical extraction of component names # return component in methods_with_grad
[docs]def validmodel(model): ''' checks valid simple name "almlmg" ''' from mujpy.tools.tools import _available_components_ # print('validmodel: {}'.format(model)) available_components =_available_components_() # creates list automagically from mucomponents component_names = [available_components[i]['name'] for i in range(len(available_components))] components = [model[i:i+2] for i in range(0, len(model), 2)] # print('valid model, available components: ',*component_names) if not components: # empty model return False for component in components: if component in component_names: pass else: return False if 'al' in components: # check that model has only one 'al' at the beginning if model.count('al')>1 or model.index('al')>0: return False return True
[docs]def get_fit_range(string): ''' transform a valid string for fit_range into a list of integers ''' fit_range = [] for chan in string.split(','): fit_range.append(int(chan)) return fit_range
[docs]def checkvalidmodel(name,component_names): ''' checkvalidmodel(name) checks that name is either :: A1, B1: 2*component string of valid component names, e.g. 'daml' or 'mgmgbl' :: or A2, B2: same, ending with 1 digit, number of groups (max 9 groups), 'daml2' or 'mgmgml2' (2 groups) :: or C1: same, beginning with 1 digit, number of external minuit parameters (max 9) '3mgml' (3 external parameters e.g. A, f, phi) :: or C2: same, both previous options '3mgml2' (3 external parameters, 2 groups) ''' from mujpy.tools.tools import modelstrip try: name, nexternals = modelstrip(name) except: # self.console('name error: '+name+' contains too many externals or groups (max 9 each)') error_msg = 'name error: '+name+' contains too many externals or groups (max 9 each)' return False, error_msg # err code mess # decode model numberofda = 0 components = [name[i:i+2] for i in range(0, len(name), 2)] for component in components: if component == 'da': numberofda += 1 if component == 'al': numberofda += 1 if numberofda > 1: # self.console('name error: '+name+' contains too many da. Not added.') error_msg = 'name error: '+name+' contains too many da/al. Not added.' return False, error_msg # error code, message if component not in component_names: # self.console() error_msg = 'name error: '+component+' is not a known component. Not added.' return False, error_msg # error code, message return True, None
###################### # GET_TOTALS ######################
[docs]def get_totals(suite): ''' calculates the grand totals and group totals of a single run iput is self.suite of class musuite returns strings totalcounts groupcounts nsbin maxbin ''' import numpy as np # called only by self.suite after having loaded a run or a run suite ################### # grouping set # suite.grouping['forward'] and suite.grouping['backward'] are np.arrays of integers # initialize totals ################### for k,d in enumerate(suite.grouping): if not k: gr = np.concatenate((suite.grouping[k]['forward'],suite.grouping[k]['backward'])) else: gr = np.concatenate((gr,np.concatenate((suite.grouping[k]['forward'],suite.grouping[k]['backward'])))) ts,gs = [],[] for k,runs in enumerate(suite._the_runs_): tsum, gsum = 0, 0 for j,run in enumerate(runs): # add values for runs to add n1 = suite.offset+suite.nt0[0] for counter in range(run.get_numberHisto_int()): if suite.datafile[-3:]=='bin' or suite.datafile[-3:]=='mdu': n1 = suite.offset+suite.nt0[counter] histo = np.array(run.get_histo_vector(counter,1)).sum() tsum += histo if counter in gr: gsum += histo ts.append(tsum) gs.append(gsum) # print('In get totals inside loop,k {}, runs {}'.format(k,runs)) ####################### # strings containing # individual run totals ####################### # self.tots_all.value = '\n'.join(map(str,np.array(ts))) # self.tots_group.value = ' '.join(map(str,np.array(gs))) # print('In get totals outside loop, ts {},gs {}'.format(ts,gs)) ##################### # display values for self._the_runs_[0][0] # self.totalcounts.value = str(ts[0]) # self.groupcounts.value = str(gs[0]) # self.console('Updated Group Total for group including counters {}'.format(gr)) # debug # self.nsbin.value = '{:.3}'.format(self._the_runs_[0][0].get_binWidth_ns()) # self.maxbin.value = str(self.histoLength) return str(int(ts[0])), str(int(gs[0])), '{:.3}'.format(suite._the_runs_[0][0].get_binWidth_ns()), str(suite.histoLength)
[docs]def _nparam(model): ''' input: dashboard['model_guess'] output: ntot, nmintot, nfree ''' number_components = len(model) # print('_nparam tools debug: model {}'.format(model)) ntot = sum([len(model[k]['pardicts']) for k in range(number_components)]) # total number of component parameters flag = [pardict['flag'] for component in model for pardict in component['pardicts']] nmintot = ntot - sum([1 for k in range(ntot) if flag[k]=='=']) # ntot minus number of functions nfree = nmintot - sum([1 for k in range(ntot) if flag[k]=='!']) # ntot minus number of fixed parameters return ntot, nmintot, nfree
################################################################## # int2min methods: generate guess values, errors and limits # of minuit parameters # int2min : # int2min_multigroup : assumes all parameters are in userpardicts # int2min_multirun : assumes both userpardicts and active parameters # must generate daughters for local parameters ##################################################################
[docs]def int2min(model): ''' input: model either dashboard["model_guess"] (after add_step_limits_to_model) or dashboard["model_guess"] both lists of dicts output: a list of lists: values: minuit parameter values, either guess of result errors: their steps fixed: True/False for each limits: [low, high] limits for each or [None,None] names: name of parameter 'x_label' for each parameter pospar: parameter for which component is positive parity, eg s in e^{-(s*t)^2/2} ''' from mujpy.tools.tools import _nparam dum, ntot, dum = _nparam(model) ##################################################### # the following variables contain the same as input # # parameters to iMinuit, removing '='s (functions) # ##################################################### positive_parity = ['Δ','σ'] val, err, fix, lim = [], [], [], [] names = [] pospar = [] # contains index of positive parity parameters, to rerun with no limits for component in model: # scan the model components label = component['label'] for k,pardict in enumerate(component['pardicts']): # list of dictionaries if pardict['flag'] != '=': # skip functions, only new minuit parameters if pardict["name"] in positive_parity: pospar.append(k) if pardict['flag'] == '~': fix.append(False) elif pardict['flag'] == '!': fix.append(True) val.append(float(pardict['value'])) names.append(pardict['name']+'_'+label) err.append(float(pardict['error'])) lim.append(pardict['limits']) # print('tools int2min debug: pardict name {} limits {}'.format(names[-1],lim[-1])) # self.console('val = {}\nerr = {}\nfix = {}\nlim = {},\npar name = {} '.format(val,err,fix,lim, names)) return val, err, fix, lim, names, pospar
[docs]def int2min_multigroup(pardicts): ''' input: pardicts either dashboard["userpardicts_guess"] if guess = True or dashboard["userpardicts_result"] if guess = False output: a list of lists: values: minuit parameter values, either guess of result errors: their steps fixed: True/False for each limits: [low, high] limits for each or [None,None] name: name of parameter 'x_label' for each parameter pospar: parameter for which component is positive parity, eg s in e^{-(s*t)^2/2} this works for A2 single fit, multigroup with userpardicts parameters = Minuit parameters ''' ##################################################### # the following variables contain the same as input # # parameters to iMinuit, removing '='s (functions) # ##################################################### val, err, fix, lim = [], [], [], [] name = [] pospar = [] # contains index of positive parity parameters, to rerun with no limits for k,pardict in enumerate(pardicts): # scan the model components if 'positive_parity' in pardict.keys(): pospar.append(k) errstd = 'error' if 'error' in pardict.keys() else 'std' val.append(float(pardict['value'])) name.append(pardict['name']) err.append(float(pardict[errstd])) if 'error' in pardict.keys(): lim.append(pardict['limits']) # print('tools debug: par {} limits {}'.format(pardict['name'],pardict['limits'])) if 'flag' in pardict.keys(): if pardict['flag'] == '!': fix.append(True) elif pardict['flag'] == '~': fix.append(False) else: return False,_,_,_,_,_,_ # self.console('val = {}\nerr = {}\nfix = {}\nlim = {}\python list with more repeated valuesncomp name = {},\npar name = {} '.format(val,err,fix,lim,name)) return val, err, fix, lim, name, pospar
[docs]def int2min_multirun(dashboard,runs): ''' input: dashboar for single group multirun containins both dashboard["userpardicts_guess"] or dashboard["userpardicts_result"] dashboard["model_guess"] (after add_step_limits_to_model) or dashboard["model_result"] runs = list of run numbers output: a list of lists, each list containing minuit internal parameters for a run values: minuit parameter values, either guess of result errors: their steps fixed: True/False for each limits: [low, high] limits for each or [None,None] names: name of parameter 'x_label' for each parameter pospar: parameter for which component is positive parity, eg s in e^{-(s*t)^2/2} Note: this method knows the model, hence it can generate only minuit parameters mucomponents, i.e. the cost function, needs to feed also non minuit parameters to the components e.g. mg with A global B1 local phi global s1 global will have only two minuit parameters per run , the second and the fourth, and it must know which value to use for the first and third This is accomplished by _add_multirun_singlegroup_ in mucomponents, using self._components_ ''' ##################################################### # the following variables contain the same as input # # parameters to iMinuit, removing '='s (functions) # ##################################################### positive_parity = ['Δ','σ'] val, err, fix, lim = [], [], [], [] name = [] user_local = [] pospar = [] # contains index of positive parity parameters, to rerun with no limits pospar_loc = [] nlocals = 0 # first scan the global and local user parameters pardicts = dashboard["userpardicts_result"] if "userpardicts_result" in dashboard.keys() else dashboard["userpardicts_guess"] # REMIND: insert plot guess option for _result dashboard model = dashboard["model_result"] if "model_result" in dashboard.keys() else dashboard["model_guess"] for k,pardict in enumerate(pardicts): # scan the model components if 'positive_parity' in pardict.keys(): if pardict["positive_parity"]: pospar_loc.append(k) pardict['limits'][0] = 0. # print('debug tools int2min_multirun: pospar {} lim({}) = {}'.format(pardict["name"],k, pardict['limits'])) errstd = 'error' if 'error' in pardict.keys() else 'std' if pardict["local"] or type(pardict["value"])==list: # the local key is set to False by default nlocals += 1 user_local.append(pardict) # set aside this parameter for the loop over runs pardict['local'] = True else: # the first n parameters are the global user parameters val.append(pardict['value']) name.append(pardict['name']) err.append(pardict[errstd]) if 'error' in pardict.keys(): lim.append(pardict['limits']) # print('tools debug: par {} limits {}'.format(pardict['name'],pardict['limits'])) if 'flag' in pardict.keys(): if pardict['flag'] == '!': fix.append(True) elif pardict['flag'] == '~': fix.append(False) else: return False,_,_,_,_,_,_ kloc = k-nlocals # print('debug tools int2min_multirun: pospar_local {}\nuser_local = {}'.format(pospar_loc,user_local)) # print('debug tools int2min_multirun: kloc = {}, len(fix) is {}'.format(kloc,len(fix))) # print('first the userpars\nval = {}\nerr = {}\nfix = {}\nlim = {}\ncomp name = {},\npar name = {} '.format(val,err,fix,lim,name)) # now scan the runs andcreate as many replicas of the local paratmeters for krun,run in enumerate(runs): # run[0] is a string with the run number # "value" may be a single guess value for all or a list of guess values, one per run, checked at start for kusr,usr in enumerate(user_local): # first the local user parameter names kloc += 1 fix.append(False) # can only be not-fixed if kusr in pospar_loc: pospar.append(kloc) # this parameter is run version of a positive parity user local par if type(usr["value"])==list: # print('list = {}, krun = {}'.format(usr["value"],krun)) val.append(usr["value"][krun]) else: val.append(usr["value"]) name.append(usr["name"]+'_'+run[0]) errstd = 'error' if 'error' in usr.keys() else 'std' if type(usr[errstd])==list: err.append(usr[errstd][krun]) else: err.append(usr[errstd]) lim.append(usr['limits']) for component in model: # then scan the model components and add only non "="-flag parameters label = component['label'] for k,pardict in enumerate(component['pardicts']): # list of dictionaries if pardict['flag'] != '=': # minuit parameter kloc += 1 if pardict["name"][0] in positive_parity: pospar.append(kloc) pardict['limits'][0] = 0. # print('debug tools int2min_multirun: pospar {} lim({}) = {}'.format(pardict["name"],k, pardict['limits'])) if pardict['flag'] == '~': fix.append(False) elif pardict['flag'] == '!': fix.append(True) # else: # print('debug tools int2min_multirun: kloc = {}, pardict["flag"] is {}'.format(kloc,pardict['flag'])) if type(pardict["value"])==list: #print('val = {}, krun = {}'.format(pardict["value"],krun)) val.append(pardict["value"][krun]) else: val.append(pardict["value"]) name.append(pardict['name']+'_'+label+'_'+run[0]) errstd = 'error' if 'error' in pardict.keys() else 'std' if type(pardict[errstd])==list: err.append(pardict[errstd][krun]) else: err.append(pardict[errstd]) lim.append(pardict['limits']) pre = 0 for k in pospar_loc: if k not in pospar: pospar.insert(pre,k) pre += 1 # print('debug tools int2min_multirun: kloc = {}, len(fix) is {}'.format(kloc,len(fix))) return val, err, fix, lim, name, pospar # all simple lists of sequential parameters, minuit order
[docs]def int2fft(model): ''' input: model dashboard["model_guess"] output: fft_subtract: a list of boolean values, one per model component fft flag True, component subtracted in residues ''' from mujpy.tools.tools import _nparam fft_flag = [] fft_name = [] for componentdict in model: # scan the model components if "fft" not in componentdict.keys(): append(False) else: append(componentdict["fft"]) fft_name.append(componentdict["name"]) return fft_name, fft_flag
################################## # method and key methods: provide component methods # and parameter key for eval(key) in _add_ # int2_method_key : single run single group # int2_multigroup_method_key : single run multi group # int2_multirun_user_method_key : multirun single group user # int2_multirun_grad_method_key : same with grad
[docs]def int2_method_key(dashboard,the_model): ''' input: dashboard, the dashboard dict structure the_model, a fit model instance (not necessarily loaded) output: a list of lists, the inner lists contain each method, a mumodel component method, in the order of the model components for the use of mumodel._add_. keys, a list of as many lambda functions as the parameters of teh component hard coding the translated "function" string for fast evaluation. This function applies tools.translate to the parameter numbers in formulas: dashboard "function" is written in terms of the internal parameter index, while Minuit parameter index skips shared or formula-determined ('=') parameters ''' from mujpy.tools.tools import translate, set_key model_guess = dashboard['model_guess'] # guess surely exists ntot = sum([len(model_guess[k]['pardicts']) for k in range(len(model_guess))]) lmin = [] # initialize the minuit parameter index of dashboard function indices nint = -1 # initialize the number of internal parameters nmin = -1 # initialize the number of minuit parameters method_key = [] function = [pardict['function'] for component in model_guess for pardict in component['pardicts']] for k in range(len(model_guess)): # scan the model name = model_guess[k]['name'] # print('name = {}, model = {}'.format(name,self._the_model_)) is_al_da = name=='al' or name=='da' bndmthd = [] if is_al_da else the_model.__getattribute__(name) # this is the method to calculate a component, to set alpha, dalpha apart keys = [] # isminuit = [] not used flag = [item['flag'] for item in model_guess[k]['pardicts']] for j,pardict in enumerate(model_guess[k]['pardicts']): nint += 1 # internal parameter incremente always if flag[j] == '=': # function is written in terms of nint # nint must be translated into nmin string = translate(nint,lmin,pardict['function']) # here is where lmin is used # translate substitutes lmin[n] where n is the index read in the function (e.g. p[3]) key_as_lambda = set_key(string) # NEW! calculates simple functions and speedup keys.append(key_as_lambda) # the function key in keys will be evaluated, key(p), inside mucomponents # isminuit.append(False) lmin.append(0) else:# flag[j] == '~' or flag[j] == '!' nmin += 1 key_as_lambda = set_key('p['+str(nmin)+']') # NEW! calculates simple functions and speedup keys.append(key_as_lambda) # the function key in keys will be evaluated, key(p), inside mucomponents lmin.append(nmin) # # isminuit.append(True) # print('int2_method tools debug: bndmthd = {}, keys = {}'.format(bndmthd,keys)) method_key.append([bndmthd,keys]) return method_key
[docs]def int2_multigroup_method_key(dashboard,the_model): ''' input: dashboard, the dashboard dict structure fit._the_model_ is an instance of mumodel (the number of groups is obtained from dashboard) output: a list of methods and keys, in the order of the model components for the use of mumodel._add_multigroup_. method is a 2d vector function accepting time and a variable number of lists of (component) parameters e.g if one component is mumodel.bl(x,A,λ) the corresponding component for a two group fit accepts the following argument (t,[A1, A2],[λ1,λ2]) keys is a list of lists of strings they are evaluated to produce the method parameters, there are ngroups strings per parameter (inner list) npar parametes per component (outer list) Invoked by the iMinuit initializing call self._the_model_._load_data_multigroup_ just before submitting migrad, This function does not need userpardicts and tools.translate since the correspondence with Minuit parameters is given directly either by "function" or by "function_multi" ''' from mujpy.tools.tools import multigroup_in_components, cstack, setkey model = dashboard['model_guess'] # guess surely exists # these are the only Minuit parameters [p[k] for k in range (nuser)] ntot = sum([len(model[k]['pardicts']) for k in range(len(model))]) method_key = [] pardicts = [pardict for component in model for pardict in component['pardicts']] mask_function_multi = multigroup_in_components(dashboard) # print('int2_multigroup_method_key tools debug: index function_multi {}\npardicts = {}'.format(mask_function_multi,pardicts)) # if "userpardicts_guess" in dashboard.keys(): # updicts = dashboard["userpardicts_guess"] # print('int2_multigroup_method_key tools debug: userpardicts ') # for j,pd in enumerate(updicts): # print('{} {} = {}({}), {}, {} '.format(j,pd["name"],pd["value"], # pd["error"], pd["flag"],pd["limits"])) if sum(mask_function_multi): ngroups = len(pardicts[mask_function_multi.index(1)]["function_multi"]) else: return [] nint = -1 # initialize the index of the dashboard component parameters # p = [1.13,1.05,0.25,0.3,0.8,700,35,125,3.3,680,0.1] # debug delete # print('tools int2_multigroup_method_key debug: fake values k, p {}'.format([[k,par] for k,par in enumerate(p)])) bndmthd = {} # to avoid same name for j,component in enumerate(model): # scan the model components name = component['name'] keys = [] bndmthd[name] = lambda x,*pars, name=name : cstack(the_model.__getattribute__(name),x,*pars) bndmthd[name].__doc__ = '"""'+name+'"""' # this is the method to calculate a component, to set alpha, dalpha apart #print('\n\tools int2_multigroup_method_key debug: {}-th component name = {}'.format(j,bndmthd[name].__doc__)) nint0 = nint for l in range(ngroups): key = [] nint = nint0 for pardict in component['pardicts']: nint += 1 # internal parameter index incremented always if mask_function_multi[nint]>0: # print('tools int2_multigroup_method_key debug: {}[{}] = {}'.format(pardict["name"],l,pardict["function_multi"][l])) key_as_lambda = set_key(pardict["function_multi"][l]) # NEW! calculates simple functions and speedup else: # print('tools int2_multigroup_method_key debug: {}[{}] = {}'.format(pardict["name"],l,pardict["function"])) key_as_lambda = set_key(pardict["function"][l]) # NEW! calculates simple functions and speedup # print('tools int2_multigroup_method_key debug: key_as_lambda(p) = {} **delete also p!'.format(key_as_lambda(p))) key.append(key_as_lambda) # the function key will be evaluated, key(p), inside mucomponents keys.append(key) #print('int2_method tools debug: appending {}-th bndmthd {} with {} groups x {} keys'.format(j,bndmthd[name].__doc__,len(keys),len(keys[0]))) method_key.append([bndmthd[name],keys]) # vectorialized method, with keys # keys = [[strp0g0, strp1g0,...],[strp0g1, strp1g1, ..],[strp0g2, strp1g2,...]..] # pars = [[p0g0, p1g0, ...],[p0g1, p1g1, ..],[p0g2, p1g2,...]..] return method_key
[docs]def int2_multirun_user_method_key(dashboard,the_model,nruns): ''' input: dashboard, the dashboard dict structure the_model is fit._the_model_ i.e. an instance of mumodel nruns is the numer of runs in the suite output: a list of methods and a list of lists of keys, [[key,...,key],...,[key,..,key]] the internal list is same parameter, different runs the model components for the use of mumodel._add_multirun_. method is a component function accepting time and a list of parameters, e.g mumodel.bl(x,A,λ) key is string defining a lambda function that produces one method parameter for a specific run, the list is for the same parameter over diffenet runs keys is a list of lists for all the parameters (any flag) of the component the list of [binding,keys] is over the components of the model This list of [binding, keys] allows mumodel _add_multirun_ to use the minuit p list (n_globals global user values, followed by nruns replica of n_locals local user values and a model specific number of local (~,!) component par values) to produce component-driven vectorized values, as many values in the vector as the runs In this way minuit fcn is a vector, one fcn per run, likewise asymm, asyme are vectors (see suite for multirun) and mumodel._chisquare_ cost function sums over individual runs for a unique global chisquare Invoked by the iMinuit initializing call self._the_model_._load_data_multirun_user_ just before submitting migrad ''' from mujpy.tools.tools import multigroup_in_components, cstack, translate_multirun, set_key from mujpy.tools.tools import get_functions_in # self._components_ is a list [[method,[key,...,key]],...,[method,[key,...,key]]], # produced by int2_multirun_user_method_key() from mujpy.tools.tools # where method is an instantiation of a component, e.g. self.ml # and value = eval(key) produces the parameter value model = dashboard['model_guess'] # guess surely exists, it is a list of component dicts, e.g. for mgbl 2 dicts method_key = [] bndmthd = {} # to avoid same name n_locals = [pardict["local"] for pardict in dashboard["userpardicts_guess"]].count(True) n_globals = len(dashboard["userpardicts_guess"])-n_locals kloc = n_globals+n_locals functions_in = get_functions_in(model,kloc-1) functions_out = translate_multirun(functions_in,n_locals,kloc,nruns) # print('\n\ndebug tools int2_multirun_user_method_key functions_out = {}'.format(functions_out)) for j,component in enumerate(model): # scan the model components (as for the first run) name = component['name'] keys = [] # this method uses pars, a list of lists (runs) of parameter for this component, obtained by key(p) from minuit p bndmthd[name] = lambda x,*pars, name=name : cstack(the_model.__getattribute__(name),x,*pars) bndmthd[name].__doc__ = '"""'+name+'"""' # no alpha in global multirun! # its pars are generated as a list of lists of the key_as_lambda functions for funcs in functions_out[j]: # funcs is a run, in the suite of runs key = [] for func in funcs: # this is a parameter for this run, in the component parameters #print('debug tools int2_multirun_user_method_key func = {}'.format(func)) key_as_lambda = set_key(func) # NEW! calculates simple functions and speedup # function key will be evaluated as key(p) inside mucomponents key.append(key_as_lambda) # collect parameter key(s) of the component keys.append(key) # create outer list adding component parameters for this run method_key.append([bndmthd[name],keys]) # vectorialized method, with its keys list of lists # appended to a list of [method, # print('debug tools int2_multirun_user_method_key: locals =\n{}'.format(globals())) return method_key
[docs]def int2_multirun_grad_method_key(dashboard,the_model,nruns): ''' input: dashboard, the dashboard dict structure the_model is fit._the_model_ i.e. an instance of global multirun mumodel nruns is the numer of runs in the suite output minuit_ordered_grad_list i.e. a list of lists [k,n,j,grad_bndmthd,dkey])], one for each minuit internal parameter p[m] k n j are indices of run, component and parameter for which dkndj_bndmthd is the <module> that computes the derivative of component n in run k with respect to parameter j (if par are the parameters for component n in run k, then dkndj_bndmthd(x,*par) calculates the derivative) djdm is a <module> that computes the derivative of the user funct (e.g. "p[0]*p[21]') with respect to p[m] (then djdm(p) is the value of the derivative) The products of these derivatives is sparse, i.e. non zero only for few values k,n,j The present method identifies each and every set of indices (k,n,j) for which the product gg_nj = dkndj_bndmthd(x,*par)*djdm(p) != 0 --------------------------- Usage To generate chisquare grad values for all minuit parameters, in order to optimize numpy miniut calculations; mucomponents _add_multirun_grad_ uses the 2D array gg = sum_n,j gg_nj and multiplies it by the 2D array dcdf = 2(f-y)/e^2 The m-th component of the chisquare gradient is sum(dcdf*gg,axis=None) -------------------------- General equation Assuming asymmetry data end errors y(k;i), ey(k;i), the expression for the chisquare gradient is sum_i,k {2(sum_n y_n(k;i,*par(k,n))-y(k;i))/ey(k;i)^2} * sum_n,j {partial y_n(k;i,*par(k,n)/partial par[k,n,j]} * {partial par[k,n,j]/partial p[m]} hereafter dcdf * sum_n,j dkndj * djdm ''' from mujpy.tools.tools import get_functions_in, diffunc, get_indices, get_number_minuit_internal from mujpy.tools.tools import multigroup_in_components, translate_multirun, set_key # firts generate dmethod_keys # dmethod_keys contains [[m_d,keys]...[m_d,keys]], as many as the model components (e.g. 2 for mgbl) # m_d is [method] if no derivative is required (e.g. bl) # or [method, derivative_method] if derivative is required (e.g. mg) # keys is [runkeys,...,runkeys] such that # par = [key(p) for key in runkeys] and method(x,*par) and derivative_method(self._x_,*par) produce the additive component for that run model = dashboard['model_guess'] names = [component['name'] for component in model] n_locals = [pardict["local"] for pardict in dashboard["userpardicts_guess"]].count(True) n_globals = len(dashboard["userpardicts_guess"])-n_locals kloc = n_globals+n_locals functions_in = get_functions_in(model,kloc-1) # functions_in are the user func strings of the single-run model functions_out = translate_multirun(functions_in,n_locals,kloc,nruns) # user func strings of the multirun model minuit_ordered_grad_list = [[] for x in range(get_number_minuit_internal(nruns,n_globals,n_locals,model))] # this is the empty output container #print('debug tools int2_multirun_grad_method_key, minuit_grad_list = {}'.format(minuit_ordered_grad_list)) for n_component, (component, component_name) in enumerate(zip(functions_out,names)): # the order is model components, runs, component parameter for k_run, run_component in enumerate(component): for j_parameter, func in enumerate(run_component): dfuncs, indices = diffunc(func) # es func = 'p[0]*p[7]', dfuncs = ['p[7]','p[0]'] indices = [0,7] for dfunc,m_minuit_parameter, in zip(dfuncs,indices): grad_bndmthd = lambda x, *par, gname='_grad_'+component_name+'_'+str(j_parameter)+'_' : the_model.__getattribute__(gname)(x,*par) grad_bndmthd.__doc__ = '"""'+'_grad_'+component_name+'_'+str(j_parameter)+'_"""' # if e.g. component_name is 'bl' methods must exist called _grad_bl_0_, _grad_bl_1_ ... # print('debug tools int2_multirun_grad_method_key, m_minuit_parameter = {}, k, n, j = {};{},{}'.format(m_minuit_parameter,k_run,n_component,j_parameter)) grad_list = minuit_ordered_grad_list[m_minuit_parameter] grad_list.append([k_run,n_component,j_parameter,grad_bndmthd,set_key(dfunc)]) return minuit_ordered_grad_list
[docs]def get_number_minuit_internal(nruns,n_globals,n_locals,model): k_mint = 0 for j,component in enumerate(model): # scan the model components (as for the first run) flags = [pardict["flag"] for pardict in component["pardicts"]] # these are the flags in the present component for k,flag in enumerate(flags): # as many flags as parameters in component if flag!="=": k_mint += 1 return n_globals + nruns*(n_locals + k_mint)
[docs]def get_functions_in(model,kk): ''' input model = single-run model dashboard dict kk = kloc -1, is incremented at each free parameter of the model, so that it scans the internal minuit indices for these parameters (ignoring those determined by a user funct e.g. "p[0]*p[4]" output functions_in = list of lists, one per component, of user functs, one per parameter, for the single-run model, all component parameters, including "~" and "!", are translated to appropriate user funct ''' functions_in = [] for j,component in enumerate(model): # scan the model components (as for the first run) flags = [pardict["flag"] for pardict in component["pardicts"]] # these are the flags in the present component function_in = [pardict["function"] for pardict in component["pardicts"]] # these are the original function (some are empty) for k,flag in enumerate(flags): # as many flags as parameters in component if flag!="=": # this parameter is among the minuit parameters kk += 1 # this is the minuit index of the current first run parameter # suppose mg with n_globals = 5 (0,1,2,3,4), n_locals = 1 (5) # 6 A = f(p[1],p[5]) 7 B = p[6] 8 φ = p[2] 9 σ = p[7] # k kk n_equals # 0 - 1 # 1 5+1+1-1=6 - # 2 - 2 # 3 5+1+3-2=7 - function_in[k] = 'p['+str(kk)+']' # write a fake "function" to eval this parameter as 'p[kk]' functions_in.append(function_in) return functions_in
[docs]def set_key(string): """ input: the function string from the json or the mudash dashboard e.g. that written in the json file as 'function':'p[0]*(0.5+1/pi*arctan(p[2])' or typed into mudash text widget as 'p[0]*(0.5+1/pi*arctan(p[2])' output: key, a python method, such that in mumodel mucomponents _add_ the command key(p) evaluates the formula the evaluation knows simple numpy functions, see the import in string code, below """ code = """ from numpy import cos, sin, tan, sinh, cosh, tanh, log, pi, exp, sqrt, real, abs, arctan def foo(): """ string = '"lambda p: '+string+ '"' string = " key = eval('"+string+"')" # print('string ={}'.format(string)) code = code + string + """ return key """ # print('code = {}'.format(code)) exec(code,globals(),globals()) key = eval(foo()) return key
#def fstack(npfunc,x,*pars): # ''' # vectorialize npfunc # input: # npfunc numpy function with input (x,*argv) # x time # *pars is a list of lists of parameters, # list len is the output_function_array.shape[0] # output: # output_function_array # stacks vertically n replica of npfunc distributing parameters as in # (x, *argv[i]) for each i-th replica # ''' # # fstack reproduces the parameter input of a component according to # # self._components_ = [[method,[key,...,key]],...,[method,[key,...,key]]], and eval(key) produces the parmeter value # # where the outer list a replica of the same component method # # either over several groups (multigroup) or over several runs (multirun) # # as of now this method does not work for the multirun multigroup userpar case (C2) # from numpy import vstack # for k,par in enumerate(pars): # if k: # # print('tools fstack debug: npfunc.__doc__: {}'.format(npfunc.__doc__)) # f = vstack((f,npfunc(x,*par))) # else: # # print('tools fstack debug: k=0 npfunc.__doc__: {}'.format(npfunc.__doc__)) # f = npfunc(x,*par) # return f
[docs]def cstack(npfunc,x,*pars): ''' vectorialize npfunc input: npfunc numpy function with input (x,*argv) x time *pars is a list of lists of parameters, list len n is the output_function_array.shape[0] output: output_function_array stacks vertically n replica of npfunc distributing parameters as in (x, *argv[i]) for each i-th replica ''' # cstack reproduces the parameter input of a component according to # self._components_ = [[method,[key,...,key]],...,[method,[key,...,key]]], and eval(key) produces the parmeter value # where the outer list a replica of the same component method # either over several groups (multigroup) or over several runs (multirun) # as of now this method does not work for the multirun multigroup userpar case (C2) from numpy import concatenate return concatenate([npfunc(x,*par) for par in pars]).reshape(-1,x.shape[0])
[docs]def int2_calib_method_key(dashboard,the_model): ''' NOT USED, remove input: the dashboard dict structure and the fit model 'alxx..' instance the actual model contains 'al' plus 'xx', .. the present method considers only the latter FOR PLOTTING ONLY (USE int2_method for the actual calib fit) output: a list of methods for calib fits, in the order of the 'xx..' model components (skipping al) for the use of mumodel._add_single_. Invoked by the iMinuit initializing call self._the_model_._load_data_, just before submitting migrad, self._the_model_ is an instance of mumodel This function applies tools.translate to the parameter numbers in formulas since on the dash each parameter of each component gets an internal number, but alpha is popped and shared or formula-determined ('=') ones are not minuit parameters ''' from mujpy.tools.tools import translate model_guess = dashboard['model_guess'] # guess surely exists ntot = sum([len(model_guess[k]['pardicts']) for k in range(len(model_guess))])-1 # minus alpha lmin = [] # initialize the minuit parameter index of dashboard function indices nint = -1 # initialize the number of internal parameters nmin = -1 # initialize the number of minuit parameters method_key = [] function = [pardict['function'] for component in model_guess for pardict in component['pardicts']] for k in range(1,len(model_guess)): # scan the model popping 'al' and its parameter 'alpha' name = model_guess[k]['name'] # print('name = {}, model = {}'.format(name,self._the_model_)) bndmthd = the_model.__getattribute__(name) keys = [] # isminuit = [] not used flag = [item['flag'] for item in model_guess[k]['pardicts']] for j,pardict in enumerate(model_guess[k]['pardicts']): nint += 1 # internal parameter incremente always if flag[j] == '=': # function is written in terms of nint # nint must be translated into nmin string = translate(nint,lmin,pardict['function']) # here is where lmin is used # translate substitutes lmin[n] where n is the index read in the function (e.g. p[3]) keys.append(string) # the function will be eval-uated, eval(key) inside mucomponents # isminuit.append(False) lmin.append(0) else:# flag[j] == '~' or flag[j] == '!' nmin += 1 keys.append('p['+str(nmin)+']') # this also needs direct translation lmin.append(nmin) # # isminuit.append(True) method_key.append([bndmthd,keys]) return method_key
[docs]def int2_calib_multigroup_method_key(dashboard,the_model): ''' NOT USED, remove input: the dashboard dict structure and the fit model 'alxx..' instance the actual model contains 'al' plus 'xx', .. the present method considers only the latter FOR PLOTTING ONLY (USE int2_method for the actual calib fit) output: a list of methods for calib fits, in the order of the 'xx..' model components (skipping al) for the use of mumodel._add_single_. Invoked by the iMinuit initializing call self._the_model_._load_data_, just before submitting migrad, self._the_model_ is an instance of mumodel This function applies tools.translate to the parameter numbers in formulas since on the dash each parameter of each component gets an internal number, but alpha is popped and shared or formula-determined ('=') ones are not minuit parameters ''' from mujpy.tools.tools import translate print('int2_calib_multigroup_method_key tools debug: copy of non multigroup, adapt!') model_guess = dashboard['model_guess'] # guess surely exists ntot = sum([len(model_guess[k]['pardicts']) for k in range(len(model_guess))])-1 # minus alpha lmin = [] # initialize the minuit parameter index of dashboard function indices nint = -1 # initialize the number of internal parameters nmin = -1 # initialize the number of minuit parameters method_key = [] function = [pardict['function'] for component in model_guess for pardict in component['pardicts']] for k in range(1,len(model_guess)): # scan the model popping 'al' and its parameter 'alpha' name = model_guess[k]['name'] # print('name = {}, model = {}'.format(name,self._the_model_)) bndmthd = the_model.__getattribute__(name) keys = [] # isminuit = [] not used flag = [item['flag'] for item in model_guess[k]['pardicts']] for j,pardict in enumerate(model_guess[k]['pardicts']): nint += 1 # internal parameter incremente always if flag[j] == '=': # function is written in terms of nint # nint must be translated into nmin string = translate(nint,lmin,pardict['function']) # here is where lmin is used # translate substitutes lmin[n] where n is the index read in the function (e.g. p[3]) keys.append(string) # the function will be eval-uated, eval(key) inside mucomponents # isminuit.append(False) lmin.append(0) else:# flag[j] == '~' or flag[j] == '!' nmin += 1 keys.append('p['+str(nmin)+']') # this also needs direct translation lmin.append(nmin) # # isminuit.append(True) method_key.append([bndmthd,keys]) return method_key
[docs]def min2int(model_guess,values_in,errors_in): ''' input: model_component from dashboard values_in Minuit.values errors_in Minuit.errors output: for all dashbord parameters names list of lists of parameter names values_out list of lists of their values errors_out list of lists of their errors reconstruct dashboard with Minuit best fit values and errors for print_components, compact fit summary ''' # # initialize # from mujpy.tools.tools import translate names, values_out, p, errors_out, e = [], [], [], [], [] nint = -1 # initialize nmin = -1 lmin = [] flag = [pardict['flag'] for component in model_guess for pardict in component['pardicts']] # flag = [pardict['flag'] for component in model_guess for pardict in component['pardicts']] for k,component in enumerate(model_guess): # scan the model component_name = component['name'] name, value, error = [], [], [] label = model_guess[k]['label'] for j,pardict in enumerate(model_guess[k]['pardicts']): # list of dictionaries, par is a dictionary nint += 1 # internal parameter incremented always if j==0: name.append('{}{}_{}'.format(component_name,pardict['name'],label)) else: name.append('{}_{}'.format(pardict['name'],label)) if flag[nint] != '=': # skip functions, they are not new minuit parameter nmin += 1 lmin.append(nmin) p.append(values_in[nmin]) # needed also by functions value.append(values_in[nmin]) e.append(errors_in[nmin]) error.append(errors_in[nmin]) # parvalue item is a string else: # functions, calculate as such # nint must be translated into nmin string = translate(nint,lmin,pardict['function']) p.append(eval(string)) value.append(eval(string)) e.append(eval(string.replace('p','e'))) error.append(eval(string.replace('p','e'))) lmin.append(0) # not needed names.append(name) values_out.append(value) errors_out.append(error) return names, values_out, errors_out # list of parameter values
[docs]def min2int_multirun(dashboard,p,e,_the_runs_): ''' input: dashboard; userpardicts_guess and model_guess from used only to retrieve "function" or "function_multi" and "error_propagation_multi" p,e Minuit best fit parameter values and std _th_runs_ = list of run numbers in suite output: for all parameters names list of lists of parameter names pars list of lists ofparameter values epars list of lists of parameter errors used only in summary_multirun_global that prints name value(error) one or more lines of global user parameters (the first list in the inner lists) one line per run local user parameters and local component parameters (the others) ''' # # initialize # from mujpy.tools.tools import multigroup_in_components names, pars, epars = [], [], [] nameloc, npars, n_locals = [], -1, 0# inner list, components name, par, epar = [], [], [] for k, pardict in enumerate(dashboard['userpardicts_guess']): if not pardict['local']: # name, par, epar are lists of globals name.append(pardict['name']) par.append(p[k]) epar.append(e[k]) npars += 1 else: # nameloc are bare names of locals n_locals += 1 nameloc.append(pardict['name']) # store the globals in name[0], pars[0], epars[0] names.append(name) pars.append(par) epars.append(epar) model = dashboard['model_guess'] for run in range(len(_the_runs_)): name, par, epar = [], [], []# inner list for k in range(n_locals): npars += 1 # for brevity name appends a progressive index, not the run number as in minuit name.append(nameloc[k]+str(run)) # if run in _the_runs, use run.get_runNumber_int() par.append(p[npars]) epar.append(e[npars]) for component in model: # scan the model components component_name = component['name'] label = component['label'] for j,pardict in enumerate(component['pardicts']): if not pardict['flag']=='=': npars += 1 # internal parameter index incremented always name.append('{}.{}_{}'.format(pardict['name'],label,str(run))) par.append(p[npars]) epar.append(e[npars]) names.append(name) pars.append(par) epars.append(epar) return names, pars, epars # list of lists of parameter names, values, errors
[docs]def min2int_multigroup(dashboard,p,e): ''' input: userpardicts_guess from dashboard (each dict corresponds to a Minuit parameter) x\ used only to retrieve "function" and "error_propagation_multi" p,e Minuit best fit parameter values and std output: for all parameters namesg list of lists of dashboard parameter names parsg list of lists of dashboard parameter values eparsg list of lists of dashboard parameter errors used only in summary_global ''' # # initialize # from mujpy.tools.tools import multigroup_in_components # print('min2int_multigroup in tools debug: dash {}'.format(dashboard)) mask_function_multi = multigroup_in_components(dashboard) userpardicts = dashboard['userpardicts_guess'] e = [e[k] if pardict['flag']=='~' else 0 for k,pardict in enumerate(userpardicts)] # names = [pardict['name'] for pardict in userpardicts] model = dashboard['model_guess'] pardicts = [pardict for component in model for pardict in component['pardicts']] ngroups = len(pardicts[mask_function_multi.index(1)]["function_multi"]) nint = -1 # initialize namesg, parsg, eparsg = [], [], [] for l in range(ngroups): nint0 = nint names, pars, epars = [], [], [] for component in model: # scan the model components component_name = component['name'] label = component['label'] # nint = nint0 name, par, epar = [], [], [] # inner list, components for j,pardict in enumerate(component['pardicts']): nint0 += 1 # internal parameter index incremented always if j==0: name.append('{}: {}_{}'.format(component_name,pardict['name'],label)) else: name.append('{}_{}'.format(pardict['name'],label)) if mask_function_multi[nint0]: par.append(eval(pardict["function_multi"][l])) try: epar.append(eval(pardict["error_propagate_multi"][l])) except: epar.append(eval(pardict["function_multi"][l].replace('p','e'))) else: par.append(eval(pardict["function"])) # the function will be eval-uated inside mucomponents try: epar.append(eval(pardict["error_propagate"])) except: epar.append(eval(pardict["function"].replace('p','e'))) # print('tools min2int_multigroup debug: nint = {} name = {} par = {}, epar = {}'.format(nint0,name[-1],par[-1],epar[-1])) pars.append(par) # middel list, model names.append(name) epars.append(epar) namesg.append(names) parsg.append(pars) eparsg.append(epars) return namesg, parsg, eparsg # list of parameter values
[docs]def minglobal2sequential(p_out,p_in,method_keys,dashboard): ''' translate global best fit results (values) into nruns equivalent sequential fits for plotting purposes: mufitplot(plot_range,the_fit) will access self.fit.lastfits and self.fit.dashboard_single if self.fit.C1 is True input: p_out is global best fit minuit parameters self.lastfit p_in is global fit minuit guess parameters values_in method_key is produced by int2_multirun_user_method_key dashboard is the fit global dashboard results toggles between plotting result of guess output: lastfits, list of lists of best fits, in the style of multirun sequential single group (B1) dashboard_single (a dashboard that produces a single run best fit function for animated plots) ''' from copy import deepcopy # values are: # first the user globals, # then the run replica: first the user locals, then the free model parameters # for each run must reconstruct a simple model_guess, model_result dashboard (no userpardicts) # with its '=' flag parameters translated in '!' flag with value calculated by key (from function) lastfits = [] userpars_g, userpars_r = dashboard["userpardicts_guess"],dashboard["userpardicts_result"] n_locals = [pardict["local"] for pardict in userpars_g].count(True) n_globals = len(userpars_g)-n_locals # could also simply transfer p = self.fit.lastfit.values # for krun in range(len(values[1:])): # i.e. in range(n_runs) par = [] # list of single-run equivalent best fit parameters # scan method_keys = [[mthd,[[key,...,key],...,[key,,...,key]]],..,[mthd,[[key,...,key]...]]] # outer components, middle runs, inner component parameters dash = deepcopy(dashboard) dash.pop("userpardicts_guess") if "userpardicts_result" in dash.keys(): dash.pop("userpardicts_result") model_in, model_out = [],[] if "model_result" not in dash.keys(): dash["model_result"] = dash["model_guess"] # make sure model_result exists in dash for jcomp,method_key in enumerate(method_keys): # components method, keys = method_key # component method and run-by-run list of keys model_guess_component = dash["model_guess"][jcomp] model_result_component = dash["model_result"][jcomp] pars_in, pars_out = [], [] for krun, runkeys in enumerate(keys): # single run par_in, par_out = [],[] # each run has its set of parameters for kpar,key in enumerate(runkeys): par_in.append(key(p_in)) par_out.append(key(p_out)) # transform all keys in "~" or "!" if model_guess_component["pardicts"][kpar]["flag"]=='=': # turn them in "!" model_guess_component["pardicts"][kpar]["flag"] = '!' model_result_component["pardicts"][kpar]["flag"] = '!' if type(model_guess_component["pardicts"][kpar]["value"]) is list: model_guess_component["pardicts"][kpar]["value"] = model_guess_component["pardicts"][kpar]["value"][0] model_result_component["pardicts"][kpar]["value"] = model_result_component["pardicts"][kpar]["value"][0] if type(model_guess_component["pardicts"][kpar]["error"]) is list: model_guess_component["pardicts"][kpar]["error"] = model_guess_component["pardicts"][kpar]["error"][0] model_result_component["pardicts"][kpar]["error"] = model_result_component["pardicts"][kpar]["error"][0] pars_in.append(par_in) # list of run lists has its set of parameters pars_out.append(par_out) dash["model_guess"][jcomp] = model_guess_component dash["model_result"][jcomp] = model_result_component model_in.append(pars_in) model_out.append(pars_out) # model is in [model [run [component]]] nesting order pars_in, pars_out, p_in, p_out = [], [], [], [] n_runs = len(model_in[0]) # n_components = len(model_in) for component_in,component_out in zip(model_in,model_out): # component contains all runs for krun in range(n_runs): pars_in.append(component_in[krun]) # pars_out.append(component_out[krun]) for krun in range(n_runs): par_in, par_out = [], [] for component in range(n_components): par_in += pars_in[krun+component] par_out += pars_out[krun+component] p_in.append(par_in) p_out.append(par_out) #print('debug tools minglobal2sequential: p_in = {}'.format(p_in)) # now the list is in the [run [model [component]]] nesting order return p_in,p_out, dash
[docs]def len_print_components(names,values,errors): ''' input: for a component parameter names parameter values parameter errors output: max length of string to print, e.g. "bl.A_fast 0.123(4) bl.λ_fast 12.3(4) bl.σ_fast 0(0)" ''' from mujpy.tools.tools import value_error out = [' '.join([names[k],'=',value_error(values[k],errors[k])]) for k in range(len(names))] maxlen = len(max(out,key=len)) return maxlen
[docs]def len_print_components_multirun(names,values,errors): ''' input: for a component parameter names parameter values parameter errors output: max length of string to print, e.g. "bl.A_fast 0.123(4) bl.λ_fast 12.3(4) bl.σ_fast 0(0)" ''' from mujpy.tools.tools import value_error outname = [' '+names[k] for k in range(len(names))] outval = [' '+value_error(values[k],errors[k]) for k in range(len(names))] maxlen = max(len(max(outname,key=len)),len(max(outval,key=len))) return maxlen
[docs]def mixer(t,y,f0): ''' mixer of a time-signal with a reference input t time y the time-signal f0 frequency of the cosine reference output y_rrf = 2*y*cos(2*pi*f0*t) t is 1d and y is 1-d, 2-d or 3-d but t.shape[0] == y.shape[-1] t is vstack-ed to be the same shape as y ''' from mujpy.tools.tools import fft_filter from numpy import pi, cos, vstack, fft, delete ydim, tdim = len(y.shape), len(t.shape) # print('tools mixer debug 1: y t shape {}, {}'.format(y.shape,t.shape)) if tdim == 1: # must replicate t to the same dimensions as y if ydim ==2: for k in range(ydim): if k: time = vstack((time,t)) else: time = t t = time elif ydim==3: # max is ydim = 3 for j in range(len.shape[-1]): for k in len.shape[-2]: if k: time = vstack((time,t)) else: time = t if j: for l in len.shape[-1]: tim = vstack((tim,time)) else: tim = time t = tim n = t.shape[-1] # apodize by zero padding to an even number yf = fft.irfft(fft_filter(t,fft.rfft(2*y*cos(2*pi*f0*t),n=n+1),f0),n=2*n) # now delete padded zeros mindex = range(n,2*n) yf =delete(yf,mindex,-1) # print('tools mixer debug 3: yf shape {}'.format(yf.shape)) return yf
[docs]def fft_filter(t,fy,f0): ''' filter above 0.2*fy peak freq works for 1-2 d ''' from numpy import arange, mgrid, where # determine max frequency fmax leny = len(fy.shape) if leny == 1: dt = t[1]-t[0] # array f of fourier component indices (real fft, 0 to fmax) m = fy.shape f = arange(m) elif leny == 2: dt = t[0,1]-t[0,0] # find peak in rfft below the rrf frequency f0 # array f of fourier component indices (real fft, 0 to fmax) n,m = fy.shape _,f = mgrid[0:n,0:m] else: dt = t[0,0,1]-t[0,0,0] l,n,m = fy.shape _,_,f = mgrid[0:l,0:n,0:m] fmax = 1/2/dt mask = (f<=f0/fmax*m).astype(int) # find where fy has a peak, below the rrf frequency f0 if leny == 1: npeak = where(abs(fy)==abs(mask*fy).max()).max() elif leny == 2: npeak = where(abs(fy)==abs(mask*fy).max())[1].max() else: npeak = where(fy==(mask*fy).max())[2].max() mask = (f<=2*npeak).astype(int) # print('tools fft_filter debug 2: fy {},mask {} shape'.format(fy.shape,mask.shape)) return fy*mask
[docs]def model_name(dashboard): ''' input the dashboard dictionary structure output the model name (e.g. 'mgbgbl') ''' return ''.join([item for component in dashboard["model_guess"] for item in component["name"]])
[docs]def userpars(dashboard): ''' checks if there are userpardicts in the fit dashboard alias of global type fit, of any kind (gg, gr, G) used by fit and plt switchyard ''' return "userpardicts_guess" in dashboard
[docs]def userlocals(dashboard): ''' input: full dashboard output: True is "userpardicts_local" in dashboard.keys ''' return "userpardicts_local" in dashboard
[docs]def multigroup_in_components(dashboard): ''' input full dashboard output mask list, 1 where "model_guess" contains at least one component (dict) whose "pardicts" (list) contains a parameter dict with at least one "function_multi":[string, string ..] key 0 otherwise ''' #print('multigroup_in_components tools debug: model_guess len {}'.format(len(dashboard["model_guess"]))) #print('multigroup_in_components tools debug: pardicts len {}'.format(len(dashboard["model_guess"][0]['pardicts']))) #print('multigroup_in_components tools debug: pardict.keys len {}'.format(len(dashboard["model_guess"][0]['pardicts'][0].keys()))) return ['function_multi' in pardict.keys() for component in dashboard["model_guess"] for pardict in component["pardicts"]]
# contains 1 for all parameters that have "function_multi", 0 otherwise # return [k for k,component in enumerate(component_function) if component>0]
[docs]def stringify_groups(groups): ''' returns a unique string for many groups to use in json file name ''' strgrp = [] for group in groups: fgroup, bgroup = group['forward'],group['backward'] strgrp.append(fgroup.replace(',','_')+'-'+bgroup.replace(',','_')) return '_'.join(strgrp)
[docs]def modelstrip(name): ''' strips numbers of external parameters at beginning of model name ''' import re nexternals, ngroups = 0, 0 # strip the name and extract number of external parameters try: nexternals = int('{}'.format(re.findall('^([0-9]+)',name)[0])) if nexternals>9: return [] name = name[:-1] except: pass # try: # ngroups = int('{}'.format(re.findall('([0-9]+)$',name)[0])) # if ngroups>9: # return [] # name = name[1:] # except: # pass return name, nexternals
############## # MUGUI AUX ? ##############
[docs]def name_of_model(model_components,model): ''' check if model_components list of dictionaries correstponds to model ''' content = [] for component in model_components: content.append(component["name"]) return True if ''.join(content) == model else False
[docs]def create_model(model): ''' create_model('daml') # adds e.g. the two component 'da' 'ml' model this method does not check syntax (prechecked by checkvalidmodel) ? separates nexternals number from model name (e.g. '3mgml' -> 'mgml', 3) ? starts switchyard for A1,A1,B1, B2, C1, C2 fits adds a model of components selected from the available_component tuple of directories with zeroed values, stepbounds from available_components, flags set to '~' and zeros functions ''' import string from mujpy.tools.tools import addcomponent, _available_components_ # print('create_model: {}'.format(model)) components = [model[i:i+2] for i in range(0, len(model), 2)] model_guess = [] # start from empty model for k,component_name in enumerate(components): component, emsg = addcomponent(component_name) # input a component name, output a component dictionary if component: model_guess.append(component) # list of dictionaries # self.console('create model added {}'.format(component+label)) else: return False, emsg return model_guess, '' # list of component dictionaries
[docs]def addcomponent(name): ''' addcomponent('ml') # adds e.g. a mu precessing, lorentzian decay, component this method adds a component selected from _available_components_(), tuple of directories with zeroed values, error and limits from available_components, flags set to '~' and zeros functions [plan also addgroupcomponents and addruncomponents (for A2, B2, C1, C2)] ''' from copy import deepcopy from mujpy.tools.tools import _available_components_ available_components =_available_components_() # creates list automagically from mucomponents component_names = [available_components[i]['name'] for i in range(len(available_components))] if name in component_names: k = component_names.index(name) npar = len(available_components[k]['pardicts']) # number of pars pars = deepcopy(available_components[k]['pardicts']) # list of dicts for # parameters, {'name':'asymmetry','error':0.01,'limits':[0, 0]} # now remove parameter name degeneracy for j, par in enumerate(pars): pars[j]['name'] = par['name'] if par['name']=='α': pars[j].update({'value':1.0}) # initilize elif par['name']=='A': pars[j].update({'value':0.1}) # initialize to not zero elif par['name']=='B': pars[j].update({'value':2.}) # initialize to TF20 else: pars[j].update({'value':0}) # does not need initialization pars[j].update({'flag':'~'}) pars[j].update({'function':''}) # adds these three keys to each pars dict pars[j]['error'] = par['error'] pars[j]['limits'] = par['limits'] # they serve to collect values in mugui # self.model_guess.append() return {'name':name,'label':'','pardicts':pars}, None # OK code, no message else: # self.console( error_msg = '\nWarning: '+name+' is not a known component. Not added.' return {}, error_msg # False error code, message
[docs]def chi2std(nu): ''' computes 1 std for least square chi2 ''' import numpy as np from scipy.special import gammainc from scipy.stats import norm mm = round(nu/4) hb = np.linspace(-mm,mm,2*mm+1) cc = gammainc((hb+nu)/2,nu/2) # see mulab: muchi2cdf(x,nu) = gammainc(x/2, nu/2); lc = 1+hb[min(list(np.where((cc<norm.cdf(1))&(cc>norm.cdf(-1))))[0])]/nu hc = 1+hb[max(list(np.where((cc<norm.cdf(1))&(cc>norm.cdf(-1))))[0])]/nu return lc, hc
[docs]def component(model,kin): ''' returns the index of the component to which parameter k belongs in model = self.model_guess, in mugui, a list of complex dictionaries:: [{'name':'da', 'pardicts':{'name':'lpha',...}, {'name':'mg', 'pardicts':{ ... }] kin is the index of a dashboard parameter (kint) ''' from numpy import array, cumsum, argmax ncomp = len(model) # number of components in model npar = array([len(model[k]['pardicts']) for k in range(ncomp)]) # number of parameters of each component npars = cumsum(npar) return argmax(npars>kin)
############# # GENERAL AUX #############
[docs]def calib(dashboard): ''' True if the first component is 'al' ''' return dashboard['model_guess'][0]['name']=='al'
[docs]def derange(string,vmax,pack=1): ''' derange(string,vmax,pack=1) reads string assumes it contains 2, 3, 4 or 5 csv or space separated values uses isinstance(vmax,float) to distinguish floats (fft) from integers (fit and plot) 5: start, stop, packe, last, packl # for plot 4: start, stop, last, packl # for plot (packe is 1) 3: start, stop, pack 2: start, stop (pack is added, pack default is 1) returns 2, 3, 4 or 5 floats or int, or default values, 0,vmax,pack, if fails validity check (stop>start, bin <stop-start, last < vmax) errmsg = '' in ok, a string indicates errors ''' # print('In derange, string = {}'.format(string)) errmsg = '' x_range = string.split(',') # assume ',' is the separator if len(x_range)==1: # try ' ' as separator x_range = string.split(' ') if len(x_range)==1: # wrong syntax x_range = [vmax-vmax,vmax,pack] # default, int for int vmax, float for float vmax errmsg = 'no range' if not errmsg: try: # three items are they integers floats or misprints? if isinstance(vmax,float): # should be three floats x_range = [float(chan) for chan in x_range] # breaks if non digits in x_range else: # should be three integers x_range = [int(chan) for chan in x_range] # breaks if non digits in x_range if len(x_range)==2: # guarantees three items x_range.append(pack) if x_range[2]>(x_range[1]-x_range[0])//2: # True for fit_range[1]<fit_range[0] or too large pack raise Exception except: x_range = [vmax-vmax,vmax,pack] # default errmsg = 'Syntax error, reset range to default. ' # to re-compose a correct string use # string = ','.join([str(val) for val in x_range]) # print('tools derange: x_range = {}'.format(x_range)) return x_range, errmsg # a list of values (int or float as appropriate)
[docs]def derun(string): ''' parses string, producing a list of runs; expects comma separated items looks for 'l','l:m','l+n+m','l:m:-1' where l, m, n are integers also more than one, comma separated rejects all other characters returns a list of lists of integer ''' import re s = [] try: # substitute multiple consecutive spaces with ',' string = re.sub("\s+", ",", string.strip()) # systematic str(int(b[])) to check that b[] ARE integers for b in string.split(','): # csv kminus = b.find(':-1') # '-1' means reverse order kcolon = b.find(':') # ':' and '+' are mutually exclusive kplus = b.find('+') #print(kminus,kcolon,kplus) if kminus<0 and kcolon<0 and kplus<0: # single run int(b) # produces an Error if b is not an integer s.append([b]) # append single run string else: if kminus>0 and kminus == kcolon: return [], 'l:-1 is illegal' elif kplus>0: # add files, append a list or run strings ss = [] k0 = 0 while kplus>0: # str(int(b[])) ss.append(int(b[k0:kplus])) k0 = kplus+1 kplus = b.find('+',k0) ss.append(int(b[k0:])) s.append([str(q) for q in ss]) else: # either kminus=-1 (just a range) or kcolon<kminus, (range in reverse order) # in both cases: if kminus<0: #print(int(b[:kcolon]),int(b[kcolon+1:])) if int(b[:kcolon])>int(b[kcolon+1:]): return [], 'l:m must have l<m' for j in range(int(b[:kcolon]),int(b[kcolon+1:])+1): s.append([str(j)]) # append single run strings else: ss = [] # # :-1 reverse order if int(b[:kcolon])>int(b[kcolon+1:kminus]): return ss, 'l:m:-1 must have l<m' for j in range(int(b[:kcolon]),int(b[kcolon+1:kminus])+1): ss.append([str(j)]) # append single run strings ss = ss[::-1] for sss in ss: s.append(sss) return s, None except: return [], 'error to be debugged'
[docs]def run_shorthand(runstrings): ''' write the runlist contained in runstrings (suite self.runs produced by derun) i.e. a list of lists, with separate run numbers in string format, the inner ones to be added in a compact string, with space separated notation e.g. '650:655,675,656:674' ''' # [[623],[624],[625],[626],[627,628,629], [631],[632],[633],[630]] -> 623:626 627+628+629 631:633 630 runlists = [[int(run) for run in runstringlist] for runstringlist in runstrings] string = [[] for i in range(len(runlists))] index_runadds = [i for i in range(len(runlists)) if len(runlists[i])>1] index_runs = [i for i in range(len(runlists)) if len(runlists[i])==1] for j in index_runadds: string[j]='+'.join([str(k) for k in runlists[j]]) k = index_runs[0] string[k].append(runlists[k][0]) up = None for j in range(1,len(string)): if j in index_runadds: k = j elif runlists[j - 1][0] + 1 == runlists[j][0]: if up != True: up = True k = j-1 string[k].append(runlists[j][0]) elif runlists[j - 1][0] - 1 == runlists[j][0]: if up != False: up = False k = j-1 string[k].append(runlists[j][0]) else: up = None k = j string[k].append(runlists[j][0]) ss = list(filter(([]).__ne__,string)) for k,l in enumerate(ss): if isinstance(l,list): if len(l)==1: ss[k] = str(l[0]) else: ss[k] = str(l[0])+':'+str(l[-1]) s = ','.join(ss) return s
[docs]def findall(p, s): '''Yields all the positions of the pattern p in the string s. Used by translate. ''' i = s.find(p) while i != -1: yield i i = s.find(p, i+1)
[docs]def find_nth(haystack, needle, n): ''' Finds nth needle in haystack Returns its first occurrence (0 if not present) Used by ? ''' start = haystack.rfind(needle) while start >= 0 and n > 1: start = haystack.rfind(needle, 1, start-1) n -= 1 return start
[docs]def get_datafilename(datafile,run): ''' datafilename = template, e.g. '/fullpath/deltat_gps_tdc_0935.bin' run = string of run digits, e.g. '1001' returns '/fullpath/deltat_gps_tdc_1001.bin' ''' import re dot_suffix = datafile[-4:] padded = re.match('.*?([0-9]+)$', datafile[:-4]).group(1) # run string of digits oldrun = str(int(padded)) # strip padding zeros datafileprefix = datafile[:datafile.find(oldrun)] # prefix up to original zero padding if len(run)-len(oldrun)>0: datafilename = datafileprefix[:len(oldrun)-len(run)]+run+dot_suffix elif len(run)-len(oldrun)==-1: datafilename = datafileprefix+'0'+run+dot_suffix elif len(run)-len(oldrun)==-2: datafilename = datafileprefix+'00'+run+dot_suffix elif len(run)-len(oldrun)==-3: datafilename = datafileprefix+'000'+run+dot_suffix else: datafilename = datafileprefix+run+dot_suffix return datafilename
[docs]def get_datafile_path_ext(datafile,run): ''' datafilename = template, e.g. '/fullpath/deltat_gps_tdc_0935.bin' run = string of run digits, e.g. '1001' returns '/fullpath/deltat_gps_tdc_1001.bin' ''' import os path = datafile[:datafile.rfind(os.path.sep)+1] # e.g. /afs/psi.ch/projec/bulkmusr/data/gps/d2022/tdc/', works in WIN with '\' as separator fileprefix = datafile[datafile.rfind(os.path.sep)+1:datafile.rfind('.')] ext = datafile[datafile.rfind['.']+1-len(datafile)] # e.g. 'bin' or 'nxs' return path, fileprefix, ext
[docs]def get_grouping(groupcsv): """ input groupcsv is a shorthand csv string, e.g. '1:3,5' or '1,3,5' etc. contained in self.suite.group[k]["forward] of self.suite.group[k]["backward"] (the k-th detector group of a multi group fit) output grouping is an np.array of indces, 0 based """ import numpy as np # two shorthands: either a list, comma separated, such as 1,3,5,6 # or a pair of integers, separated by a colon, such as 1:3 = 1,2,3 # only one column is allowed, but 1, 3, 5 , 7:9 = 1, 3, 5, 7, 8, 9 # or 1:3,5,7 = 1,2,3,5,7 are also valid # no more complex nesting (3:5,5,8:10 is not allowed) # get the shorthand from the gui Text groupcsv = groupcsv.replace('.',',') # can only be a mistake: '.' means ',' try: if groupcsv.find(':')==-1: # no colon, it's a pure csv grouping = np.array([int(ss) for ss in groupcsv.split(',')]) # read it else: # colon found if groupcsv.find(',')==-1: # (no commas, only colon, must be n:m) nm = [int(w) for w in groupcsv.split(':')] # read n m grouping = np.array(list(range(nm[0],nm[1]+1))) # single counters else: # general case, mixed csv and colon p = groupcsv.split(':') # '1,2,3,4,6' '7,10,12,14' '16,20,23' ncolon = len(p)-1 grouping = np.array([]) for k in range(ncolon): q = p[k].split(',') # ['1' '2' '3' '4' '6'] if k>0: last = int(q[0]) grouping = np.concatenate((grouping,np.array(list(range(first,last+1))))) first = int(q[-1]) grouping = np.concatenate((grouping,np.array(list(int(w) for w in q[1:-1])))) elif k==0: first = int(q[-1]) grouping = np.concatenate((grouping,np.array(list(int(w) for w in q[:-1])))) q = p[-1].split(',') # '22','25' last = int(q[0]) grouping = np.concatenate((grouping,np.array(list(range(first,last+1))))) grouping = np.concatenate((grouping,np.array(list(int(w) for w in q[1:])))) grouping -=1 # this is counter index, remove 1 for python 0-based indexing except: grouping = np.array([-1]) # error flag return grouping
[docs]def get_group(grouping): ''' reverse of get_grouping, input grouping is an np.array of indices of detectors , 0 based output is groupcsv shorthand as in self.group[k]["forward"} or self.group[k]["backward"} e.g '1:3,5' or '1,3,5' etc. ''' import numpy as np # find sequences groups = [] if grouping.size>1: grouping = np.sort(grouping)+1 # 1 base for csv gsequences = np.split(grouping, np.where(np.diff(grouping) != 1)[0]+1) for gsequence in gsequences: gstring = str(gsequence) if gsequence.size==1 else str(gsequence[0])+':'+str(gsequence[-1]) groups.append(gstring) else: groups.append(str(grouping[0])) return ','.join(groups)
[docs]def getname(fullname): ''' estracts parameter name from full parameter name (i.e. name + label) for the time being just the first letter ''' return fullname[0]
[docs]def initialize_csv(Bstr, filespec, the_run ): ''' writes beginning of csv row with nrun T [T eT T eT] B for ISIS [PSI] ''' nrun = the_run.get_runNumber_int() # print('tools initialize_csv debug: nrun {}'.format(nrun)) if filespec=='bin' or filespec=='mdu': TsTc, eTsTc = the_run.get_temperatures_vector(), the_run.get_devTemperatures_vector() n1,n2 = spec_prec(eTsTc[0]),spec_prec(eTsTc[1]) # calculates format specifier precision form = '{},{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n2) form += 'f},{:.' form += '{}'.format(n2) form += 'f},{}' #".format(value,most_significant)' return form.format(nrun, TsTc[0],eTsTc[0],TsTc[1],eTsTc[1], Bstr[:Bstr.find('G')]) elif filespec=='nxs': Ts = the_run.get_temperatures_vector() n1 = '1' form = '{},{:.' form += '{}'.format(n1) form += 'f},{}' #".format(value) return form.format(nrun, Ts[0], Bstr[:Bstr.find('G')])
[docs]def minparam2_csv(dashboard,values_in,errors_in,multirun=0): ''' transforms Minuit values Minuit errors in cvs format input: dashboard dashboard["model_guess"], for single group None, for multi group dashboard, for single group multirun user (C1) values_in, errors_in are Minuit values Minuit errors if multirun is nruns !=0 (True) uses min2int_multirun(dashboard,values_in,errors_in,multirun_nruns) else (multirun = 0 (False) uses min2int(dashboard,values_in,errors_in) output: cvs partial row with parameters and errors for A1, A20 and B1, or A21 list of partial rows (one per run) for C1 ''' from mujpy.tools.tools import min2int, min2int_multirun, spec_prec if multirun: _, values, errors = min2int_multirun(dashboard,values_in,errors_in,multirun) # must write rows with single run gvalues,gerrors = values[0],errors[0] rows = [] for parvalues,parerrors in zip(values[1:],errors[1:]): #locals row = '' for parvalue,parerror in zip(parvalues,parerrors): n1 = spec_prec(parerror) # calculates format specifier precision form = ',{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f}' row += form.format(parvalue,parerror) for parvalue,parerror in zip(values[0],errors[0]): # globals replicated in every row n1 = spec_prec(parerror) # calculates format specifier precision form = ',{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f}' row += form.format(parvalue,parerror) rows.append(row) # these are as many rows as runs else: (_, values, errors) = (min2int(dashboard,values_in,errors_in) if dashboard else (None, [values_in], [errors_in])) # from minuit parameters to component parameters # output is lists (components) of lists (parameters) # else dashboard false (multigroup user) Minuit and user parameters coincide rows = '' # this is a single row, really for parvalues, parerrors in zip(values,errors): for parvalue,parerror in zip(parvalues,parerrors): n1 = spec_prec(parerror) # calculates format specifier precision form = ',{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f}' rows += form.format(parvalue,parerror) return rows
[docs]def nextrun(datapath): ''' assume datapath is path+fileprefix+runnumber+extension datafile is next run, runnumber incremented by one if datafile exists return next run, datafile else return runnumber and datapath ''' import os from mujpy.tools.tools import muzeropad path, ext = os.path.splitext(datapath) lastchar = len(path) for c in reversed(path): try: int(c) lastchar -= 1 except: break run = path[lastchar:] runnext = str(int(run)+1) datafile = path[:lastchar]+muzeropad(runnext)+ext run = runnext if os.path.exists(datafile) else run datafile = datafile if os.path.exists(datafile) else datapath return run, datafile
[docs]def thisrun(datapath): ''' assume datapath is path+fileprefix+runnumber+extension datafile is present run if datafile exists returns path to datafile ''' import os from mujpy.tools.tools import muzeropad path, ext = os.path.splitext(datapath) lastchar = len(path) for c in reversed(path): try: int(c) lastchar -= 1 except: break run = path[lastchar:] datafile = path[:lastchar]+muzeropad(run)+ext return datafile
[docs]def prevrun(datapath): ''' assume datapath is path+fileprefix+runnumber+extension datafile is prev run, runnumber decremented by one if datafile exists return prev run, datafile else return runnumber and datapath ''' import os from mujpy.tools.tools import muzeropad path, ext = os.path.splitext(datapath) lastchar = len(path) for c in reversed(path): try: int(c) lastchar -= 1 except: break run = path[lastchar:] runprev = str(int(run)-1) datafile = path[:lastchar]+muzeropad(runprev)+ext run = runprev if os.path.exists(datafile) else run datafile = datafile if os.path.exists(datafile) else datapath return run, datafile
[docs]def chi2_csv(chi2,lowchi2,hichi2,groups,offset): ''' input: chi2, chi2-sdt, chi2+sdt, groups, offset (bins) groups is suite.groups and its len, 1 or more, identifies multigroup output: cvs partial row with these values and timestring ''' from time import localtime, strftime echi = max(chi2-lowchi2,hichi2-chi2) n1 = spec_prec(echi) # calculates format specifier precision form = ',{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f},{:.' form += '{}'.format(n1) form += 'f}' # ' {} {}' row = form.format(chi2,chi2-lowchi2,hichi2-chi2) for group in groups: row += ',{}'.format(group["alpha"]) row += ',{},{}'.format(offset,strftime("%d.%b.%H:%M:%S", localtime())) return row
[docs]def write_csv(header,row,the_run,file_csv,filespec,scan=None): ''' input : header, the model specific csv header to compare with that of the csv file row, the line to be added to the csv file the_run, run instance (first one for added runs) file_csv, full path/filename to csv file filespec, 'bin', 'mdu' or 'nsx' scan, T, B or None output: two strings to write on console writes onto csv finding the right line writes a new file if csv does not exist or is incompatible (writes ~ version) ''' from mujpy.tools.tools import get_title import os import re from datetime import datetime nrun = int(re.split(" |,|, ",row)[0]) now = datetime.now() dt_string = now.strftime("%d/%m/%Y %H:%M:%S") if scan==None: # order by nrun, first item in csv csv_index = 0 elif scan=='T': # order by T, 4th or 2nd item in csv csv_index = 3 if filespec == 'bin' or filespec == 'mdu' else 1 else: # order by B, 6th or 4th item in csv csv_index = 5 if filespec == 'bin' or filespec == 'mdu' else 3 rowvalue = float(re.split(" |,|, ",row)[csv_index]) # also nrun is transformed into float if os.path.isfile(file_csv): try: # the file exists lineout = [] # is equivalent to False with open(file_csv,'r') as f_in: notexistent = True # the line does not exist for nline,line in enumerate(f_in.readlines()): if nline==0: if header!=line: # different headers, substitute present one raise # exits this try else: lineout.append(header) elif float(re.split(" |,|, ",line)[csv_index]) < rowvalue: # append line first lineout.append(line) elif float(re.split(" |,|, ",line)[csv_index]) == rowvalue: # substitute an existing fit lineout.append(row) # insert before last existing fit notexistent = False else: if notexistent: lineout.append(row) # insert before last existing fit notexistent = False lineout.append(line) # insert all other existing fits if notexistent: lineout.append(row) # append at the end notexistent = False with open(file_csv,'w') as f_out: for line in lineout: f_out.write(line) file_csv = file_csv[file_csv.rfind('/')+1:] return 'Run {}: {} ***'.format(nrun, get_title(the_run)), '. Log added to {}'.format(file_csv) except: # incompatible headers, save backup and write a new file os.rename(file_csv,file_csv+'~') with open(file_csv,'w') as f: f.write(header) f.write(row) file_csv = file_csv[file_csv.rfind('/')+1:] return 'Run {}: {} ***'.format(nrun, get_title(the_run)),'. Log in NEW {} [backup in {}]'.format( file_csv, file_csv+'~') else: # csv does not exist with open(file_csv,'w') as f: f.write(header) f.write(row) file_csv = file_csv[file_csv.rfind('/')+1:] return 'Run {}: {} ***'.format(nrun, get_title(the_run)),'. Log in NEW {}'.format(file_csv)
[docs]def get_title(run,notemp=False,nofield=False): ''' form standard psi title ''' title = [(run.get_sample()).rstrip()] title.append((run.get_orient()).rstrip()) if not notemp: tstr = run.get_temp() try: temp = float(tstr[:tstr.index('K')]) except: temp = float(tstr) title.append('{:.1f}K'.format(temp)) if not nofield: field = run.get_field() try: title.append('{:.0f}mT'.format(float(field[:field.index('G')])/10)) except: title.append('{:.0f}mT'.format(float(field)/10)) return ' '.join(title)
[docs]def get_run_title(the_suite): ''' output list of run and title strings each run and group in the run replicates its run number + title used only in mufitplot (fit and fft ''' from mujpy.tools.tools import get_title run_title = [] for run in the_suite._the_runs_: for kgroup in range(len(the_suite.grouping)): run_title.append(str(run[0].get_runNumber_int())+'-'+get_title(run[0])) return run_title
[docs]def get_nruns(the_suite): ''' get nrun strings ''' nruns = [] print for k,run in enumerate(the_suite._the_runs_): nruns.append(str(run[0].get_runNumber_int())) return nruns
[docs]def get_run_number_from(path_filename,filespecs): ''' strips number after filespecs[0] and before filespec[1] ''' try: string = path_filename.split(filespecs[0],1)[1] run = string.split('.'+filespecs[1],1)[0] except: run = '-1' return str(int(run)) # to remove leading zeros
[docs]def muvalid(string): ''' parse function CHECK WITH MUCOMPONENT, THAT USES A DIFFERENT SCHEME accepted functions are RHS of agebraic expressions of parameters p[i], i=0...ntot ''' import re error_message = '' if string.strip() !='': # empty and blank strings are validated pattern = re.compile(r"p\[(\d+)\]") # find all patterns p[*] where * is digits test = pattern.sub(r"a",string) # substitute "a" to "p[*]" in s # strindices = pattern.findall(string) # indices = [int(strindices[k]) for k in range(len(strindices))] # in internal parameter list # mindices = ... # produce the equivalent minuit indices try: safetry(test) # should select only safe use (although such a thing does not exist!) except Exception as e: error_message = 'Function: {}. Tested: {}. Wrong or not allowed syntax: {}'.format(string,test,e) return error_message
[docs]def p2x(instring): ''' replaces parameters e.g. p[2] with variable x2 in string returns substitude string and list of indices (ascii) ''' import re patterna = re.compile(r"p\[(\d+)\]") # find all patterns p[*] where * is digits n = patterna.findall(instring) # all indices of parameters outstring = instring for k in n: strin = r"p\["+re.escape(k)+"\]" patternb = re.compile(strin) stri = r"x"+re.escape(k) # variable outstring = patternb.sub(stri,outstring) return outstring, n
[docs]def errorpropagate(string,p,e): ''' parse function in string substitute p[n] with xn, with errors en calculate the partial derivative pdn = partial f/partial xn return the sqrt of the sum of (pdn*en)**2 ''' from jax import grad import numpy as np funct,n = p2x(string) # from parameters p[n] to variables xn s = 'lambda ' ss = ['x'+k+',' for k in n] args = ''.join(ss)[:-1] s = s + args + ': '+funct[1:] # removes the '=' # s = 'lambda xn,xm,... : expression of xn, xm, ...' f = eval(s) # defines a function of the parameters, called xn, xm, variance = 0 for k in n: exec('x'+k+'= p['+k+']') # this assigns p[n] value to xn d = grad(f,argnums=int(k)) # this is the derivative with respect to the k-th variable ss exec('variance += (d('+args+')*e['+k+'])**2') return np.sqrt(variance)
[docs]def group_shorthand(grouping): ''' group_calib is the list of gorup dictionaries ''' shorthand = [] for group in grouping: fwd = '_'.join([str(s+1) for s in group['forward']]) bkd = '_'.join([str(s+1) for s in group['backward']]) shorthand.append(fwd+'-'+bkd) return '+'.join(shorthand)
[docs]def json_name(model,datafile,grouping,version,g=False): ''' model is e.g. 'mlmg' datafile is e.g. '/afs/psi.ch/bulkmusr/data/gps/d2022/tdc/deltat_gps_tdc_1233.bin' must have a single '.' grp_calib is the list of dictionaries defining the groups g = True for global version is a label returns a unique name for the json dashboard file ''' from re import findall from mujpy.tools.tools import group_shorthand run = findall('[0-9]+',datafile)[-1] return model+'.'+run+'.'+group_shorthand(grouping)+'.'+version+'.json'
[docs]def muvaluid(string): ''' Run suite fits: muvaluid returns True/False * checks the syntax for string function corresponding to flag='l'. Meant for pars displaying large changes across the run suite, requiring different migrad start guesses:: # string syntax: e.g. "0.2*3,2.*4,20." # means that for the first 3 runs value = 0.2, # for the next 4 runs value = 2.0 # from the 8th run on value = 20.0 ''' try: value_times_list = string.split(',') last = value_times_list.pop() for value_times in value_times_list: value,times = value_times.split('*') dum, dum = float(value),int(times) dum = float(last) return True except: return False
[docs]def muvalue(lrun,string): ''' Run suite fits: muvalue returns the value for the nint-th parameter of the lrun-th run according to string (corresponding flag='l'). Large parameter change across the run suite requires different migrad start guesses. Probably broken! ''' # string syntax: e.g. "0.2*3,2.*4,20." # means that for the first 3 runs value = 0.2, # for the next 4 runs value = 2.0 # from the 8th run on value = 20.0 value = [] for value_times in string.split(','): try: # if value_times contains a '*' value,times = value_times.split('*') for k in range(int(times)): value.append(float(value)) except: # if value_times is a single value for k in range(len(value),lrun): value.append(float(value_times)) # cannot work! doesn't check for syntax, can be broken; this returns a list that doesn't know about lrun return value[lrun]
[docs]def muzeropad(runs,nzeros=4): ''' runs is a string containing the run number nzeros the number of digit chars in the filename PSI bin: nzeros=4 ISIS nxs nzeros=8 returns the runs string with left zero padding to nzeros digits ''' zeros='0'*nzeros if len(runs)<len(zeros): return zeros[:len(zeros)-len(runs)]+runs elif len(runs)==len(zeros): return runs
[docs]def path_file_dialog(path,spec): import tkinter from tkinter import filedialog import os tkinter.Tk().withdraw() # Close the root window spc, spcdef = '.'+spec,'*.'+spec in_path = filedialog.askopenfilename(initialdir = path,filetypes=((spc,spcdef),('all','*.*'))) return in_path
[docs]def path_dialog(path,title): import tkinter from tkinter import filedialog import os tkinter.Tk().withdraw() # Close the root window in_path = filedialog.askdirectory(initialdir = path,title = title) return in_path
################ # PLOT METHODS # ################
[docs]def plot_parameters(nsub,labels,fig=None): ''' standard plot of fit parameters vs B,T (or X to be implemente) input nsub<6 is the number of subplots labels is a dict of labels, e.g. {title:self.title, xlabel:'T [K]', ylabels: ['asym',r'$\lambda$',r'$\sigma$,...]} fig is the standard fig e.g self.fig_pars output the ax array on which to plot one dimensional (from top to bottom and again, for two columns) example two asymmetry parameters are both plotfal=1 and are plotted in ax[0] a longitudinal lambda is plotflag=2 and is plotted in ax[1] ... a transverse sigma is plotflag=n and is plotted in ax[n-1] ''' import matplotlib.pyplot as P nsubplots = nsub if nsub!=5 else 6 # nsub = 5 is plotted as 2x3 # select layout, 1 , 2 (1,2) , 3 (1,3) , 4 (2,2) or 6 (3,2) nrc = { '1':(1,[]), '2':(2,1), '3':(3,1), '4':(2,2), '5':(3,2), '6':(3,2) } figsize = { '1':(5,4), '2':(5,6), '3':(5,8), '4':(8,6), '5':(8,8), '6':(8,8) } spaces = { '1':[], '2':{'hspace':0.05,'top':0.90,'bottom':0.09,'left':0.13,'right':0.97,'wspace':0.03}, '3':{'hspace':0.05,'top':0.90,'bottom':0.09,'left':0.08,'right':0.97,'wspace':0.03}, '4':{'hspace':0.,'top':0.90,'bottom':0.09,'left':0.08,'right':0.89,'wspace':0.02}, '5':{'hspace':0.,'top':0.90,'bottom':0.09,'left':0.08,'right':0.89,'wspace':0.02}, '6':{'hspace':0.,'top':0.90,'bottom':0.09,'left':0.08,'right':0.89,'wspace':0.02} } if fig: # has been set to a handle once fig.clf() if nrc[str(nsub)][1]: # not a single subplot fig,ax = P.subplots(nrc[str(nsub)][0],nrc[str(nsub)][1], figsize=figsize[str(nsub)],sharex = 'col', num=fig.number) # existed, keep the same number fig.subplots_adjust(**spaces[str(nsub)]) # fine tune in dictionaries else: # single subplot fig,ax = P.subplots(nrc[str(nsub)][0], figsize=figsize['1'], num=fig.number) # existed, keep the same number else: # handle does not exist, make one if nrc[str(nsub)][1]: # not a single subplot fig,ax = P.subplots(nrc[str(nsub)][0],nrc[str(nsub)][1], figsize=figsize[str(nsub)],sharex = 'col') # first creation fig.subplots_adjust(**spaces[str(nsub)]) # fine tune in dictionaries else: # single subplot fig,ax = P.subplots(nrc[str(nsub)][0], figsize=figsize['1']) # first creation fig.canvas.manager.set_window_title('Fit parameters') # the title on the window bar fig.suptitle(labels['title']) # the sample title axout=[] axright = [] if nsubplots>3: # two columns (nsubplots=6 for nsub=5) ax[-1,0].set_xlabel(labels['xlabel']) # set right xlabel ax[-1,1].set_xlabel(labels['xlabel']) # set left xlabel nrows = int(nsubplots/2) # (nsubplots=6 for nsub=5), 1, 2, 3 # for k in range(0,nrows-1): # ax[k,0].set_xticklabels([]) # no labels on all left xaxes but the last # ax[k,1].set_xticklabels([]) # no labels on all right xaxes but the last for k in range(nrows): axright.append(ax[k,1].twinx()) # creates replica with labels on right axright[k].set_ylabel(labels['ylabels'][nrows+k]) # right ylabels ax[k,0].set_ylabel(labels['ylabels'][k]) # left ylabels axright[k].tick_params(left=True,direction='in') # ticks in for right subplots ax[k,0].tick_params(top=True,right=True,direction='in') # ticks in for x axis, right subplots ax[k,1].tick_params(top=True,left=False,right=False,direction='in') # ticks in for x axis, right subplots ax[k,1].set_yticklabels([]) axout.append(ax[k,0]) # first column for k in range(nrows): axout.append(axright[k]) # second column axout is a one dimensional list of axis else: # one column ax[-1].set_xlabel(labels['xlabel']) # set xlabel for k in range(nsub-12): ax[k].set_xticklabels([]) # no labels on all xaxes but the last for k in range(nsub): ylab = labels['ylabels'][k] if isinstance(ylab,str): # ylab = 1 for empty subplots ax[k].set_ylabel(ylab) # ylabels ax[k].tick_params(top=True,right=True,direction='in') # ticks in for right subplots axout = ax # just one column return fig, axout
[docs]def set_bar(n,b): ''' service to animate histograms e.g. in the fit tab extracted from matplotlib animate histogram example ''' from numpy import array, zeros, ones import matplotlib.path as path # get the corners of the rectangles for the histogram left = array(b[:-1]) right = array(b[1:]) bottom = zeros(len(left)) top = bottom + n nrects = len(left) # here comes the tricky part -- we have to set up the vertex and path # codes arrays using moveto, lineto and closepoly # for each rect: 1 for the MOVETO, 3 for the LINETO, 1 for the # CLOSEPOLY; the vert for the closepoly is ignored but we still need # it to keep the codes aligned with the vertices nverts = nrects*(1 + 3 + 1) verts = zeros((nverts, 2)) codes = ones(nverts, int) * path.Path.LINETO codes[0::5] = path.Path.MOVETO codes[4::5] = path.Path.CLOSEPOLY verts[0::5, 0] = left verts[0::5, 1] = bottom verts[1::5, 0] = left verts[1::5, 1] = top verts[2::5, 0] = right verts[2::5, 1] = top verts[3::5, 0] = right verts[3::5, 1] = bottom xlim = [left[0], right[-1]] return verts, codes, bottom, xlim
[docs]def set_fig(num,nrow,ncol,title,**kwargs): # unused? perhaps delete? check first ''' num is figure number (static, to keep the same window) nrow, ncol number of subplots rows and columns kwargs is a dict of keys to pass to subplots as is initializes figures when they are first called or after accidental killing ''' import matplotlib.pyplot as P fig,ax = P.subplots(nrow, ncol, num = num, **kwargs) fig.canvas.manager.set_window_title(title) return fig, ax
############### # END OF PLOT # ###############
[docs]def rebin(x,y,strstp,pack,e=None): ''' input: x is 1D intensive (time) y [,e] are 1D, 2D or 3D intensive arrays to be rebinned pack > 1 is the rebinning factor, e.g it returns:: xr = array([x[k*pack:k*(pack+1)].sum()/pack for k in range(int(floor((stop-start)/pack)))]) strstp = [start,stop] is a list of slice indices rebinning of x, y [,e] is done on the slice truncated to the approrpiate pack multiple, stopm x[start:stopm], y[start:stopm], [e[start:stopm]] use either:: xr,yr = rebin(x,y,strstp,pack) or:: xr,yr,eyr = rebin(x,y,strstp,pack,ey) # the 5th is y error ''' from numpy import floor, sqrt, zeros start,stop = strstp # print('tools rebin debug: start, stop, pack = {}, [], []'.format(start, stop, pack)) m = int(floor((stop-start)/pack)) # length of rebinned xb mn = m*pack # length of x slice xx =x[start:start+mn] # slice of the first 1d array xx = xx.reshape(m,pack) # temporaty 2d array xr = xx.sum(1)/pack # rebinned first ndarray if len(y.shape)==1: yb = zeros(m) yy = y[start:start+mn] # slice row yy = yy.reshape(m,pack) # temporaty 2d yr = yy.sum(1)/pack # rebinned row if e is not None: ey = e[start:start+mn] # slice row ey = ey.reshape(m,pack) # temporaty 2d er = sqrt((ey**2).sum(1))/pack # rebinned row - only good for ISIS elif len(y.shape)==2: nruns = y.shape[0] # number of runs yr = zeros((nruns,m)) if e is not None: er = zeros((nruns,m)) for k in range(nruns): # each row is a run yy = y[k][start:start+mn] # slice row yy = yy.reshape(m,pack) # temporaty 2d yr[k] = yy.sum(1)/pack # rebinned row if e is not None: ey = e[k][start:start+mn] # slice row ey = ey.reshape(m,pack) # temporaty 2d er[k] = sqrt((ey**2).sum(1))/pack # rebinned row elif len(y.shape)==3: ngroups,nruns = y.shape[0:2] # number of groups, runs yr = zeros((ngroups,nruns,m)) if e is not None: er = zeros((ngroups,nruns,m)) for k in range(ngroups): for j in range(nruns): yy = y[k][j][start:start+mn] # slice row yy = yy.reshape(m,pack) # temporaty 2d yr[k][j] = yy.sum(1)/pack # rebinned row if e is not None: ey = e[k][j][start:start+mn] # slice row ey = ey.reshape(m,pack) # temporaty 2d er[k][j] = sqrt((ey**2).sum(1))/pack # rebinned row if e is not None: return xr,yr,er else: return xr,yr
[docs]def rebin_decay(x,yf,yb,bf,bb,strstp,pack): ''' input: x is 1D intensive (time) yf, yb 1D, 2D, 3D extensive arrays to be rebinned bf, bb are scalars or arrays (see musuite.single_for_back_counts and musuite.single_multigroup_for_back_counts) pack > 1 is the rebinning factor, e.g it returns:: xr = array([x[k*pack:k*(pack+1)].sum()/pack for k in range(int(floor((stop-start)/pack)))]) yr = array([y[k*pack:k*(pack+1)].sum() for k in range(int(floor((stop-start)/pack)))]) strstp = [start,stop] is a list of slice indices rebinning of x,y is done on the slice truncated to the approrpiate pack multiple, stopm x[start:stopm], y[start:stopm] use:: xr,yfr, ybr, bfr, bbr, yfmr, ybmr = rebin(x,yf,yb,bf,bb,yfm,ybm,strstp,pack) ''' from numpy import floor, sqrt, exp, zeros, mean from mujpy.tools.tools import TauMu_mus start,stop = strstp m = int(floor((stop-start)/pack)) # length of rebinned xb mn = m*pack # length of x slice xx =x[start:start+mn] # slice of the first 2D array xx = xx.reshape(m,pack) # temporaty 2d array xr = xx.sum(1)/pack # rebinned first ndarray bfr, bbr = bf*pack, bb*pack if len(yf.shape)==1: yfr = zeros(m) ybr = zeros(m) yfr = yf[start:start+mn] # slice row ybr = yb[start:start+mn] # slice row yfr = yfr.reshape(m,pack) # temporaty 2d ybr = ybr.reshape(m,pack) # temporaty 2d yfr = yfr.sum(1) # rebinned row extensive ybr = ybr.sum(1) # rebinned row extensive yfmr, ybmr = mean((yfr-bfr)*exp(xr/TauMu_mus())), mean((ybr-bbr)*exp(xr/TauMu_mus())) elif len(yf.shape)==2: nruns = yf.shape[0] # number of runs yfr = zeros((nruns,m)) ybr = zeros((nruns,m)) for k in range(nruns): # each row is a run, or a group yyf = yf[k][start:start+mn] # slice row yyf = yyf.reshape(m,pack) # temporaty 2d yfr[k] = yyf.sum(1) # rebinned row extesive yyb = yb[k][start:start+mn] # slice row yyb = yyb.reshape(m,pack) # temporaty 2d ybr[k] = yyb.sum(1) # rebinned row extesive bfr, bbr = bf[k]*pack, bb[k]*pack # print('tools,rebin_decay,debug: bfr {}, bbr {}'.format(bfr, bbr)) yfmr, ybmr = mean((yfr[:][k]-bfr)*exp(xr/TauMu_mus())), mean((ybr[:][k]-bbr)*exp(xr/TauMu_mus())) elif len(yf.shape)==3: # probably never used unless calib mode becomes a C2 case ngroups,nruns = yf.shape[0:2] # number of runs yfr = zeros((ngroups,nruns,m)) ybr = zeros((nruns,m)) for k in range(ngroups): for j in range(nruns): yyf = yf[k][j][start:start+mn] # slice row yyf = yyf.reshape(m,pack) # temporaty 2d yfr[k][j] = yyf.sum(1) # rebinned row extesive yyb = yb[k][j][start:start+mn] # slice row yyb = yyb.reshape(m,pack) # temporaty 2d ybr[k][j] = yyb.sum(1) # rebinned row extesive bfr, bbr = bf[k][j]*pack, bb[k][j]*pack yfmr, ybmr = mean((yfr[:][k][j]-bfr)*exp(xr/TauMu_mus())), mean((ybr[:][k][j]-bbr)*exp(xr/TauMu_mus())) return xr,yfr,ybr,bfr,bbr,yfmr,ybmr
[docs]def safetry(string): ''' Used by muvalid ''' from math import acos,asin,atan,atan2,ceil,cos,cosh,degrees,e,exp,floor,log,log10,pi,pow,radians,sin,sinh,sqrt,tan,tanh safe_list = ['a','acos', 'asin', 'atan', 'atan2', 'ceil', 'cos', 'cosh', 'degrees', 'e', 'exp', 'floor', 'log', 'log10', 'pi', 'pow', 'radians', 'sin', 'sinh', 'sqrt', 'tan', 'tanh'] # use the list to filter the local namespace a = 0.3 safe_dict={} for k in safe_list: safe_dict[k]=locals().get(k) # print(safe_dict[k]) return eval(string,{"__builtins__":None},safe_dict)
[docs]def scanms(y,n): # produces guess for hifi t=0 bin, to be fed to a step fit function # check running average of (n bins,n skips,n bins) # with two means m1,m2 and two variances s21,s22, against step pattern # compares m2-m1 with sqrt(s21+s22) from numpy import sqrt istart = [] istop = [] for k in range(y.shape[0]-n): m1,m2 = y[k:k+n].sum()/n, y[k+2*n:k+3*n].sum()/n s = sqrt(((y[k:k+n]-m1)**2).sum()/(n-1)+ ((y[k+2*n:k+3*n]-m2)**2).sum()/(n-1)) if m2-m1>s: if not istart: istart = k+n elif not istop: istop = k+n elif istop == k+n-1: istop = k+n if istop and istart: if istop-istart == n: return istop return -1
[docs]def spec_prec(a): ''' format specifier precision:: 0 for a > 1.0 1 for 1.0 > a > 0.1 2 for 0.1 > a > 0.01 etc. ''' import numpy as np return int(abs(min(0.,np.floor(np.log10(a)))))
[docs]def shorten(path,subpath): ''' shortens path e.g. path, subpath = '/home/myname/myfolder', '/home/myname' shart = './myfolder' ''' short = path.split(subpath) if len(short)==2: short = '.'+short[1] return short
[docs]def exit_safe(): ''' opens an are you sure box? ''' from tkinter.messagebox import askyesno answer = askyesno(title='Exit mujpy', message='Really quit?') return answer
[docs]def step(x,a,n,dn,b): from scipy.stats import norm # error function as step function for t=0 in HIFI return a+b*norm.cdf(x,n,dn)
[docs]def tlog_exists(path,run,ndigits): ''' check if tlog exists under various known filenames types ''' import os filename_psibulk = 'run_'+muzeropad(run,ndigits)+'.mon' # add definitions for e.g. filename_isis ok = os.path.exists(os.path.join(path,filename_psibulk)) # or os.path.exists(os.path.join(paths,filename_isis)) return ok
[docs]def translate_multirun(functions_in,n_locals,kloc,nruns): ''' functions_in = [list of function strings], for the model components of a single-run model (obtained from get_functions_in) where a "~","!" parameter dummy function has been redefined as 'p[k]' and k is their dash index n_locals = number of user_local parameters kloc = index of first component first parameter in the single run model (kloc-n_locals is the index of the first user_locals) nruns = number of runs in the suite functions_out = [list of lists of function strings, indices of component & parameter], with translated minuit indices outer list is components of the model, middle list is runs, inner list is component parameter functions used in int2_multirun_user_method_key and int2_multirun_grad_method_key ''' # print('debug tools translate_multirun functions_in = {}'.format(functions_in)) korig = kloc # minuit index index of first component first parameter in the single run model npar_run = n_locals # these will be the local parameters in each run, initialized to number of user_locals # extact this from functions: first scan a single run model to find how many parameters reference themeselves # the next loop is solely to determine npar_run = number of local parameters per run (korig is used but will be reset in the next loop) for funcs in functions_in: # list of functions for a component for func in funcs: # individual function for one parameter parloc = 'p['+str(korig)+']' # original parameter # print('debug tools translate_multirun korig = {}, parloc = {}, func = {}'.format(korig, parloc, func)) if func.find(parloc)>=0: # if present this parameter references itself i.e. it is a "~" local parameter i.e. a free minuit parameter # (non-local are determines by user parameters) # print('debug tools translate_multirun found! korig = {}, func = {}'.format(korig, func)) korig += 1 # increment the minuit index of the single-run-model parameter npar_run +=1 # increment the number of local parameters per run # print('debug tools translate_multirun local parameters per run npar_run = {} minuit parameters = {}'.format(npar_run, korig)) fso = [] # next loop produces fso, appending a list funcs_out for each run, of lists func_out for each component, containing translated indices # from single-run-model to actual multirun model knew = kloc-n_locals # this is the index of the first user_local parameter for krun in range(nruns): funcs_out = [] # list of component lists of translated functions for a run korig = kloc # index for first run free parameter index in model, starts after user local replicas knew += n_locals # minuit index includes run user local replicas, and is incremented at each run for funcs in functions_in: # funcs is list of functions for a component func_out = [] # list of translated functions for one component for k, func in enumerate(funcs): # individual function for parameter k parloc = 'p['+str(korig)+']' # original parameter parnew = 'p['+str(knew)+']' # parameter for this run # print('debug tools translate_multirun knew = {}, parloc = {} parnew {}'.format(knew,parloc,parnew)) if func.find(parloc)>=0: # if present # print('debug tools translate_multirun func = {} becomes {}'.format(func,func.replace(parloc,parnew))) func_out.append(func.replace(parloc,parnew)) # it is translated and appended knew += 1 # increment the minuit index for the multirun model korig +=1 # increment the single-run-model index else: # otherwise func_out.append(func) # it is appended untranslated # either way func_out[k] is appended # now check for every parameter if contains user_local parameters for j in range(n_locals): # assign local index to user_local parameters kuserorig = kloc-j-1 # index of user_local parameter for first run parloc = 'p['+str(kuserorig)+']' # this is a user_local parameter kusernew = kloc+krun*npar_run-j-1 # index of user_local parameter for present run parnew = 'p['+str(kusernew)+']' # this is the value for present run if func_out[k].find(parloc)>=0: # if present # print('debug tools translate_multirun func_out = {} becomes {}'.format(func_out[k],func_out[k].replace(parloc,parnew))) func_out[k] = func_out[k].replace(parloc,parnew) # it is translated # at the end of this loop func.out is a list of func for the parameters of this component funcs_out.append(func_out) # adds this component to funcs_out, list of components in this run fso.append(funcs_out) # adds list of components for this run to list of runs # the next loop reshuffles fso to produce functions_out in the correct order model components, runs, component parameter functions_out = [] ncomponents = len(fso[0]) # middle list is components for jcomp in range(ncomponents): frun = [] for krun,run in enumerate(fso): # run is list of lists for run krun frun.append(run[jcomp]) # run[jcomp] is the jcomp component functions for run krun # now frun is a list or runs for component jcomp functions_out.append(frun) # now functions_out is a list of components, each a list or runs, each a list of parameter func return functions_out
[docs]def get_indices(func): ''' input func is a user string function, e.g. 'p[0]*p[2]' output list of (string) indices found in the string, in between 'p[' and ']', e.g. '0','2' ''' from mujpy.tools.tools import findall return [func[i:j] for (i,j) in zip([k+1 for k in findall('[',func)],[l for l in findall(']',func)])]
[docs]def diffunc(func): ''' input user function of the form 'p[0]*(1-p[1])' up to functions of three parameters (this could be easily extended) output a list of its derivatives [with respect to 'p[0]' and 'p[1]'] and the list of their indices, [0,1] ''' from mujpy.tools.tools import get_indices from sympy import symbols,diff,sympify,simplify from sympy import sin,cos,exp, sqrt,atan,pi # identify variables # first identify indices func = func.replace('abs','Abs').replace('arctan','atan') # indices =[func[i:j] for (i,j) in zip([k+1 for k in findall('[',func)],[l for l in findall(']',func)])] indices = get_indices(func) ind = [int(k) for k in indices] if len(indices)==0: # no indices, func is the empty string return ['0'],ind elif len(indices)==1: # one index x = symbols('x') p0 = 'p['+indices[0]+']' fun = func.replace(p0,'x') # function of x f0 = str(diff(sympify(fun),x)).replace('x',p0).replace('Abs','abs').replace('atan','arctan') return [f0],ind elif len(indices)==2: # two index x,y = symbols('x,y') p0,p1 = 'p['+indices[0]+']','p['+indices[1]+']' fun = func.replace(p0,'x').replace(p1,'y') # function of x,y f0 = str(sympify(diff(fun,x))).replace('x',p0).replace('y',p1).replace('Abs','abs').replace('atan','arctan') f1 = str(sympify(diff(fun,y))).replace('x',p0).replace('y',p1).replace('Abs','abs').replace('atan','arctan') return [f0,f1],ind elif len(indices)==3: # three index x,y,z = symbols('x,y,z') p0,p1,p2 = 'p['+indices[0]+']','p['+indices[1]+']','p['+indices[2]+']' f0 = str(sympify(diff(fun,x))).replace('x',p0).replace('y',p1).replace('z',p2).replace('Abs','abs').replace('atan','arctan') f1 = str(sympify(diff(fun,y))).replace('x',p0).replace('y',p1).replace('z',p2).replace('Abs','abs').replace('atan','arctan') f2 = str(sympify(diff(fun,z))).replace('x',p0).replace('y',p1).replace('z',p2).replace('Abs','abs').replace('atan','arctan') # could be extended to four, five ... return [f0,f1,f2],ind
[docs]def translate(nint,lmin,function_in): ''' input: nint: dashbord index, lmin: list of minuit indices replacement, one for each dashboard index, -1 is blanck function: single function string, of dashboard index nint, to be translated output: function_out: single translated function Used in int2_method_key and min2int to replace parameter indices contained in function[nint] e.g. :: p[0]*2+p[3] by translate the internal parameter indices 0 and 3 (written according to the dashboard dict order) into the corresponding minuit parameter list indices, skipping shared parameters. e.g. if parameter 1 is shared with parameter 0, the minuit parameter index 3 will be translated to 2 (skipping internal index 1) ''' from copy import deepcopy from mujpy.tools.tools import findall # print(' nint = {}, lmin = {}\n{}'.format(nint,lmin,function_in)) function_out = deepcopy(function_in) # search for integers between '[' and ']' start = [i+1 for i in findall('[',function_out)] # finds index of number after all occurencies of '[' stop = [i for i in findall(']',function_out)] # same for ']' nints = [function_out[i:j] for (i,j) in zip(start,stop)] # this is a list of strings with the numbers to be replaced nmins = [lmin[int(function_out[i:j])] for (i,j) in zip(start,stop)] # replacements integers for lstr,m in zip(nints,nmins): function_out = function_out.replace(lstr,str(m)) return function_out
[docs]def translate_nint(nint,lmin,function): # NOT USED any more?!! ''' Used in int2_int and min2int to parse parameters contained in function[nint].value e.g. :: p[4]*2+p[7] and translate the internal parameter indices 4 and 7 (written according to the gui parameter list order) into the corresponding minuit parameter list indices, that skips shared and fixed parameters. e.g. if parameter 6 is shared with parameter 4 and parameter 2 is fixed, the minuit parameter indices will be 3 instead of 4 (skipping internal index 2) and 5 instead of 7 (skipping both 2 and 6) Returns lmin[nint] ''' from mujpy.tools.tools import findall string = function[nint].value # search for integers between '[' and ']' start = [i+1 for i in findall('[',string)] # finds index of number after all occurencies of '[' stop = [i for i in findall(']',string)] # same for ']' nints = [string[i:j] for (i,j) in zip(start,stop)] # this is a list of strings with the numbers nmins = [lmin[int(string[i:j])] for (i,j) in zip(start,stop)] return nmins
[docs]def value_error(value,error): ''' value_error(v,e) returns a string of the format v(e) ''' from numpy import floor, log10, seterr eps = 1e-10 # minimum error if error>eps: # normal error exponent = int(floor(log10(error))) most_significant = int(round(error/10**exponent)) if most_significant>9: exponent += 1 most_significant=1 exponent = -exponent if exponent<0 else 0 form = '"{:.' form += '{}'.format(exponent) form += 'f}({})".format(value,most_significant)' else: if abs(value)<eps: form = '"(0(0)"' # too small both else: form = '"{}(0)".format(value)' # too small error return eval(form)
[docs]def results(): ''' generate a notebook with some results ''' import subprocess # write a python script script = '# Single Run Single Group Fit' script = script + '\n'#!/usr/bin/env python3' script = script + '\n# -*- coding: utf-8 -*-' script = script + '%matplotlib tk' script = script + '\n%cd /home/roberto.derenzi/git/mujpy/' script = script + '\nfrom mujpy.musuite import suite' script = script + '\nimport json, re' script = script + '\nfrom os.path import isfile' script = script + '\nfrom mujpy.mufit import mufit' script = script + '\nfrom mujpy.mufitplot import mufitplot' script = script + "\njsonsuffix = '.json'\n" # notice: the new cell is produced by the \n at the end of the previous line followed by \n# script = script + '\n# Define log and data paths, ' script = script + '\n# detector grouping and its calibration ' script = script + '\nlogpath = "/home/roberto.derenzi/git/mujpy/log/"' script = script + '\ndatafile = "/home/roberto.derenzi/musrfit/MBT/gps/run_05_21/data/deltat_tdc_gps_0822.bin"' script = script + '\nrunlist = "822" # first run first' script = script + '\nmodelname = "mgml"' script = script + '\nversion = "1"' script = script + "\ngrp_calib = [{'forward':'3', 'backward':'4', 'alpha':1.13}]" script = script + "\ngroupcalibfile = '3-4.calib'" script = script + "\ninputsuitefile = 'input.suite'" script = script + "\ndashboard = modelname+'.'+re.search(r'\d+', runlist).group()+'.'+groupcalibfile[:groupcalibfile.index('.')]+'.'+version+jsonsuffix" script = script + '\nif not isfile(logpath+dashboard):' script = script + "\n print('Model definition dashboard file {} does not exist. Make one.'.format(logpath+dashboard))\n" script = script + "\n# Can add 'scan': 'T' or 'B' for orderinng csv for increasing T, B, otherwise increasing nrun" script = script + "\ninput_suite = {'console':'print'," script = script + "\n 'datafile':datafile," script = script + "\n 'logpath':logpath," script = script + "\n 'runlist':runlist," script = script + "\n 'groups calibration':groupcalibfile," script = script + "\n 'offset':20" script = script + "\n } # 'console':logging, for output in Log Console, 'console':print, for output in notebook" script = script + '\nwith open(logpath+inputsuitefile,"w") as f:' script = script + "\n json.dump(input_suite,f)" script = script + '\nwith open(logpath+groupcalibfile,"w") as f:' script = script + "\n json.dump(grp_calib,f)" script = script + "\nthe_suite = suite(logpath+inputsuitefile,mplot=False) # the_suite implements the class suite according to input.suite\n" script = script + "\n# End of suite definition, this suiete is a single run" script = script + "\n# Now let's fit it according to the dashboard" script = script + "\nthe_fit = mufit(the_suite,logpath+dashboard)\n" script = script + "\n# Let's now plot the fit result" script = script + "\n# We plot the fit result, not the guess, over a single range" script = script + "\nfit_plot= mufitplot('0,20000,40',the_fit)#,guess=True) # '0,1000,4,24000,100' # two range plot" # save it in cache/notebook.py with open('/tmp/notebook.py',"w") as f: f.write(script) # compose notebook filename = 'xxyy.label.group.ipynb' bashCommand = 'p2j -o -t /home/roberto.derenzi/git/mujpy/getstarted/Delendo/xxyy.label.group.ipynb /tmp/notebook.py' process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE) output, error = process.communicate()
# issue os command 'p2j cache/notebook.py '+filename
[docs]def signif(x, p): ''' write x with p significant digits ''' from numpy import asarray,where,isfinite,abs,floor,log10,round x = asarray(x) x_positive = where(isfinite(x) & (x != 0), abs(x), 10**(p-1)) mags = 10 ** (p - 1 - floor(log10(x_positive))) return round(x * mags) / mags