Code Review #1 - Build Stock Dataset#
이제 코드로 논문을 구현해보도록 하겠습니다. 코드는 [Jansen, 2020]에서 많이 참고하였습니다. 원본 코드를 보고 싶다면 해당 Github Repo를 참고하세요.
1. Import Libraries#
import warnings
warnings.filterwarnings('ignore')
from time import time
from tqdm import tqdm
from pathlib import Path
import pandas as pd
import FinanceDataReader as fdr
import yfinance as yf
2. Define Functions#
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def format_time(t):
"""Return a formatted time string 'HH:MM:SS
based on a numeric time() value"""
m, s = divmod(t, 60)
h, m = divmod(m, 60)
return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>2.0f}'
results_path = Path('KR2_results', 'asset_pricing') # 경로 설정
if not results_path.exists():
results_path.mkdir(parents=True)
chunks
함수는 리스트를n
개씩 나누어주는 함수입니다.format_time
함수는 시간을 시, 분, 초로 나누어주는 함수입니다.데이터를 다운로드 받을 경로를 설정해줍니다. 이 경로는 나중에도 계속 사용되므로 변경되지 않습니다.
3. KRX 종목코드 가져오기#
krx = fdr.StockListing('KRX')
krx = krx[krx["Market"] != "KONEX"]
krx = krx[krx["Market"] != "KOSDAQ"]
krx['Code'] = krx['Code'] + '.' + krx['Market'].apply(lambda x: 'KS')
krx = krx["Code"].to_list()
n = len(krx)
yf_codes = yf.Tickers(krx)
# 출력 예시
print(list(yf_codes.tickers)[:5])
print("KRX 주식 종목 수 :", len(list(yf_codes.tickers)))
['005930.KS', '373220.KS', '000660.KS', '207940.KS', '005490.KS']
KRX 주식 종목 수 : 1000
yfinance 라이브러리에서 KRX Ticker를 추출하는 과정보다 FinanceDataReader 라이브러리에서 KRX Ticker를 추출하고 yfinance로 옮기는 과정이 더 쉬웠습니다. 따라서 FinanceDataReader를 사용해 KRX Ticker를 추출하고 yfinance로 옮기는 과정을 진행하였습니다.
KRX 데이터 중에서는 코스닥과 코넥스도 포함되어 있으므로, 간단한 전처리 절차를 통해 해당 데이터를 거르고 코스피 데이터만 추출하였습니다. 초기에는 코스닥, 코넥스도 같이 포함하였으나 오히려 모델 성능이 하락하는 경향을 보이는 것을 확인하여 제외하였습니다. 이는 한국 시장의 경우 마켓에 대해 더욱 세분화된 데이터를 사용하는 것이 더 좋은 성능을 보인다고 말할 수 있습니다.
원본 논문의 경우 CRSP 데이터베이스를 통해 NYSE, AMEX, NASDAQ의 모든 종목에 대한 1957년 3월부터 2016년 12월까지, 즉 60년 동안의 데이터를 활용했습니다. 역사가 길고 데이터의 양이 많은 미국 시장과 달리 한국 시장은 1990년대 중반부터 데이터가 축적되기 시작했습니다. 만약 코스닥 및 코넥스의 역사가 미국처럼 길었다면, 모델 성능 향상에 기여했을 수도 있을 것입니다.
4. 주식 메타 데이터 수집#
일반적인 방법#
meta_data = []
start = time()
for code in tqdm(krx):
try:
yf_object = yf.Ticker(code)
s = pd.Series(yf_object.get_info())
meta_data.append(s.to_frame(code))
except Exception as e:
print(code, e)
df = pd.concat(meta_data, axis=1).dropna(how='all').T
df = df.apply(pd.to_numeric, errors='ignore')
# 다운로드 받을 주식 데이터의 메타 정보 저장
df.to_hdf(results_path / 'data.h5', 'stocks/info')
# 다운로드 받은 주식 데이터의 메타 정보 불러오기
print("다운로드한 메타데이터의 수 : ", len(df))
df.head(5)
100%|██████████| 1000/1000 [09:24<00:00, 1.77it/s]
병렬로 다운로드 해보기#
from concurrent.futures import ThreadPoolExecutor
def fetch_data(code):
try:
yf_object = yf.Ticker(code)
s = pd.Series(yf_object.get_info())
return s.to_frame(code)
except Exception as e:
print(code, e)
return None
meta_data = []
start = time()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(tqdm(executor.map(fetch_data, krx), total=len(krx)))
# filter out None results
meta_data = [res for res in results if res is not None]
df = pd.concat(meta_data, axis=1).dropna(how='all').T
df = df.apply(pd.to_numeric, errors='ignore')
# Save the stock data
df.to_hdf(results_path / 'data.h5', 'stocks/info')
print("병렬로 다운로드한 메타데이터의 수 : ", len(df))
병렬로 다운로드한 메타데이터의 수 : 959
5. 주식 OHLCV 데이터 수집#
prices_adj = []
start = time()
for i, chunk in enumerate(chunks(krx, 100), 1):
prices_adj.append(yf.download(chunk, period='max', auto_adjust=True).stack(-1))
per_ticker = (time()-start) / (i * 100)
to_do = n - (i * 100)
to_go = to_do * per_ticker
print(f'Success: {len(prices_adj):5,.0f}/{i:5,.0f} | To go: {format_time(to_go)} ({to_do:5,.0f})')
prices_adj = (pd.concat(prices_adj)
.dropna(how='all', axis=1)
.rename(columns=str.lower)
.swaplevel())
prices_adj.index.names = ['ticker', 'date']
[*********************100%***********************] 100 of 100 completed
6 Failed downloads:
['091990.KS', '066970.KS', '035900.KS', '263750.KS', '022100.KS', '247540.KS']: Exception('%ticker%: No timezone found, symbol may be delisted')
Success: 1/ 1 | To go: 00:00:33 ( 900)
[*********************100%***********************] 100 of 100 completed
14 Failed downloads:
['137400.KS', '145020.KS', '039030.KS', '293490.KS', '214450.KS', '036930.KS', '278280.KS', '237690.KS', '196170.KS', '214150.KS', '035760.KS', '067310.KS', '058470.KS', '240810.KS']: Exception('%ticker%: No timezone found, symbol may be delisted')
Success: 2/ 2 | To go: 00:00:28 ( 800)
[*********************100%***********************] 100 of 100 completed
15 Failed downloads:
['141080.KS', '166090.KS', '056190.KS', '195940.KS', '086450.KS', '098460.KS', '348210.KS', '272290.KS', '036830.KS', '003380.KS', '213420.KS', '074600.KS', '084370.KS', '215200.KS', '046890.KS']: Exception('%ticker%: No timezone found, symbol may be delisted')
Success: 3/ 3 | To go: 00:00:24 ( 700)
[*********************100%***********************] 100 of 100 completed
9 Failed downloads:
['084850.KS', '131290.KS', '215000.KS', '243070.KS', '319660.KS', '095610.KS', '091700.KS', '183300.KS', '230360.KS']: Exception('%ticker%: No timezone found, symbol may be delisted')
Success: 4/ 4 | To go: 00:00:21 ( 600)
[*********************100%***********************] 100 of 100 completed
2 Failed downloads:
['267980.KS', '060250.KS']: Exception('%ticker%: No timezone found, symbol may be delisted')
Success: 5/ 5 | To go: 00:00:18 ( 500)
[*********************100%***********************] 100 of 100 completed
Success: 6/ 6 | To go: 00:00:14 ( 400)
[*********************100%***********************] 100 of 100 completed
Success: 7/ 7 | To go: 00:00:11 ( 300)
[*********************100%***********************] 100 of 100 completed
Success: 8/ 8 | To go: 00:00:07 ( 200)
[*********************100%***********************] 100 of 100 completed
Success: 9/ 9 | To go: 00:00:04 ( 100)
[*********************100%***********************] 100 of 100 completed
Success: 10/ 10 | To go: 00:00:00 ( 0)
각 시세 청크에 대해 주식 데이터는 야후 파이낸스의 yf.download
메서드를 사용하여 다운로드됩니다. period='max'
인수는 사용 가능한 최대 기록 데이터를 다운로드하는 것을 나타냅니다. auto_adjust=True
인수는 조정된 가격(분할, 배당금 등에 따라 조정된 가격)을 가져오는 데 사용됩니다. 그런 다음 결과 데이터프레임이 마지막 레벨에 “스택”됩니다(즉, 열이 행으로 변환됨).
모든 데이터가 다운로드되어 prices_adj
목록에 저장되면 다음 작업이 수행됩니다:
가격 조정` 목록의 모든 데이터 청크가 단일 데이터 프레임으로 연결됩니다.
모든
NaN
값을 가진 열(ticker)이 제거됩니다.열 이름은 소문자로 변환됩니다.
다중 인덱스의 레벨이 바뀝니다.
인덱스 이름은
ticker
및date
로 설정됩니다.
결과 prices_adj
데이터 프레임은 주식 시세를 첫 번째 인덱스 수준으로, 날짜를 두 번째 인덱스 수준으로, 다양한 주식 속성(예: ‘시가’, ‘종가’, ‘고가’, ‘저가’ 등)을 나타내는 열을 모두 소문자로 갖게 됩니다.
6. 이상치 제거#
# 이상치 제거
df = prices_adj.close.unstack('ticker')
pmax = df.pct_change().max()
pmin = df.pct_change().min()
to_drop = pmax[pmax > 1].index.union(pmin[pmin<-1].index)
print("Outliers :", len(to_drop))
prices_adj = prices_adj.drop(to_drop, level='ticker')
print("Final using Stock Data :", len(prices_adj.index.unique('ticker')))
# 최종 데이터셋 저장
idx = pd.IndexSlice
prices_adj.sort_index().loc[idx[:, '2000': '2023'], :].to_hdf(results_path / 'data.h5',
'stocks/prices/adjusted')
Outliers : 155
Final using Stock Data : 799
prices_adj
데이터프레임에서 close
열만 선택하고, 이를 ticker
를 기준으로 unstack
하여 df
에 저장합니다. 결과적으로 df
는 날짜별로 각 티커의 종가를 열로 가지게 됩니다.
pct_change()
함수로 df
의 행간의 백분율 변화를 계산합니다. 각 티커의 최대 백분율 상승률(pmax
)과 최대 백분율 하락률(pmin
)을 계산합니다. 1보다 크거나 -1보다 작은 티커들을 이상치로 판단하여 제거할 목록(to_drop
)에 추가하고 이상치로 판단된 티커의 수를 출력한 후, to_drop
목록에 있는 티커들을 prices_adj
데이터프레임에서 제거합니다.
최종적으로 사용되는 티커의 수를 출력하고, prices_adj
데이터프레임을 인덱스를 기준으로 정렬, 2000년부터 2023년까지의 데이터만 선택하여 HDF5 형식으로 저장합니다.
HDFSHierarchical Data Format 형식이 궁금하다면 Pandas의 공식 문서를 참고하세요.