ヾ(´・ω・`)ノ゙

In [1]:
%matplotlib inline
import itertools

import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import matplotlib.pyplot as plt
import seaborn as sns
import tqdm # ←これいい感じですな(´・ω・`)

from IPython.core.display import display
In [2]:
%load_ext autoreload
%autoreload 2
In [3]:
print(np.__version__)
print(pd.__version__)
1.10.4
0.18.1

↓ 少しコード量が多めなのでモジュールとして読み込みます

In [4]:
import test20170421.tester as tester
import test20170421.util as u
In [ ]:
 

このデータを使ってみます

In [5]:
df = u.read_hst('data/USDJPY60.hst')
print('data len', len(df))
df.head(5)
data len 63718
Out[5]:
Open High Low Close Volume
Time
2007-01-02 07:00:00 119.01 119.03 118.91 118.96 240
2007-01-02 08:00:00 118.96 118.98 118.87 118.89 240
2007-01-02 09:00:00 118.89 118.92 118.54 118.58 240
2007-01-02 10:00:00 118.58 118.73 118.55 118.72 240
2007-01-02 11:00:00 118.72 118.75 118.66 118.74 240

超簡単なテストストラテジー

In [6]:
class TestStrategy(tester.Tester):
    
    def __init__(self, df):
        super(TestStrategy, self).__init__(df)
    
    def set_signal(self, period_a=50, period_b=100):
        ma_a = u.EMA(self.Close, period_a)
        ma_b = u.EMA(self.Close, period_b)
        self.ob = ma_a>ma_b
        self.os = ma_a<ma_b
        self.cb = self.os
        self.cs = self.ob

bt = TestStrategy(df)
bt.set_signal()
res = bt.run(rettype_df=True)
res[['pl']].set_index(res['ct']).cumsum().plot(figsize=(15,3))
Out[6]:
<matplotlib.axes._subplots.AxesSubplot at 0xc05f400>
In [7]:
param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
print('A', param_a)
print('B', param_b)
A [  5   6   6   7   8   9  11  12  14  16  18  20  23  26  30  34  38  43
  49  56  64  72  82  93 106 120 137 155 176 200]
B [ 10  11  13  15  17  19  21  24  28  31  36  41  46  52  59  67  77  87
  99 112 127 145 164 186 212 240 273 310 352 400]

とりあえず ↑ の組み合わせをテストしてみます

In [8]:
%%time
res_df = DataFrame(index=param_a, columns=param_b, dtype=float)
bt = TestStrategy(df)
bt.set_testbar('20080101')

best_param = None
best_score = 0

for p in itertools.product(param_a, param_b):
    bt.set_signal(*p)
    res = bt.run(rettype_df=False)
    res -= 0.01 # 1トレード当たり1pipsをスプレッドとして引いとく
    a, b = p
    score = res.sum()
    res_df.loc[a][b] = score
    if score>best_score:
        best_score = score
        best_param = p

print('best', best_param, best_score)
bt.set_signal(*best_param)
res = bt.run(rettype_df=True)
(res[['pl']]-0.01).set_index(res['ct']).cumsum().plot(figsize=(15,3))
best (11, 17) 64.229
Wall time: 908 ms

↑ まぁさすがにこれだけじゃ微妙な結果になりますな(´・ω・`)

ma2本使って単に損益の大きいものを選択しただけなので

In [9]:
sns.heatmap(res_df[::-1], annot=True, fmt='.1f', figure=plt.figure(figsize=(15, 8)))
plt.ylabel('ma param a')
plt.xlabel('ma param b')
Out[9]:
<matplotlib.text.Text at 0xbfb81d0>
In [ ]:
 

よし、次は一定期間を最適化期間に使って、最適化したパラメータで次期間をテストする
のを繰り返すのをやってみます(´・ω・`)

  • train_start, train_end: 最適化データの範囲
  • test_start, test_end: テストの範囲
  • read_start: インジ計算等のためのデータ読み込み開始点
In [10]:
wft_plan = u.wft_plan('20090101', train_size=12, test_size=3)
display(wft_plan.head())
display(wft_plan.tail())
train_start train_end test_start test_end read_start
0 2008-01-01 2008-12-31 2009-01-01 2009-03-31 2007-12-01
1 2008-04-01 2009-03-31 2009-04-01 2009-06-30 2008-03-01
2 2008-07-01 2009-06-30 2009-07-01 2009-09-30 2008-06-01
3 2008-10-01 2009-09-30 2009-10-01 2009-12-31 2008-09-01
4 2009-01-01 2009-12-31 2010-01-01 2010-03-31 2008-12-01
train_start train_end test_start test_end read_start
29 2015-04-01 2016-03-31 2016-04-01 2016-06-30 2015-03-01
30 2015-07-01 2016-06-30 2016-07-01 2016-09-30 2015-06-01
31 2015-10-01 2016-09-30 2016-10-01 2016-12-31 2015-09-01
32 2016-01-01 2016-12-31 2017-01-01 2017-03-31 2015-12-01
33 2016-04-01 2017-03-31 2017-04-01 2017-06-30 2016-03-01
In [ ]:
 

この例のMAストラテジーだとデータの開始から最後まで
一気にバックテストするのを総当りでやっても時間がかからないですが、
実際には、

  • パラメータの組み合わせはもっと多く総当りだと時間がかかる
  • なので総当りで結果が出るまで待ちたくない
  • なので総当りではない方法で最適化する

ということになると思うので、期間を区切ったバックテストを繰り返すやり方にしました

In [11]:
def test_wft(strategy, df, plan, param_set):
    opt_trade_list = []
    ft_trade_list = []
    
    wft_iter = plan.iterrows()
    if 'tqdm' in globals():
        wft_iter = tqdm.tqdm_notebook(list(wft_iter))
    
    for i,(train_s, train_e, test_s, test_e, read_s) in wft_iter:
        bt = strategy(df[read_s:train_e])
        bt.set_testbar(start=train_s)
        
        # 最適パラメータを探す(この例だと総当りして右肩上がりぽくなってるやつを選ぶだけ(^q^))
        best_param = None
        best_score = 0
        for p in itertools.product(*param_set):
            bt.set_signal(*p)
            res = bt.run(rettype_df=False)
            score = np.corrcoef(np.arange(len(res)), res.cumsum())[0,1] if len(res)>1 else 0
            if score>best_score:
                best_score = score
                best_param = p
        if best_param is None:
            continue
        
        # 最適なパラメータのバックテスト結果をリストに入れておきます
        bt.set_signal(*best_param)
        res = bt.run(rettype_df=True)
        if opt_trade_list:
            # 最適化期間が重複する部分は前回の最適化の履歴から除外する
            opt_trade_list[-1] = opt_trade_list[-1][opt_trade_list[-1]['ot']<train_s]
        opt_trade_list.append(res)
        
        # テスト期間をテストしてフォワードテストの結果としてリストに入れておきます
        ft = strategy(df[read_s:test_e])
        ft.set_testbar(start=test_s)
        ft.set_signal(*best_param)
        ft_trade_list.append(ft.run(rettype_df=True))
    
    # フィッティングした結果とフォワードテストした結果を返します
    opt_trade_df = pd.concat(opt_trade_list).set_index('ct')
    ft_trade_df = pd.concat(ft_trade_list).set_index('ct')
    return (opt_trade_df[opt_trade_df['ot']>=plan['test_start'][0]],
            ft_trade_df)


param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
wft_plan = u.wft_plan('20080101', train_size=6, test_size=3, read_size=3)

opt_res, ft_res = test_wft(TestStrategy, df, wft_plan, [param_a, param_b])

opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))
wft_plan['test_start'].apply(lambda x: plt.axvline(x, lw=0.5, ls=':', c='k', alpha=1));

ちーん(´・ω・`)

このシステムは最適化期間を短くすればさらにオーバーフィットするはず(´・ω・`)

※ ma_a=10, ma_b=20 でだめなら ma_a=20, ma_b=10 にすればよい
といったシステムになっていてその範囲を探索させてるので(^^;)

In [12]:
param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
wft_plan = u.wft_plan('20080101', train_size=1, test_size=1, read_size=3)
opt_res, ft_res = test_wft(TestStrategy, df, wft_plan, [param_a, param_b])
opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))

Out[12]:
<matplotlib.axes._subplots.AxesSubplot at 0xdf6d0b8>
In [ ]:
 

よしせっかくなのでパラメータがもう少しあるものをやってみます(´・ω・`)

最初のほうに定義したEMA関数だけでできる簡単なものにします
EURUSD M15 を使ってみます

In [13]:
df2 = u.read_hst('data/EURUSD15.hst')
In [14]:
class TestStrategy2(tester.TesterExt):
    
    def __init__(self, df):
        super(TestStrategy2, self).__init__(df)
    
    def set_signal(self, n1=1, n2=100, th=3, sl=0.002, tp=0.05, tr=0.005, trg=0.005):
        range1 = u.EMA(self.High, n1) - u.EMA(self.Low, n1) 
        range2 = u.EMA(self.High, n2) - u.EMA(self.Low, n2)
        sig = range1 > (range2*th)
        self.ob = sig & (self.Close>self.Open)
        self.os = sig & (self.Close<self.Open)
        self.set_sl_pct(sl)
        self.set_tp_pct(tp)
        self.set_trail_pct(tr)
        self.set_trailtrg_pct(trg)


bt = TestStrategy2(df2)
bt.set_signal()
bt.slrange
bt.set_testbar(start='20080101')
res = bt.run(rettype_df=True)
res[['pl']].set_index(res['ct']).cumsum().plot(figsize=(15,3))
Out[14]:
<matplotlib.axes._subplots.AxesSubplot at 0xe582400>
In [15]:
param_set = [
    [1],
    u.ma_params(50, 200, 10),
    np.arange(2, 4, 0.1, dtype=float),
    np.arange(0.001, 0.005, 0.001),
    [0],
    np.arange(0.001, 0.01, 0.001),
    np.arange(0.001, 0.01, 0.001)
]

print('組み合わせ', np.product([len(i) for i in param_set]))
組み合わせ 64800

これはなんやかんや値を設定するし、15分足だし、バックテスト中も指値やトレールのチェックをするので
さっきのMA2本のものよりテストに時間がかかるものになりました
なので遺伝的アルゴリズムを使ってみます(´・ω・`)

豊嶋先生のqiitaの記事のアルゴリズムを使わせてもらいました

このテスターから呼べるようにするのに少しいじったのでバグを埋め込んでしまっていたらすいません(´・ω・`;)

In [16]:
%%time

def evaluation_func(x):
    x -= 0.0001
    return np.corrcoef(np.arange(len(x)), x.cumsum())[0,1] if len(x)>1 else 0

def filter_f(x):
    # 約10年ぶん全体で500回もトレードがないものは低い評価値を返す細工をしときます
    if len(x) < 500:
        return -1

bt = TestStrategy2(df2)
bt.set_testbar(start='20080101')

opt_res = u.opt_gen(bt.run,                          # 実行する関数
                    {'rettype_df':False},            # 実行する関数に与える引数(リストか辞書)
                    bt.set_signal,                   # パラメータを設定する関数
                    param_set,                       # パラメータセット
                    evaluation_func=evaluation_func, # 評価関数
                    filter_f=filter_f,               # フィルターとして使う関数(なくてもいい)
                    generation=30,                   # 世代数
                    M=20,                            # 個体数
                    maximize=True,                   # 最大化するならTrue
                    nb_progress_bar=True)            # notebookでプログレスバーを出す

if opt_res.best_score<0:
    print('なんか失敗した(´・ω・`)')
else:
    print('bt cnt', opt_res.exec_cnt)
    print('best', opt_res.best_param, opt_res.best_score)
    plt.plot(opt_res.convergence, marker='.', figure=plt.figure(figsize=(15,3)))

    bt.set_signal(*opt_res.best_param)
    res = bt.run(rettype_df=True)
    print('trade count', len(res))
    (res[['pl']]-0.0001).set_index(res['ct']).cumsum().plot(figsize=(15,3))
bt cnt 600
best [1.0, 79.0, 3.300000000000001, 0.001, 0.0, 0.008, 0.003] 0.98593783481884
trade count 1257
Wall time: 12 s
In [17]:
def evaluation_func(x):
    x -= 0.0001
    return np.corrcoef(np.arange(len(x)), x.cumsum())[0,1] if len(x)>1 else 0


def test_wft2(strategy, df, plan, param_set):
    opt_trade_list = []
    ft_trade_list = []
    
    wft_iter = plan.iterrows()
    if 'tqdm' in globals():
        wft_iter = tqdm.tqdm_notebook(list(wft_iter))
    for i,(train_s, train_e, test_s, test_e, read_s) in wft_iter:
        bt = strategy(df[read_s:train_e])
        bt.set_testbar(start=train_s)
        
        days = len(pd.date_range(train_s, train_e, freq='B')) # train期間の日数
        def filter_f(x):
            # 1日当たり0.5回以下の結果は低い値を返すようにしてみます
            if len(x)/days < 0.5:
                return -1
        
        opt_res = u.opt_gen(bt.run,
                            {'rettype_df':False},
                            bt.set_signal,
                            param_set,
                            evaluation_func=evaluation_func,
                            filter_f=filter_f,
                            generation=30,
                            M=20,
                            maximize=True)
        
        if opt_res.best_score<0:
            # まともなパラメータが見つからなかったらスキップします
            print('ft skip: {}~{}'.format(test_s.date(), test_e.date()))
            continue
        
        best_param = opt_res.best_param
        bt.set_signal(*best_param)
        res = bt.run(rettype_df=True)
        if opt_trade_list:
            opt_trade_list[-1] = opt_trade_list[-1][opt_trade_list[-1]['ot']<train_s]
        opt_trade_list.append(res)
        
        ft = strategy(df[read_s:test_e])
        ft.set_testbar(start=test_s)
        ft.set_signal(*best_param)
        ft_trade_list.append(ft.run(rettype_df=True))
    
    opt_trade_df = pd.concat(opt_trade_list).set_index('ct')
    opt_trade_df = opt_trade_df[opt_trade_df['ot']>=plan['test_start'][0]]
    ft_trade_df = pd.concat(ft_trade_list).set_index('ct')
    return (opt_trade_df, ft_trade_df)


param_set = [
    [1],
    u.ma_params(50, 200, 10),
    np.arange(2, 4, 0.1, dtype=float),
    np.arange(0.001, 0.005, 0.001),
    [0],
    np.arange(0.001, 0.01, 0.001),
    np.arange(0.001, 0.01, 0.001)
]

wft_plan = u.wft_plan('20100101', train_size=12*2, test_size=3)
opt_res, ft_res = test_wft2(TestStrategy2, df2, wft_plan, param_set)

opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))
wft_plan['test_start'].apply(lambda x: plt.axvline(x, lw=0.5, ls=':', c='k', alpha=1));

ちーん(´・ω・`)

In [ ]:
 

ぜんぜん関係ないけど最近知ったdf.between_time()が結構便利でいい(´・ω・`)

こういう感じで選択できるんですね

In [18]:
%time a = df.between_time('12:00', '15:00', include_end=False)
print(a.shape)
a.head(10)
Wall time: 6 ms
(8025, 5)
Out[18]:
Open High Low Close Volume
Time
2007-01-02 12:00:00 118.74 118.78 118.72 118.73 240
2007-01-02 13:00:00 118.73 118.74 118.70 118.73 240
2007-01-02 14:00:00 118.73 118.75 118.65 118.66 240
2007-01-03 12:00:00 118.72 118.74 118.66 118.69 240
2007-01-03 13:00:00 118.69 118.82 118.69 118.73 240
2007-01-03 14:00:00 118.73 118.79 118.73 118.78 240
2007-01-04 12:00:00 119.36 119.39 119.34 119.36 240
2007-01-04 13:00:00 119.36 119.39 119.33 119.38 240
2007-01-04 14:00:00 119.38 119.41 119.34 119.35 240
2007-01-05 12:00:00 118.51 118.53 118.32 118.34 240

between_timeの範囲のbool配列がほしくてなんかそれ用のがあるのかと思ったけど
よくわからなかったのでこうした(´・ω・`)

In [19]:
%time mask = np.in1d(np.arange(len(df)), df.index.indexer_between_time('12:00', '15:00', include_end=False))
print(mask) # 上の選択と同じbool配列ができてる
print(df[mask].shape)
Wall time: 7 ms
[False False False ..., False False False]
(8025, 5)

↓ こんな感じでもいいみたいだけどdf.index.timeの取得がわりと時間かかるみたい

In [20]:
import datetime
%time t = df.index.time
%time mask = (t>=datetime.time(12,0)) & (t<datetime.time(15,0))
print(mask)
print(df[mask].shape)
Wall time: 335 ms
Wall time: 3 ms
[False False False ..., False False False]
(8025, 5)
In [ ]:
 

わーい!ヾ(´・ω・`)ノ゙

In [21]:
u.wai()