diff --git a/gs_quant/timeseries/statistics.py b/gs_quant/timeseries/statistics.py index 0155fcc5..e25c7f29 100644 --- a/gs_quant/timeseries/statistics.py +++ b/gs_quant/timeseries/statistics.py @@ -15,16 +15,30 @@ # Such functions should be fully documented: docstrings should describe parameters and the return value, and provide # a 1-line description. Type annotations should be provided for parameters. +import math +from collections import namedtuple + import datetime +import multiprocessing as mp +from multiprocessing import Pool import numpy +import pandas as pd +from pandas.tseries.offsets import BDay import scipy.stats.mstats as stats from scipy.stats import percentileofscore +from scipy.spatial.distance import euclidean from statsmodels.regression.rolling import RollingOLS +from dtaidistance import dtw from .algebra import * import statsmodels.api as sm from ..models.epidemiology import SIR, SEIR, EpidemicModel from ..data import DataContext +from typing import Optional, Union, Callable +import plotly.graph_objects as go +from plotly.subplots import make_subplots +import ipywidgets as widgets +from IPython.display import display """ Stats library is for basic arithmetic and statistical operations on timeseries. @@ -1389,3 +1403,1531 @@ def predict_e(self) -> pd.Series: :return: exposed predict """ return self._model.e_predict + + +DateRange = namedtuple('DateRange', ['start', 'end']) + + +def recurrence_parallel_score(args): + """ + Function to feed to the Multiprocessing Pool in calcualting the similarity + score between two time series. + """ + curr_period_ts, recurrence_ts, factor, recurr_daterange, metric = args + score = metric(curr_period_ts, recurrence_ts) + return '{}_{}'.format(factor, recurr_daterange), score + + +def cummulative_pct_change(ts) -> float: + """ + Function that calculates cummulative pct_change of series + + :param ts: pd.Series or list + """ + ts = list(ts) + + if ts[0] == 0: # deal with first value is 0 + if ts[-1] > ts[0]: + pct_change = 1.0 + elif ts[-1] < ts[0]: + pct_change = -1.0 + else: + pct_change = 0.0 + else: + pct_change = (ts[-1] - ts[0])/ts[0] + + return pct_change * 100 + + +def dtw_metric(a, b): + """ + Function that calculates the DTW similarity between two time series + + :param a: time series a + :param b: time series b + """ + + if np.unique(a).size == 1 or np.unique(b).size == 1: + return np.nan + + a = np.array(a) + b = np.array(b) + + a = (a - a.mean())/a.std() + b = (b - b.mean())/b.std() + + score = dtw.distance_fast(a, b) + + return score if score != np.inf or score != -np.inf else np.nan + + +class GS_Recurrence(): + """ + Finds historically similar time periods given a handful of time series, + a similarity metric, query start/end date (time period to compare all + other periods against of the same length), and various other parameters. + + :param df: the original Pandas Dataframe to feed to the model. Requires + index as dates and columns as time series. Only supports daily + data. + + **Examples** + + >>> query_start = datetime.date(2020, 3, 20) + >>> query_end = datetime.date(2020, 9, 4) + + >>> GS_Recurr = GS_Recurrence(clean_df) + >>> GS_Recurr.find_recurrences(factors=['NASDAQ', 'VIXCLS], + start=query_start, + end=query_end) + """ + + def __init__(self, df): + + self.df_original = df.loc[:, ~df.columns.duplicated()] + self.df_original.index = pd.to_datetime(self.df_original.index) + self.df_normalized = self.df_original.copy(deep=True) + + self.recurrences = None + self.all_recurrences = None + + self.factors = None + self.factor_weights = None + + self.factor_unit = None + + self.query_start = None + self.query_end = None + + df_original_nans = self.df_original.isnull().sum().sum() + if df_original_nans != 0: + print('''Found {} NaNs... Imputing Using Cubic Spline & \ + Forward/Backward Filling...'''.format(df_original_nans)) + + print('Done.') + self.df_original = self.df_original.apply(self._fillnan) + + def _fillnan(self, ts: pd.Series) -> pd.Series: + """ + Function fill in missing data by interpolating (cubic) and then + backward/forward filling + + :param ts: Pandas Series + """ + ts = ts.interpolate(method='cubic') + ts = ts.ffill().bfill() + ts = ts.round(4) + return ts + + def _fill_inf_nan(self, series: pd.Series) -> pd.Series: + """ + Function fill in missing np.nan and np.inf with 0.0 + + :param series: Pandas Series + """ + series = series.copy(deep=True) + series = series.replace(np.inf, 1.0).replace(np.nan, 0.0) + return series + + def _str_to_daterange(self, + s: str, + days_before: int = 0, + days_after: int = 0) -> DateRange: + """ + Function that converts str with the format 'm/d/Y - m/d/Y' to + DateRange with field names 'start' and 'end' + + """ + start, end = s.split('-', 1) + start, end = start.strip(), end.strip() + + start = datetime.datetime.strptime(start, + '%Y/%m/%d') - BDay(days_before) + end = datetime.datetime.strptime(end, '%Y/%m/%d') + BDay(days_after) + + return DateRange(start=start, end=end) + + def _overlap_daterange(self, + dates: list, + length: int, + every_n: int = 1) -> list: + """ + Function to generate all the overlapping date range given a list of + datetimeindex + + :param dates: Pandas DatetimeIndex list + :param length: length of overlapping time periods + """ + + lst = ['{} - {}'.format( + date.strftime('%Y/%m/%d'), + dates[idx + length - 1].strftime('%Y/%m/%d')) + for idx, date in enumerate(dates[:len(dates) - length + 1]) + if len(dates[idx: idx + length]) == length] + + lst = lst[0::every_n] + return lst + + def _calculate_overlap(self, r1: DateRange, r2: DateRange) -> int: + """ + Function to calculate number of overlapped days from two DateRanges + + :param r1: first DateRange + :param r2: second DateRange + """ + latest_start = max(r1.start, r2.start) + earliest_end = min(r1.end, r2.end) + delta = (earliest_end - latest_start).days + 1 + num_overlap_days = max(0, delta) + return num_overlap_days + + def _remove_overlap(self, + df: pd.DataFrame, + sort_by: str, + overlap_pct: float = 0.5, + asc: bool = False) -> pd.DataFrame: + """ + Function to remove overlap dates in similarity score df + + :param df: simialrity scores df + :param sort_by: column to sort in descending order before + :param overlap_pct: % to determine if two date periods overlap + :param asc: sort final df by asc + """ + query_str = self.query_start.strftime('%Y/%m/%d') + \ + ' - ' + self.query_end.strftime('%Y/%m/%d') + + df = df.sort_values(sort_by, ascending=asc) + + non_overlap_list = [] + + fst_recurr = df.index[0] + non_overlap_list.append((fst_recurr, + self._str_to_daterange(fst_recurr))) + + query_length = len(pd.date_range(self.query_start, + self.query_end, + freq='B')) + non_overlap_thresh = query_length * overlap_pct + + for idx, date_str in enumerate(df.index): + query_date_range = self._str_to_daterange(date_str) + overlap_count = 0 + for non_overlap in non_overlap_list: + non_overlap_date_range = self._str_to_daterange(non_overlap[0]) + num_overlap = self._calculate_overlap(query_date_range, + non_overlap_date_range) + + if num_overlap > non_overlap_thresh: + overlap_count += 1 + break + + if overlap_count == 0: + non_overlap_list.append((date_str, query_date_range)) + + non_overlap_dates = [i[0] for i in non_overlap_list + if i[0] != query_str] + print(' Before removing overlapped dates: {}'.format(df.shape[0])) + final = df.loc[non_overlap_dates] + print(' After removing overlapped dates: {}'.format(final.shape[0])) + return final + + def _calculate_weights(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Function to weight the factors and their similarity scores. + Returns the recurrences using weighted factors. + + :param df: Pandas DataFrame of scores + """ + + for col in df.columns: + min_ = self._fill_inf_nan(df[col].copy(deep=True)).dropna().min() + max_ = self._fill_inf_nan(df[col].copy(deep=True)).dropna().max() + df['{} 0-1'.format(col)] = ((df[col] - min_) / (max_ - min_)) + + norm_columns = [col for col in df.columns if '0-1' in col] + + norm_df = df[norm_columns] + norm_df.columns = [col.replace(' Score 0-1', '') + for col in norm_columns] + + assert(((1.0 - norm_df) <= 1.0).sum().sum() == (norm_df.shape[0] * + norm_df.shape[1])) + assert(((1.0 - norm_df) >= 0.0).sum().sum() == (norm_df.shape[0] * + norm_df.shape[1])) + + w_score = (1.0 - norm_df).mul([self.factor_weights[col] + for col in norm_df.columns]).sum(axis=1) + df['Final Score'] = w_score + + df = df.sort_values('Final Score', ascending=False) + + return df + + def _parallel_recurr_dict_creation(self, args: list) -> dict: + """ + Function to run in parallel that extracts overlapping time periods + and stores them into a dict of dict. + + :param: args is of the format factor, time series, and a list of + overlapping date ranges + """ + factor, df, overlap_date_ranges = args + + temp_dict = {} + for start_end in overlap_date_ranges: + daterange = self._str_to_daterange(start_end) + daterange = pd.date_range(daterange.start, daterange.end, freq='B') + + recurr_chunk = df.loc[daterange] + temp_dict[start_end] = np.array(recurr_chunk) + + return {factor: temp_dict} + + def find_recurrences(self, + factors: list, + start: datetime.date, + end: datetime.date, + factor_unit: Optional[dict] = None, + factor_weights: Optional[dict] = None, + overlap_pct: float = 0.2, + sim_metric: Union[str, Callable]='dtw', + normalize: Union[str, Callable]='z-score', + asc: bool = False, + parallel: bool = False, + n_jobs: int = 1, + every_n: int = 1, + search_start: Optional[datetime.date] = None, + search_end: Optional[datetime.date] = None) -> None: + """ + Function to find recurrences. + + :param factors: factors of interest + :param start: query period start date + :param end: query period end date + :param factor_unit: dict with factors as key and resepctive unit as + values; 'r' (return) or 'd' (difference) + :param factor_weights: factors weights to weigh the factors; should + sum to 1; higher value implies more important + :param overlap_pct: % threshold to determine if two periods overlap + :param sim_metric: default metric is 'dtw', but can pass in your own + metric that compares two time series + :param normalize: normalization function that is applied to the entire + time series; defauly is z-score, but can pass in + own function or choose None + :param asc: order final scores by asc; default is that most similar + scores are 1.0 and least similar 0.0, since score is first + nomalized between 0-1 and then subtracted from one before + applying weights + :param parallel: run in parallel + :param n_jobs: number of cores to use; -1 specifies all cores + :param every_n: every n overlapping date ranges is selected (instead + of every overlapping period). Helps reduce + computational time. + + **Examples** + + >>> GS_Recurr = GS_Recurrence(clean_df) + >>> GS_Recurr.find_recurrences(factors=['NASDAQ', 'VIXCLS], + start=query_start, + end=query_end) + + """ + def z_score_normalize(ts) -> np.array: + if np.unique(ts).size == 1: # if values same, return same array + return np.array(ts) + else: + return np.array(((ts - ts.mean())/ts.std()).dropna()) + + df = self.df_original.copy(deep=True) + + self.factors = list(set(factors)) + self.query_start = start + self.query_end = end + assert(start < end), 'Start date is not before end date!' + + if factor_unit and not set(self.factors).issubset(factor_unit.keys()): + raise ValueError('Specified factor units do not match factors.') + else: + if factor_unit: + assert(set(factor_unit.values()).issubset(['r', 'd'])), \ + ('Specified factor type is not r (return) or d (diff).') + self.factor_unit = factor_unit + else: + self.factor_unit = {factor: 'r' + for factor in df.columns} + + if factor_weights: + assert(sum(factor_weights.values()) == 1.0), \ + ('Specified weights do not sum to 1.') + self.factor_weights = factor_weights + else: + self.factor_weights = {factor: 1 for factor in self.factors} + + if normalize == 'z-score': + self.df_normalized = self.df_normalized.apply(z_score_normalize) + elif normalize is None: + self.df_normalized = df.copy(deep=True) + else: + self.df_normalized = self.df_normalized.apply(normalize) + + if isinstance(sim_metric, dict): + sim_metric = {factor: (dtw_metric if metric == 'dtw' + else metric) + for factor, metric in sim_metric.items()} + sim_metric = {factor: (dtw_metric if sim_metric.get(factor, 0) == 0 + else sim_metric[factor]) + for factor in factors} + else: + sim_metric = {factor: dtw_metric + for factor in factors} + + search_start = search_start if search_start else df.index[0] + search_start = pd.Timestamp(search_start) + + search_end = search_end if search_end else df.index[-1] + search_end = pd.Timestamp(search_end) + assert(search_start < search_end), \ + 'Search start date is not before end date!' + + print('========================================================') + print('''INPUT PARAMETERS:\n\ + QUERY PERIOD = {0} - {1}\n\ + SEARCHING BETWEEN: {2} - {3}\n\ + FACTORS = {4}\n\ + OVERLAP PCT = {5}\n\ + FACTOR TYPES = {6}\n\ + SIMILARITY METRICS = {7}\n\ + EVERY N: {8}'''.format(start.strftime('%Y/%m/%d'), + end.strftime('%Y/%m/%d'), + search_start.strftime('%Y/%m/%d'), + search_end.strftime('%Y/%m/%d'), + self.factors, + overlap_pct, + {factor: self.factor_unit[factor] + for factor in factors}, + {factor: sim_metric[factor].__name__ + for factor, _ in sim_metric.items()}, + every_n)) + print('========================================================') + + query_length = len(pd.date_range(start, end, freq='B')) + + search_period = self.df_normalized.index + search_period = search_period[search_period >= search_start] + search_period = search_period[search_period <= search_end] + + overlap_date_ranges = self._overlap_daterange(search_period, + query_length, + every_n=every_n) + + query_str = self.query_start.strftime('%Y/%m/%d') + \ + ' - ' + self.query_end.strftime('%Y/%m/%d') + + if query_str not in overlap_date_ranges: + overlap_date_ranges.append(query_str) + + # Create Dict That Contains Normalized Time Series for Query Period + print('Creating query dict...') + query_dict = {} + + query_chunk = self.df_normalized[self.factors].loc[start:end] + query_str = '{} - {}'.format(start.strftime('%Y/%m/%d'), + end.strftime('%Y/%m/%d')) + + for factor in self.factors: + query_dict[factor] = {} + query_dict[factor][query_str] = np.array(query_chunk[factor]) + print('Done.') + + # Compare each recurrence period with query period. + # If user specifies custom function, use custom function, o/w use DTW. + if parallel: + try: + pool = Pool(mp.cpu_count() if n_jobs == -1 else n_jobs) + + # Create Dict That Contains Time Series for All Recurrences + print('Creating recurrences dict...') + + recurr_dict = {factor: {} for factor in self.factors} + inputs = [(factor, + self.df_normalized[factor].copy(deep=True), + overlap_date_ranges) + for factor in self.factors] + + recurr_list = pool.map(self._parallel_recurr_dict_creation, + inputs) + for temp_dict in recurr_list: + factor = list(temp_dict.keys())[0] + recurr_dict[factor] = temp_dict[factor] + + print('Done.') + + # Calcualte scores + print('''Calculating scores b/w query period and \ +recurrence periods...''') + scores = {factor: {} for factor in self.factors} + + inputs = [(np.array(query_dict[factor][query_str]), + np.array(recurr_dict[factor][recurr_daterange]), + factor, + recurr_daterange, + sim_metric[factor]) + for factor in factors + for recurr_daterange in recurr_dict[factor].keys()] + + scores_results = pool.map(recurrence_parallel_score, inputs) + + for result in scores_results: + factor, recurr_date_range = result[0].split('_') + scores[factor][recurr_date_range] = result[1] + pool.close() + except Exception as e: + pool.close() + raise ValueError(e) + else: + # Create Dict That Contains Time Series for All Recurrence Periods + print('Creating recurrences dict...') + recurr_dict = {} + for factor in self.factors: + recurr_dict[factor] = {} + for start_end in overlap_date_ranges: + daterange = self._str_to_daterange(start_end) + daterange = pd.date_range(daterange.start, + daterange.end, + freq='B') + recurr_chunk = self.df_normalized.loc[daterange][factor] + recurr_dict[factor][start_end] = np.array(recurr_chunk) + + print('Done.') + + # Calcualte scores + print('''Calculating scores b/w query period and recurrence +periods...''') + scores = {factor: {} for factor in self.factors} + + for factor in self.factors: + scores[factor] = {} + for recurr_daterange in recurr_dict[factor].keys(): + recurr_series = recurr_dict[factor][recurr_daterange] + query_series = query_dict[factor][query_str] + + score = sim_metric[factor](recurr_series, query_series) + scores[factor][recurr_daterange] = score + + print('Done.') + + # Fill in missing scores with least similar score + print('Filling NaNs w/ min score.') + + for factor in self.factors: + all_scores = [v for _, v in scores[factor].items() + if not math.isnan(v)] + least_similar_score = max(all_scores) if asc else min(all_scores) + + for recurr_daterange, dtw_score in scores[factor].items(): + if math.isnan(scores[factor][recurr_daterange]): + scores[factor][recurr_daterange] = least_similar_score + + print('Done.') + + print('Calculating weights...') + scores_df = pd.DataFrame.from_dict(scores) + scores_df.columns = ['{} Score'.format(col) + for col in scores_df.columns] + + scores_df_weighted = self._calculate_weights(scores_df) + self.all_recurrences = scores_df_weighted + + print('Done.') + print('Removing overlapped dates...') + + # Remove Overlap + recurrences = self._remove_overlap(scores_df_weighted, + sort_by='Final Score', + overlap_pct=overlap_pct, + asc=asc) + self.recurrences = recurrences + + print('Done.') + + print('=============================') + print('DONE WITH FINDING RECURRENCES.') + print('=============================') + + def _calc_significance(self, + n: int, + factor: str, + days_after: int, + recurrences_chg: list): + """ + Function to calculate the signifance of the moves of n recurrences. + Signficance is calculated using a one-sided t-test, where the mean to + compare the recurrence's move is calculated by sampling periods + of the same length as the recurrences and averaging the returns. + + Returns the sampled mean, t-statistic, and p-value. + + :param n: number of recurrences + :param factor: the factor / time series in question + :param days_after: the number of days after the recurrences to + calculate the significance for + :param recurrences_chg: list of cummulative returns / differences + of the n recurrences (length n) + """ + + df = self.df_original.copy(deep=True) + series = df.copy(deep=True)[factor].sort_index() + + query_length = days_after + + overlap_date_ranges = self._overlap_daterange(series.index, + query_length, + every_n=5) + + overlap_date_ranges = [self._str_to_daterange(daterange) + for daterange in overlap_date_ranges] + + all_changes = [] + for date_range in overlap_date_ranges: + sample_series = series.loc[date_range.start:date_range.end] + + if self.factor_unit[factor] == 'r': + diff = cummulative_pct_change(sample_series) + elif self.factor_unit[factor] == 'd': + diff = sample_series[-1] - sample_series[0] + + all_changes.append(diff) + + sim_diff_mean = np.nanmean(all_changes) + recurrences_chg = recurrences_chg[~np.isnan(recurrences_chg)] + t, p_value_sim = stats.ttest_1samp(np.array(recurrences_chg), + sim_diff_mean) + + return sim_diff_mean, t, p_value_sim + + def summary(self, factors: list, n: int = 10, days_before: int = 15, + days_after: int = 252) -> None: + """ + Function to plot the recurrences found on a global view, a comparison + of all the recurrences on the same graph, and a statistic table. + + Returns: None + + :param factors: list of factors to calculate statistics for. + Should be a subset of all the factors. + :param n: the number of most similar recurrences to consider + :param days_before: the number of days before the recurrences to + graph + :param days_after: the number of days after the recurrences to graph + and calculate the significance for + + **Examples** + + >>> GS_Recurr.summary(factors=['NASDAQ']) + """ + + df = self.df_original.copy(deep=True) + + # Plot all recurrences on the same map + print('Looking at {} recurrences.'.format(n)) + print('''Displaying period -{0} days before and +{1} days after \ +recurrences.'''.format(days_before, days_after)) + + fig = make_subplots(rows=1, cols=1, + shared_xaxes=True, + subplot_titles=factors) + + for idx, factor in enumerate(self.factors[:1]): + temp = pd.Series(df[factor]) + temp.index = pd.to_datetime(temp.index) + + fig.add_trace(go.Scatter(x=temp.index, y=np.array(temp), + name=factor), row=idx+1, col=1) + + for recurrence_date in sorted(self.recurrences.index[:n]): + start_str, end_str = recurrence_date.split(' - ', 1) + start = datetime.datetime.strptime(start_str.strip(), + '%Y/%m/%d') + end = datetime.datetime.strptime(end_str.strip(), '%Y/%m/%d') + + # Adding a trace with a fill, setting opacity to 0 + fig.add_trace( + go.Scatter( + x=[start, end, end, start, start], + y=[0, 0, max(temp), max(temp), 0], + fill='toself', + mode='lines', + opacity=0.5, + name=recurrence_date, + fillcolor='salmon', + line_color='salmon', + ), + row=idx+1, col=1 + ) + + # Add Query Period + fig.add_trace( + go.Scatter( + x=[self.query_start, self.query_end, self.query_end, + self.query_start, self.query_start], + y=[0, 0, max(temp), max(temp), 0], + fill='toself', + mode='lines', + opacity=0.5, + name='{} - {} (Query)'.format(self.query_start, + self.query_end), + fillcolor='forestgreen', + line_color='forestgreen', + ), + row=idx+1, col=1 + ) + title = 'Recurrences Overview ({})'.format(factor) + fig.update_layout(title_text=title, + title_x=0.5, + legend_title='Recurrence Dates', + showlegend=False, + margin=dict( + l=20, + r=20, + b=40, + t=40, + pad=4 + ) + ) + fig.show() + + # Plot normalized recurrences on one plot + all_stats = {} + for factor in factors: + stats = {} + + fig = go.Figure() + + ext_date_range = pd.date_range(self.query_start - + BDay(days_before), + self.query_end + + BDay(days_after), freq='B') + + t = np.arange(len(ext_date_range)) - days_before + query_factor_series = df.loc[ext_date_range][factor] + query_factor_series /= query_factor_series[days_before] + + for idx, recurr_date_str in enumerate(self.recurrences.index[:n]): + stats[recurr_date_str] = {} + ext_daterange = self._str_to_daterange(recurr_date_str, + days_before, + days_after) + ext_daterange = pd.date_range(ext_daterange.start, + ext_daterange.end, + freq='B') + + ext_series = df.loc[ext_daterange][factor] + ext_series /= ext_series[days_before] + + daterange = self._str_to_daterange(recurr_date_str) + + during_daterange = pd.date_range(daterange.start, + daterange.end, + freq='B') + recurr_series = df.loc[during_daterange][factor] + + after_date_range = pd.date_range(daterange.end, + daterange.end + + BDay(days_after), + freq='B') + after_series = df.loc[after_date_range][factor] + + # Calculate Stats + if self.factor_unit[factor] == 'd': + change = recurr_series[-1] - recurr_series[0] + after_chg = after_series[-1] - after_series[0] + elif self.factor_unit[factor] == 'r': + change = cummulative_pct_change(recurr_series) + after_chg = cummulative_pct_change(after_series) + else: + raise ValueError('Not r or d.') + + ended_up_during = recurr_series[-1] > recurr_series[0] + ended_up_after = after_series[-1] > after_series[0] + + stats[recurr_date_str]['Chg'] = change + stats[recurr_date_str]['After Chg'] = after_chg + stats[recurr_date_str]['Ended Up During'] = ended_up_during + stats[recurr_date_str]['Ended Up After'] = ended_up_after + + fig.add_trace( + go.Scatter( + x=t, + y=ext_series, + name='{} (#{})'.format(recurr_date_str, idx) + ) + ) + + all_stats[factor] = {} + + ended_up_during = len([1 for k, v in stats.items() + if v['Ended Up During']]) + ended_down_during = n - ended_up_during + + ended_up_after = len([1 for k, v in stats.items() + if v['Ended Up After']]) + ended_down_after = n - ended_up_after + + avg_change = np.nanmean([v['Chg'] for k, v in stats.items()]) + stdev_chg = np.nanstd([v['Chg'] for k, v in stats.items()]) + + avg_change_after = np.nanmean([v['After Chg'] + for k, v in stats.items()]) + stdev_chg_after = np.nanstd([v['After Chg'] + for k, v in stats.items()]) + + avg_change = round(avg_change, 2) + stdev_chg = round(stdev_chg, 2) + avg_change_after = round(avg_change_after, 2) + stdev_change_after = round(stdev_chg_after, 2) + + # Calculate p-value (durig recurrence) + recurrence_chg = np.array([v['Chg'] for k, v in stats.items()]) + query_length = np.busday_count(self.query_start, + self.query_end) + 1 + results_during = self._calc_significance(n, + factor, + query_length, + recurrence_chg) + sim_diff_mean_d, t_stat_d, p_value_sim_d = results_during + sim_diff_mean_d = round(sim_diff_mean_d, 2) + p_value_sim_d = round(p_value_sim_d, 2) + + # Calculate p-value (after recurrence) + recurr_chg_after = np.array([v['After Chg'] + for k, v in stats.items()]) + results_after = self._calc_significance(n, + factor, + days_after, + recurr_chg_after) + sim_diff_mean_a, t_stat_a, p_value_sim_a = results_after + sim_diff_mean_a = round(sim_diff_mean_a, 2) + p_value_sim_a = round(p_value_sim_a, 2) + + # Overall stats + all_stats[factor]['Ended Up/Down (During)'] = (ended_up_during, + ended_down_during) + all_stats[factor]['Average Change (During) (%)'] = avg_change + all_stats[factor]['Stdev Change (During) (%)'] = stdev_chg + all_stats[factor]['p-value (During)'] = p_value_sim_d + all_stats[factor]['Sampled Mean Change (During) (%)'] = \ + sim_diff_mean_d + + all_stats[factor]['Units'] = self.factor_unit[factor] + all_stats[factor]['Ended Up/Down (After)'] = (ended_up_after, + ended_down_after) + + all_stats[factor]['Average Change (After) (%)'] = avg_change_after + all_stats[factor]['Stdev Change (After) (%)'] = stdev_change_after + all_stats[factor]['p-value (After)'] = p_value_sim_a + all_stats[factor]['Sampled Mean Change (After) (%)'] = \ + sim_diff_mean_a + + stats_df = pd.DataFrame.from_dict(all_stats).T + cols = [ + 'Ended Up/Down (During)', + 'Average Change (During) (%)', + 'Stdev Change (During) (%)', + 'p-value (During)', + 'Sampled Mean Change (During) (%)', + 'Units', + 'Ended Up/Down (After)', + 'Average Change (After) (%)', + 'Stdev Change (After) (%)', + 'p-value (After)', + 'Sampled Mean Change (After) (%)' + ] + stats_df = stats_df[cols] + display(stats_df) + + # Add query time series + fig.add_trace(go.Scatter( + x=t, + y=query_factor_series, + name='{} - {} (Query)'.format(self.query_start, + self.query_end), + line=dict(color='black')) + ) + + # Add black line to denote start/end of recurrences + fig.update_layout( + shapes=[dict( + type='line', + yref='paper', + y0=0, + y1=1, + xref='x', + x0=len(pd.date_range(self.query_start, + self.query_end, freq='B')), + x1=len(pd.date_range(self.query_start, + self.query_end, freq='B')) + ), + + dict( + type='line', + yref='paper', y0=0, y1=1, + xref='x', x0=0, x1=0 + ) + ], + margin=dict( + l=20, + r=20, + b=40, + t=40, + pad=4 + ) + ) + + # Add Title, Y-Axis, X_Axis + fig.update_layout( + title='Recurrence Comparison Graph for {}'.format(factor), + title_x=0.5, + xaxis_title='Time (days)', + yaxis_title='Normalized Time Series', + font=dict( + family='Courier New, monospace', + size=12, + color='#7f7f7f' + ), + xaxis=dict(tickmode='linear', + tick0=-days_before, + dtick=10) + ) + + fig.show() + print('==========================================================') + + +class GS_RecurrenceUI: + """ + The GUI for GS_Recurrence. Uses Jupyter Widgets and is only useable + in Jupyter Notebooks / Jupyter Lab. + + :param df: the original Pandas Dataframe to feed to the model. Requires + index as dates and columns as time series. Only supports daily + data. + :param factor_unit: dict with factor name as keys and resepctive unit as + values; 'r' (return) or 'd' (difference) + :param sim_metric: default metric is 'dtw', but can pass in your own + metric that compares two time series. Can specify + a dict with factor name as keys and callable as values. + :param normalize: normalization function that is applied to the entire + time series; default is z-score, but can pass in + own function or choose None + :param factor_weights: factors weights to weigh the factors; should + sum to 1; higher value implies more important + :param parallel: run in parallel + :param n_jobs: number of cores to use; -1 specifies all cores + :param every_n: every n overlapping date ranges is selected (instead + of every overlapping period). Helps reduce + computational time. + + **Examples** + >>> def custom_metric_pct_chg(a, b): + pct_chg_a = (a[-1] - a[0])/a[0] * 100 + pct_chg_b = (b[-1] - b[0])/b[0] * 100 + + feature_vector_a = np.array([pct_chg_a]) + feature_vector_b = np.array([pct_chg_b]) + + score = np.linalg.norm(feature_vector_a - feature_vector_b) + if score == np.inf or score == -np.inf: + return np.nan + return score + + >>> factor_units = {'NASDAQ': 'r', '10Y2Y': 'd'} + >>> similarity_dict = {'NASDAQ': custom_metric_pct_chg, + '10Y2Y': 'DTW'} + + >>> GS_RecurrenceUI(df=df, + factor_unit=factor_units, + sim_metric=similarity_dict) + """ + + def __init__(self, + df: pd.DataFrame, + factor_unit: dict, + sim_metric: Union[str, Callable]='dtw', + normalize: Union[str, Callable]='z-score', + factor_weights: Optional[dict] = None, + parallel: bool = False, + n_jobs: int = 1, + every_n: int = 1) -> None: + + df.index = pd.to_datetime(df.index) + self.GSRecurrence = None + self.df = df.sort_index() + self.sim_metric = sim_metric + self.normalize = normalize + self.factor_weights = factor_weights + self.parallel = parallel + self.n_jobs = n_jobs + self.every_n = every_n + + self.factor_unit = factor_unit + + # INPUT PARAMTERS FOR GSRecurrence + self.out = widgets.Output(layout={'border': '1px solid black'}) + + self.start_date = \ + widgets.DatePicker(value=datetime.date(2020, 1, 1), + description='Query Start ', + style=dict(description_width='initial')) + self.end_date = \ + widgets.DatePicker(value=df.index[-1], + description='Query End ', + style=dict(description_width='initial')) + + self.search_start = \ + widgets.DatePicker(value=df.index[0], + description='Search Start', + style=dict(description_width='initial')) + self.search_end = \ + widgets.DatePicker(value=df.index[-1], + description='Search End', + style=dict(description_width='initial')) + + self.factors = widgets.SelectMultiple( + options=list(df.columns), + value=[df.columns[0]], + description='Factors', + disabled=False, + rows=3 + ) + + self.overlap_pct = widgets.FloatSlider( + value=0.2, + min=0, + max=1, + description='Overlap %:', + continuous_update=False, + style=dict(description_width='initial') + ) + + self.button = widgets.Button(description='Calculate Recurrences!', + layout=widgets.Layout(width='auto')) + self.param_box = widgets.HBox([widgets.VBox([self.start_date, + self.search_start]), + widgets.VBox([self.end_date, + self.search_end]), + self.factors, + self.overlap_pct, + self.button]) + + self.button.on_click(self.calculate_recurrences) + + # INPUT PARAMTERS FOR Visualization + self.days_before = widgets.IntSlider( + value=-15, + min=-100, + max=-15, + step=5, + description='Days Before:', + continuous_update=False + ) + + self.days_after = widgets.IntSlider( + value=30, + min=0, + max=500, + step=5, + description='Days After:', + continuous_update=False + ) + + self.normalize_slide = widgets.IntSlider( + value=0, + min=0, + max=100, + step=1, + description='Normalize: ', + continuous_update=False + ) + + self.normalize_check = widgets.Checkbox( + value=True, + description='', + disabled=False, + indent=False, + layout=widgets.Layout(width='12px') + ) + + self.num_recurrences = widgets.IntSlider( + value=10, + min=1, + max=100, + step=1, + description='# Recurrences:', + continuous_update=False + ) + + self.factor = widgets.Dropdown( + description='Factor: ', + value=self.df.columns[0], + options=self.df.columns + ) + + self.graphs = go.FigureWidget() + self.graphs.update_layout(title_text='Recurrences Comparison (__)', + title_x=0.5) + + self.recurr_graph = go.FigureWidget() + self.recurr_graph.update_layout( + title_text='Recurrences Overview (__)', title_x=0.5) + + self.update_stats = widgets.Button(description='Refresh Stats Table', + layout=widgets.Layout(width='auto')) + + self.stats_table = go.FigureWidget(make_subplots( + rows=1, cols=1, + shared_xaxes=True, + vertical_spacing=0.03, + specs=[[{'type': 'table'}]])) + self.stats_table.update_layout(title_text='Stats Table', title_x=0.5) + + self.panel = widgets.VBox([widgets.HBox([self.days_before, + self.days_after, + self.num_recurrences, + self.factor, + self.update_stats]), + widgets.HBox([self.normalize_check, + self.normalize_slide]), + self.graphs, + self.recurr_graph, + self.stats_table, + self.out]) + + self.days_before.observe(self.update_graphs, names='value') + self.days_after.observe(self.update_graphs, names='value') + self.normalize_slide.observe(self.update_graphs, names='value') + self.normalize_check.observe(self.update_graphs, names='value') + self.factor.observe(self.update_graphs, names='value') + self.factor.observe(self.update_recurr_graph, names='value') + + self.num_recurrences.observe(self.update_graphs, names='value') + self.num_recurrences.observe(self.update_recurr_graph, names='value') + + self.update_stats.on_click(self.update_stats_table) + + def update_recurr_graph(self, change): + """ + Function that is called when a change is detected. Updates the graph + with all the recurrences plotted against each other. + """ + with self.out: + GS_Recurr = self.GSRecurrence + recurrences = GS_Recurr.recurrences + + n = self.num_recurrences.value + factor = self.factor.value + + # PLot all recurrences on same graph + new_traces = [] + + series = GS_Recurr.df_normalized[factor] + min_series, max_series = min(series), max(series) + new_traces.append(go.Scatter(x=series.index, + y=np.array(series), + name=factor + ) + ) + + for recurrence_date in sorted(GS_Recurr.recurrences.index[:n]): + start_str, end_str = recurrence_date.split(' - ', 1) + start_ = datetime.datetime.strptime(start_str.strip(), + '%Y/%m/%d') + end_ = datetime.datetime.strptime(end_str.strip(), '%Y/%m/%d') + + new_traces.append( + go.Scatter( + x=[start_, end_, end_, start_, start_], + y=[min_series, min_series, max_series, + max_series, min_series], + fill='toself', + mode='lines', + opacity=0.5, + name=recurrence_date, + fillcolor='salmon', + line_color='salmon', + ) + ) + + # Add Query Period + query_start = pd.Timestamp(GS_Recurr.query_start) + query_end = pd.Timestamp(GS_Recurr.query_end) + new_traces.append( + go.Scatter( + x=[query_start, query_end, query_end, + query_start, query_start], + y=[min_series, min_series, max_series, + max_series, min_series], + fill='toself', + mode='lines', + opacity=0.5, + name='{} - {} (Query)'.format(query_start, query_end), + fillcolor='forestgreen', + line_color='forestgreen', + ) + ) + + with self.recurr_graph.batch_update(): + self.recurr_graph.data = [] + self.recurr_graph.add_traces(new_traces) + + self.recurr_graph.update_layout( + title_text='Recurrences Overview ({})'.format(factor), + title_x=0.5, + legend_title='Recurrence Dates', + showlegend=False, + xaxis_title='Date (Days)', + yaxis_title='{}'.format(factor), + + margin=dict( + l=20, + r=20, + b=40, + t=40, + pad=4 + ) + ) + + def calculate_recurrences(self, change): + """ + Function that is called when the Calculate Recurrences button is + clicked. Calls GS_Recurrence and calculates a new set of recurrences. + """ + + with self.out: + factors = self.factors.value + n = self.num_recurrences.value + days_after = self.days_after.value + days_before = np.abs(self.days_before.value) + normalize_day = self.normalize_slide.value + start = self.start_date.value + end = self.end_date.value + pct_overlap = self.overlap_pct.value + factor_unit = self.factor_unit + parallel = self.parallel + n_jobs = self.n_jobs + + # Clear outputs and graphs + self.out.clear_output() + self.graphs.data = [] + self.recurr_graph.data = [] + self.stats_table.data = [] + + title = 'Recurrences Overview (__)' + self.recurr_graph.update_layout(title_text=title, + title_x=0.5) + self.graphs.update_layout(title_text='Recurrences Comparison (__)', + title_x=0.5) + + GS_Recurr = GS_Recurrence(self.df) + GS_Recurr.find_recurrences(factors=self.factors.value, + start=start, + end=end, + factor_unit=factor_unit, + factor_weights=self.factor_weights, + overlap_pct=pct_overlap, + parallel=parallel, + n_jobs=n_jobs, + sim_metric=self.sim_metric, + normalize=self.normalize, + every_n=self.every_n, + search_start=self.search_start.value, + search_end=self.search_end.value) + recurrences = GS_Recurr.recurrences + display(recurrences.head(n)) + + self.GSRecurrence = GS_Recurr + self.normalize_slide.max = len(pd.date_range(GS_Recurr.query_start, + GS_Recurr.query_end, + freq='B')) + self.num_recurrences.max = GS_Recurr.recurrences.shape[0] + + self.factor.value = factors[0] + + # Plot recurrences and update stats table + self.update_graphs(change) + self.update_recurr_graph(change) + self.update_stats_table(change) + + def update_graphs(self, change): + """ + Function that is called when a change is detected. Updates the graphs. + """ + + GS_Recurr = self.GSRecurrence + n = self.num_recurrences.value + factor = self.factor.value + days_after = self.days_after.value + days_before = np.abs(self.days_before.value) + normalize_day = self.normalize_slide.value + recurrences = GS_Recurr.recurrences + start = self.start_date.value + end = self.end_date.value + + with self.out: + temp_df = GS_Recurr.df_original[factor] + date_range = pd.date_range(start - BDay(days_before), + end + BDay(days_after), + freq='B') + query_series = temp_df.loc[date_range] + if self.normalize_check.value: + query_series /= query_series[normalize_day + + days_before] + + t = np.arange(len(query_series)) - days_before + + new_traces = [] + for idx, recurr_date_str in enumerate(recurrences.index[:n]): + daterange = GS_Recurr._str_to_daterange(recurr_date_str, + days_before, + days_after) + daterange = pd.date_range(daterange.start, + daterange.end, + freq='B') + recurr_series = temp_df.loc[daterange] + + assert(recurr_series[days_before] == + recurr_series[daterange[days_before]]) + + normalized_series = recurr_series + if self.normalize_check.value: + normalized_series = recurr_series / \ + recurr_series[normalize_day + + days_before] + + # Add recurrence to graph + new_traces.append(go.Scatter( + x=t, + y=normalized_series, + name='{} (#{})'.format(recurr_date_str, idx+1)) + ) + + new_traces.append( + go.Scatter( + x=t, + y=query_series, + name='{} - {} (Query)'.format(GS_Recurr.query_start, + GS_Recurr.query_end), + line=dict(color='black')) + ) + + with self.graphs.batch_update(): + self.graphs.data = [] + self.graphs.add_traces(new_traces) + + # Add black line to denote start/end of recurrences + self.graphs.update_layout( + title_text='Recurrences Comparison ({})'.format(factor), + title_x=0.5, + + xaxis_title='Date (Days)', + yaxis_title='Normalized Time Series', + legend_title='Recurrence Dates', + showlegend=True, + shapes=[dict( + type='line', + xref='x', + x0=len(pd.date_range(GS_Recurr.query_start, + GS_Recurr.query_end, + freq='B'))-1, + x1=len(pd.date_range(GS_Recurr.query_start, + GS_Recurr.query_end, + freq='B'))-1, + yref='paper', y0=0, y1=1 + ), + + # Line that goes through origin + dict( + type='line', + yref='paper', y0=0, y1=1, + xref='x', x0=0, x1=0 + ), + + # Shaded line to denote normalized date + dict( + type='line', + xref='x', + x0=normalize_day, + x1=normalize_day, + yref='paper', y0=0, y1=1, + line=dict( + dash='dash' + ) + ) + ], + margin=dict( + l=20, + r=20, + b=40, + t=40, + pad=4 + ) + ) + + def update_stats_table(self, change): + """ + Function that is called when the Refresh Stats button is clicked. + Refreshes the statistics table using the new paramters. + """ + GS_Recurr = self.GSRecurrence + start = self.start_date.value + end = self.end_date.value + days_before = np.abs(self.days_before.value) + days_after = self.days_after.value + n = self.num_recurrences.value + recurrs = GS_Recurr.recurrences + factor_unit = GS_Recurr.factor_unit + + query_start = pd.Timestamp(GS_Recurr.query_start) + query_end = pd.Timestamp(GS_Recurr.query_end) + + with self.out: + print('''Updating stats table... N={0}, days_before={1}, \ +days_after={2}'''.format(n, days_before, days_after)) + + all_stats = {} + for factor in self.df.columns: + stats = {} + + date_range = pd.date_range(start - + BDay(days_before), + end + + BDay(days_after), freq='B') + + for idx, recurr_date_str in enumerate(recurrs.index[:n]): + stats[recurr_date_str] = {} + daterange = GS_Recurr._str_to_daterange(recurr_date_str) + during_daterange = pd.date_range(daterange.start, + daterange.end, + freq='B') + + recurr_series = self.df.loc[during_daterange][factor] + + after_date_range = pd.date_range(daterange.end, + daterange.end + + BDay(days_after), + freq='B') + after_series = self.df.loc[after_date_range][factor] + + # Calculate Stats + if factor_unit[factor] == 'd': + change = recurr_series[-1] - recurr_series[0] + after_chg = after_series[-1] - after_series[0] + elif factor_unit[factor] == 'r': + change = cummulative_pct_change(recurr_series) + after_chg = cummulative_pct_change(after_series) + else: + raise ValueError('Not r or d.') + + ended_up_during = recurr_series[-1] > recurr_series[0] + ended_up_after = after_series[-1] > after_series[0] + + stats[recurr_date_str]['Chg'] = change + stats[recurr_date_str]['After Chg'] = after_chg + stats[recurr_date_str]['Ended Up During'] = ended_up_during + stats[recurr_date_str]['Ended Up After'] = ended_up_after + + all_stats[factor] = {} + + ended_up_during = len([1 for k, v in stats.items() + if v['Ended Up During']]) + ended_down_during = n - ended_up_during + + ended_up_after = len([1 for k, v in stats.items() + if v['Ended Up After']]) + ended_down_after = n - ended_up_after + + avg_change = np.nanmean([v['Chg'] for k, v in stats.items()]) + stdev_chg = np.nanstd([v['Chg'] for k, v in stats.items()]) + + avg_change_after = np.nanmean([v['After Chg'] + for k, v in stats.items()]) + stdev_chg_after = np.nanstd([v['After Chg'] + for k, v in stats.items()]) + + avg_change = round(avg_change, 2) + stdev_chg = round(stdev_chg, 2) + avg_change_after = round(avg_change_after, 2) + stdev_change_after = round(stdev_chg_after, 2) + + # Calculate p-value (durig recurrence) + recurrence_chg = np.array([v['Chg'] + for k, v in stats.items()]) + query_length = np.busday_count(query_start.date(), + query_end.date()) + 1 + results_during = GS_Recurr._calc_significance(n, + factor, + query_length, + recurrence_chg) + sim_diff_mean_d, t_stat_d, p_value_sim_d = results_during + sim_diff_mean_d = round(sim_diff_mean_d, 2) + p_value_sim_d = round(p_value_sim_d, 2) + + # Calculate p-value (after recurrence) + recurr_chg_after = np.array([v['After Chg'] + for k, v in stats.items()]) + results_after = GS_Recurr._calc_significance(n, + factor, + days_after, + recurr_chg_after) + sim_diff_mean_a, t_stat_a, p_value_sim_a = results_after + sim_diff_mean_a = round(sim_diff_mean_a, 2) + p_value_sim_a = round(p_value_sim_a, 2) + + all_stats[factor]['Ended Up/Down (During)'] = \ + (ended_up_during, ended_down_during) + all_stats[factor]['Average Change (During) (%)'] = avg_change + all_stats[factor]['Stdev Change (During) (%)'] = stdev_chg + all_stats[factor]['p-value (During)'] = p_value_sim_d + all_stats[factor]['Sampled Mean Change (During) (%)'] = \ + sim_diff_mean_d + + all_stats[factor]['Units'] = factor_unit[factor] + all_stats[factor]['Ended Up/Down (After)'] = (ended_up_after, + ended_down_after) + + all_stats[factor]['Average Change (After) (%)'] = \ + avg_change_after + all_stats[factor]['Stdev Change (After) (%)'] = \ + stdev_change_after + all_stats[factor]['p-value (After)'] = p_value_sim_a + all_stats[factor]['Sampled Mean Change (After) (%)'] = \ + sim_diff_mean_a + + stats_df = pd.DataFrame.from_dict(all_stats).T + display(stats_df) + + with self.stats_table.batch_update(): + self.stats_table.add_trace( + go.Table( + header=dict( + values=['Factor'] + list(stats_df.columns), + font=dict(size=10), + align='center' + ), + cells=dict( + values=[list(self.df.columns)] + + [stats_df[k].tolist() + for k in stats_df.columns], + align='center') + ), + row=1, col=1 + ) + title = 'Stats Table (N={}, {} Days After)'.format(n, + days_after) + self.stats_table.update_layout(title_text=title, + title_x=0.5) + print('Done.') + + def _ipython_display_(self): + display(self.param_box) + display(self.panel) diff --git a/setup.py b/setup.py index 6da25da6..3a10985a 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ "cachetools", "configparser", "dataclasses;python_version<'3.7'", + "dtaidistance", "funcsigs", "future", "inflection", @@ -65,6 +66,7 @@ "msgpack", "nest-asyncio", "pandas<1.1", + "plotly>=4.6.0", "python-dateutil>=2.7.0", "requests", "scipy>=1.2.0", @@ -78,7 +80,8 @@ ], extras_require={ "internal": ["gs_quant_internal>=0.6.6", "requests_kerberos"], - "notebook": ["jupyter", "matplotlib~=2.1.0", "pprint", "seaborn"], + "notebook": ["jupyter", "matplotlib~=2.1.0", "pprint", "seaborn", + "ipywidgets"], "test": ["pytest", "pytest-cov", "pytest-mock", "testfixtures", "nbconvert", "nbformat", "jupyter_client"], "develop": ["wheel", "sphinx", "sphinx_rtd_theme", "sphinx_autodoc_typehints", "pytest", "pytest-cov", "pytest-mock", "testfixtures"]