diff --git a/DataLoader.py b/DataLoader.py index 9d9052b..8491358 100644 --- a/DataLoader.py +++ b/DataLoader.py @@ -17,7 +17,7 @@ # symbols = all_active_listings['symbol'].unique() #for testing -symbols = ['IBM', 'MSFT', 'FB', 'AAPL', 'QQQ', 'AAP', 'ATHM', 'VIPS'] +symbols = ['IBM', 'MSFT', 'FB'] #returns a generator, so the calls don't happen until 'write_alpha_results' is called stock_data = au.get_alpha_stock_data( @@ -28,17 +28,18 @@ max_threads = 7 ) -sma_data = au.get_alpha_technical_data( - function = 'SMA', - symbols = symbols, +technical_data = au.get_alpha_technical_data( + functions = [ + 'SMA', 'EMA', 'MACD', 'STOCH', 'RSI', 'BBANDS' + ], + symbols = symbols, + api_key = api_key, interval = 'daily', time_period = 60, series_type = 'close', - api_key = api_key, max_threads = 7 ) - au.write_alpha_results( results = stock_data, symbols = symbols, @@ -46,8 +47,7 @@ ) au.write_alpha_results( - results = sma_data, + results = technical_data, symbols = symbols, dest_path = "technical_data/" ) - diff --git a/alpha_utils.py b/alpha_utils.py index 0341db4..db8c2a4 100644 --- a/alpha_utils.py +++ b/alpha_utils.py @@ -68,15 +68,33 @@ def get_alpha_listings( df = alpha_csv_to_dataframe(response) return df -def alpha_csv_to_dataframe(response): - decoded_content = response.content.decode('utf-8') - cr = csv.reader(decoded_content.splitlines(), delimiter=',') - df = pd.DataFrame(cr) - header, df = df.iloc[0], df[1:] - df.columns = header - df.reset_index(drop = True, inplace=True) - df.columns.name = None - return df +def alpha_csv_to_dataframe(responses): + output = [] + if isinstance(responses, list) != True: + responses = [responses] + for response in responses: + decoded_content = response.content.decode('utf-8') + cr = csv.reader(decoded_content.splitlines(), delimiter=',') + df = pd.DataFrame(cr) + header, df = df.iloc[0], df[1:] + df.columns = header + df.reset_index(drop = True, inplace=True) + df.columns.name = None + output.append(df) + return output + +def request_alpha_data(urls) -> requests.Response: + session = requests.Session() + retry = Retry( + connect = CONNECT_RETRIES, + backoff_factor = BACKOFF_FACTOR + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('https://', adapter) + if isinstance(urls, list): + return [session.get(url) for url in urls] + else: + return session.get(urls) ############################################################################################### @@ -85,17 +103,18 @@ def get_alpha_stock_data( base_url: str = ALPHA_BASE_URL, output_size: str = 'compact', max_threads: int = 5 ) -> Iterator: - """Multi-threaded function for getting stock data from Alpha Vantage API + """Multi-threaded function for getting stock price data from Alpha Vantage API. + This wrapper is for the functions in "Stock Time Series" (see AV API docs) Parameters ----------- - base_url: str - the alpha vantage URL for the API function: str - the 'function' or endpoint you want to call. See AV API docs for more info. + A function or endpoint you want to call. See AV API docs for more info. symbols: Iterable An iterable objects of stock ticker symbols (strings) api_key: str The Alpha Vantage API key + base_url: str + the alpha vantage URL for the API output_size: str 'compact' or 'full'. See AV API docs for more info max_threads: int @@ -114,29 +133,29 @@ def get_alpha_stock_data( '&datatype=', data_type ) urls.append(''.join(map(str, sequence))) - urls executor = ThreadPoolExecutor(max_threads) for result in executor.map(request_alpha_data, urls): yield alpha_csv_to_dataframe(result) def get_alpha_technical_data( - function: str, symbols: Iterable, api_key: str, + functions: Iterable, symbols: Iterable, api_key: str, base_url: str = ALPHA_BASE_URL, interval: str = 'daily', time_period: int = 60, series_type: str = 'close', max_threads: int = 5 ) -> Iterator: """Multi-threaded function for getting technical data from Alpha Vantage API + This wrapper is for the functions in "Technical Indicators" (see AV API docs). Parameters ----------- - base_url: str - the alpha vantage URL for the API - function: str - the 'function' or endpoint you want to call. See AV API docs for more info. + functions: Iterable + A list of functions or endpoints you want to call. See AV API docs for more info. symbols: Iterable An iterable objects of stock ticker symbols (strings) api_key: str The Alpha Vantage API key + base_url: str + the alpha vantage URL for the API interval: str 1min, 5min, 15min, 30min, 60min, daily, weekly, monthly. time_period: int @@ -153,37 +172,34 @@ def get_alpha_technical_data( data_type = 'csv' #most memory efficient urls = [] for symbol in symbols: - sequence = ( - base_url, 'function=', function, '&symbol=', symbol, - '&interval=', interval, '&time_period=', time_period, - '&series_type=', series_type, '&apikey=', api_key, '&datatype=', data_type - ) - urls.append(''.join(map(str, sequence))) - urls + url_list = [] + for function in functions: + if function == 'VWAP' and interval not in ['daily', 'weekly', 'monthly']: + raise ValueError( + 'VWAP is only available for intraday time series intervals' + ) + sequence = ( + base_url, 'function=', function, '&symbol=', symbol, + '&interval=', interval, '&time_period=', time_period, + '&series_type=', series_type, '&apikey=', api_key, '&datatype=', data_type + ) + url_list.append(''.join(map(str, sequence))) + urls.append(url_list) executor = ThreadPoolExecutor(max_threads) - for result in executor.map(request_alpha_data, urls): - yield alpha_csv_to_dataframe(result) - - -def request_alpha_data(url): - session = requests.Session() - retry = Retry( - connect = CONNECT_RETRIES, - backoff_factor = BACKOFF_FACTOR - ) - adapter = HTTPAdapter(max_retries=retry) - session.mount('https://', adapter) - return session.get(url) + for result_list in executor.map(request_alpha_data, (url_list for url_list in urls)): + yield alpha_csv_to_dataframe(result_list) def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str) -> None: """Writes elements in an Iterator - with the stock ticker as an added column - to a folder as a csv Parameters ----------- results: Iterator - This should be the raw JSON output from the API as an Iterator object, one element for each symbol + This should be the raw output from the API as an Iterator object. + Each element should correspond to a stock symbol in 'symbols' and be a list of DataFrames, + where each DataFrame in that sub-list corresponds to a function in functions. symbols: Iterable - An iterable objects of stock ticker symbols (strings) + An iterable object of stock ticker symbols (strings) Returns -------- pd.DataFrame @@ -192,17 +208,26 @@ def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str) -> """ os.makedirs(dest_path, exist_ok = True) for i, result in enumerate(results): - temp_df = None - if isinstance(result, pd.DataFrame): - temp_df = result + symbol_df = None + if isinstance(result, list) != True: + result = [result] + for j, df in enumerate(result): + if isinstance(df, pd.DataFrame) != True: + raise Exception("stock_data must be an Iterator of pandas DataFrames") + temp_df = df temp_df['symbol'] = symbols[i] temp_df = reorder_last_to_first(temp_df) temp_df = clean_alpha_cols(temp_df) - temp_df.to_csv(os.path.join(dest_path, symbols[i] + '.csv'), index=False) - else: - raise Exception("stock_data must be an Iterator of pandas DataFrames") + if symbol_df is None: + symbol_df = temp_df + else: + symbol_df = pd.merge(symbol_df, temp_df, on = ['symbol', 'timestamp']) + + symbol_df.to_csv(os.path.join(dest_path, symbols[i] + '.csv'), index=False) return None +########################################################################################## + def clean_alpha_cols(df: pd.DataFrame) -> pd.DataFrame: """Cleans up column names coming out of the Alpha Vantage API Parameters @@ -217,6 +242,7 @@ def clean_alpha_cols(df: pd.DataFrame) -> pd.DataFrame: new_cols = [re.sub("[0-9]\\.\s", "", col) for col in new_cols] new_cols = [re.sub("\s", "_", col) for col in new_cols] df.columns = new_cols + df.columns = ['symbol', 'timestamp'] + df.columns.tolist()[2:] return df def reorder_last_to_first(df: pd.DataFrame) -> pd.DataFrame: diff --git a/technical_data/.gitempty b/technical_data/.gitempty new file mode 100644 index 0000000..f4e505f --- /dev/null +++ b/technical_data/.gitempty @@ -0,0 +1 @@ +#placeholder for stock data \ No newline at end of file