%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
この前、バックテストの結果のnumpy配列から
連敗数を求める機会があったのでそのときに書いたコードです(´・ω・`)
np.random.seed(0)
x = np.random.randint(-1000, 1001, size=10000000)
print('こんなトレード結果があったとして')
print('profit', x[:20])
plt.plot(x.cumsum()); plt.show()
print('勝ちを 1 負けを -1 であらわす配列にして')
wl = np.where(x>=0, 1, -1)
print('win lose', wl[:20])
↑この配列から、
[-1, -2, 1, 2, -1, -2, 1, 2, 3, 4, -1 ...]
といった、何連勝目か、何連敗目か、を知るための配列を作りたかったわけです(´・ω・`)
def f1(x):
l = [x[0]]
prev = l[-1]
for v in x[1:]:
l.append(l[-1]+v if v==prev else v)
prev = v
return np.array(l)
%time res = f1(wl)
print(res[:20])
for文を使えば簡単なことなんですが、
以前、pandasのSeriesからこれを求めたくて、しばらく悩んでいい方法が思いつかず、
結局、諦めてfor文で計算したということがありました(´・ω・`)
ここに、
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-value-in-python-array
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
こんなコードがあって、こんなやり方を見たときは、
と感心しました
このやり方はぼくじゃ何日考えても思いつかんわって(^ω ^ ;)
それもあって、numpy配列でもfor使わずにいけるんじゃないか?と考えたわけです(´・ω・`)
先に言っとくと、numbaとか使ってfor回せばそっちのほうが簡単で速いです
しかし、頭の体操とnumpyの練習も兼ねてやってみました
numpy縛りなのでgroupbyが使えないわけですね(^ω^;)
def f2(x):
start, end = (lambda sep: (sep[:-1], sep[1:]))(np.diff(np.concatenate([[0], x, [0]]))!=0)
counts = (lambda cs: cs[end]-cs[start]+x[start])(x.cumsum())
start[0] = False
ret = x.copy()
ret[start] = -(counts + np.where(counts>0, 1, -1))[:-1]
return ret.cumsum()
%time res = f2(wl)
print(res[:20])
※入力と同じサイズの配列を返すのでなく何連続したかわかればいいのなら以下でいける...はず
f = lambda x:(lambda cs,s,e:cs[e]-cs[s]+x[s])(x.cumsum(),*(lambda a:(a[:-1],a[1:]))(np.diff(np.concatenate([[0],x,[0]]))!=0))
%time res = f(wl)
print(res[:20])
print('最大連勝', res.max())
print('最大連敗', abs(res.min()))
import numba
@numba.njit#(cache=True)
def f3(x):
ret = np.zeros_like(x, dtype=x.dtype)
ret[0] = x[0]
for i in range(1, len(x)):
if x[i]==x[i-1]:
ret[i] = ret[i-1]+x[i]
else:
ret[i] = x[i]
return ret
%time res = f3(wl) # 一回目はコンパイルするのでちょっと遅い
%time res = f3(wl)
print(res[:20])
%time a = np.concatenate([[0], wl, [0]])
%time a = np.diff(a)!=0
%time start, end = a[:-1], a[1:]
%time cs = wl.cumsum()
print('↓これがちょっと時間かかる')
%time v1, v2, v3 = cs[end], cs[start], wl[start]
%time count = v1-v2+v3
start[0] = False
result = x.copy()
print('↓これもちょっと時間かかる')
%time result[start] = -(count + np.where(count>0, 1, -1))[:-1]
%time result = result.cumsum()
start, endのbool配列で参照してるとこがそこそこ時間かかってるかんじですね
そして今日閃いた (´・ω・`)!
bool配列でアクセスしてるところを整数のインデックスでアクセスするようにすれば少し速くなるかも
と
def f4(x):
start, end = ((lambda sep: (sep[:-1], sep[1:]-1))
((np.diff(np.concatenate([[0], x, [0]]))!=0).nonzero()[0]))
counts = (lambda cs: cs[end]-cs[start]+x[start])(x.cumsum())
ret = x.copy()
ret[start[1:]] = -(counts + np.where(counts>0, 1, -1))[:-1]
return ret.cumsum()
print('元のほう')
%time res1 = f2(wl)
print('整数のインデックスで取得するようにしたほう')
%time res2 = f4(wl)
print((res1!=res2).sum())
少し、速くはなった・・・・・・(´・ω・`)
そういや、以下の計算でだいたい合うか?(´・ω・`)
import math
# なんか本で読んだ方法
def estimate_consecutive_lose(x, p=None):
av = (x==1).sum() / len(x)
if p is None: p = 1/len(x)
return math.log(p) / math.log(1-av)
res = []
for i in range(100):
x = np.random.randn(10000)
wl = np.where(x>=0, 1, -1)
cl = abs(f3(wl).min())
ecl = estimate_consecutive_lose(wl)
res.append([cl, ecl, (wl==1).sum()/len(wl)])
print(pd.DataFrame(res, columns=['cl', 'ecl', 'av']).mean())
たぶん、勝ち負けの結果が独立してるならけっこう近い値になるはず?