Source code for hybridLFPy.helpers

#!/usr/bin/env python
"""
Documentation:

This is a script containing general helper functions which can be applied
to specialized cases.
"""

import numpy as np
import os
import stat
import shutil
import glob
import copy
import scipy.signal as ss
import h5py
from mpi4py import MPI


###################################
# Initialization of MPI stuff     #
###################################
COMM = MPI.COMM_WORLD
SIZE = COMM.Get_size()
RANK = COMM.Get_rank()



#######################################
### DATA I/O                        ###
#######################################


[docs]def read_gdf(fname): """ Fast line-by-line gdf-file reader. Parameters ---------- fname : str Path to gdf-file. Returns ------- numpy.ndarray ([gid, val0, val1, **]), dtype=object) mixed datatype array """ gdf_file = open(fname, 'r') gdf = [] for l in gdf_file: data = l.split() gdf += [data] gdf = np.array(gdf, dtype=object) if gdf.size > 0: gdf[:, 0] = gdf[:, 0].astype(int) gdf[:, 1:] = gdf[:, 1:].astype(float) return np.array(gdf)
[docs]def write_gdf(gdf, fname): """ Fast line-by-line gdf-file write function Parameters ---------- gdf : numpy.ndarray Column 0 is gids, columns 1: are values. fname : str Path to gdf-file. Returns ------- None """ gdf_file = open(fname, 'w') for line in gdf: for i in np.arange(len(line)): gdf_file.write(str(line[i]) + '\t') gdf_file.write('\n') return None
[docs]def load_h5_data(path='', data_type='LFP', y=None, electrode=None, warmup=0., scaling=1.): """ Function loading results from hdf5 file Parameters ---------- path : str Path to hdf5-file data_type : str Signal types in ['CSD' , 'LFP', 'CSDsum', 'LFPsum']. y : None or str Name of population. electrode : None or int TODO: update, electrode is NOT USED warmup : float Lower cutoff of time series to remove possible transients scaling : float, Scaling factor for population size that determines the amount of loaded single-cell signals Returns ---------- numpy.ndarray [electrode id, compound signal] if `y` is None numpy.ndarray [cell id, electrode, single-cell signal] otherwise """ assert y is not None or electrode is not None if y is not None: f = h5py.File(os.path.join(path, '%s_%ss.h5' %(y,data_type))) data = f['data'].value[:,:, warmup:] if scaling != 1.: np.random.shuffle(data) num_cells = int(len(data)*scaling) data = data[:num_cells,:, warmup:] else: f = h5py.File(os.path.join(path, '%ssum.h5' %data_type)) data = f['data'].value[:, warmup:] return data
[docs]def dump_dict_of_nested_lists_to_h5(fname, data): """ Take nested list structure and dump it in hdf5 file. Parameters ---------- fname : str Filename data : dict(list(numpy.ndarray)) Dict of nested lists with variable len arrays. Returns ------- None """ # Open file print('writing to file: %s' % fname) f = h5py.File(fname) # Iterate over values for i, ivalue in list(data.items()): igrp = f.create_group(str(i)) for j, jvalue in enumerate(ivalue): jgrp = igrp.create_group(str(j)) for k, kvalue in enumerate(jvalue): if kvalue.size > 0: dset = jgrp.create_dataset(str(k), data=kvalue, compression='gzip') else: dset = jgrp.create_dataset(str(k), data=kvalue, maxshape=(None, ), compression='gzip') # Close file f.close()
[docs]def load_dict_of_nested_lists_from_h5(fname, toplevelkeys=None): """ Load nested list structure from hdf5 file Parameters ---------- fname : str Filename toplevelkeys : None or iterable, Load a two(default) or three-layered structure. Returns ------- dict(list(numpy.ndarray)) dictionary of nested lists with variable length array data. """ # Container: data = {} # Open file object f = h5py.File(fname, 'r') # Iterate over partial dataset if toplevelkeys is not None: for i in toplevelkeys: ivalue = f[str(i)] data[i] = [] for j, jvalue in enumerate(ivalue.values()): data[int(i)].append([]) for k, kvalue in enumerate(jvalue.values()): data[i][j].append(kvalue.value) else: for i, ivalue in list(f.items()): i = int(i) data[i] = [] for j, jvalue in enumerate(ivalue.values()): data[i].append([]) for k, kvalue in enumerate(jvalue.values()): data[i][j].append(kvalue.value) # Close dataset f.close() return data
[docs]def setup_file_dest(params, clearDestination=True): """ Function to set up the file catalog structure for simulation output Parameters ---------- params : object e.g., `cellsim16popsParams.multicompartment_params()` clear_dest : bool Savefolder will be cleared if already existing. Returns ------- None """ if RANK == 0: if not os.path.isdir(params.savefolder): os.mkdir(params.savefolder) assert(os.path.isdir(params.savefolder)) else: if clearDestination: print('removing folder tree %s' % params.savefolder) while os.path.isdir(params.savefolder): try: os.system('find %s -delete' % params.savefolder) except: shutil.rmtree(params.savefolder) os.mkdir(params.savefolder) assert(os.path.isdir(params.savefolder)) if not os.path.isdir(params.sim_scripts_path): print('creating %s' % params.sim_scripts_path) os.mkdir(params.sim_scripts_path) if not os.path.isdir(params.cells_path): print('creating %s' % params.cells_path) os.mkdir(params.cells_path) if not os.path.isdir(params.figures_path): print('creating %s' % params.figures_path) os.mkdir(params.figures_path) if not os.path.isdir(params.populations_path): print('creating %s' % params.populations_path) os.mkdir(params.populations_path) try: if not os.path.isdir(params.raw_nest_output_path): print('creating %s' % params.raw_nest_output_path) os.mkdir(params.raw_nest_output_path) except: pass if not os.path.isdir(params.spike_output_path): print('creating %s' % params.spike_output_path) os.mkdir(params.spike_output_path) for f in ['cellsim16popsParams.py', 'cellsim16pops.py', 'example_brunel.py', 'brunel_alpha_nest.py', 'mesocircuit.sli', 'mesocircuit_LFP_model.py', 'binzegger_connectivity_table.json', 'nest_simulation.py', 'microcircuit.sli']: if os.path.isfile(f): if not os.path.exists(os.path.join(params.sim_scripts_path, f)): shutil.copy(f, os.path.join(params.sim_scripts_path, f)) os.chmod(os.path.join(params.sim_scripts_path, f), stat.S_IREAD) COMM.Barrier()
####################################### ### GENERAL ### #######################################
[docs]def calculate_fft(data, tbin): """ Function to calculate the Fourier transform of data. Parameters ---------- data : numpy.ndarray 1D or 2D array containing time series. tbin : float Bin size of time series (in ms). Returns ------- freqs : numpy.ndarray Frequency axis of signal in Fourier space. fft : numpy.ndarray Signal in Fourier space. """ if len(np.shape(data)) > 1: n = len(data[0]) return np.fft.fftfreq(n, tbin * 1e-3), np.fft.fft(data, axis=1) else: n = len(data) return np.fft.fftfreq(n, tbin * 1e-3), np.fft.fft(data)
####################################### ### DATA MANIPULATION ### #######################################
[docs]def centralize(data, time=False, units=False): """ Function to subtract the mean across time and/or across units from data Parameters ---------- data : numpy.ndarray 1D or 2D array containing time series, 1st index: unit, 2nd index: time time : bool True: subtract mean across time. units : bool True: subtract mean across units. Returns ------- numpy.ndarray 1D or 0D array of centralized signal. """ assert(time is not False or units is not False) res = copy.copy(data) if time is True: res = np.array([x - np.mean(x) for x in res]) if units is True: res = np.array(res - np.mean(res, axis=0)) return res
[docs]def normalize(data): """ Function to normalize data to have mean 0 and unity standard deviation (also called z-transform) Parameters ---------- data : numpy.ndarray Returns ------- numpy.ndarray z-transform of input array """ data = data.astype(float) data -= data.mean() return data / data.std()
####################################### ### FILTER ### #######################################
[docs]def movav(y, Dx, dx): """ Moving average rectangular window filter: calculate average of signal y by using sliding rectangular window of size Dx using binsize dx Parameters ---------- y : numpy.ndarray Signal Dx : float Window length of filter. dx : float Bin size of signal sampling. Returns ------- numpy.ndarray Filtered signal. """ if Dx <= dx: return y else: ly = len(y) r = np.zeros(ly) n = np.int(np.round((Dx / dx))) r[0:np.int(n / 2.)] = 1.0 / n r[-np.int(n / 2.)::] = 1.0 / n R = np.fft.fft(r) Y = np.fft.fft(y) yf = np.fft.ifft(Y * R) return yf
[docs]def decimate(x, q=10, n=4, k=0.8, filterfun=ss.cheby1): """ scipy.signal.decimate like downsampling using filtfilt instead of lfilter, and filter coeffs from butterworth or chebyshev type 1. Parameters ---------- x : numpy.ndarray Array to be downsampled along last axis. q : int Downsampling factor. n : int Filter order. k : float Aliasing filter critical frequency Wn will be set as Wn=k/q. filterfun : function `scipy.signal.filter_design.cheby1` or `scipy.signal.filter_design.butter` function Returns ------- numpy.ndarray Array of downsampled signal. """ if not isinstance(q, int): raise TypeError("q must be an integer") if n is None: n = 1 if filterfun == ss.butter: b, a = filterfun(n, k / q) elif filterfun == ss.cheby1: b, a = filterfun(n, 0.05, k / q) else: raise Exception('only ss.butter or ss.cheby1 supported') try: y = ss.filtfilt(b, a, x) except: # Multidim array can only be processed at once for scipy >= 0.9.0 y = [] for data in x: y.append(ss.filtfilt(b, a, data)) y = np.array(y) try: return y[:, ::q] except: return y[::q]
####################################### ### CORRELATION ANALYSIS ### #######################################
[docs]def mean(data, units=False, time=False): """ Function to compute mean of data Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time units : bool Average over units time : bool Average over time Returns ------- if units=False and time=False: error if units=True: 1 dim numpy.ndarray; time series if time=True: 1 dim numpy.ndarray; series of unit means across time if units=True and time=True: float; unit and time mean Examples -------- >>> mean(np.array([[1, 2, 3], [4, 5, 6]]), units=True) array([ 2.5, 3.5, 4.5]) >>> mean(np.array([[1, 2, 3], [4, 5, 6]]), time=True) array([ 2., 5.]) >>> mean(np.array([[1, 2, 3], [4, 5, 6]]), units=True,time=True) 3.5 """ assert(units is not False or time is not False) if units is True and time is False: return np.mean(data, axis=0) elif units is False and time is True: return np.mean(data, axis=1) elif units is True and time is True: return np.mean(data)
[docs]def compound_mean(data): """ Compute the mean of the compound/sum signal. Data is first summed across units and averaged across time. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time Returns ------- float time-averaged compound/sum signal Examples -------- >>> compound_mean(np.array([[1, 2, 3], [4, 5, 6]])) 7.0 """ return np.mean(np.sum(data, axis=0))
[docs]def variance(data, units=False, time=False): """ Compute the variance of data across time, units or both. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time. units : bool Variance across units time : bool Average over time Returns ---------- if units=False and time=False: Exception if units=True: 1 dim numpy.ndarray; time series if time=True: 1 dim numpy.ndarray; series of single unit variances across time if units=True and time=True: float; mean of single unit variances across time Examples ---------- >>> variance(np.array([[1, 2, 3],[4, 5, 6]]), units=True) array([ 2.25, 2.25, 2.25]) >>> variance(np.array([[1, 2, 3], [4, 5, 6]]), time=True) array([ 0.66666667, 0.66666667]) >>> variance(np.array([[1, 2, 3], [4, 5, 6]]), units=True, time=True) 0.66666666666666663 """ assert(units is not False or time is not False) if units is True and time is False: return np.var(data, axis=0) elif units is False and time is True: return np.var(data, axis=1) elif units is True and time is True: return np.mean(np.var(data, axis=1))
[docs]def compound_variance(data): """ Compute the variance of the compound/sum signal. Data is first summed across units, then the variance across time is calculated. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time Returns ------- float variance across time of compound/sum signal Examples -------- >>> compound_variance(np.array([[1, 2, 3], [4, 5, 6]])) 2.6666666666666665 """ return np.var(np.sum(data, axis=0))
[docs]def powerspec(data, tbin, Df=None, units=False, pointProcess=False): """ Calculate (smoothed) power spectra of all timeseries in data. If units=True, power spectra are averaged across units. Note that averaging is done on power spectra rather than data. If pointProcess is True, power spectra are normalized by the length T of the time series. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time. tbin : float Binsize in ms. Df : float/None, Window width of sliding rectangular filter (smoothing), None is no smoothing. units : bool Average power spectrum. pointProcess : bool If set to True, powerspectrum is normalized to signal length T. Returns ------- freq : tuple numpy.ndarray of frequencies. POW : tuple if units=False: 2 dim numpy.ndarray; 1st axis unit, 2nd axis frequency if units=True: 1 dim numpy.ndarray; frequency series Examples -------- >>> powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df) Out[1]: (freq,POW) >>> POW.shape Out[2]: (2,len(analog_sig1)) >>> powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df, units=True) Out[1]: (freq,POW) >>> POW.shape Out[2]: (len(analog_sig1),) """ freq, DATA = calculate_fft(data, tbin) df = freq[1] - freq[0] T = tbin * len(freq) POW = np.abs(DATA) ** 2 if Df is not None: POW = [movav(x, Df, df) for x in POW] cut = int(Df / df) freq = freq[cut:] POW = np.array([x[cut:] for x in POW]) POW = np.abs(POW) assert(len(freq) == len(POW[0])) if units is True: POW = mean(POW, units=units) assert(len(freq) == len(POW)) if pointProcess: POW *= 1. / T * 1e3 # Normalization, power independent of T return freq, POW
[docs]def compound_powerspec(data, tbin, Df=None, pointProcess=False): """ Calculate the power spectrum of the compound/sum signal. data is first summed across units, then the power spectrum is calculated. If pointProcess=True, power spectra are normalized by the length T of the time series. Parameters ---------- data : numpy.ndarray, 1st axis unit, 2nd axis time tbin : float, binsize in ms Df : float/None, window width of sliding rectangular filter (smoothing), None -> no smoothing pointProcess : bool, if set to True, powerspectrum is normalized to signal length T Returns ------- freq : tuple numpy.ndarray of frequencies POW : tuple 1 dim numpy.ndarray, frequency series Examples -------- >>> compound_powerspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df) Out[1]: (freq,POW) >>> POW.shape Out[2]: (len(analog_sig1),) """ return powerspec([np.sum(data, axis=0)], tbin, Df=Df, units=True, pointProcess=pointProcess)
[docs]def crossspec(data, tbin, Df=None, units=False, pointProcess=False): """ Calculate (smoothed) cross spectra of data. If `units`=True, cross spectra are averaged across units. Note that averaging is done on cross spectra rather than data. Cross spectra are normalized by the length T of the time series -> no scaling with T. If pointProcess=True, power spectra are normalized by the length T of the time series. Parameters ---------- data : numpy.ndarray, 1st axis unit, 2nd axis time tbin : float, binsize in ms Df : float/None, window width of sliding rectangular filter (smoothing), None -> no smoothing units : bool, average cross spectrum pointProcess : bool, if set to True, cross spectrum is normalized to signal length T Returns ------- freq : tuple numpy.ndarray of frequencies CRO : tuple if `units`=True: 1 dim numpy.ndarray; frequency series if `units`=False:3 dim numpy.ndarray; 1st axis first unit, 2nd axis second unit, 3rd axis frequency Examples -------- >>> crossspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df) Out[1]: (freq,CRO) >>> CRO.shape Out[2]: (2,2,len(analog_sig1)) >>> crossspec(np.array([analog_sig1, analog_sig2]), tbin, Df=Df, units=True) Out[1]: (freq,CRO) >>> CRO.shape Out[2]: (len(analog_sig1),) """ N = len(data) if units is True: # smoothing and normalization take place in powerspec # and compound_powerspec freq, POW = powerspec(data, tbin, Df=Df, units=True) freq_com, CPOW = compound_powerspec(data, tbin, Df=Df) assert(len(freq) == len(freq_com)) assert(np.min(freq) == np.min(freq_com)) assert(np.max(freq) == np.max(freq_com)) CRO = 1. / (1. * N * (N - 1.)) * (CPOW - 1. * N * POW) assert(len(freq) == len(CRO)) else: freq, DATA = calculate_fft(data, tbin) T = tbin * len(freq) df = freq[1] - freq[0] if Df is not None: cut = int(Df / df) freq = freq[cut:] CRO = np.zeros((N, N, len(freq)), dtype=complex) for i in range(N): for j in range(i + 1): tempij = DATA[i] * DATA[j].conj() if Df is not None: tempij = movav(tempij, Df, df)[cut:] CRO[i, j] = tempij CRO[j, i] = CRO[i, j].conj() assert(len(freq) == len(CRO[0, 0])) if pointProcess: CRO *= 1. / T * 1e3 # normalization return freq, CRO
[docs]def compound_crossspec(a_data, tbin, Df=None, pointProcess=False): """ Calculate cross spectra of compound signals. a_data is a list of datasets (a_data = [data1,data2,...]). For each dataset in a_data, the compound signal is calculated and the crossspectra between these compound signals is computed. If pointProcess=True, power spectra are normalized by the length T of the time series. Parameters ---------- a_data : list of numpy.ndarrays Array: 1st axis unit, 2nd axis time. tbin : float Binsize in ms. Df : float/None, Window width of sliding rectangular filter (smoothing), None -> no smoothing. pointProcess : bool If set to True, crossspectrum is normalized to signal length `T` Returns ------- freq : tuple numpy.ndarray of frequencies. CRO : tuple 3 dim numpy.ndarray; 1st axis first compound signal, 2nd axis second compound signal, 3rd axis frequency. Examples -------- >>> compound_crossspec([np.array([analog_sig1, analog_sig2]), np.array([analog_sig3,analog_sig4])], tbin, Df=Df) Out[1]: (freq,CRO) >>> CRO.shape Out[2]: (2,2,len(analog_sig1)) """ a_mdata = [] for data in a_data: a_mdata.append(np.sum(data, axis=0)) # calculate compound signals return crossspec(np.array(a_mdata), tbin, Df, units=False, pointProcess=pointProcess)
[docs]def autocorrfunc(freq, power): """ Calculate autocorrelation function(s) for given power spectrum/spectra. Parameters ---------- freq : numpy.ndarray 1 dimensional array of frequencies. power : numpy.ndarray 2 dimensional power spectra, 1st axis units, 2nd axis frequencies. Returns ------- time : tuple 1 dim numpy.ndarray of times. autof : tuple 2 dim numpy.ndarray; autocorrelation functions, 1st axis units, 2nd axis times. """ tbin = 1. / (2. * np.max(freq)) * 1e3 # tbin in ms time = np.arange(-len(freq) / 2. + 1, len(freq) / 2. + 1) * tbin # T = max(time) multidata = False if len(np.shape(power)) > 1: multidata = True if multidata: N = len(power) autof = np.zeros((N, len(freq))) for i in range(N): raw_autof = np.real(np.fft.ifft(power[i])) mid = int(len(raw_autof) / 2.) autof[i] = np.hstack([raw_autof[mid + 1:], raw_autof[:mid + 1]]) assert(len(time) == len(autof[0])) else: raw_autof = np.real(np.fft.ifft(power)) mid = int(len(raw_autof) / 2.) autof = np.hstack([raw_autof[mid + 1:], raw_autof[:mid + 1]]) assert(len(time) == len(autof)) # autof *= T*1e-3 # normalization is done in powerspec() return time, autof
[docs]def crosscorrfunc(freq, cross): """ Calculate crosscorrelation function(s) for given cross spectra. Parameters ---------- freq : numpy.ndarray 1 dimensional array of frequencies. cross : numpy.ndarray 2 dimensional array of cross spectra, 1st axis units, 2nd axis units, 3rd axis frequencies. Returns ------- time : tuple 1 dim numpy.ndarray of times. crossf : tuple 3 dim numpy.ndarray, crosscorrelation functions, 1st axis first unit, 2nd axis second unit, 3rd axis times. """ tbin = 1. / (2. * np.max(freq)) * 1e3 # tbin in ms time = np.arange(-len(freq) / 2. + 1, len(freq) / 2. + 1) * tbin # T = max(time) multidata = False # check whether cross contains many cross spectra if len(np.shape(cross)) > 1: multidata = True if multidata: N = len(cross) crossf = np.zeros((N, N, len(freq))) for i in range(N): for j in range(N): raw_crossf = np.real(np.fft.ifft(cross[i, j])) mid = int(len(raw_crossf) / 2.) crossf[i, j] = np.hstack( [raw_crossf[mid + 1:], raw_crossf[:mid + 1]]) assert(len(time) == len(crossf[0, 0])) else: raw_crossf = np.real(np.fft.ifft(cross)) mid = int(len(raw_crossf) / 2.) crossf = np.hstack([raw_crossf[mid + 1:], raw_crossf[:mid + 1]]) assert(len(time) == len(crossf)) # crossf *= T*1e-3 # normalization happens in cross spectrum return time, crossf
[docs]def corrcoef(time, crossf, integration_window=0.): """ Calculate the correlation coefficient for given auto- and crosscorrelation functions. Standard settings yield the zero lag correlation coefficient. Setting integration_window > 0 yields the correlation coefficient of integrated auto- and crosscorrelation functions. The correlation coefficient between a zero signal with any other signal is defined as 0. Parameters ---------- time : numpy.ndarray 1 dim array of times corresponding to signal. crossf : numpy.ndarray Crosscorrelation functions, 1st axis first unit, 2nd axis second unit, 3rd axis times. integration_window: float Size of the integration window. Returns ------- cc : numpy.ndarray 2 dim array of correlation coefficient between two units. """ N = len(crossf) cc = np.zeros(np.shape(crossf)[:-1]) tbin = abs(time[1] - time[0]) lim = int(integration_window / tbin) if len(time)%2 == 0: mid = len(time)/2-1 else: mid = np.floor(len(time)/2.) for i in range(N): ai = np.sum(crossf[i, i][mid - lim:mid + lim + 1]) offset_autoi = np.mean(crossf[i,i][:mid-1]) for j in range(N): cij = np.sum(crossf[i, j][mid - lim:mid + lim + 1]) offset_cross = np.mean(crossf[i,j][:mid-1]) aj = np.sum(crossf[j, j][mid - lim:mid + lim + 1]) offset_autoj = np.mean(crossf[j,j][:mid-1]) if ai > 0. and aj > 0.: cc[i, j] = (cij-offset_cross) / np.sqrt((ai-offset_autoi) * \ (aj-offset_autoj)) else: cc[i, j] = 0. return cc
[docs]def coherence(freq, power, cross): """ Calculate frequency resolved coherence for given power- and crossspectra. Parameters ---------- freq : numpy.ndarray Frequencies, 1 dim array. power : numpy.ndarray Power spectra, 1st axis units, 2nd axis frequencies. cross : numpy.ndarray, Cross spectra, 1st axis units, 2nd axis units, 3rd axis frequencies. Returns ------- freq: tuple 1 dim numpy.ndarray of frequencies. coh: tuple ndim 3 numpy.ndarray of coherences, 1st axis units, 2nd axis units, 3rd axis frequencies. """ N = len(power) coh = np.zeros(np.shape(cross)) for i in range(N): for j in range(N): coh[i, j] = cross[i, j] / np.sqrt(power[i] * power[j]) assert(len(freq) == len(coh[0, 0])) return freq, coh
[docs]def cv(data, units=False): """ Calculate coefficient of variation (cv) of data. Mean and standard deviation are computed across time. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time. units : bool Average `cv`. Returns ------- numpy.ndarray If units=False, series of unit `cv`s. float If units=True, mean `cv` across units. Examples -------- >>> cv(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]])) array([ 0.48795004, 0.63887656]) >>> cv(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]), units=True) 0.56341330073710316 """ mu = mean(data, time=True) var = variance(data, time=True) cv = np.sqrt(var) / mu if units is True: return np.mean(cv) else: return cv
[docs]def fano(data, units=False): """ Calculate fano factor (FF) of data. Mean and variance are computed across time. Parameters ---------- data : numpy.ndarray 1st axis unit, 2nd axis time. units : bool Average `FF`. Returns ------- numpy.ndarray If units=False, series of unit FFs. float If units=True, mean FF across units. Examples -------- >>> fano(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]])) array([ 0.83333333, 1.9047619 ]) >>> fano(np.array([[1, 2, 3, 4, 5, 6], [11, 2, 3, 3, 4, 5]]), units=True) 1.3690476190476191 """ mu = mean(data, time=True) var = variance(data, time=True) ff = var / mu if units is True: return np.mean(ff) else: return ff
if __name__ == "__main__": import doctest doctest.testmod()