Skip to content

Commit

Permalink
loading big data
Browse files Browse the repository at this point in the history
  • Loading branch information
stevegbrooks committed Dec 18, 2021
1 parent c3bd6bb commit b7cc435
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 70 deletions.
187 changes: 118 additions & 69 deletions DataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import shutil
import random
import os
import time
import numpy

from importlib import reload
import alpha_utils as au
Expand All @@ -16,82 +18,129 @@
tech_output_path = "technical_data"
api_key = au.get_alpha_key('secrets.yml')

############### GET TICKER SYMBOLS ###############
############### GET TICKER SYMBOLS ##############################################

#get all active listings based as of today
all_active_listings = au.get_alpha_listings(api_key)
#only need NYSE and NASDAQ...
all_active_listings = all_active_listings[all_active_listings.exchange.isin(['NYSE', 'NASDAQ'])]
symbols = all_active_listings['symbol'].unique()

#for testing
#symbols = ['IBM', 'MSFT', 'FB', 'AAPL', 'QQQ', 'AAP', 'GSPY', 'GUNR']
rand_sample = random.sample(range(len(symbols)), k = 100)
random.seed(42)
rand_sample = random.sample(range(len(symbols)), k = 2500)
symbols = symbols[rand_sample]

############### GET STOCK DATA ###################

#returns a generator, so the calls don't happen until 'write_alpha_results' is called
stock_data = au.get_alpha_stock_data(
function = 'TIME_SERIES_DAILY_ADJUSTED',
symbols = symbols,
api_key = api_key,
output_size = 'full',
max_threads = 7
)

############### WRITE STOCK DATA #################
technical_data = au.get_alpha_technical_data(
functions = [
'SMA', 'EMA', 'MACD', 'STOCH', 'RSI', 'BBANDS'
],
symbols = symbols,
api_key = api_key,
interval = 'daily',
time_period = 60,
series_type = 'close',
max_threads = 7
)

au.write_alpha_results(
results = stock_data,
symbols = symbols,
dest_path = stock_output_path
)

au.write_alpha_results(
results = technical_data,
symbols = symbols,
dest_path = tech_output_path
)

shutil.make_archive(
base_name = stock_output_path,
format = 'zip',
root_dir = stock_output_path
)

shutil.make_archive(
base_name = tech_output_path,
format = 'zip',
root_dir = tech_output_path
)

############### PRINT RESULTS ###################

#num files
files = [f for f in os.listdir(stock_output_path) if not f.startswith('.')]
print(stock_output_path + "/", "contains", len(files), "files.")

#size of .zip output
zip_size = os.path.getsize(stock_output_path + '.zip')
print("Zipped data size:", round(zip_size / (1024 * 1024), 2), "MB")

#num files
files = [f for f in os.listdir(tech_output_path) if not f.startswith('.')]
print(tech_output_path + "/", "contains", len(files), "files.")

#size of .zip output
zip_size = os.path.getsize(tech_output_path + '.zip')
print("Zipped data size:", round(zip_size / (1024 * 1024), 2), "MB")
numpy.savetxt("symbols.csv", symbols, delimiter=",", fmt = "%s")

print("Total number of symbols:", len(symbols))

#for testing
# symbols = ['IBM', 'MSFT', 'FB', 'AAPL', 'QQQ', 'AAP', 'GSPY', 'GUNR']

BATCH_SIZE = 100

last = 0
for i in range((len(symbols) // BATCH_SIZE) + 1):
left = last
right = left + BATCH_SIZE
print("Getting stock data for symbols {} to {}".format(left, right - 1))
symbols_subset = symbols[left:right]

stock_data = au.get_alpha_stock_data(
function = 'TIME_SERIES_DAILY_ADJUSTED',
symbols = symbols_subset,
api_key = api_key,
output_size = 'full',
max_threads = 7
)

try:

au.write_alpha_results(
results = stock_data,
symbols = symbols_subset,
dest_path = stock_output_path,
columns = ['symbol', 'timestamp', 'adjusted_close', 'volume']
)

zip_file_name = stock_output_path + '_' + str(left) + '_' + str(right - 1)

shutil.make_archive(
base_name = zip_file_name,
format = 'zip',
root_dir = stock_output_path
)

#num files
files = [f for f in os.listdir(stock_output_path) if not f.startswith('.')]
print(stock_output_path + "/", "contains", len(files), "files.")

#size of .zip output
zip_size = os.path.getsize(zip_file_name + '.zip')
print("Zipped data size:", round(zip_size / (1024 * 1024), 2), "MB")

#delete csvs
_ = os.system('rm ' + stock_output_path + '/*.csv')

except:
print("Error writing stock data to file.")
continue
time.sleep(15)
last = right


##############################################################################

BATCH_SIZE = 30

last = 0
for i in range((len(symbols) // BATCH_SIZE) + 1):
left = last
right = left + BATCH_SIZE
print("Getting tech data for symbols {} to {}".format(left, right - 1))
symbols_subset = symbols[left:right]
technical_data = au.get_alpha_technical_data(
functions = [
'EMA', 'MACD', 'STOCH', 'RSI', 'BBANDS'
],
symbols = symbols_subset,
api_key = api_key,
interval = 'daily',
time_period = 60,
series_type = 'close',
max_threads = 7
)

try:

au.write_alpha_results(
results = technical_data,
symbols = symbols_subset,
dest_path = tech_output_path
)

zip_file_name = tech_output_path + '_' + str(left) + '_' + str(right - 1)

shutil.make_archive(
base_name = zip_file_name,
format = 'zip',
root_dir = tech_output_path
)

#num files
files = [f for f in os.listdir(tech_output_path) if not f.startswith('.')]
print(tech_output_path + "/", "contains", len(files), "files.")

#size of .zip output
zip_size = os.path.getsize(zip_file_name + '.zip')
print("Zipped data size:", round(zip_size / (1024 * 1024), 2), "MB")

#delete csvs
_ = os.system('rm ' + tech_output_path + '/*.csv')

except:
print("Error writing tech data to file.")
continue

time.sleep(60)
last = right
9 changes: 8 additions & 1 deletion alpha_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def get_alpha_technical_data(
for result_list in executor.map(request_alpha_data, (url_list for url_list in urls)):
yield alpha_csv_to_dataframe(result_list)

def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str, max_threads: int = 5) -> None:
def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str, columns: Iterable = None, max_threads: int = 5) -> None:
"""Writes elements in an Iterator - with the stock ticker as an added column - to a folder as a csv
Parameters
-----------
Expand All @@ -199,6 +199,11 @@ def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str, ma
where each DataFrame in that sub-list corresponds to a function in functions.
symbols: Iterable
An iterable object of stock ticker symbols (strings)
dest_path: str
Where to write the results to
columns: Iterable
An iterable object of column names to restrict the written DataFrame to.
If left as None (the default), all columns will be written.
Returns
--------
pd.DataFrame
Expand All @@ -223,6 +228,8 @@ def write_alpha_results(results: Iterator, symbols: Iterable, dest_path: str, ma
else:
symbol_df = pd.merge(symbol_df, temp_df, on = ['symbol', 'timestamp'])
out_path = os.path.join(dest_path, symbols[i] + '.csv')
if columns is not None:
symbol_df = symbol_df[columns]
outputs[out_path] = symbol_df
executor = ThreadPoolExecutor(max_threads)
for results in executor.map(write_simple_wrapper, outputs.items()):
Expand Down

0 comments on commit b7cc435

Please sign in to comment.