Source code for ml_investment.models

import pandas as pd
import numpy as np
from copy import deepcopy
from tqdm import tqdm
from typing import List
from sklearn.model_selection import GroupKFold



[docs]class LogExpModel: ''' Model wrapper to fit on log of target and exp produced prediction. May be usefull for some target distributions. ''' def __init__(self, base_model): ''' Parameters ---------- base_model: class implements ``fit(X, y)``, ``predict(X)``/``predict_proba(X)`` interfaces ''' self.base_model = base_model
[docs] def fit(self, X: pd.DataFrame, y): ''' Interface for model training Parameters ---------- X: ``pd.DataFrame`` containing features y: target data ''' # assert np.min(y) >= 0 mask = (y > 0).values self.base_model.fit(X[mask], np.log(y[mask]))
[docs] def predict(self, X): ''' Interface for prediction Parameters ---------- X: ``pd.DataFrame`` containing features ''' return np.exp(self.base_model.predict(X))
[docs]class EnsembleModel: ''' Class for training ansamble of base models. ''' def __init__(self, base_models: List, bagging_fraction: float=0.8, model_cnt: int=20): ''' Parameters ---------- base_models: list of classes implements ``fit(X, y)``, ``predict(X)``/``predict_proba(X)`` interfaces bagging_fraction: part of random data subsample for training models model_cnt: total number of models in resulted ansamble ''' self.base_models = base_models self.bagging_fraction = bagging_fraction self.model_cnt = model_cnt self.models = []
[docs] def fit(self, X: pd.DataFrame, y: pd.Series): ''' Interface for model training Parameters ---------- X: ``pd.DataFrame`` containing features y: target data ''' for _ in tqdm(range(self.model_cnt)): idxs = np.random.randint(0, len(X), int(len(X) * self.bagging_fraction)) curr_model = deepcopy(np.random.choice(self.base_models)) curr_model.fit(X.iloc[idxs], y.iloc[idxs]) self.models.append(curr_model)
[docs] def predict(self, X): ''' Interface for prediction Parameters ---------- X: pd.DataFrame containing features ''' preds = [] for k in range(self.model_cnt): try: model_pred = self.models[k].predict_proba(X)[:, 1] except: model_pred = self.models[k].predict(X) preds.append(model_pred) return np.mean(preds, axis=0)
[docs]class GroupedOOFModel: ''' Model wrapper incapsulate out of fold separation within data groups. Each sample in group can not be in training and validation fold at the same time. ''' def __init__(self, base_model, group_column: str, fold_cnt: int=5): ''' Parameters ---------- base_model: model implements ``fit(X, y)``, ``predict(X)``/``predict_proba(X)`` interfaces group_column: name of column for grouping training data. ``X`` in ``fit(X, y)`` and ``predict(X)`` should contain this column. Samples with one group value will be placed only in one training fold. fold_cnt: number of folds for training ''' self.fold_cnt = fold_cnt self.group_column = group_column self.base_models = [] for k in range(self.fold_cnt): self.base_models.append(deepcopy(base_model)) self.group_df = None self.columns = None
[docs] def fit(self, X: pd.DataFrame, y: pd.Series): ''' Interface for model training Parameters ---------- X: ``pd.DataFrame`` containing features and ``self.group_column`` y: target data ''' groups = X.reset_index()[self.group_column] df_arr = [] kfold = GroupKFold(self.fold_cnt) for k, (itr, ite) in enumerate(kfold.split(X, y, groups)): self.base_models[k].fit(X.iloc[itr], y.iloc[itr]) curr_group_df = pd.DataFrame() curr_group_df['group'] = np.unique(groups[ite]) curr_group_df['fold_id'] = k df_arr.append(curr_group_df) self.group_df = pd.concat(df_arr, axis=0) self.columns = X.columns
[docs] def predict(self, X: pd.DataFrame) -> np.array: ''' Interface for prediction Parameters ---------- X: ``pd.DataFrame`` containing features and ``self.group_column`` ''' groups = X.reset_index()[self.group_column] predict_groups = pd.DataFrame() predict_groups['group'] = groups predict_groups = pd.merge(predict_groups, self.group_df, on='group', how='left') predict_groups.index = X.index # If group was not in train data -> put to 0th fold predict_groups = predict_groups.fillna(0) pred_df = [] for fold_id in range(self.fold_cnt): X_curr = X[predict_groups['fold_id'] == fold_id] if len(X_curr) == 0: continue try: pred = self.base_models[fold_id].predict_proba(X_curr)[:, 1] except: pred = self.base_models[fold_id].predict(X_curr) curr_pred_df = pd.DataFrame() curr_pred_df['pred'] = pred curr_pred_df.index = X_curr.index pred_df.append(curr_pred_df) pred_df = pd.concat(pred_df, axis=0) pred_df = pred_df.loc[X.index] return pred_df['pred'].values
[docs]class TimeSeriesOOFModel: ''' Model wrapper incapsulate out of fold time-series separation. ''' def __init__(self, base_model, time_column: str, fold_cnt: int=5): ''' Parameters ---------- base_model: model implements ``fit(X, y)``, ``predict(X)``/``predict_proba(X)`` interfaces time_column: name of column for separating training data. ``X`` in ``fit(X, y)`` and ``predict(X)`` should contain this column. Samples from feature would not be used for training and prediction past. fold_cnt: number of folds for training ''' self.fold_cnt = fold_cnt self.time_column = time_column self.base_models = [] for k in range(self.fold_cnt): self.base_models.append(deepcopy(base_model)) self.time_bounds = None self.is_fitted_fold = np.zeros(self.fold_cnt) def _create_time_bounds(self, times: List[np.datetime64]): max_time = max(times) min_time = min(times) delta = (max_time - min_time) // self.fold_cnt self.time_bounds = [] for fold_id in range(1, self.fold_cnt): self.time_bounds.append(min_time + fold_id * delta) self.time_bounds.append(max_time) # Fictive boundary for fit() code simplification self.time_bounds.append(max_time + np.timedelta64(10000, 'D'))
[docs] def fit(self, X: pd.DataFrame, y): ''' Interface for model training Parameters ---------- X: ``pd.DataFrame`` containing features and ``self.time_column`` y: target data ''' times = X.reset_index()[self.time_column].astype(np.datetime64).values self._create_time_bounds(times) for fold_id in range(self.fold_cnt): curr_mask = times <= self.time_bounds[fold_id] # check if there are enough samples if curr_mask.sum() > 5: self.base_models[fold_id].fit(X[curr_mask], y[curr_mask]) self.is_fitted_fold[fold_id] = 1
[docs] def predict(self, X: pd.DataFrame) -> np.array: ''' Interface for prediction Parameters ---------- X: ``pd.DataFrame`` containing features and ``self.time_column`` ''' times = X.reset_index()[self.time_column].astype(np.datetime64).values pred_df = [] X_curr = X[times <= self.time_bounds[0]] curr_pred_df = pd.DataFrame() curr_pred_df['pred'] = [np.nan] * len(X_curr) curr_pred_df.index = X_curr.index pred_df.append(curr_pred_df) for fold_id in range(self.fold_cnt): curr_mask = (times > self.time_bounds[fold_id]) * \ (times <= self.time_bounds[fold_id + 1]) X_curr = X[curr_mask] if len(X_curr) == 0: continue if not self.is_fitted_fold[fold_id]: curr_pred_df = pd.DataFrame() curr_pred_df['pred'] = [np.nan] * len(X_curr) curr_pred_df.index = X_curr.index pred_df.append(curr_pred_df) continue try: pred = self.base_models[fold_id].predict_proba(X_curr)[:, 1] except: pred = self.base_models[fold_id].predict(X_curr) curr_pred_df = pd.DataFrame() curr_pred_df['pred'] = pred curr_pred_df.index = X_curr.index pred_df.append(curr_pred_df) pred_df = pd.concat(pred_df, axis=0) pred_df = pred_df.loc[X.index] return pred_df['pred'].values