%matplotlib inline
import itertools
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import matplotlib.pyplot as plt
import seaborn as sns
import tqdm # ←これいい感じですな(´・ω・`)
from IPython.core.display import display
%load_ext autoreload
%autoreload 2
print(np.__version__)
print(pd.__version__)
↓ 少しコード量が多めなのでモジュールとして読み込みます
import test20170421.tester as tester
import test20170421.util as u
df = u.read_hst('data/USDJPY60.hst')
print('data len', len(df))
df.head(5)
class TestStrategy(tester.Tester):
def __init__(self, df):
super(TestStrategy, self).__init__(df)
def set_signal(self, period_a=50, period_b=100):
ma_a = u.EMA(self.Close, period_a)
ma_b = u.EMA(self.Close, period_b)
self.ob = ma_a>ma_b
self.os = ma_a<ma_b
self.cb = self.os
self.cs = self.ob
bt = TestStrategy(df)
bt.set_signal()
res = bt.run(rettype_df=True)
res[['pl']].set_index(res['ct']).cumsum().plot(figsize=(15,3))
param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
print('A', param_a)
print('B', param_b)
%%time
res_df = DataFrame(index=param_a, columns=param_b, dtype=float)
bt = TestStrategy(df)
bt.set_testbar('20080101')
best_param = None
best_score = 0
for p in itertools.product(param_a, param_b):
bt.set_signal(*p)
res = bt.run(rettype_df=False)
res -= 0.01 # 1トレード当たり1pipsをスプレッドとして引いとく
a, b = p
score = res.sum()
res_df.loc[a][b] = score
if score>best_score:
best_score = score
best_param = p
print('best', best_param, best_score)
bt.set_signal(*best_param)
res = bt.run(rettype_df=True)
(res[['pl']]-0.01).set_index(res['ct']).cumsum().plot(figsize=(15,3))
ma2本使って単に損益の大きいものを選択しただけなので
sns.heatmap(res_df[::-1], annot=True, fmt='.1f', figure=plt.figure(figsize=(15, 8)))
plt.ylabel('ma param a')
plt.xlabel('ma param b')
よし、次は一定期間を最適化期間に使って、最適化したパラメータで次期間をテストする
のを繰り返すのをやってみます(´・ω・`)
wft_plan = u.wft_plan('20090101', train_size=12, test_size=3)
display(wft_plan.head())
display(wft_plan.tail())
この例のMAストラテジーだとデータの開始から最後まで
一気にバックテストするのを総当りでやっても時間がかからないですが、
実際には、
ということになると思うので、期間を区切ったバックテストを繰り返すやり方にしました
def test_wft(strategy, df, plan, param_set):
opt_trade_list = []
ft_trade_list = []
wft_iter = plan.iterrows()
if 'tqdm' in globals():
wft_iter = tqdm.tqdm_notebook(list(wft_iter))
for i,(train_s, train_e, test_s, test_e, read_s) in wft_iter:
bt = strategy(df[read_s:train_e])
bt.set_testbar(start=train_s)
# 最適パラメータを探す(この例だと総当りして右肩上がりぽくなってるやつを選ぶだけ(^q^))
best_param = None
best_score = 0
for p in itertools.product(*param_set):
bt.set_signal(*p)
res = bt.run(rettype_df=False)
score = np.corrcoef(np.arange(len(res)), res.cumsum())[0,1] if len(res)>1 else 0
if score>best_score:
best_score = score
best_param = p
if best_param is None:
continue
# 最適なパラメータのバックテスト結果をリストに入れておきます
bt.set_signal(*best_param)
res = bt.run(rettype_df=True)
if opt_trade_list:
# 最適化期間が重複する部分は前回の最適化の履歴から除外する
opt_trade_list[-1] = opt_trade_list[-1][opt_trade_list[-1]['ot']<train_s]
opt_trade_list.append(res)
# テスト期間をテストしてフォワードテストの結果としてリストに入れておきます
ft = strategy(df[read_s:test_e])
ft.set_testbar(start=test_s)
ft.set_signal(*best_param)
ft_trade_list.append(ft.run(rettype_df=True))
# フィッティングした結果とフォワードテストした結果を返します
opt_trade_df = pd.concat(opt_trade_list).set_index('ct')
ft_trade_df = pd.concat(ft_trade_list).set_index('ct')
return (opt_trade_df[opt_trade_df['ot']>=plan['test_start'][0]],
ft_trade_df)
param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
wft_plan = u.wft_plan('20080101', train_size=6, test_size=3, read_size=3)
opt_res, ft_res = test_wft(TestStrategy, df, wft_plan, [param_a, param_b])
opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))
wft_plan['test_start'].apply(lambda x: plt.axvline(x, lw=0.5, ls=':', c='k', alpha=1));
※ ma_a=10, ma_b=20 でだめなら ma_a=20, ma_b=10 にすればよい
といったシステムになっていてその範囲を探索させてるので(^^;)
param_a = u.ma_params(5, 200, 30)
param_b = u.ma_params(10, 400, 30)
wft_plan = u.wft_plan('20080101', train_size=1, test_size=1, read_size=3)
opt_res, ft_res = test_wft(TestStrategy, df, wft_plan, [param_a, param_b])
opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))
最初のほうに定義したEMA関数だけでできる簡単なものにします
EURUSD M15 を使ってみます
df2 = u.read_hst('data/EURUSD15.hst')
class TestStrategy2(tester.TesterExt):
def __init__(self, df):
super(TestStrategy2, self).__init__(df)
def set_signal(self, n1=1, n2=100, th=3, sl=0.002, tp=0.05, tr=0.005, trg=0.005):
range1 = u.EMA(self.High, n1) - u.EMA(self.Low, n1)
range2 = u.EMA(self.High, n2) - u.EMA(self.Low, n2)
sig = range1 > (range2*th)
self.ob = sig & (self.Close>self.Open)
self.os = sig & (self.Close<self.Open)
self.set_sl_pct(sl)
self.set_tp_pct(tp)
self.set_trail_pct(tr)
self.set_trailtrg_pct(trg)
bt = TestStrategy2(df2)
bt.set_signal()
bt.slrange
bt.set_testbar(start='20080101')
res = bt.run(rettype_df=True)
res[['pl']].set_index(res['ct']).cumsum().plot(figsize=(15,3))
param_set = [
[1],
u.ma_params(50, 200, 10),
np.arange(2, 4, 0.1, dtype=float),
np.arange(0.001, 0.005, 0.001),
[0],
np.arange(0.001, 0.01, 0.001),
np.arange(0.001, 0.01, 0.001)
]
print('組み合わせ', np.product([len(i) for i in param_set]))
これはなんやかんや値を設定するし、15分足だし、バックテスト中も指値やトレールのチェックをするので
さっきのMA2本のものよりテストに時間がかかるものになりました
なので遺伝的アルゴリズムを使ってみます(´・ω・`)
このテスターから呼べるようにするのに少しいじったのでバグを埋め込んでしまっていたらすいません(´・ω・`;)
%%time
def evaluation_func(x):
x -= 0.0001
return np.corrcoef(np.arange(len(x)), x.cumsum())[0,1] if len(x)>1 else 0
def filter_f(x):
# 約10年ぶん全体で500回もトレードがないものは低い評価値を返す細工をしときます
if len(x) < 500:
return -1
bt = TestStrategy2(df2)
bt.set_testbar(start='20080101')
opt_res = u.opt_gen(bt.run, # 実行する関数
{'rettype_df':False}, # 実行する関数に与える引数(リストか辞書)
bt.set_signal, # パラメータを設定する関数
param_set, # パラメータセット
evaluation_func=evaluation_func, # 評価関数
filter_f=filter_f, # フィルターとして使う関数(なくてもいい)
generation=30, # 世代数
M=20, # 個体数
maximize=True, # 最大化するならTrue
nb_progress_bar=True) # notebookでプログレスバーを出す
if opt_res.best_score<0:
print('なんか失敗した(´・ω・`)')
else:
print('bt cnt', opt_res.exec_cnt)
print('best', opt_res.best_param, opt_res.best_score)
plt.plot(opt_res.convergence, marker='.', figure=plt.figure(figsize=(15,3)))
bt.set_signal(*opt_res.best_param)
res = bt.run(rettype_df=True)
print('trade count', len(res))
(res[['pl']]-0.0001).set_index(res['ct']).cumsum().plot(figsize=(15,3))
def evaluation_func(x):
x -= 0.0001
return np.corrcoef(np.arange(len(x)), x.cumsum())[0,1] if len(x)>1 else 0
def test_wft2(strategy, df, plan, param_set):
opt_trade_list = []
ft_trade_list = []
wft_iter = plan.iterrows()
if 'tqdm' in globals():
wft_iter = tqdm.tqdm_notebook(list(wft_iter))
for i,(train_s, train_e, test_s, test_e, read_s) in wft_iter:
bt = strategy(df[read_s:train_e])
bt.set_testbar(start=train_s)
days = len(pd.date_range(train_s, train_e, freq='B')) # train期間の日数
def filter_f(x):
# 1日当たり0.5回以下の結果は低い値を返すようにしてみます
if len(x)/days < 0.5:
return -1
opt_res = u.opt_gen(bt.run,
{'rettype_df':False},
bt.set_signal,
param_set,
evaluation_func=evaluation_func,
filter_f=filter_f,
generation=30,
M=20,
maximize=True)
if opt_res.best_score<0:
# まともなパラメータが見つからなかったらスキップします
print('ft skip: {}~{}'.format(test_s.date(), test_e.date()))
continue
best_param = opt_res.best_param
bt.set_signal(*best_param)
res = bt.run(rettype_df=True)
if opt_trade_list:
opt_trade_list[-1] = opt_trade_list[-1][opt_trade_list[-1]['ot']<train_s]
opt_trade_list.append(res)
ft = strategy(df[read_s:test_e])
ft.set_testbar(start=test_s)
ft.set_signal(*best_param)
ft_trade_list.append(ft.run(rettype_df=True))
opt_trade_df = pd.concat(opt_trade_list).set_index('ct')
opt_trade_df = opt_trade_df[opt_trade_df['ot']>=plan['test_start'][0]]
ft_trade_df = pd.concat(ft_trade_list).set_index('ct')
return (opt_trade_df, ft_trade_df)
param_set = [
[1],
u.ma_params(50, 200, 10),
np.arange(2, 4, 0.1, dtype=float),
np.arange(0.001, 0.005, 0.001),
[0],
np.arange(0.001, 0.01, 0.001),
np.arange(0.001, 0.01, 0.001)
]
wft_plan = u.wft_plan('20100101', train_size=12*2, test_size=3)
opt_res, ft_res = test_wft2(TestStrategy2, df2, wft_plan, param_set)
opt_res['pl'].cumsum().plot(figsize=(15,5))
ft_res['pl'].cumsum().plot(figsize=(15,5))
wft_plan['test_start'].apply(lambda x: plt.axvline(x, lw=0.5, ls=':', c='k', alpha=1));
こういう感じで選択できるんですね
%time a = df.between_time('12:00', '15:00', include_end=False)
print(a.shape)
a.head(10)
between_timeの範囲のbool配列がほしくてなんかそれ用のがあるのかと思ったけど
よくわからなかったのでこうした(´・ω・`)
%time mask = np.in1d(np.arange(len(df)), df.index.indexer_between_time('12:00', '15:00', include_end=False))
print(mask) # 上の選択と同じbool配列ができてる
print(df[mask].shape)
↓ こんな感じでもいいみたいだけどdf.index.timeの取得がわりと時間かかるみたい
import datetime
%time t = df.index.time
%time mask = (t>=datetime.time(12,0)) & (t<datetime.time(15,0))
print(mask)
print(df[mask].shape)
u.wai()