#!/usr/bin/python3 """PoissonFlash Generate sequence of random events with constant expectation value r for the event rate, display as flashing yellow circle and show observed event rate in time intervals corresponding to a given Poisson mean. """ import argparse import multiprocessing as mp import sys import time import numpy as np from scipy.special import loggamma from scipy.optimize import newton import math import random import threading import matplotlib as mpl mpl.use('Qt5Agg') import matplotlib.pyplot as plt import matplotlib.patches as patches plt.style.use('dark_background') # dark canvas background # helper function (fromPhyPraKit.phyFit) def Poisson_CI(lam, sigma=1.0): """ determine one-sigma Confidence Interval around the mean lambda of a Poisson distribution, Poiss(x, lambda). The method is based on delta-log-Likelihood (dlL) of the Poission likelihood Args: - lam: mean of Poission distribution - cl: desired confidence level - sigma: alternatively specify an n-sigma interval """ # functions def nlLPoisson(x, lam): """negative log-likelihood of Poissoin distrbution""" return lam - x * np.log(lam) + loggamma(x + 1.0) def f(x, lam, dlL): """Delta log-L - offset, input to Newton method""" return nlLPoisson(x, lam) - nlLPoisson(lam, lam) - dlL dlL = 0.5 * sigma * sigma # for dlL=0.5, there is only one intersection with zero for lam<1.8 dl = 1.3 * np.sqrt(lam) dlm = min(dl, lam) cp = newton(f, x0=lam + dl, x1=lam, args=(lam, dlL)) try: # may not converge for small lambda, as there is no intersection < lam cm = newton(f, x0=lam - dlm, x1=lam, args=(lam, dlL)) except: cm = 0.0 if (cp - cm) < lam / 100.0: # found same intersection, cm = 0.0 # set 1st one to 0. return cm, cp class DisplayPoissonEvent(): """ display a short flash for each random event (following Poission statistics) """ def __init__(self, mpQ, rate=1., mean=4.): self.mpQ = mpQ # queue for data self.rate = rate # average event rate self.tau = 1./rate # average time between events self.mean = mean # desired mean value for time bin self.tflash = max(self.tau/20, 0.050) # flash time for object indicating event occurance self.flash_color = '#ffff80' self.bg_color = 'black' # initialize an (interactive) figure self.fig = plt.figure("PoissonFlash 𝜏=%.3g"%(self.tau), figsize=(6., 7.5)) self.fig.suptitle('Poisson Statistics ' + time.asctime(), size='large', color='y') self.fig.subplots_adjust(left=0.15, bottom=0.1, right=0.95, top=0.95, wspace=None, hspace=0.15) gs = self.fig.add_gridspec(nrows=4, ncols=1) self.ax = self.fig.add_subplot(gs[:-1, :]) # figure for "Poission Event" self.ax.set_xlim(-2, 2) self.ax.set_ylim(-2, 2) #plt.axis("off") self.ax.xaxis.set_tick_params(labelbottom=False) self.ax.yaxis.set_tick_params(labelleft=False) self.ax.set_xticks([]) self.ax.set_yticks([]) c = patches.Circle((0, 0.), 1, fc=self.bg_color, ec="orange") c1 = patches.Circle((0, 0.), 1, ec="grey", lw=20) self.flashobj= self.ax.add_patch(c1) self.flashobj= self.ax.add_patch(c) _ = self.ax.text(0.05, 0.96, "Event Rate: %.2f Hz"%(self.rate), transform=self.ax.transAxes, color='yellow') self.txt = self.ax.text(0.05, 0.92, " ", transform=self.ax.transAxes, color='azure') # figure for rate history self.axrate = self.fig.add_subplot(gs[-1, :]) self.axrate.set_ylabel("Counts") self.axrate.set_xlabel("History (s)") self.Npoints = 100 self.interval = self.mean * self.tau # bin width self.xmin = -self.interval * self.Npoints self.xmax = 0.0 self.axrate.set_xlim(self.xmin, self.xmax) self.ymin = 0.0 self.ymax = self.mean + 5*np.sqrt(self.mean) # allow for 5 sigma spread self.axrate.set_ylim(self.ymin, self.ymax) self.axrate.grid(True, alpha=0.5) self.axrate.set_xlabel("History [s]") self.axrate.set_ylabel("Counts") _ = self.axrate.text(0.02, 0.88, "$\Delta$t=%2gs $\Rightarrow$ mean=%.2f"%( self.mean/self.rate, self.mean), transform=self.axrate.transAxes, color='orangered') self.axrate.axhline(self.mean, color='red', linestyle='dashed') if self.mean > 1.5: cm1, cp1 = Poisson_CI(self.mean, 1.) # 68% confidence intervall rect1 = patches.Rectangle((self.xmin, cm1), self.xmax-self.xmin, cp1-cm1, color='green', alpha=0.5) self.axrate.add_patch(rect1) if self.mean > 10: cm2, cp2 = Poisson_CI(self.mean, 2.) # 96% confidence intervall rect2 = patches.Rectangle((self.xmin, cm2), self.xmax-self.xmin, cp2-cm2, color='yellow', alpha=0.5) self.axrate.add_patch(rect2) self.xplt = np.linspace(-self.Npoints * self.interval, 0.0, self.Npoints) self.counts = np.zeros(self.Npoints)-1. (self.hline,) = self.axrate.plot(self.xplt, self.counts, '.--', markersize=6, color = "ivory") plt.ion() plt.show() def __call__(self): """Flash object""" N = 0 Nlast = 0 lastbin = 0 T0 = time.time() while True: t = self.mpQ.get() if t >= 0: # show colored object t_last = t self.flashobj.set_facecolor(self.flash_color) self.fig.canvas.start_event_loop(0.5*self.tflash) # replaces plt.pause() without stealing focus # collect statistics N += 1 rate = N / t if N>2 else 0. status_text = "active %.1f s"%t + 15*' ' + \ "counts %d"%N + 5*' '+ "average rate %.3f Hz"%rate # display statistics and make flash object invisible self.txt.set_text(status_text) self.flashobj.set_facecolor(self.bg_color) self.fig.canvas.start_event_loop(0.5*self.tflash) # calculate entries for rate histogram hbin = int(t/self.interval)%self.Npoints if hbin != lastbin: k = lastbin % self.Npoints self.hline.set_ydata(np.concatenate((self.counts[k + 1 :], self.counts[: k + 1]))) lastbin = hbin self.counts[hbin] = 0 # initialize counter for next bin self.counts[hbin] += 1 else: print("\n*==* Poissson Flasher exiting ...") print( 8*' '+ "active %.1f s"%t_last + 5*' ' + \ "counts %d"%N + 5*' '+"average rate %.3f Hz"%rate) def showFlash(mpQ, rate, mean): """Background process to show poisson event as a flashing object """ flasher = DisplayPoissonEvent(mpQ, rate, mean) flasher() def poissonEventGenerator(mpQ, rate): """Generate sequence of signals corresponding to a Poisson process """ tau = 1./rate dtcum = 0. T0 = time.time() while True: # show flash t = time.time() - T0 mpQ.put(t) # send event time # exponentially distributed wait time dt_wait = -tau * math.log(random.uniform(0., 1.)) # correct for time lags dt_cor = max(0.9*dt_wait, dt_wait+dtcum-t) dtcum += dt_wait # wait ... if dt_cor > 0.002: time.sleep(dt_cor) print(" lag: %.1f ms "%((dt_wait-dt_cor)*1000), end='\r') # get command line parameters if __name__ == "__main__": # ------------------------------------------------ if sys.platform.startswith('win'): # On Windows calling this function is necessary. mp.freeze_support() parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('-r', '--rate', type=float, default=1.0, help='average event rate in Hz') parser.add_argument('-m', '--mean', type=float, default=4.0, help='average Poisson mean for history ') parser.add_argument('-t', '--time', type=int, default=1800, help='run time in seconds') parser.add_argument('-H', '--history', type=int, default=500, help='max. number of data points to store') parser.add_argument('-f', '--file', type=str, default='', help='file name to store results') args = parser.parse_args() rate = args.rate mean = args.mean run_time = args.time timestamp = time.strftime('%y%m%d-%H%M', time.localtime()) filename = args.file + '_' + timestamp + '.csv' if args.file != '' else '' NHistory = args.history times = np.zeros(NHistory) tau = 1./args.rate # set up background process for flash display procs = [] generatorQ = mp.Queue() # Queue to spy on data transfer inside class Display flasherQ = mp.Queue() # Queue to spy on data transfer inside class Display procs.append(mp.Process(name="showFlash", target=showFlash, args=(flasherQ, rate,mean,))) # generator process, must be last in list ! procs.append(mp.Process(name="poissonEventGenerator", target=poissonEventGenerator, args=(generatorQ, rate,))) # start print(10*' '+"flashing randomly with rate of %.3fs"%rate + " for %d s"%run_time) print(20*' ' + " to end ", end = '\r') for p in procs: p.start() icount = 0 start_time = time.time() t=0 try: while t < run_time: t = generatorQ.get() flasherQ.put(t) times[icount % NHistory] = t icount += 1 procs[-1].terminate() # terminate generator a = input(15*' ' + "!!! time over, type to end ==> ") except KeyboardInterrupt: print("\n keyboard interrupt - ending ") finally: # end processes flasherQ.put(-1) for p in procs: if p.is_alive(): p.terminate() # sort and store data event_times = times[: icount].tolist() if icount <= NHistory \ else np.concatenate((times[icount:], times[:icount])).tolist() if filename != '': with open(filename, 'w') as csvfile: csvfile.write('event_times[s]\n') for i in range(icount): csvfile.write(str(times[i])+'\n') print("*==* script " + sys.argv[0] + " ended, data saved to file " + filename)