Skip to content

Commit

Permalink
added the ability for get_alpha_technical_data to take in a list of f…
Browse files Browse the repository at this point in the history
…unctions instead of just one
  • Loading branch information
stevegbrooks committed Dec 6, 2021
1 parent 03c1693 commit d54f90f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 55 deletions.
16 changes: 8 additions & 8 deletions DataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# symbols = all_active_listings['symbol'].unique()

#for testing
symbols = ['IBM', 'MSFT', 'FB', 'AAPL', 'QQQ', 'AAP', 'ATHM', 'VIPS']
symbols = ['IBM', 'MSFT', 'FB']

#returns a generator, so the calls don't happen until 'write_alpha_results' is called
stock_data = au.get_alpha_stock_data(
Expand All @@ -28,26 +28,26 @@
max_threads = 7
)

sma_data = au.get_alpha_technical_data(
function = 'SMA',
symbols = symbols,
technical_data = au.get_alpha_technical_data(
functions = [
'SMA', 'EMA', 'MACD', 'STOCH', 'RSI', 'BBANDS'
],
symbols = symbols,
api_key = api_key,
interval = 'daily',
time_period = 60,
series_type = 'close',
api_key = api_key,
max_threads = 7
)


au.write_alpha_results(
results = stock_data,
symbols = symbols,
dest_path = "stock_data/"
)

au.write_alpha_results(
results = sma_data,
results = technical_data,
symbols = symbols,
dest_path = "technical_data/"
)

120 changes: 73 additions & 47 deletions alpha_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,33 @@ def get_alpha_listings(
df = alpha_csv_to_dataframe(response)
return df

def alpha_csv_to_dataframe(response):
decoded_content = response.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
df = pd.DataFrame(cr)
header, df = df.iloc[0], df[1:]
df.columns = header
df.reset_index(drop = True, inplace=True)
df.columns.name = None
return df
def alpha_csv_to_dataframe(responses):
output = []
if isinstance(responses, list) != True:
responses = [responses]
for response in responses:
decoded_content = response.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
df = pd.DataFrame(cr)
header, df = df.iloc[0], df[1:]
df.columns = header
df.reset_index(drop = True, inplace=True)
df.columns.name = None
output.append(df)
return output

def request_alpha_data(urls) -> requests.Response:
session = requests.Session()
retry = Retry(
connect = CONNECT_RETRIES,
backoff_factor = BACKOFF_FACTOR
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
if isinstance(urls, list):
return [session.get(url) for url in urls]
else:
return session.get(urls)

###############################################################################################

Expand All @@ -85,17 +103,18 @@ def get_alpha_stock_data(
base_url: str = ALPHA_BASE_URL,
output_size: str = 'compact', max_threads: int = 5
) -> Iterator:
"""Multi-threaded function for getting stock data from Alpha Vantage API
"""Multi-threaded function for getting stock price data from Alpha Vantage API.
This wrapper is for the functions in "Stock Time Series" (see AV API docs)
Parameters
-----------
base_url: str
the alpha vantage URL for the API
function: str
the 'function' or endpoint you want to call. See AV API docs for more info.
A function or endpoint you want to call. See AV API docs for more info.
symbols: Iterable
An iterable objects of stock ticker symbols (strings)
api_key: str
The Alpha Vantage API key
base_url: str
the alpha vantage URL for the API
output_size: str
'compact' or 'full'. See AV API docs for more info
max_threads: int
Expand All @@ -114,29 +133,29 @@ def get_alpha_stock_data(
'&datatype=', data_type
)
urls.append(''.join(map(str, sequence)))
urls

executor = ThreadPoolExecutor(max_threads)
for result in executor.map(request_alpha_data, urls):
yield alpha_csv_to_dataframe(result)

def get_alpha_technical_data(
function: str, symbols: Iterable, api_key: str,
functions: Iterable, symbols: Iterable, api_key: str,
base_url: str = ALPHA_BASE_URL,
interval: str = 'daily', time_period: int = 60, series_type: str = 'close',
max_threads: int = 5
) -> Iterator:
"""Multi-threaded function for getting technical data from Alpha Vantage API
This wrapper is for the functions in "Technical Indicators" (see AV API docs).
Parameters
-----------
base_url: str
the alpha vantage URL for the API
function: str
the 'function' or endpoint you want to call. See AV API docs for more info.
functions: Iterable
A list of functions or endpoints you want to call. See AV API docs for more info.
symbols: Iterable
An iterable objects of stock ticker symbols (strings)
api_key: str
The Alpha Vantage API key
base_url: str
the alpha vantage URL for the API
interval: str
1min, 5min, 15min, 30min, 60min, daily, weekly, monthly.
time_period: int
Expand All @@ -153,37 +172,34 @@ def get_alpha_technical_data(
data_type = 'csv' #most memory efficient
urls = []
for symbol in symbols:
sequence = (
base_url, 'function=', function, '&symbol=', symbol,
'&interval=', interval, '&time_period=', time_period,
'&series_type=', series_type, '&apikey=', api_key, '&datatype=', data_type
)
urls.append(''.join(map(str, sequence)))
urls
url_list = []
for function in functions:
if function == 'VWAP' and interval not in ['daily', 'weekly', 'monthly']:
raise ValueError(
'VWAP is only available for intraday time series intervals'
)
sequence = (
base_url, 'function=', function, '&symbol=', symbol,
'&interval=', interval, '&time_period=', time_period,
'&series_type=', series_type, '&apikey=', api_key, '&datatype=', data_type
)
url_list.append(''.join(map(str, sequence)))
urls.append(url_list)

executor = ThreadPoolExecutor(max_threads)
for result in executor.map(request_alpha_data, urls):
yield alpha_csv_to_dataframe(result)


def request_alpha_data(url):
session = requests.Session()
retry = Retry(
connect = CONNECT_RETRIES,
backoff_factor = BACKOFF_FACTOR
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
return session.get(url)
for result_list in executor.map(request_alpha_data, (url_list for url_list in urls)):
yield alpha_csv_to_dataframe(result_list)

def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str) -> None:
"""Writes elements in an Iterator - with the stock ticker as an added column - to a folder as a csv
Parameters
-----------
results: Iterator
This should be the raw JSON output from the API as an Iterator object, one element for each symbol
This should be the raw output from the API as an Iterator object.
Each element should correspond to a stock symbol in 'symbols' and be a list of DataFrames,
where each DataFrame in that sub-list corresponds to a function in functions.
symbols: Iterable
An iterable objects of stock ticker symbols (strings)
An iterable object of stock ticker symbols (strings)
Returns
--------
pd.DataFrame
Expand All @@ -192,17 +208,26 @@ def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str) ->
"""
os.makedirs(dest_path, exist_ok = True)
for i, result in enumerate(results):
temp_df = None
if isinstance(result, pd.DataFrame):
temp_df = result
symbol_df = None
if isinstance(result, list) != True:
result = [result]
for j, df in enumerate(result):
if isinstance(df, pd.DataFrame) != True:
raise Exception("stock_data must be an Iterator of pandas DataFrames")
temp_df = df
temp_df['symbol'] = symbols[i]
temp_df = reorder_last_to_first(temp_df)
temp_df = clean_alpha_cols(temp_df)
temp_df.to_csv(os.path.join(dest_path, symbols[i] + '.csv'), index=False)
else:
raise Exception("stock_data must be an Iterator of pandas DataFrames")
if symbol_df is None:
symbol_df = temp_df
else:
symbol_df = pd.merge(symbol_df, temp_df, on = ['symbol', 'timestamp'])

symbol_df.to_csv(os.path.join(dest_path, symbols[i] + '.csv'), index=False)
return None

##########################################################################################

def clean_alpha_cols(df: pd.DataFrame) -> pd.DataFrame:
"""Cleans up column names coming out of the Alpha Vantage API
Parameters
Expand All @@ -217,6 +242,7 @@ def clean_alpha_cols(df: pd.DataFrame) -> pd.DataFrame:
new_cols = [re.sub("[0-9]\\.\s", "", col) for col in new_cols]
new_cols = [re.sub("\s", "_", col) for col in new_cols]
df.columns = new_cols
df.columns = ['symbol', 'timestamp'] + df.columns.tolist()[2:]
return df

def reorder_last_to_first(df: pd.DataFrame) -> pd.DataFrame:
Expand Down
1 change: 1 addition & 0 deletions technical_data/.gitempty
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#placeholder for stock data

0 comments on commit d54f90f

Please sign in to comment.