import os
import sys
import time
import datetime
import struct
import calendar
from dateutil.parser import parse
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
通常ヒストリのフォルダはroamingフォルダの中だったと思いますが 僕は /portable をつけて起動しているのでroamingじゃないとこに ヒストリフォルダがあるのでこんな感じのパスになってます
HST_DIR = r'D:\MT4\FXCM MetaTrader 4\history\FXCM-JPYDemo01'
__init__()にヒストリファイルのパスを渡して読み込む感じで使うようにしました
class MT4hstReader(object):
"""mt4 hst reader
arg: hst file path
"""
def __init__(self, path=None):
self.HEADER_SIZE = 148
self.BAR_BYTE = 60
self.headerdict_keys = [
'var', 'copyright', 'symbol',
'period', 'digits', 'timesign',
'last_sync', 'unsused'
]
self.filepath = path
if self.filepath: self._init()
def set_path(self, path):
"""set file path & read info"""
self.filepath = path
self._init()
def print_fileinfo(self):
"""print header info"""
print '\n'.join(['{}: {}'.format(k, self.header_dict[k])
for k in self.headerdict_keys])
print (
'filesize: {}byte\nbars: {}\n'
'firstbartime: {} [{}]\nlastbartime: {} [{}]'
).format(
self.filesize, self.bars,
self.firstbartime,
datetime.datetime.utcfromtimestamp(self.firstbartime),
self.lastbartime,
datetime.datetime.utcfromtimestamp(self.lastbartime)
)
def hst_symbol(self):
"""str hst symbol"""
return self.header_dict['symbol']
def hst_period(self):
"""int hst period"""
return int(self.header_dict['period'])
def hst_digits(self):
"""int hst digits"""
return int(self.header_dict['digits'])
def get_data(self, n=0, idx=0, mt4index=False, method='py'):
"""get hst data
arg:
n: read bars
idx: readidx mt4timeseries index
mt4index: ret index type, True=mt4timeseries
method: 'py':pure python 'np':numpy 'df':pd.DataFrame
ret:
MqlRates[idx+n-1] ~ MqlRates[idx]
py: [(int, float, float, float, float, int)]
np: [('t', '<i8'), ('o', '<f8'), ('h', '<f8'),
('l', '<f8'), ('c', '<f8'), ('v', '<i8')]
df: float64
"""
if idx>self.bars-1 or idx<0:
print 'idx out of range'
return None
if n==0 or n>self.bars-idx:
n = self.bars-idx
readstart_pos = self.HEADER_SIZE + (self.bars-n-idx)*self.BAR_BYTE
rates = None
with open(self.filepath, 'rb') as f:
if method!='py':
# np or df
dtype = np.dtype([('t','i8'), ('o','f8'), ('h','f8'), ('l','f8'),
('c','f8'), ('v','i8'), ('s','i4'), ('r','i8')])
dtype_tohlcv = np.dtype([('t','i8'), ('o','f8'), ('h','f8'),
('l','f8'), ('c','f8'), ('v','i8')])
rates = np.frombuffer(f.read(), dtype=dtype, count=n,
offset=readstart_pos).astype(dtype_tohlcv)
if mt4index: rates=rates[::-1]
if method=='df': rates=DataFrame(rates)
else:
# pure python
f.seek(readstart_pos, os.SEEK_SET)
# rates = [self._unpack_bar_data(f.read(self.BAR_BYTE))
# for _ in xrange(n)]
# 一旦変数に入れてmapで処理(上のより少し速いけど余分にメモリを食う?)
data = f.read(self.BAR_BYTE*n)
rates = map(self._unpack_bar_data, [data[i:i+self.BAR_BYTE]
for i in xrange(0, len(data), self.BAR_BYTE)])
if mt4index: rates.reverse()
return rates
def get_data_dt(self, startdate=None, enddate=None, mt4index=False, method='py'):
"""get hst data
arg: startdate, enddate: str or datetime.datetime or unixepoch
"""
st, ed = self._get_idx_tuple(startdate, enddate)
return self.get_data(st-ed+1, ed, mt4index, method)
def _get_idx_tuple(self, startdate=None, enddate=None):
"""
指定された期間のインデックスのタプルを返す
はじめにおおよその位置にシークしてそこからwhileで探す
argtypes:
str or datetime.datetime or unixepoch
ret: (startdate_idx, enddate_idx)
example:
call: ('2015-01-01', 2015-12-31)
ret: (350, 100)
memo:
MT4の時系列配列のインデックスに見立てて計算している
"""
def to_epoch(date):
if isinstance(date, str):
return calendar.timegm(parse(date).timetuple())
elif isinstance(date, datetime.datetime):
return calendar.timegm(date.timetuple())
elif isinstance(date, int):
return date
st_epoch = self.firstbartime if startdate is None else to_epoch(startdate)
ed_epoch = self.lastbartime if enddate is None else to_epoch(enddate)
if st_epoch is None or ed_epoch is None:
if not st_epoch: print '_get_idx_tuple() arg err!!! check startdate'
if not ed_epoch: print '_get_idx_tuple() arg err!!! check enddate'
return None
if st_epoch>ed_epoch:
st_epoch, ed_epoch = ed_epoch, st_epoch
st_epoch = max(self.firstbartime, min(self.lastbartime, st_epoch))
ed_epoch = max(self.firstbartime, min(self.lastbartime, ed_epoch))
min_idx = 0
max_idx = self.bars-1
timerange = float(self.lastbartime-self.firstbartime)
st_abaut_idx = int((self.lastbartime-st_epoch)/timerange*(max_idx))
ed_abaut_idx = int((self.lastbartime-ed_epoch)/timerange*(max_idx))
def read_datetime(f):
# mt4のdatetime8byteを読んでseek位置を戻す
t = struct.unpack('Q', f.read(8))[0]
f.seek(-8, os.SEEK_CUR)
return t
def index_seek(f, i):
# MT4時系列配列のindexの値でseekする
i = max(min_idx, min(max_idx, i))
f.seek(-self.BAR_BYTE-(i*self.BAR_BYTE), os.SEEK_END)
def pos_to_index(f):
# 現在の位置をmt4の時系列配列のindexに変換
return (self.filesize-f.tell())/self.BAR_BYTE-1
def search(f, search_t, method='l'):
# methodはちょうどの時刻がないとき左右どっちの足のindexを返すか('l' or 'r')
idx = pos_to_index(f)
t = read_datetime(f)
timer = time.clock()
# ストレージの種類によりけりだと思うけど異様に遅かったら中止
# プログラムのミスのせいも考えられる
limit_sec = 10
while 1:
if t==search_t: return idx
idx += 1 if t>search_t else -1
if idx<=min_idx: return min_idx
if idx>=max_idx: return max_idx
pre_t = t
index_seek(f, idx)
t = read_datetime(f)
if min(t, pre_t) < search_t < max(t, pre_t):
if t>pre_t: return idx+1 if method=='l' else idx
if t<pre_t: return idx if method=='l' else idx-1
if time.clock()>timer+limit_sec:
print 'time over!!! search() bug???'
return None
with open(self.filepath, 'rb') as f:
index_seek(f, st_abaut_idx)
st_idx = search(f, st_epoch, 'r')
index_seek(f, ed_abaut_idx)
ed_idx = search(f, ed_epoch, 'l')
return st_idx, ed_idx
return None
def _unpack_bar_data(self, data):
"""ret: tuple(datetime, o, h, l, c, v)"""
return struct.unpack('QddddQQL', data)[:-2]
def _unpack_header_data(self, data):
"""set headerinfo dict"""
readsize = [4, 64, 12, 4, 4, 4, 4, 52]
unpackcmd = ['L', '64s', '12s', 'L', 'L', 'L', 'L', '13L']
dic = {}
read = 0
for k,rs,c in zip(self.headerdict_keys, readsize, unpackcmd):
dic[k] = struct.unpack(c, data[read:read+rs])[0]
read += rs
return dic
def _init(self):
"""read hstfile info
set: filesize, header, firstbartime, lastbartime, bars
"""
if not os.path.exists(self.filepath):
print 'file not found'
return None
self.filesize = os.path.getsize(self.filepath)
self.bars = (self.filesize-self.HEADER_SIZE)/self.BAR_BYTE
with open(self.filepath, 'rb') as f:
self.header_dict = self._unpack_header_data(f.read(self.HEADER_SIZE))
self.firstbartime = self._unpack_bar_data(f.read(self.BAR_BYTE))[0]
f.seek(-self.BAR_BYTE, os.SEEK_END)
self.lastbartime = self._unpack_bar_data(f.read(self.BAR_BYTE))[0]
filename = os.path.join(HST_DIR, 'USDJPY60.hst')
hstreader = MT4hstReader(filename)
# hstのヘッダ情報を見てみる
hstreader.print_fileinfo()
type_pylist = hstreader.get_data(n=10)
print type(type_pylist)
for i in type_pylist:
print i
# method='np'で読み込んでみる 本数が多いとこっちが速いぽい
type_np = hstreader.get_data(n=10, method='np')
print type(type_np)
for i in type_np:
print i
# method='df'で読み込んでみる
type_df = hstreader.get_data(n=10, method='df')
type_df
文字列を渡せば日付文字列としてパースして処理する
dateutil.parserでパースできる文字列ならOKになってるはず
datetime.datetimeオブジェクトを渡してもいい
整数が渡されたらMT4のdatetimeと同様に扱う
# 日付文字列で読み込む
for i in hstreader.get_data_dt(startdate='2016-01-01')[:5]:
print i
print '-'*60
# datetime.datetimeで読み込む
for i in hstreader.get_data_dt(datetime.datetime(2016, 1, 1))[:5]:
print i
print '-'*60
# epoch秒で読み込む
t = calendar.timegm(datetime.datetime(2016, 1, 1).timetuple())
print 't={}'.format(t)
for i in hstreader.get_data_dt(t)[:5]:
print i
# 1/1を指定してるけどその日はデータがないので先頭は2日かなんかのデータになってると思います
__init__()にヒストリフォルダのパスを渡すようにして使うようにしました
データの読み込み自体は上で定義したMT4hstReaderを使います
class MT4hstDF(object):
"""mt4 hst to pd.DataFrame"""
def __init__(self, hstdirpath=None):
self.hstdirpath = hstdirpath
self.hstreader = MT4hstReader()
def set_hstdir(self, path):
"""set hst dir path"""
self.hstdirpath = path
def get_df(self, symbollist, tf, offsethour=0,
h1_to_d1=False, summertime=False, n=0, idx=0,
startdate=None, enddate=None, collist=None):
"""
get multicolumns dataframe
args:
symbollist: list
tf: int
offsethour: int
h1_to_d1: bool
summertime: bool
n: int
idx: int
startdate, enddate: datetime or str or int_unixepoch
ret:
pd.DataFrame
index: pandas.tseries.index.DatetimeIndex
collist: pandas.core.index.MultiIndex
ret example:
-----------------------------------------...
| |EURUSD |GBPUSD ...
|----------------------------------------...
| |o |h |l |c |v |o |h |...
|----------------------------------------...
|t | | | | | | | |...
|----------------------------------------...
|2016-02-25 |val|val|val|val|val|val|val|....
|----------------------------------------...
|2016-02-26 |val|val|val|val|val|val|val|...
|----------------------------------------...
|2016-02-27 |val|val|val|val|val|val|val|...
|----------------------------------------...
example:
MT4hstDF_object.get_df(
['EURUSD', 'GBPUSD', 'USDJPY'], 60,
startdate='2015-01-01', enddate='2015-12-31')
"""
if collist is None:
collist = 'o h l c v'.split()
h2d = False
if tf==1440 and h1_to_d1:
h2d = True
datalist = []
for symbol in symbollist:
filepath = os.path.join(self.hstdirpath,
symbol+str(60 if h2d else tf)+'.hst')
if not os.path.exists(filepath):
continue
self.hstreader.set_path(filepath)
if startdate is not None or enddate is not None:
df = self.hstreader.get_data_dt(method='df',
startdate=startdate,
enddate=enddate)
else:
df = self.hstreader.get_data(method='df', n=n, idx=idx)
df = df.set_index(pd.to_datetime(df['t'], unit='s')
).drop('t', axis=1)[collist]
if offsethour:
df.index += pd.tseries.offsets.Hour(offsethour) # adj index
if h2d:
df=self.__h1df_to_d1(df, summertime, collist) # h1tod1
df.columns = pd.MultiIndex.from_product([symbol, df.columns])
datalist.append(df)
return pd.concat(datalist, axis=1)
def __h1df_to_d1(self, df, summertime, collist):
"""get_df sub routine"""
w, s = 7, 7
if summertime: s-=1
df.index = self.__hour_shift(df.index, -w, -s)
ohlc_dict = {'o':'first', 'h':'max', 'l':'min', 'c':'last', 'v':'sum'}
ohlc_dict = {k:v for k,v in ohlc_dict.items() if k in collist}
df = df.resample('D', how=ohlc_dict).dropna()[collist]
df.index = self.__hour_shift(df.index, w, s)
return df
def __hour_shift(self, index, w, s):
"""get_df sub routine"""
st_date_dic = {y:self.__get_summertime_tuple(y, index.tz)
for y in range(index[0].year, index[-1].year+1)}
def f(date):
st, ed = st_date_dic[date.year]
return (date+datetime.timedelta(hours=s) if ed>=date>=st
else date+datetime.timedelta(hours=w))
return map(f, index)
def __get_summertime_tuple(self, y, tz):
"""get_df sub routine"""
mn3 = pd.date_range('{}-03-01'.format(y),
'{}-04-01'.format(y), tz=tz)[:-1]
mn11 = pd.date_range('{}-11-01'.format(y),
'{}-12-01'.format(y), tz=tz)[:-1]
def f(mn):
month = mn[0].month
sdcnt = 0
for d, wd in zip(mn, map(lambda x: x.weekday() ,mn)):
if month==3:
if wd==6: sdcnt += 1
if sdcnt==2: return d # 3月第2日曜日
else:
if wd==6: return d-datetime.timedelta(days=1) # 11月第1日曜日
return f(mn3), f(mn11)
dfreader = MT4hstDF(HST_DIR)
"""
def get_df(self, symbollist, tf, offsethour=0,
h1_to_d1=False, summertime=False, n=0, idx=0,
startdate=None, enddate=None, collist=None)
symbollisetは通貨ペア名のリスト
dfはタイムフレーム 1,5,15,30,60,240,1440 のような整数を渡します
offsethourはdatetime.indexの調整 何時間ずらすか入れる
h1_to_d1はTrueでH1のデータから日足データを作る
日足が6本のとことかだと微妙なのでH1から日足を作るとき使う
summertimeは夏時間の部分をずらすときのために作ったような気がするがよく覚えてない(´・ω・`)
n=0, idx=0, startdate=None, enddate=None
は本数で読み込むか、日付文字列やdatetime.datetimeで読み込むかの引数で好きなほうを使う
collistは読み込む列デフォルトは t o h l c v を読み込む
"""
symbols = ['EURUSD', 'GBPUSD', 'USDJPY',
'EURJPY', 'GBPJPY', 'EURGBP',
'AUDUSD', 'USDCHF', 'USDCAD']
df = dfreader.get_df(symbols, 60, startdate='2010-01-01')
print df.columns
print df.shape
df.tail()
# 曜日別、時間別の値幅をプロットしてみる(´・ω・`)
hl = DataFrame()
for symbol in df.columns.levels[0]:
hl = pd.concat(
[hl, DataFrame((df[symbol]['h']-df[symbol]['l'])/df[symbol]['o']*100, columns=[symbol])],
axis=1
)
wdays = np.array('mon tue wed thu fri sat sun'.split())
hl['hour'] = hl.index.hour
hl['weekday'] = wdays[hl.index.weekday]
gbw = hl.groupby('weekday')
gbh = hl.groupby('hour')
gbwm = gbw.mean().loc[wdays[:5]]
gbhm = gbh.mean()
gbwm[[i for i in gbwm.columns if i!='hour']].plot(figsize=(15,4), style='-o')
gbhm.plot(figsize=(15,4), style='-o')
d1 = dfreader.get_df(symbols, 1440, startdate='2013-01-01', collist=['c'])
d1.tail()
(1+d1.pct_change()).fillna(1).cumprod().plot(figsize=(15,5))
# ヒストリカルボラティリティ 1行でかけると気持ちいい(´・ω・`)
if pd.__version__>='0.18.0':
(np.log1p((d1.pct_change()).fillna(0)).rolling(20).std()*np.sqrt(250)).plot(figsize=(15,5))
else:
(pd.rolling_std(np.log1p((d1.pct_change()).fillna(0)), 20)*np.sqrt(250)).plot(figsize=(15,5))
def get_straight(symbol, df):
df_symbols = df.columns.levels[0]
if symbol in df_symbols:
return df[symbol]['c']
cross_jpy = symbol.replace('USD', '')+'JPY'
if cross_jpy in df_symbols:
if symbol.index('USD')==3:
return df[cross_jpy]['c'] / df['USDJPY']['c']
if symbol.index('USD')==0:
return df['USDJPY']['c'] / df[cross_jpy]['c']
print 'err'
def get_cross(symbol, straight_dic):
st_base = {k:v for k,v in straight_dic.items() if symbol[:3] in k}
st_quote = {k:v for k,v in straight_dic.items() if symbol[3:] in k}
b_usd_idx = st_base.keys()[0].index('USD')
q_usd_idx = st_quote.keys()[0].index('USD')
if b_usd_idx==3 and q_usd_idx==3:
return st_base.values()[0] / st_quote.values()[0]
elif b_usd_idx==3 and q_usd_idx==0:
return st_base.values()[0] * st_quote.values()[0]
elif b_usd_idx==0 and q_usd_idx==0:
return st_quote.values()[0] / st_base.values()[0]
print 'err'
def get_KuChart(df, currencys=None, multicols=False):
default = 'EUR GBP AUD NZD USD CAD CHF JPY'.split()
currencys = [c for c in default if c in currencys] if currencys else default
usd_idx = currencys.index('USD')
straight_symbols = [c+'USD' if currencys.index(c)<usd_idx else 'USD'+c
for c in currencys if c!='USD']
cross_symbols = [currencys[i]+currencys[j]
for i in range(len(currencys))
for j in range(i+1, len(currencys))
if not 'USD' in currencys[i]+currencys[j]]
straight_close_dic = {s:get_straight(s, df) for s in straight_symbols}
cross_close_dic = {s:get_cross(s, straight_close_dic) for s in cross_symbols }
price_dic = dict(straight_close_dic, **cross_close_dic)
price_dic = dict([[k,np.log1p(v.pct_change().fillna(0))] for k,v in price_dic.items()])
f = lambda keys,d,c:sum(d[k] if k.index(c)==0 else -d[k] for k in keys)/len(keys)
ku = DataFrame({c: f([k for k in price_dic.keys() if c in k], price_dic, c)
for c in currencys})
if multicols:
ku.columns = pd.MultiIndex.from_product(['ku', ku.columns])
return ku
ku = get_KuChart(d1, 'EUR GBP AUD USD CAD CHF JPY'.split())
ku.tail()
ku_clr_dic = {'EUR':'Red', 'GBP':'Lime', 'AUD':'RoyalBlue', 'NZD':'Violet',
'USD':'Orange', 'CAD':'BlueViolet', 'CHF':'Gray', 'JPY':'Turquoise'}
params = {'figsize':(15,5), 'color':[ku_clr_dic[c] for c in ku.columns]}
ku.plot(**params)
ku.cumsum().plot(**params)
ku_y = pd.concat([ku[str(i)].cumsum() for i in sorted(list(set(ku.index.year)))])
ku_y.plot(lw=1, **params)
p = max(ku.quantile(0.995).max(), -ku.quantile(0.05).min())
xlim = (-p, p)
ku.plot(kind='kde', lw=1, xlim=xlim, **params)
p = ku.abs().quantile(0.99).max()
ku.abs().plot(kind='kde', lw=1, xlim=(0,p), **params)
sns.pairplot(ku)
n = 30000
data = DataFrame(
(np.random.randn(n)/1000).cumsum()+100,
index=pd.date_range(end=datetime.datetime.now().date(),periods=n, freq='s')
)
data.plot(figsize=(15,5))
def candle(df, size=(15,5)):
# ローソク足を表示する 重い
from matplotlib.finance import candlestick_ohlc
df = df.copy()
plt.figure(figsize=size)
df['idx'] = range(df.index.size)
ax = plt.subplot()
candlestick_ohlc(ax, df['idx o h l c'.split()].values, width=0.8, colorup='forestgreen', colordown='brown')
labelstep = max([1, int(df.index.size/15)])
ax.set_xticks(df['idx'][::labelstep])
ax.set_xticklabels(map(lambda x: str(x)[:10], df.index[::labelstep]), rotation=45, fontsize=8)
ax.set_xlim(left=0)
df.drop('idx', axis=1, inplace=True)
ohlc_data = data.resample('5min').agg('ohlc')
ohlc_data.columns = 'o h l c'.split()
candle(ohlc_data)