Source code for ml_investment.features

import copy
import numpy as np
import pandas as pd

from multiprocessing import Pool, cpu_count
from tqdm import tqdm
from typing import Union, List, Dict, Callable
from .utils import int_hash_of_str, get_quarter_idx

np.seterr(divide='ignore', invalid='ignore')


def calc_series_stats(series: Union[List[float], np.array],
                      stats: Dict[str, Callable]={'mean': np.mean,
                                                  'median': np.median,
                                                  'max': np.max,
                                                  'min': np.min,
                                                  'std': np.std},
                      name_prefix: str='',
                      norm: bool=False) -> Dict[str, float]:
    '''
    Calculate base statistics on series
            
    Parameters
    ----------
    series:
        series by which statistics are calculated
    name_prefix:
        string prefix of returned features
    norm:
        normilize resulted statistics to first element or not
        
    Returns
    -------
        Dict with calculated features 
    '''
    series = np.array(series).astype('float')
    series = series[~np.isnan(series)] 
    series = list(series)
    if len(series) == 0:
        series = np.array([np.nan])
        
    result = {'{}_{}'.format(name_prefix, key): stats[key](series) 
              for key in stats}
    
    if norm:
        result = {key: result[key] / np.abs(series[0]) for key in result}
    
    return result
    
              
                
[docs]class QuarterlyFeatures: ''' Feature calculator for qaurtrly-based statistics. Return features for company quarter slices. ''' def __init__(self, data_key: str, columns: List[str], quarter_counts: List[int]=[2, 4, 10], max_back_quarter: int=10, min_back_quarter: int=0, stats: Dict[str, Callable]={'mean': np.mean, 'median': np.median, 'max': np.max, 'min': np.min, 'std': np.std}, calc_stats_on_diffs: bool=True, data_preprocessing: Callable=None, n_jobs: int=cpu_count(), verbose: bool=False): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.QuarterlyFeatures.calculate` columns: column names for feature calculation(like revenue, debt etc) quarter_counts: list of number of quarters for statistics calculation. e.g. if ``quarter_counts = [2]`` than statistics will be calculated on current and previous quarter max_back_quarter: max bound of company slices in time. If ``max_back_quarter = 1`` than features will be calculated for only current company quarter. If max_back_quarter is larger than total number of quarters for company than features will be calculated for all quarters min_back_quarter: min bound of company slices in time. If ``min_back_quarter = 0`` (default) than features will be calculated for all quarters. If ``min_back_quarter = 2`` than current and previous quarter slices will not be used for feature calculation stats: aggregation functions for features calculation. Should be as ``Dict[str, Callable]``. Keys of this dict will be used as features names prefixes. Values of this dict should implement ``foo(x:List) -> float`` interface calc_stats_on_diffs: calculate statistics on series diffs( ``np.diff(series)`` ) or not data_preprocessing: function implemening ``foo(x) -> x_`` interface. It will be used before feature calculation. n_jobs: number of threads for calculation verbose: show progress or not ''' self.data_key = data_key self.columns = columns self.quarter_counts = quarter_counts self.max_back_quarter = max_back_quarter self.min_back_quarter = min_back_quarter self.stats = stats self.calc_stats_on_diffs = calc_stats_on_diffs self.data_preprocessing = data_preprocessing self.n_jobs = n_jobs self.verbose = verbose self._data_loader = None def _calc_series_feats(self, data: pd.DataFrame, str_prefix: str='') -> Dict[str, float]: result = {} for quarter_cnt in self.quarter_counts: for col in self.columns: series = data[col].values[:quarter_cnt][::-1].astype('float') name_prefix = 'quarter{}_{}'.format(quarter_cnt, col) feats = calc_series_stats(series=series, stats=self.stats, name_prefix=name_prefix) result.update(feats) if self.calc_stats_on_diffs: diff_feats = calc_series_stats(series=np.diff(series), stats=self.stats, name_prefix='{}_diff'\ .format(name_prefix)) result.update(diff_feats) return result def _single_ticker(self, ticker:str) -> List[Dict[str, float]]: result = [] quarterly_data = self._data_loader.load([ticker]) if quarterly_data is None: return result if self.data_preprocessing is not None: quarterly_data = self.data_preprocessing(quarterly_data) max_back_quarter = min(self.max_back_quarter, len(quarterly_data) - 1) min_back_quarter = min(self.min_back_quarter, len(quarterly_data) - 1) assert min_back_quarter <= max_back_quarter for back_quarter in range(min_back_quarter, max_back_quarter): curr_data = quarterly_data[back_quarter:] feats = { 'ticker': ticker, 'date': curr_data['date'].values[0], } series_feats = self._calc_series_feats(curr_data) feats.update(series_feats) result.append(feats) return result
[docs] def calculate(self, data: Dict, index: List[str]) -> pd.DataFrame: ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.features.QuarterlyFeatures.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: list of tickers to calculate features for, i.e. ``['AAPL', 'TSLA']`` Returns ------- ``pd.DataFrame`` resulted features with index ``['ticker', 'date']``. Each row contains features for ``ticker`` company at ``date`` quarter ''' if self.verbose: print("Quarterly features calculation") self._data_loader = data[self.data_key] with Pool(self.n_jobs) as p: X = [] for ticker_feats_arr in tqdm(p.imap(self._single_ticker, index), disable=not self.verbose): X.extend(ticker_feats_arr) X = pd.DataFrame(X).set_index(['ticker', 'date']) return X
[docs]class QuarterlyDiffFeatures: ''' Feature calculator for qaurtr-to-another-quarter company indicators(revenue, debt etc) progress evaluation. Return features for company quarter slices. ''' def __init__(self, data_key:str, columns: List[str], compare_quarter_idxs: List[int]=[1, 4], max_back_quarter: int=10, min_back_quarter: int=0, norm: bool=True, data_preprocessing: Callable=None, n_jobs: int=cpu_count(), verbose: bool=False): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.QuarterlyDiffFeatures.calculate` columns: column names for feature calculation(like revenue, debt etc) compare_quarter_idxs: list of back quarter idxs for progress calculation. e.g. if ``compare_quarter_idxs = [1]`` than current quarter will be compared with previous quarter. If ``compare_quarter_idxs = [4]`` than current quarter will be compared with previous year quarter. max_back_quarter: max bound of company slices in time. If ``max_back_quarter = 1`` than features will be calculated for only current company quarter. If max_back_quarter is larger than total number of quarters for company than features will be calculated for all quarters min_back_quarter: min bound of company slices in time. If ``min_back_quarter = 0`` (default) than features will be calculated for all quarters. If ``min_back_quarter = 2`` than current and previous quarter slices will not be used for feature calculation norm: normalize to compare quarter or not data_preprocessing: function implemening ``foo(x) -> x_`` interface. It will be used before feature calculation. n_jobs: number of threads for calculation verbose: show progress or not ''' self.data_key = data_key self.columns = columns self.compare_quarter_idxs = compare_quarter_idxs self.max_back_quarter = max_back_quarter self.min_back_quarter = min_back_quarter self.norm = norm self.data_preprocessing=data_preprocessing self.n_jobs = n_jobs self.verbose = verbose self._data_loader = None def _calc_diff_feats(self, data: pd.DataFrame) -> Dict[str, float]: result = {} curr_quarter = np.array([data[col].values[0] for col in self.columns], dtype='float') for quarter_idx in self.compare_quarter_idxs: if len(data) >= quarter_idx + 1: compare_quarter = np.array([data[col].values[quarter_idx] for col in self.columns], dtype='float') else: compare_quarter = np.array([np.nan for col in self.columns], dtype='float') curr_feats = curr_quarter - compare_quarter if self.norm: curr_feats = curr_feats / compare_quarter curr_feats = {'compare{}_{}'.format(quarter_idx, col):val for col, val in zip(self.columns, curr_feats)} result.update(curr_feats) return result def _single_ticker(self, ticker: str) -> List[Dict[str, float]]: result = [] quarterly_data = self._data_loader.load([ticker]) if quarterly_data is None: return result if self.data_preprocessing is not None: quarterly_data = self.data_preprocessing(quarterly_data) max_back_quarter = min(self.max_back_quarter, len(quarterly_data) - 1) min_back_quarter = min(self.min_back_quarter, len(quarterly_data) - 1) assert min_back_quarter <= max_back_quarter for back_quarter in range(min_back_quarter, max_back_quarter): curr_data = quarterly_data[back_quarter:] feats = { 'ticker': ticker, 'date': curr_data['date'].values[0], } diff_feats = self._calc_diff_feats(curr_data) feats.update(diff_feats) result.append(feats) return result
[docs] def calculate(self, data: Dict, index: List[str]) -> pd.DataFrame: ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.features.QuarterlyDiffFeatures.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: list of tickers to calculate features for, i.e. ``['AAPL', 'TSLA']`` Returns ------- ``pd.DataFrame`` resulted features with index ``['ticker', 'date']``. Each row contains features for ``ticker`` company at ``date`` quarter ''' if self.verbose: print("Quarterly diff features calculation") self._data_loader = data[self.data_key] with Pool(self.n_jobs) as p: X = [] for ticker_feats_arr in tqdm(p.imap(self._single_ticker, index), disable=not self.verbose): X.extend(ticker_feats_arr) X = pd.DataFrame(X).set_index(['ticker', 'date']) return X
class HashingEncoder: def transform(self, vals): result = [int_hash_of_str(str(x)) for x in vals] return result
[docs]class BaseCompanyFeatures: ''' Feature calculator for getting base company information(sector, industry etc). Encode categorical columns via hashing label encoding. Return features for current company state. ''' def __init__(self, data_key:str, cat_columns:List[str], verbose: bool=False): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.BaseCompanyFeatures.calculate` cat_columns: column names of categorical features for encoding verbose: show progress or not ''' self.data_key = data_key self.cat_columns = cat_columns self.verbose = verbose self.he = HashingEncoder()
[docs] def calculate(self, data: Dict, index: List[str]) -> pd.DataFrame: ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.features.BaseCompanyFeatures.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: list of tickers to calculate features for, i.e. ``['AAPL', 'TSLA']`` Returns ------- ``pd.DataFrame`` resulted features with index ``['ticker']``. Each row contains features for ``ticker`` company ''' if self.verbose: print("Base features calculation") base_df = data[self.data_key].load(index) for col in self.cat_columns: base_df[col] = base_df[col].fillna('None') base_df[col] = self.he.transform(base_df[col]) result = pd.DataFrame() result['ticker'] = index result = pd.merge(result, base_df[['ticker'] + self.cat_columns], on='ticker', how='left') result = result.set_index(['ticker']) return result
[docs]class DailyAggQuarterFeatures: ''' Feature calculator for daily-based statistics for quarter slices. Return features for company quarter slices. ''' def __init__(self, daily_data_key: str, quarterly_data_key: str, columns: List[str], agg_day_counts: List[Union[int, np.timedelta64]] = [100, 200], max_back_quarter: int=10, min_back_quarter: int=0, daily_index=None, stats: Dict[str, Callable]={'mean': np.mean, 'median': np.median, 'max': np.max, 'min': np.min, 'std': np.std}, norm: bool=True, n_jobs: int=cpu_count(), verbose: bool=False): ''' Parameters ---------- daily_data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.DailyAggQuarterFeatures.calculate` for daily data loading quarterly_data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.DailyAggQuarterFeatures.calculate` for quarterly data loading columns: column names for feature calculation(like marketcap, pe) agg_day_counts: list of days counts to calculate statistics on. e.g. if ``agg_day_counts = [100, 200]`` statistics will be calculated based on last 100 and 200 days(separetly). max_back_quarter: max bound of company slices in time. If ``max_back_quarter = 1`` than features will be calculated for only current company quarter. If max_back_quarter is larger than total number of quarters for company than features will be calculated for all quarters min_back_quarter: min bound of company slices in time. If ``min_back_quarter = 0`` (default) than features will be calculated for all quarters. If ``min_back_quarter = 2`` than current and previous quarter slices will not be used for feature calculation daily_index: indexes for ``data[daily_data_key]`` dataloader. If ``None`` than index will be the same as for ``data[quarterly]``. I.e. if you want to use this class for calculating commodities features, ``daily_index`` may be list of interesting commodities codes. If you want want to use it i.e. for calculating daily price features, ``daily_index`` should be ``None`` stats: aggregation functions for features calculation. Should be as ``Dict[str, Callable]``. Keys of this dict will be used as features names prefixes. Values of this dict should implement ``foo(x:List) -> float`` interface norm: normalize daily stats or not n_jobs: number of threads for calculation verbose: show progress or not ''' self.daily_data_key = daily_data_key self.quarterly_data_key = quarterly_data_key self.columns = columns self.agg_day_counts = agg_day_counts self.max_back_quarter = max_back_quarter self.min_back_quarter = min_back_quarter self.daily_index = daily_index self.stats = stats self.norm = True self.n_jobs = n_jobs self.verbose = verbose self._daily_data_loader = None self._quarterly_data_loader = None def _calc_series_feats(self, data: pd.DataFrame, str_prefix: str='') -> Dict[str, float]: result = {} if len(data) == 0: return result for day_cnt in self.agg_day_counts: if type(day_cnt) == int: curr_data = data[:day_cnt] elif type(day_cnt) == np.timedelta64: daily_dates = data['date'].values curr_data = data[daily_dates > daily_dates[0] - day_cnt] for col in self.columns: series = curr_data[col].values[::-1].astype('float') name_prefix = '{}_days{}_{}'.format(str_prefix, str(day_cnt), col) feats = calc_series_stats(series=series, stats=self.stats, name_prefix=name_prefix, norm=self.norm) result.update(feats) return result def _single_ticker(self, ticker: str) -> List[Dict[str, float]]: result = [] quarterly_data = self._quarterly_data_loader.load([ticker]) if quarterly_data is None: return result daily_data = copy.deepcopy(self.daily_data) if self.daily_index is None: daily_data[''] = self._daily_data_loader.load([ticker]) max_back_quarter = min(self.max_back_quarter, len(quarterly_data) - 1) min_back_quarter = min(self.min_back_quarter, len(quarterly_data) - 1) assert min_back_quarter <= max_back_quarter for back_quarter in range(min_back_quarter, max_back_quarter): curr_data = quarterly_data[back_quarter:] curr_date = np.datetime64(curr_data['date'].values[0]) feats = {} feats['ticker'] = ticker feats['date'] = curr_date for idx in daily_data.keys(): if daily_data[idx] is not None: daily_dates = daily_data[idx]['date'].values else: continue curr_daily_data = daily_data[idx][daily_dates < curr_date] daily_feats = self._calc_series_feats(curr_daily_data, idx) feats.update(daily_feats) result.append(feats) return result
[docs] def calculate(self, data: Dict, index: List[str]) -> pd.DataFrame: ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having fields named as values in ``daily_data_key`` and ``quarterly_data_key`` params of :func:`~ml_investment.features.DailyAggQuarterFeatures.__init__` This fields should contain classes implementing ``load(index) -> pd.DataFrame`` interfaces index: list of tickers to calculate features for, i.e. ``['AAPL', 'TSLA']`` Returns ------- ``pd.DataFrame`` resulted features with index ``['ticker', 'date']``. Each row contains features for ``ticker`` company at ``date`` quarter ''' if self.verbose: print("Daily agg quarter features calculation") self._daily_data_loader = data[self.daily_data_key] self._quarterly_data_loader = data[self.quarterly_data_key] self.daily_data = {} if self.daily_index is not None: for idx in self.daily_index: self.daily_data[idx] = self._daily_data_loader.load([idx]) with Pool(self.n_jobs) as p: X = [] for ticker_feats_arr in tqdm(p.imap(self._single_ticker, index), disable=not self.verbose): X.extend(ticker_feats_arr) X = pd.DataFrame(X).set_index(['ticker', 'date']) return X
[docs]class RelativeGroupFeatures: ''' Feature calculator for features relative to some group median. I.e. calculate revenue growth relative to median in sector/industry. ''' def __init__(self, feature_calculator, group_data_key: str, group_col: str, relation_foo = lambda x, y: x - y, keep_group_feats=False, verbose: bool=False): ''' Parameters ---------- feature_calculator: key of dataloader in ``data`` argument during :func:`~ml_investment.features.DailyAggQuarterFeatures.calculate` for daily data loading group_data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.features.RelativeGroupFeatures.calculate` for loading data having ``group_col`` group_col: column name for groups in which median values will be calculated relation_foo: function implementing ``foo(x, y) -> z`` interface. E.g. if foo = lambda x: x - y, than resulted features will be calculated as difference between current company features and group median features. keep_group_feats: return group median features or not verbose: show progress or not ''' self.feature_calculator = feature_calculator self.group_data_key = group_data_key self.group_col = group_col self.relation_foo = relation_foo self.keep_group_feats = keep_group_feats self.verbose = verbose
[docs] def calculate(self, data, index): ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having fields named as values in ``group_data_key`` and necessary for ``feature_calculator`` keys. This fields should contain classes implementing ``load(index) -> pd.DataFrame`` interfaces index: index needed for ``feature_calculator.calculate()`` Returns ------- ``pd.DataFrame`` resulted features with index as in ''feature_calculator.calculate``. ''' if self.verbose: print("Relative group features calculation") X = self.feature_calculator.calculate(data, index) index_cols = list(X.index.names) cols = X.columns group_df = data[self.group_data_key].load(index)[['ticker', self.group_col]] X = pd.merge(X.reset_index(), group_df, on='ticker', how='left') X['q_idx'] = X['date'].apply(lambda x: get_quarter_idx(x)) mean_df = X.groupby([self.group_col, 'q_idx']).median() mean_df.columns = ['{}_median_{}'.format(self.group_col, x) for x in mean_df.columns] mean_df = mean_df.reset_index() X = pd.merge(X, mean_df, on=[self.group_col, 'q_idx'], how='left') for col in cols: new_col = 'rel_to_{}_{}'.format(self.group_col, col) mean_col = '{}_median_{}'.format(self.group_col, col) X[new_col] = self.relation_foo(X[col], X[mean_col]) keep_cols = [x for x in X.columns if 'rel_to_' in x] if self.keep_group_feats: keep_cols += [x for x in X.columns if '{}_median_'.format(self.group_col) in x] keep_cols += index_cols return X[keep_cols].set_index(index_cols)
[docs]class FeatureMerger: ''' Feature calculator that combined two other feature calculators. Merge is executed by left. ''' def __init__(self, fc1, fc2, on=Union[str, List[str]]): ''' Parameters ---------- fc1: first feature calculator implements ``calculate(data: Dict, index) -> pd.DataFrame`` interface fc2: second feature calculator implements ``calculate(data: Dict, index) -> pd.DataFrame`` interface on: columns on which merge the results of executed calculate methods ''' self.fc1 = fc1 self.fc2 = fc2 self.on = on
[docs] def calculate(self, data: Dict, index) -> pd.DataFrame: ''' Interface to calculate features for tickers based on data Parameters ---------- data: dict having field names needed for ``fc1`` and ``fc2`` This fields should contain classes implementing ``load(index) -> pd.DataFrame`` interface index: indexes dor feature calculators. I.e. if features about companies than index may be list of tickers, like ``['AAPL', 'TSLA']`` Returns ------- ``pd.DataFrame`` resulted merged features ''' X1 = self.fc1.calculate(data, index) X2 = self.fc2.calculate(data, index) X = pd.merge(X1, X2, on=self.on, how='left') X.index = X1.index return X