In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

この前、バックテストの結果のnumpy配列から
連敗数を求める機会があったのでそのときに書いたコードです(´・ω・`)

In [2]:
np.random.seed(0)
x = np.random.randint(-1000, 1001, size=10000000)

print('こんなトレード結果があったとして')
print('profit', x[:20])
plt.plot(x.cumsum()); plt.show()

print('勝ちを 1 負けを -1 であらわす配列にして')
wl = np.where(x>=0, 1, -1)
print('win lose', wl[:20])
こんなトレード結果があったとして
profit [-316 -441  653  216 -165 -237  731  383   33  747 -723  778  828 -401   94
  496 -400  420 -686 -295]
勝ちを 1 負けを -1 であらわす配列にして
win lose [-1 -1  1  1 -1 -1  1  1  1  1 -1  1  1 -1  1  1 -1  1 -1 -1]

↑この配列から、
[-1, -2, 1, 2, -1, -2, 1, 2, 3, 4, -1 ...]
といった、何連勝目か、何連敗目か、を知るための配列を作りたかったわけです(´・ω・`)

for文を使ってやる

In [3]:
def f1(x):
    l = [x[0]]
    prev = l[-1]
    for v in x[1:]:
        l.append(l[-1]+v if v==prev else v)
        prev = v
    return np.array(l)

%time res = f1(wl)
print(res[:20])
Wall time: 4.13 s
[-1 -2  1  2 -1 -2  1  2  3  4 -1  1  2 -1  1  2 -1  1 -1 -2]

for文を使えば簡単なことなんですが、
以前、pandasのSeriesからこれを求めたくて、しばらく悩んでいい方法が思いつかず、
結局、諦めてfor文で計算したということがありました(´・ω・`)

ここに、

https://stackoverflow.com/questions/27626542/counting-consecutive-positive-value-in-python-array

y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)

こんなコードがあって、こんなやり方を見たときは、

すっごいね!まほうみたい!!

と感心しました
このやり方はぼくじゃ何日考えても思いつかんわって(^ω ^ ;)

それもあって、numpy配列でもfor使わずにいけるんじゃないか?と考えたわけです(´・ω・`)

先に言っとくと、numbaとか使ってfor回せばそっちのほうが簡単で速いです
しかし、頭の体操とnumpyの練習も兼ねてやってみました
numpy縛りなのでgroupbyが使えないわけですね(^ω^;)

In [4]:
def f2(x):
    start, end = (lambda sep: (sep[:-1], sep[1:]))(np.diff(np.concatenate([[0], x, [0]]))!=0)
    counts = (lambda cs: cs[end]-cs[start]+x[start])(x.cumsum())
    start[0] = False
    ret = x.copy()
    ret[start] = -(counts + np.where(counts>0, 1, -1))[:-1]
    return ret.cumsum()

%time res = f2(wl)
print(res[:20])
Wall time: 464 ms
[-1 -2  1  2 -1 -2  1  2  3  4 -1  1  2 -1  1  2 -1  1 -1 -2]

※入力と同じサイズの配列を返すのでなく何連続したかわかればいいのなら以下でいける...はず

In [5]:
f = lambda x:(lambda cs,s,e:cs[e]-cs[s]+x[s])(x.cumsum(),*(lambda a:(a[:-1],a[1:]))(np.diff(np.concatenate([[0],x,[0]]))!=0))

%time res = f(wl)
print(res[:20])

print('最大連勝', res.max())
print('最大連敗', abs(res.min()))
Wall time: 307 ms
[-2  2 -2  4 -1  2 -1  2 -1  1 -2  1 -3  1 -1  1 -4  1 -3  3]
最大連勝 22
最大連敗 23

頭の体操も兼ねてと思って手をつけたわけですが

上の数行を考えるのに

恥ずかしくて言えないくらい時間かかった疲れた(´・ω・`)

In [ ]:
 

numba

In [6]:
import numba
@numba.njit#(cache=True)
def f3(x):
    ret = np.zeros_like(x, dtype=x.dtype)
    ret[0] = x[0]
    for i in range(1, len(x)):
        if x[i]==x[i-1]:
            ret[i] = ret[i-1]+x[i]
        else:
            ret[i] = x[i]
    return ret
In [7]:
%time res = f3(wl) # 一回目はコンパイルするのでちょっと遅い
%time res = f3(wl)
print(res[:20])
Wall time: 333 ms
Wall time: 58 ms
[-1 -2  1  2 -1 -2  1  2  3  4 -1  1  2 -1  1  2 -1  1 -1 -2]

↑ こっちのほうがだいぶはやいすね(´・ω・`;)

In [ ]:
 

ついでにどこが遅いのか見てみます

In [8]:
%time a = np.concatenate([[0], wl, [0]])
%time a = np.diff(a)!=0
%time start, end = a[:-1], a[1:]
%time cs = wl.cumsum()
print('↓これがちょっと時間かかる')
%time v1, v2, v3 = cs[end], cs[start], wl[start]

%time count = v1-v2+v3

start[0] = False
result = x.copy()
print('↓これもちょっと時間かかる')
%time result[start] = -(count + np.where(count>0, 1, -1))[:-1]
%time result = result.cumsum()
Wall time: 21 ms
Wall time: 40 ms
Wall time: 0 ns
Wall time: 33 ms
↓これがちょっと時間かかる
Wall time: 178 ms
Wall time: 21 ms
↓これもちょっと時間かかる
Wall time: 103 ms
Wall time: 38 ms

start, endのbool配列で参照してるとこがそこそこ時間かかってるかんじですね

そして今日閃いた  (´・ω・`)!
bool配列でアクセスしてるところを整数のインデックスでアクセスするようにすれば少し速くなるかも

In [9]:
def f4(x):
    start, end = ((lambda sep: (sep[:-1], sep[1:]-1))
                  ((np.diff(np.concatenate([[0], x, [0]]))!=0).nonzero()[0]))
    counts = (lambda cs: cs[end]-cs[start]+x[start])(x.cumsum())
    ret = x.copy()
    ret[start[1:]] = -(counts + np.where(counts>0, 1, -1))[:-1]
    return ret.cumsum()

print('元のほう')
%time res1 = f2(wl)
print('整数のインデックスで取得するようにしたほう')
%time res2 = f4(wl)

print((res1!=res2).sum())
元のほう
Wall time: 466 ms
整数のインデックスで取得するようにしたほう
Wall time: 363 ms
0

少し、速くはなった・・・・・・(´・ω・`)

In [ ]:
 

そういや、以下の計算でだいたい合うか?(´・ω・`)

In [10]:
import math

# なんか本で読んだ方法
def estimate_consecutive_lose(x, p=None):
    av = (x==1).sum() / len(x)
    if p is None: p = 1/len(x)
    return math.log(p) / math.log(1-av)

res = []
for i in range(100):
    x = np.random.randn(10000)
    wl = np.where(x>=0, 1, -1)
    cl = abs(f3(wl).min())
    ecl = estimate_consecutive_lose(wl)
    res.append([cl, ecl, (wl==1).sum()/len(wl)])

print(pd.DataFrame(res, columns=['cl', 'ecl', 'av']).mean())
cl     12.510000
ecl    13.283883
av      0.500146
dtype: float64

たぶん、勝ち負けの結果が独立してるならけっこう近い値になるはず?

In [ ]:
 
In [ ]: