Source code for ml_investment.targets

import numpy as np
import pandas as pd
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
from typing import List, Dict, Tuple, Callable



[docs]class QuarterlyTarget: ''' Calculator of target represented as column in quarter-based data. Work with quarterly slices of company. ''' def __init__(self, data_key: str, col: str, quarter_shift: int=0, n_jobs: int=cpu_count()): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.QuarterlyTarget.calculate` col: column name for target calculation(like marketcap, revenue) quarter_shift: number of quarters to shift. e.g. if ``quarter_shift = 0`` than value for current quarter will be returned. If ``quarter_shift = 1`` than value for next quarter will be returned. If ``quarter_shift = -1`` than value for previous quarter will be returned. ''' self.data_key = data_key self.col = col self.quarter_shift = quarter_shift self.n_jobs = n_jobs self._data_loader = None def _single_ticker_target(self, ticker_and_dates: Tuple[str, List]) -> pd.DataFrame: ticker, dates = ticker_and_dates quarterly_data = self._data_loader.load([ticker])[::-1] quarter_dates = quarterly_data['date'].astype(np.datetime64).values vals = [] for date in dates: assert np.datetime64(date) in quarter_dates curr_date_mask = quarter_dates == np.datetime64(date) curr_quarter_idx = np.where(curr_date_mask)[0][0] idx = curr_quarter_idx + self.quarter_shift if idx >= 0 and idx < len(quarterly_data): value = quarterly_data[self.col].values[idx] else: value = np.nan vals.append(value) result = pd.DataFrame() result['y'] = vals result['date'] = dates result['ticker'] = ticker return result
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.QuarterlyTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` quarter ''' self._data_loader = data[self.data_key] grouped = index.groupby('ticker')['date'].apply(lambda x: x.tolist()).reset_index() params = [(ticker, dates) for ticker, dates in grouped.values] with Pool(self.n_jobs) as p: result = [] for ticker_result in tqdm(p.imap(self._single_ticker_target, params)): result.append(ticker_result) result = pd.concat(result, axis=0) result = result.drop_duplicates(['ticker', 'date']) result = pd.merge(index, result, on=['ticker', 'date'], how='left') result = result.set_index(['ticker', 'date']) result = result.infer_objects() return result
[docs]class QuarterlyDiffTarget: ''' Calculator of target represented as difference between column values in current and previous quarter. Work with quarterly slices of company. ''' def __init__(self, data_key: str, col: str, norm: bool=True, n_jobs: int = cpu_count()): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.QuarterlyDiffTarget.calculate` col: column name for target calculation(like marketcap, revenue) norm: normalize difference to previous quarter or not n_jobs: number of threads for calculation ''' self.curr_target = QuarterlyTarget(data_key=data_key, col=col, quarter_shift=0, n_jobs=n_jobs) self.last_target = QuarterlyTarget(data_key=data_key, col=col, quarter_shift=-1, n_jobs=n_jobs) self.norm = norm
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.QuarterlyTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` quarter ''' curr_df = self.curr_target.calculate(data, index) last_df = self.last_target.calculate(data, index) curr_df['y'] = curr_df['y'] - last_df['y'] if self.norm: curr_df['y'] = curr_df['y'] / np.abs(last_df['y']) return curr_df
[docs]class QuarterlyBinDiffTarget: ''' Calculator of target represented as binary difference between column values in current and previous quarter. Work with quarterly slices of company. ''' def __init__(self, data_key: str, col: str, n_jobs: int=cpu_count()): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.QuarterlyBinDiffTarget.calculate` col: column name for target calculation(like marketcap, revenue) n_jobs: number of threads for calculation ''' self.target = QuarterlyDiffTarget(data_key=data_key, col=col, norm=False, n_jobs=n_jobs)
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.QuarterlyTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` quarter ''' target_df = self.target.calculate(data, index) target_df.loc[target_df['y'].isnull() == False, 'y'] = \ target_df.loc[target_df['y'].isnull() == False, 'y'] > 0 target_df['y'] = target_df['y'].astype(float) return target_df
[docs]class DailyAggTarget: ''' Calculator of target represented as aggregation function of daily values. Work with daily slices of company. ''' def __init__(self, data_key: str, col: str, horizon: int=100, foo: Callable=np.mean, n_jobs: int = cpu_count()): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.DailyAggTarget.calculate` col: column name for target calculation(like marketcap, pe) horizon: number of days for target calculation. If ``horizon > 0`` than values will be get from the future of current date. If ``horizon < 0`` than values will be get from the past of current date foo: function processing target aggregation n_jobs: number of threads for calculation ''' self.data_key = data_key self.col = col self.horizon = horizon self.foo = foo self.n_jobs = n_jobs self._data_loader = None def _single_ticker_target(self, ticker_and_dates: Tuple[str, List]) -> pd.DataFrame: ticker, dates = ticker_and_dates result = pd.DataFrame() result['date'] = dates result['ticker'] = ticker result['y'] = None daily_data = self._data_loader.load([ticker]) if daily_data is None: return result daily_data = daily_data[::-1] daily_dates = daily_data['date'].astype(np.datetime64).values vals = [] for date in dates: if self.horizon >= 0: series = daily_data[daily_dates >= np.datetime64(date)] series = series[self.col].values[:self.horizon] else: series = daily_data[daily_dates < np.datetime64(date)] series = series[self.col].values[self.horizon:] vals.append(self.foo(series.astype(float))) result['y'] = vals return result
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.DailyAggTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` day ''' self._data_loader = data[self.data_key] grouped = index.groupby('ticker')['date'].apply(lambda x: x.tolist()).reset_index() params = [(ticker, dates) for ticker, dates in grouped.values] with Pool(self.n_jobs) as p: result = [] for ticker_result in tqdm(p.imap(self._single_ticker_target, params)): result.append(ticker_result) result = pd.concat(result, axis=0) result = result.drop_duplicates(['ticker', 'date']) result = pd.merge(index, result, on=['ticker', 'date'], how='left') result = result.set_index(['ticker', 'date']) return result
[docs]class DailySmoothedQuarterlyDiffTarget: ''' Feature calculator getting difference between current and last quarter smoothed daily column values. Work with company quarter slices. ''' def __init__(self, daily_data_key: str, quarterly_data_key: str, col: str, smooth_horizon: int=30, norm: bool=True, n_jobs: int = cpu_count()): ''' Parameters ---------- daily_data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.DailySmoothedQuarterlyDiffTarget.calculate` for daily data loading quarterly_data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.DailySmoothedQuarterlyDiffTarget.calculate` for quarterly data loading col: column name for target calculation(like marketcap, pe) smooth_horizon: number of days for target calculation. If ``smooth_horizon > 0`` than values for smoothing wiil be get from future of quarter date. If ``smooth_horizon < 0`` than values for smoothing will be get from the past of quarter date norm: normalize result or not n_jobs: number of threads for calculation ''' self.norm = norm self.daily_target = DailyAggTarget(data_key=daily_data_key, col=col, horizon=smooth_horizon, foo=np.mean, n_jobs=n_jobs) self.prev_quarter_date_target = QuarterlyTarget( data_key=quarterly_data_key, col='date', quarter_shift=-1, n_jobs=n_jobs)
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.DailySmoothedQuarterlyDiffTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` quarter ''' last_date_df = self.prev_quarter_date_target.calculate(data, index) last_date_df = last_date_df.reset_index() last_date_df['date'] = last_date_df['y'] del last_date_df['y'] curr_df = self.daily_target.calculate(data, index) last_df = self.daily_target.calculate(data, last_date_df) result = curr_df.copy() result['y'] = (curr_df['y'].values - last_df['y'].values) if self.norm: result['y'] = result['y'].values / last_df['y'].values return result
[docs]class ReportGapTarget: ''' Calculator of target represented as smoothed gap at some date(i.e. report date). Work with daily slices of company. ''' def __init__(self, data_key: str, col: str, smooth_horizon: int=1, norm: bool=True, n_jobs: int = cpu_count()): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.ReportGapTarget.calculate` col: column name for target calculation(like marketcap, pe) smooth_horizon: number of days for column smoothing norm: normalize gap value or not n_jobs: number of threads for calculation ''' self.curr_target = DailyAggTarget(data_key=data_key, col=col, horizon=smooth_horizon, foo=np.mean, n_jobs=n_jobs) self.last_target = DailyAggTarget(data_key=data_key, col=col, horizon=-smooth_horizon, foo=np.mean, n_jobs=n_jobs) self.norm = norm
[docs] def calculate(self, data: Dict, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for dates and tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.ReportGapTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers and dates to calculate targets for. Should have columns: ``["ticker", "date"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company at ``date`` time ''' curr_df = self.curr_target.calculate(data, index) last_df = self.last_target.calculate(data, index) curr_df['y'] = curr_df['y'] - last_df['y'] if self.norm: curr_df['y'] = curr_df['y'] / np.abs(last_df['y']) return curr_df
[docs]class BaseInfoTarget: ''' Calculator of target represented by base company information ''' def __init__(self, data_key: str, col: str): ''' Parameters ---------- data_key: key of dataloader in ``data`` argument during :func:`~ml_investment.targets.BaseInfoTarget.calculate` col: column name for target calculation(like sector, industry) ''' self.data_key = data_key self.col = col
[docs] def calculate(self, data, index: pd.DataFrame) -> pd.DataFrame: ''' Interface to calculate targets for tickers in index parameter based on data Parameters ---------- data: dict having field named as value in ``data_key`` param of :func:`~ml_investment.targets.BaseInfoTarget.__init__` This field should contain class implementing ``load(index) -> pd.DataFrame`` interface index: ``pd.DataFrame`` containing information of tickers to calculate targets for. Should have columns: ``["ticker"]`` Returns ------- ``pd.DataFrame`` targets having 'y' column. Index of this dataframe has the same values as ``index`` param. Each row contains target for ``ticker`` company ''' base_df = data[self.data_key].load(index['ticker'].values)[['ticker', self.col]] result = pd.merge(index, base_df, on='ticker', how='left') result = result.rename({self.col: 'y'}, axis=1) result = result[['ticker', 'y']] result = result.set_index(['ticker']) return result