Source code for ml_investment.backtest.strategy

import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import List, Dict


class Order:
    BUY = 1
    SELL = -1
       
    MARKET = 0
    LIMIT = 1
    CLOSE = 2

    INIT = 0
    PARTIAL = 1
    COMPLETED = 2
    EXPIRED = 3
    REJECTED = 4



[docs]class Strategy: ''' Base class for strategy backtesting. It contains overrideble method ``step`` for defining user strategy. This class incapsulate backtesting and metrics calculation process and also contains information about orders. ''' def __init__(self): self.data_loader = None self._data = {} self._cash = 0 self._active_orders = [] self._data_step_idxs = {} self.portfolio = {} self.step_dates:List[np.datetime64] = None self.step_date = None self.step_idx = None self.orders = [] self.cash: List = [] self.returns: List[float] = [] self.equity: List[float] = [] self.metrics:Dict = {} def _cast_data(self, df): date_df = pd.DataFrame() date_df['date'] = self.step_dates date_df['base_date'] = date_df['date'] df = pd.merge_asof(df, date_df, on='date', direction='forward') df = df.groupby('base_date').agg({'price': 'last', 'return': np.prod}) df = df.reset_index().rename({'base_date': 'date'}, axis=1) df = pd.merge(date_df[['date']], df, on='date', how='left') return df def _check_create_ticker_data(self, ticker): if ticker not in self._data: df = self.data_loader.load([ticker]) df[self.date_col] = df[self.date_col].astype(np.datetime64) df = df.sort_values(self.date_col) df.index = range(len(df)) result = pd.DataFrame() result['date'] = df[self.date_col] result['price'] = df[self.price_col] if self.return_col is None: result['return'] = df[self.price_col] /\ df[self.price_col].shift(1) if self.return_format == 'price': result['return'] = (df[self.return_col] /\ df[self.return_col].shift(1)).fillna(1) if self.return_format == 'change': result['return'] = df[self.return_col] + 1 if self.return_format == 'ratio': result['return'] = df[self.return_col] result = self._cast_data(result) # If len of data larger than len of step dates, shuoldn't use # older values as cumulative returns result.loc[0, 'return'] = 1 result['missed'] = result['price'].isnull() # If last date of data less then some step_date result['closed'] = result['missed'][::-1].cumprod()[::-1] result['price'] = result['price'].ffill() result['price_return'] = (result['price'] /\ result['price'].shift(1)).fillna(1) result['prev_price'] = result['price'].shift(1) result['dividend'] = result['return'] - result['price_return'] result['dividend'] *= result['dividend'].abs() > 1e-5 #result['missed'] = result['missed'].ffill() self._data[ticker] = result def _aposteriori_next_step_max_size(self, order): idx = self.step_idx + 1 if len(self._data[order['ticker']]) <= idx: return missed = self._data[order['ticker']].loc[idx, 'missed'] if missed: return price = self._data[order['ticker']].loc[idx, 'price'] size = self.portfolio[order['ticker']] if order['direction'] == Order.BUY: result = self._cash // (price * (1 + self.comission)) if order['direction'] == Order.SELL: result = size return result def _execute_market_order(self, order): idx = self.step_idx + 1 if idx >= len(self.step_dates): order['status'] = Order.EXPIRED order['price'] = np.nan order['execution_date'] = np.nan self.orders.append(order) return execution_date = np.datetime64( self._data[order['ticker']].loc[idx, 'date']) if order['submit_date'] >= execution_date: self._active_orders.append(order) return if order['submit_date'] + order['lifetime'] < execution_date: order['status'] = Order.EXPIRED order['price'] = np.nan order['execution_date'] = np.nan self.orders.append(order) return if self._data[order['ticker']].loc[idx, 'missed']: self._active_orders.append(order) return price = self._data[order['ticker']].loc[idx, 'price'] possible_size = self._aposteriori_next_step_max_size(order) if possible_size == 0 or possible_size is None: self._active_orders.append(order) return if not order['allow_partial'] and order['size'] > possible_size: self._active_orders.append(order) return execution_size = min(order['size'], possible_size) self.portfolio[order['ticker']] += order['direction'] * execution_size self._cash -= order['direction'] * price *\ execution_size * (1 + self.comission) order['execution_date'] = execution_date order['price'] = price if execution_size == order['size']: order['status'] = Order.COMPLETED else: order['size'] = possible_size order['status'] = Order.PARTIAL self.orders.append(order) def _execute_close_order(self, order): idx = self.step_idx + 1 execution_date = np.datetime64( self._data[order['ticker']].loc[idx, 'date']) price = self._data[order['ticker']].loc[idx, 'price'] self.portfolio[order['ticker']] = 0 self._cash -= order['direction'] * price *\ order['size'] * (1 + self.comission) order['execution_date'] = execution_date order['price'] = price order['status'] = Order.COMPLETED self.orders.append(order) def _execute_orders(self): curr_orders = self._active_orders.copy() curr_orders = sorted(curr_orders, key=lambda x: x['direction']) self._active_orders = [] for order in curr_orders: self._check_create_ticker_data(order['ticker']) if order['ticker'] not in self.portfolio: self.portfolio[order['ticker']] = 0 if order['order_type'] == Order.MARKET: self._execute_market_order(order) if order['order_type'] == Order.CLOSE: self._execute_close_order(order) if order['order_type'] == Order.LIMIT: raise NotImplementedError def _post_close_orders(self): if self.step_idx >= len(self.step_dates) - 2: return for ticker in self.portfolio.keys(): size = self.portfolio[ticker] if size == 0: continue closed = self._data[ticker].loc[self.step_idx + 2, 'closed'] if closed: if self.verbose: print('Close ticker {}'.format(ticker)) direction = Order.SELL if size > 0 else Order.BUY size = abs(size) self.post_order(ticker=ticker, direction=direction, size=size, order_type=Order.CLOSE, lifetime=np.timedelta64(300, 'D'), allow_partial=False) def _receive_dividends(self): for ticker in self.portfolio.keys(): size = self.portfolio[ticker] if size == 0: continue dividend = self._data[ticker].loc[self.step_idx, 'dividend'] if (not np.isnan(dividend)) and dividend != 0: prev_price = self._data[ticker].loc[self.step_idx, 'prev_price'] self._cash += size * prev_price * dividend def _calc_equity(self): equity = 0 for ticker in self.portfolio.keys(): size = self.portfolio[ticker] if size == 0: continue price = self._data[ticker].loc[self.step_idx, 'price'] equity += size * price equity += self._cash return equity
[docs] def post_order(self, ticker: str, direction: int, size: float, order_type: int=Order.MARKET, lifetime: np.timedelta64=np.timedelta64(300, 'D'), allow_partial: bool=True): ''' Post new order to backtest. It may be used inside your strategy overriden ``step`` method. Parameters ---------- ticker: ticker of company to post order for direction: one of ``Order.BUY`` (1), ``Order.SELL`` (-1) size: size of order in pieces order_type: one of ``Order.MARKET`` (0), ``Order.LIMIT`` (1) lifetime: amount of time before order closing if it can not be executed (e.g. if unsatisfactory price lasts a long time) allow_partial: may order be executed with not full size or not ''' if size == 0: return submit_date = self.step_date + self.latency if order_type == Order.CLOSE: submit_date = self.step_date self._active_orders.append({'ticker': ticker, 'direction': direction, 'size': size, 'order_type': order_type, 'lifetime': lifetime, 'allow_partial': allow_partial, 'creation_date': self.step_date, 'submit_date': submit_date})
[docs] def post_order_value(self, ticker: str, direction: int, value: float, order_type: int=Order.MARKET, lifetime: np.timedelta64=np.timedelta64(300, 'D'), allow_partial: bool=True): ''' Post new order by value (instead of size) to backtest. It may be used inside your strategy overriden ``step`` method. Parameters ---------- ticker: ticker of company to post order for direction: one of ``Order.BUY`` (1), ``Order.SELL`` (-1) value: value of order in money order_type: one of ``Order.MARKET`` (0), ``Order.LIMIT`` (1) lifetime: amount of time before order closing if it can not be executed (e.g. if unsatisfactory price lasts a long time) allow_partial: may order be executed with not full size or not ''' self._check_create_ticker_data(ticker) price = self._data[ticker].loc[self.step_idx, 'price'] size = round(value / price) self.post_order(ticker=ticker, direction=direction, size=size, order_type=order_type, lifetime=lifetime, allow_partial=allow_partial)
[docs] def post_portfolio_size(self, ticker: str, size: int, lifetime: np.timedelta64=np.timedelta64(300, 'D'), allow_partial: bool=True): ''' Post order to backtest to have desired size in portfolio. It will calculate difference between current and desired size to create appropriate order. It may be used inside your strategy overriden ``step`` method. Parameters ---------- ticker: ticker of company to post order for size: desired size in portfolio (in pieces) lifetime: amount of time before order closing if it can not be executed (e.g. if unsatisfactory price lasts a long time) allow_partial: may order be executed with not full size or not ''' if ticker not in self.portfolio: self.portfolio[ticker] = 0 diff_size = size - self.portfolio[ticker] if diff_size == 0: return direction = Order.BUY if diff_size > 0 else Order.SELL diff_size = abs(diff_size) self.post_order(ticker=ticker, direction=direction, size=diff_size, order_type=Order.MARKET, lifetime=lifetime, allow_partial=allow_partial)
[docs] def post_portfolio_value(self, ticker: str, value: float, lifetime: np.timedelta64=np.timedelta64(300, 'D'), allow_partial: bool=True): ''' Post order to backtest to have desired value in portfolio. It will calculate difference between current and desired value to create appropriate order. It may be used inside your strategy overriden ``step`` method. Parameters ---------- ticker: ticker of company to post order for value: desired value in portfolio (in money) lifetime: amount of time before order closing if it can not be executed (e.g. if unsatisfactory price lasts a long time) allow_partial: may order be executed with not full size or not ''' self._check_create_ticker_data(ticker) price = self._data[ticker].loc[self.step_idx, 'price'] if np.isnan(price): if self.verbose: print("There are no price for {} yet".format(ticker)) return needed_size = round(value / price) self.post_portfolio_size(ticker=ticker, size=needed_size, lifetime=lifetime, allow_partial=allow_partial)
[docs] def post_portfolio_part(self, ticker: str, part: float, lifetime: np.timedelta64=np.timedelta64(300, 'D'), allow_partial: bool=True): ''' Post order to backtest to have desired part in portfolio. It will calculate difference between current and desired part to create appropriate order. It may be used inside your strategy overriden ``step`` method. Parameters ---------- ticker: ticker of company to post order for part: desired part in all equity including other stocks and cash in portfolio (value between 0 and 1) lifetime: amount of time before order closing if it can not be executed (e.g. if unsatisfactory price lasts a long time) allow_partial: may order be executed with not full size or not ''' needed_value = self.equity[self.step_idx] * part self.post_portfolio_value(ticker=ticker, value=needed_value, lifetime=lifetime, allow_partial=allow_partial)
[docs] def calc_metrics(self, metrics:Dict): for key in metrics.keys(): self.metrics[key] = metrics[key](self.step_dates, self.returns)
[docs] def step(self): None
[docs] def backtest(self, data_loader, date_col: str, price_col: str, return_col: str, return_format: str, step_dates: List[np.timedelta64]=None, cash: float=100_000, comission: float=0.00025, latency: np.timedelta64=np.timedelta64(0, 'h'), allow_short: bool=False, metrics=None, preload: bool=False, verbose: bool=True): ''' Backtest strategy on provided data and other parameters. It will create and execute orders and calculate resulted equity and metrics. Parameters ---------- data_loader: class implementing ``load(index) -> pd.DataFrame`` interface. index in this case is list of tickers to load market data for. date_col: name of column containing date (time) information in market data provided by ``data_loader``. price_col: name of column containing price information in market data provided by ``data_loader``. return_col: name of column containing total return information in data provided by ``data_loader``. It may be differ from price due to dividends, stock splits and etc. return_format: format of data provided by ``return_col`` column. If ``return_format = 'ratio'`` than column should contain ratio between previous and current adjusted price. E.g. 1.2 means growth by 20% from the previous step. If ``return_format = 'price'`` than column should contain adjusted price (price, including dividends and etc.) If ``return_format = 'change'`` than column should contain relative change between current and previous step. E.g. 0.2 means growth by 20% from the previous step. step_dates: dates in which all actions can be taken. Include new market prices receiving, order creation and executing. ``step`` method will iterate over all those dates. If None than all possible dates, provided by ``date_col`` column in ``data_loader`` will be used. Possible only if ``preload = True`` and ``data_loader`` have ``existing_index(index) -> List`` interface. cash: initial amount of cash comission: commission charged for each trade (in percent of order value) latency: time between current step date and actual order posting. It emulates delays during ``step`` logic and in the Internet connection with the exchange. allow_short: allow short positions or not preload: load all data provided from ``data_loader`` to ram or not verbose: show progress or not ''' if allow_short: raise NotImplementedError if preload: raise NotImplementedError self.data_loader = data_loader self.date_col = date_col self.price_col = price_col self.return_col = return_col self.return_format = return_format self.step_dates = step_dates self._cash = cash self.comission = comission self.latency = latency self.allow_short = allow_short self.verbose = verbose for self.step_idx, self.step_date in tqdm(enumerate(self.step_dates), disable=not self.verbose): self._receive_dividends() self.equity.append(self._calc_equity()) self.cash.append(self._cash) self.step() self._post_close_orders() self._execute_orders() self.equity = np.array(self.equity) self.returns = self.equity[1:] / self.equity[:-1] - 1 self.returns = np.insert(self.returns, 0, 0., axis=0) if metrics is not None: self.calc_metrics(metrics)