""" This file contains utilities used in the TP1 exercises. """ from itertools import product, cycle import numpy as np import pandas as pd import vector as vec import matplotlib.pyplot as plt ### Class to access particle four vectors @pd.api.extensions.register_dataframe_accessor("v4") class FourVecAccessor(object): def __init__(self, pandas_obj): # to distinguish between multiple particles or single particle # we only need to save the column information, self._obj_columns = pandas_obj.columns # to keep data consistent when appending columns unsing this accessor save indices to use them in returns self._obj_indices = pandas_obj.index # get the correct index level, 0 for single particle, 1 for multiple _vars = self._obj_columns.get_level_values(self._obj_columns.nlevels - 1) if 'M' in _vars and "px" in _vars: kin_vars = ["M", "px", "py", "pz"] elif 'mass' in _vars and 'px' in _vars: kin_vars = ["mass", "px", "py", "pz"] elif 'mass' in _vars and 'pt' in _vars: kin_vars = ["mass", "pt", "phi", "eta"] elif 'E' in _vars and "px" in _vars: kin_vars = ["E", "px", "py", "pz"] elif 'E' in _vars and 'pt' in _vars: kin_vars = ["E", "pt", "phi", "eta"] else: raise KeyError("No matching structure implemented for interpreting the data as a four " "momentum!") # the following lines are where the magic happens # no multi-index, just on particle if self._obj_columns.nlevels == 1: # get the dtypes for the kinetic variables dtypes = pandas_obj.dtypes kin_view = list(map(lambda x: (x, dtypes[x]), kin_vars)) # get the kinetic variables from the dataframe and convert it to a numpy array. # require it to be C_CONTIGUOUS, vector uses C-Style # This array can then be viewed as a vector object. # Every property not given is calculated on the fly by the vector object. # E.g. the mass is not stored but calculated when the energy is given and vice versa. self._v4 = np.require(pandas_obj[kin_vars].to_numpy(), requirements='C').view( kin_view).view(vec.MomentumNumpy4D) # multi-index, e.g. getting the four momentum for multiple particles elif self._obj_columns.nlevels == 2: # get the dtypes for the kinetic variables # assume the same dtypes for the other particles dtypes = pandas_obj[self._obj_columns.get_level_values(0).unique()[0]].dtypes kin_view = list(map(lambda x: (x, dtypes[x]), kin_vars)) self._v4 = np.require(pandas_obj.loc[:, (self._obj_columns.get_level_values(0).unique(), kin_vars)].to_numpy(), requirements='C').view(kin_view).view(vec.MomentumNumpy4D) else: raise IndexError("Expected a dataframe with a maximum of two multi-index levels.") def __getattribute__(self, item): """ Attributes of this accessor are forwarded to the four vector. Returns either a pandas dataframe, if we have multiple particles or a pandas Series for a single particle. """ try: return object.__getattribute__(self, item) except AttributeError: try: return pd.DataFrame(self._v4.__getattribute__(item), columns=pd.MultiIndex.from_product( [self._obj_columns.unique(0), [item]]), index=self._obj_indices) except ValueError: try: return pd.Series(self._v4.__getattribute__(item).flatten(), name=item, index=self._obj_indices) except AttributeError as e: if "'function' object has no attribute 'flatten'" in str(e): raise AttributeError( "Functions of the four vectors can NOT be called directly via the " "accessor. Use the vector property instead! " "Usage: 'df['particle'].v4.vector.desired_function()'") raise e @property def vector(self): """The four vector object itself. It's required when using methods like boosting.""" if self._obj_columns.nlevels == 1: return self._v4[:, 0] else: return self._v4 ### Function to sort elements of an event def get_sorted_df_by_low_level_criteria(df, criteria, axis=1): """ Sorts in ascending order the top levels of a given pd.DataFrame with two levels columns by given criteria that has the form of a pandas DataFrame containing low level quantity, i.e. df.leptons.v4.pt @param df: pd.DataFrame containing the top level objects that will be sorted @param criteria: pd.DataFrame containing the sorting criteria values that are passed to np.argsort @param axis: 0 or 1; row wise sort: axis=1; column wise sort: axis=0. Make sure to pass the right sorted DataFrame (axis option) @return: pd.DataFrame """ N, n, v = *df.shape, df[df.columns.get_level_values(0)[0]].shape[1] idx = np.argsort(criteria, axis=axis) sorted_data = np.take_along_axis(df.to_numpy().reshape((N, int(n / v), v)), idx.to_numpy()[:, :, None], axis).reshape((N, n)) return pd.DataFrame(data=sorted_data, columns=df.columns, index = df.index) ### Proposal for a plotting function of physics object quantities in a grid plot given one or more MultiIndex pd.DataFrames ### without stack option def plot_quantities(df, column, quantity=None, bins=None, hist_range=None, density=False, weights=None, label=None, unit=None, yscale="log", suptitle=None): """ Plot the distributions of given quantities of given physics objects that are present in given DataFrame(s). For two or more DataFrames, the plotting is done object-quantity wise in the same subplot. :param df: pd.DataFrame or list of pd.DataFrames :param column: str or list of str of physics objects, i. e. "lepton_0" or ["jet_0", "jet_1"] :param quantity: str or list of str, i. e. "pt" or ["pt", "eta", "phi"] in case of a two level index, None if only one index level exists :param label: str or list of str with the same length as df if df is a list :param unit: str or list of str with the same length as quantity for x-axis label or the same length as columns in case quantity is None :param density: bool or list of bool with the same length as quantity :param hist_range: None or (float, float) or list of those with same length as quantity or column or the same length as all created subplots: a) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt"], hist_range = [(0, 100), (0, 50), (0, 25)] => creates subplots of (jet_0, pt), (jet_1, pt) and (jet_2, pt) in (0, 100), (0, 50) and (0, 25) intervals. b) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt", "eta"], hist_range = [(0, 100), (-4, 4)] => creates subplots where pt of all subplots ranges are set to (0, 100) and eta ranges to (-4, 4) c) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt", "eta"], hist_range = [(0, 100), (-4, 4), None, (-4, 4), None, (-4, 4)] => creates six subplots where all eta ranges are set to (-4, 4) and (jet_0, pt) range is set to (0, 100). All other ranges are set by default. :param bins: int or list of int, setting the number of used bins. In case of a list: the same rules applies for bins as for hist_range :param yscale: str like as in matplotlib yscale or list of str, setting the used yscale. In case of a list: the same rules applies for bins as for hist_range. Possible options are for example 'log', 'linear', 'logit', 'sumlog', ... :param weights: None, 1D np.ndarray or pd.Series or a list of those with the same length as df :param suptitle: None or str containing the figure title :return: None """ def to_list(it): # helper for argument conversion return it if isinstance(it, list) else [it] with_quantity = True if quantity else False # in case no quantity is provided df, label, unit, hist_range = to_list(df), to_list(label), to_list(unit), to_list(hist_range) column, quantity = to_list(column), to_list(quantity) bins, density, yscale, weights = to_list(bins), to_list(density), to_list(yscale), to_list(weights) # padding up to quantity dim if necessary unit = unit + [None] * (len(quantity) - len(unit)) density = density + [True if len(density) == 1 and density[0] else False] * (len(quantity) - len(density)) # padding up to df dim if necessary label = label + [None] * (len(df) - len(label)) weights = weights + [None] * (len(df) - len(weights)) def check_dim_of(it): # checking for bins, hist_range, yscale length in case to prevent strange plots return (len(it) == 1 or len(it) == len(quantity) or (len(it) == len(column) and len(quantity) == 1) or len(it) == len(list(product(column, quantity)))) assert check_dim_of(hist_range) and check_dim_of(bins) and check_dim_of(yscale), \ "(hist_range/bins/yscale) have either to be None or a (two element tuple/int/str) or be a " \ "list of those types and have the same length as column if single or None quantity is " \ "given or have the same length as quantity or the length of column * quantity" bins, hist_range, yscale = cycle(bins), cycle(hist_range), cycle(yscale) # number of created plots and subdivision in columns and rows n_plots = len(list(product(column, quantity))) n_columns = len(quantity) if len(quantity) > 1 else (3 if n_plots > 3 else n_plots) n_rows = int(np.ceil(n_plots / n_columns)) fig, ax = plt.subplots(n_rows, n_columns, figsize=(25, int(4 * n_rows))) ax = ax if isinstance(ax, np.ndarray) else np.array(ax) # in case only a single column amd a single quantity is given fig.suptitle(suptitle) def get_quantity(dataframe, _column_name, _quantity_name): # helper to get pd.Series... if _quantity_name is not None: # in case of a two level index try: # of a four vector quantity return getattr(dataframe[_column_name].v4, _quantity_name).to_numpy() except AttributeError: # of other quantity return dataframe.loc[:, (_column_name, _quantity_name)].to_numpy() else: # in case of a single level index return dataframe.loc[:, _column_name].to_numpy() def is_None(it): # helper: checking for None __and__ avoiding a collision with np.ndarray like objects return isinstance(it, type(None)) for i, (_ax, (_column, (_quantity, _unit, _density))) in enumerate(zip(ax.flatten(), product(column, zip(quantity, unit, density)))): empty_subplot_content = 0 # counter for setting subplots with no content to invisible edges = None # for auto setting of first histogram edges _bins, _hist_range, _yscale = next(bins), next(hist_range), next(yscale) # bins, hist_range, yscale for (_column, _quantity) for j, (_df, _label, _weights) in enumerate(zip(df, label, weights)): # in case multiple dataframes are given _plot_values = get_quantity(_df, _column, _quantity) if np.all(np.isnan(_plot_values)): empty_subplot_content += 1 continue _, edges, _ = _ax.hist(_plot_values, histtype="step", range=_hist_range, density=_density, bins=100 if not _bins and is_None(edges) else _bins if is_None(edges) else edges, # same edges label=_label if _label else f"Label_{j}", # default or custom weights=_weights) _ax.set(yscale=_yscale, ylabel="Events", title=f"{_column}{':' if _quantity else ''} {_quantity if _quantity else ''}", xlabel=f"{_quantity if _quantity else _column} " f"{'in ' if (unit[i % len(unit)]) or (_unit and with_quantity) else ''}" f"{_unit if (_unit and with_quantity) else (unit[i % len(unit)] if unit[i % len(unit)] else '')}") # _unit if with_quantity else unit[i] if _unit or unit[i] else '' _ax.legend() if empty_subplot_content == len(df): _ax.set_visible(False) # not showing MET eta or similar not defined or emtpy NaN like quantity lists plt.tight_layout() plt.show()