#!/usr/bin/env python3 """General exapmple for Markov-Chain Monte-Carlo""" # GQ Quast, Nov. 20203, adapted from code by Jonathan Fraine, see # https://exowanderer.medium.com/metropolis-hastings-mcmc-from-scratch-in-python-c21e53c485b7 import numpy as np import matplotlib.pyplot as plt def mcmc_updater(curr_state, curr_likeli, target_pdf, proposal_distribution): """ Propose a new state and compare the likelihoods Given the current state (initially random), current likelihood, the target pdf, and the transition (proposal) distribution, `mcmc_updater` generates a new proposal, evaluate its likelihood, compares that to the current likelihood with a uniformly samples threshold, then it returns new or current state in the MCMC chain. Args: - curr_state (float): the current parameter/state value - curr_likeli (float): the current likelihood estimate - target_pdf (function): a function handle to compute the likelihood - proposal_distribution (function): a function handle to compute the next proposal state Returns: - (tuple): either the current state or the new state and its corresponding likelihood """ global Nupd # count number of state updates # Generate a proposal state using the proposal distribution # Proposal state == new guess state to be compared to current proposal_state = proposal_distribution(curr_state) # Calculate the acceptance criterion prop_likeli = target_pdf(proposal_state) accept_crit = prop_likeli / curr_likeli # Generate a random number between 0 and 1 accept_threshold = np.random.uniform(0, 1) # If the acceptance criterion is greater than the random number, # accept the proposal state as the current state if accept_crit > accept_threshold: Nupd += 1 return proposal_state, prop_likeli return curr_state, curr_likeli def metropolis_hastings( target_pdf, proposal_distribution, initial_state, num_samples, stepsize=0.5, burnin_fraction=0.2): """ Compute the Markov Chain Monte Carlo Args: - target_pdf (function): a function handle to compute the target likelihood - proposal_distribution (function): a function handle to compute the next proposal state - initial_state (list): The initial conditions to start the chain - num_samples (integer): The number of samples to compte, or length of the chain - burnin_fraction (float): a float value from 0 to 1. The percentage of chain considered to be the burnin length Returns: - samples (list): The Markov Chain, samples from the target distribution """ samples = [] # The number of samples in the burn in phase num_burnin = int(burnin_fraction * num_samples) # Set the current state to the initial state curr_state = initial_state curr_likeli = target_pdf(curr_state) for i in range(num_samples+num_burnin): # proposal distribution sampling and comparison occur in mcmc_updater() curr_state, curr_likeli = mcmc_updater( curr_state=curr_state, curr_likeli=curr_likeli, target_pdf=target_pdf, proposal_distribution=proposal_distribution ) # Append the current state to the list of samples if i >= num_burnin: # Only append after the burn-in samples.append(curr_state) return samples def target_pdf(x): # define here the distribution to be sampled, here the sum of two Gaussians sig = 1. m = 10. g1 = np.exp(-(x-m)**2 / (2*sig**2))/sig/np.sqrt(2 * np.pi) sig = 0.1 m = 12. g2 = np.exp(-(x-m)**2 / (2*sig**2))/sig/np.sqrt(2 * np.pi) return (g1+g2)/2. def proposal_distribution(x, stepsize=0.5): # Select the proposed state (new guess) from a Gaussian distribution # centered at the current state, within a Guassian of width `stepsize` # ! this is a very common and the simplest-possible version of a "transition kernel" return np.random.normal(x, stepsize) def MCMCkernel(x, x0=0., stepsize=0.5): # for graphical visialisation of MCMC transition kernel return np.exp(-(x-x0)**2 / (2*stepsize**2))/stepsize/np.sqrt(2 * np.pi) if __name__ == "__main__": # ------------------------------------------ # np.random.seed(42) # set fixed seed only for testing Nupd=0 initial_state = 0. # random starting point num_samples = int(100000) burnin_fraction = 0.2 samples = metropolis_hastings( target_pdf, proposal_distribution, initial_state, num_samples, burnin_fraction=burnin_fraction ) # collcet result: print(f"*==* MCMC: {num_samples:d} accepted,", f" {int(num_samples*burnin_fraction):d} burn-in", f" {Nupd:d} state updates" ) bconts, bedges, _ = plt.hist(samples, bins=int(min(250,num_samples/100)), density=True, label="Sample") xplt = np.linspace(bedges[0], bedges[-1], 250) plt.plot(xplt,target_pdf(xplt), label = "Target PDF") # plot some MCMC Kernels: # plt.plot(xplt, MCMCkernel(xplt, 6.), "g--", label ="MCMC Kernel", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 6.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 7.), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 7.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 8.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 9.), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 9.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 10.), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 10.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 11.), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 11.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 12.), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 12.5), "g--", alpha=0.5) # plt.plot(xplt, MCMCkernel(xplt, 13.), "g--", alpha=0.5) plt.xlabel("x") plt.ylabel("Density") plt.legend() plt.show()