#!/usr/bin/python3
"""PoissonFlash
Generate sequence of random events with constant expectation value r
for the event rate, display as flashing yellow circle and show observed
event rate in time intervals corresponding to a given Poisson mean.
"""
import argparse
import multiprocessing as mp
import sys
import time
import numpy as np
from scipy.special import loggamma
from scipy.optimize import newton
import math
import random
import threading
import matplotlib as mpl
mpl.use('Qt5Agg')
import matplotlib.pyplot as plt
import matplotlib.patches as patches
plt.style.use('dark_background') # dark canvas background
# helper function (fromPhyPraKit.phyFit)
def Poisson_CI(lam, sigma=1.0):
"""
determine one-sigma Confidence Interval around the
mean lambda of a Poisson distribution, Poiss(x, lambda).
The method is based on delta-log-Likelihood (dlL)
of the Poission likelihood
Args:
- lam: mean of Poission distribution
- cl: desired confidence level
- sigma: alternatively specify an n-sigma interval
"""
# functions
def nlLPoisson(x, lam):
"""negative log-likelihood of Poissoin distrbution"""
return lam - x * np.log(lam) + loggamma(x + 1.0)
def f(x, lam, dlL):
"""Delta log-L - offset, input to Newton method"""
return nlLPoisson(x, lam) - nlLPoisson(lam, lam) - dlL
dlL = 0.5 * sigma * sigma
# for dlL=0.5, there is only one intersection with zero for lam<1.8
dl = 1.3 * np.sqrt(lam)
dlm = min(dl, lam)
cp = newton(f, x0=lam + dl, x1=lam, args=(lam, dlL))
try: # may not converge for small lambda, as there is no intersection < lam
cm = newton(f, x0=lam - dlm, x1=lam, args=(lam, dlL))
except:
cm = 0.0
if (cp - cm) < lam / 100.0: # found same intersection,
cm = 0.0 # set 1st one to 0.
return cm, cp
class DisplayPoissonEvent():
""" display a short flash for each random event
(following Poission statistics)
"""
def __init__(self, mpQ, rate=1., mean=4.):
self.mpQ = mpQ # queue for data
self.rate = rate # average event rate
self.tau = 1./rate # average time between events
self.mean = mean # desired mean value for time bin
self.tflash = max(self.tau/20, 0.050) # flash time for object indicating event occurance
self.flash_color = '#ffff80'
self.bg_color = 'black'
# initialize an (interactive) figure
self.fig = plt.figure("PoissonFlash ðœ=%.3g"%(self.tau), figsize=(6., 7.5))
self.fig.suptitle('Poisson Statistics ' + time.asctime(), size='large', color='y')
self.fig.subplots_adjust(left=0.15, bottom=0.1, right=0.95, top=0.95, wspace=None, hspace=0.15)
gs = self.fig.add_gridspec(nrows=4, ncols=1)
self.ax = self.fig.add_subplot(gs[:-1, :])
# figure for "Poission Event"
self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2)
#plt.axis("off")
self.ax.xaxis.set_tick_params(labelbottom=False)
self.ax.yaxis.set_tick_params(labelleft=False)
self.ax.set_xticks([])
self.ax.set_yticks([])
c = patches.Circle((0, 0.), 1, fc=self.bg_color, ec="orange")
c1 = patches.Circle((0, 0.), 1, ec="grey", lw=20)
self.flashobj= self.ax.add_patch(c1)
self.flashobj= self.ax.add_patch(c)
_ = self.ax.text(0.05, 0.96,
"Event Rate: %.2f Hz"%(self.rate),
transform=self.ax.transAxes, color='yellow')
self.txt = self.ax.text(0.05, 0.92, " ",
transform=self.ax.transAxes, color='azure')
# figure for rate history
self.axrate = self.fig.add_subplot(gs[-1, :])
self.axrate.set_ylabel("Counts")
self.axrate.set_xlabel("History (s)")
self.Npoints = 100
self.interval = self.mean * self.tau # bin width
self.xmin = -self.interval * self.Npoints
self.xmax = 0.0
self.axrate.set_xlim(self.xmin, self.xmax)
self.ymin = 0.0
self.ymax = self.mean + 5*np.sqrt(self.mean) # allow for 5 sigma spread
self.axrate.set_ylim(self.ymin, self.ymax)
self.axrate.grid(True, alpha=0.5)
self.axrate.set_xlabel("History [s]")
self.axrate.set_ylabel("Counts")
_ = self.axrate.text(0.02, 0.88,
"$\Delta$t=%2gs $\Rightarrow$ mean=%.2f"%(
self.mean/self.rate, self.mean),
transform=self.axrate.transAxes, color='orangered')
self.axrate.axhline(self.mean, color='red', linestyle='dashed')
if self.mean > 1.5:
cm1, cp1 = Poisson_CI(self.mean, 1.) # 68% confidence intervall
rect1 = patches.Rectangle((self.xmin, cm1),
self.xmax-self.xmin, cp1-cm1,
color='green', alpha=0.5)
self.axrate.add_patch(rect1)
if self.mean > 10:
cm2, cp2 = Poisson_CI(self.mean, 2.) # 96% confidence intervall
rect2 = patches.Rectangle((self.xmin, cm2),
self.xmax-self.xmin, cp2-cm2,
color='yellow', alpha=0.5)
self.axrate.add_patch(rect2)
self.xplt = np.linspace(-self.Npoints * self.interval, 0.0, self.Npoints)
self.counts = np.zeros(self.Npoints)-1.
(self.hline,) = self.axrate.plot(self.xplt, self.counts, '.--',
markersize=6, color = "ivory")
plt.ion()
plt.show()
def __call__(self):
"""Flash object"""
N = 0
Nlast = 0
lastbin = 0
T0 = time.time()
while True:
t = self.mpQ.get()
if t >= 0:
# show colored object
t_last = t
self.flashobj.set_facecolor(self.flash_color)
self.fig.canvas.start_event_loop(0.5*self.tflash)
# replaces plt.pause() without stealing focus
# collect statistics
N += 1
rate = N / t if N>2 else 0.
status_text = "active %.1f s"%t + 15*' ' + \
"counts %d"%N + 5*' '+ "average rate %.3f Hz"%rate
# display statistics and make flash object invisible
self.txt.set_text(status_text)
self.flashobj.set_facecolor(self.bg_color)
self.fig.canvas.start_event_loop(0.5*self.tflash)
# calculate entries for rate histogram
hbin = int(t/self.interval)%self.Npoints
if hbin != lastbin:
k = lastbin % self.Npoints
self.hline.set_ydata(np.concatenate((self.counts[k + 1 :],
self.counts[: k + 1])))
lastbin = hbin
self.counts[hbin] = 0 # initialize counter for next bin
self.counts[hbin] += 1
else:
print("\n*==* Poissson Flasher exiting ...")
print( 8*' '+ "active %.1f s"%t_last + 5*' ' + \
"counts %d"%N + 5*' '+"average rate %.3f Hz"%rate)
def showFlash(mpQ, rate, mean):
"""Background process to show poisson event as a flashing object
"""
flasher = DisplayPoissonEvent(mpQ, rate, mean)
flasher()
def poissonEventGenerator(mpQ, rate):
"""Generate sequence of signals corresponding to a Poisson process
"""
tau = 1./rate
dtcum = 0.
T0 = time.time()
while True:
# show flash
t = time.time() - T0
mpQ.put(t) # send event time
# exponentially distributed wait time
dt_wait = -tau * math.log(random.uniform(0., 1.))
# correct for time lags
dt_cor = max(0.9*dt_wait, dt_wait+dtcum-t)
dtcum += dt_wait
# wait ...
if dt_cor > 0.002:
time.sleep(dt_cor)
print(" lag: %.1f ms "%((dt_wait-dt_cor)*1000), end='\r')
# get command line parameters
if __name__ == "__main__": # ------------------------------------------------
if sys.platform.startswith('win'):
# On Windows calling this function is necessary.
mp.freeze_support()
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-r', '--rate', type=float, default=1.0,
help='average event rate in Hz')
parser.add_argument('-m', '--mean', type=float, default=4.0,
help='average Poisson mean for history ')
parser.add_argument('-t', '--time', type=int, default=1800,
help='run time in seconds')
parser.add_argument('-H', '--history', type=int, default=500,
help='max. number of data points to store')
parser.add_argument('-f', '--file', type=str, default='',
help='file name to store results')
args = parser.parse_args()
rate = args.rate
mean = args.mean
run_time = args.time
timestamp = time.strftime('%y%m%d-%H%M', time.localtime())
filename = args.file + '_' + timestamp + '.csv' if args.file != '' else ''
NHistory = args.history
times = np.zeros(NHistory)
tau = 1./args.rate
# set up background process for flash display
procs = []
generatorQ = mp.Queue() # Queue to spy on data transfer inside class Display
flasherQ = mp.Queue() # Queue to spy on data transfer inside class Display
procs.append(mp.Process(name="showFlash",
target=showFlash,
args=(flasherQ, rate,mean,)))
# generator process, must be last in list !
procs.append(mp.Process(name="poissonEventGenerator",
target=poissonEventGenerator,
args=(generatorQ, rate,)))
# start
print(10*' '+"flashing randomly with rate of %.3fs"%rate + " for %d s"%run_time)
print(20*' ' + " to end ", end = '\r')
for p in procs:
p.start()
icount = 0
start_time = time.time()
t=0
try:
while t < run_time:
t = generatorQ.get()
flasherQ.put(t)
times[icount % NHistory] = t
icount += 1
procs[-1].terminate() # terminate generator
a = input(15*' ' + "!!! time over, type to end ==> ")
except KeyboardInterrupt:
print("\n keyboard interrupt - ending ")
finally:
# end processes
flasherQ.put(-1)
for p in procs:
if p.is_alive(): p.terminate()
# sort and store data
event_times = times[: icount].tolist() if icount <= NHistory \
else np.concatenate((times[icount:], times[:icount])).tolist()
if filename != '':
with open(filename, 'w') as csvfile:
csvfile.write('event_times[s]\n')
for i in range(icount):
csvfile.write(str(times[i])+'\n')
print("*==* script " + sys.argv[0] + " ended, data saved to file " + filename)