Skip to content

Commit

Permalink
Added more tests for comparing entries from db and export files
Browse files Browse the repository at this point in the history
Added tests that assert first and last few entries from db and export files.
Comparing object IDs only for now.

Also added temp directory for tests so that local directory isn't filled with export files in emission/archived
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Sep 1, 2024
1 parent 6dc72b6 commit 23734e5
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 127 deletions.
29 changes: 0 additions & 29 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,35 +123,6 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
print("Exported file names: %s" % file_names)
return file_names

# new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

# if new_entries:
# self._last_processed_ts = new_entries[-1]['data']['ts']
# print(f"Updated last_processed_ts {self._last_processed_ts}")

# if current_end_ts >= initEndTs:
# file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
# print(f"Exporting entries from {current_start_ts} to {current_end_ts} to file: {file_name}")
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name)
# file_names.append(file_name)
# self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

# current_start_ts = current_end_ts

# else:
# remaining_entries = self.get_export_timeseries_entries(user_id, ts, initStartTs, initEndTs)
# if not remaining_entries:
# print("No new data to export, breaking out of while loop")
# break
# else:
# print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
# print("Incrementing time range")
# current_start_ts = current_end_ts

# print(f"Exported data to {len(file_names)} files")
# print(f"Exported file names: {file_names}")
# return file_names

# def export_pipeline_states(self, user_id, file_name):
# pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
# logging.info("Found %d pipeline states %s" %
Expand Down
253 changes: 155 additions & 98 deletions emission/tests/exportTests/TestPurgeRestoreModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
import unittest
import json
import pathlib as pl
import emission.storage.timeseries.abstract_timeseries as esta
import gzip
import logging
import tempfile
import time
from bson.objectid import ObjectId

import emission.core.get_database as edb
import emission.tests.common as etc
import emission.storage.pipeline_queries as espq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.pipeline.purge_stage as epp
import emission.pipeline.restore_stage as epr
import emission.purge_restore.export_timeseries as epret
import emission.purge_restore.import_timeseries as eprit
import emission.purge_restore.purge_data as eprpd
import bin.debug.load_multi_timeline_for_range as lmtfr
import logging
import gzip
import emission.storage.json_wrappers as esj
import emission.storage.timeseries.timequery as estt

class TestPurgeRestoreModule(unittest.TestCase):
def setUp(self):
Expand All @@ -30,9 +32,9 @@ def setUp(self):

def tearDown(self):
print("Clearing entries for test UUID from database...")
# etc.dropAllCollections(edb._get_current_db())
self.clearRelatedDb()
self.clearAllDb()
etc.dropAllCollections(edb._get_current_db())
# self.clearRelatedDb()
# self.clearAllDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
Expand Down Expand Up @@ -110,87 +112,120 @@ def clearAllDb(self):
# self.assertEqual(tsdb_count, 0)

def testPurgeRestorePipelineFull(self):
'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"About to purge {res} entries")
print(f"About to purge {res} entries")
self.assertEqual(res, 1906)

# Run the purge pipeline
file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full")
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the file exists after the export process and checking contents
'''
self.assertTrue(pl.Path(file_names[0] + ".gz").is_file())
with gzip.open(file_names[0] + ".gz", 'r') as ef:
exported_data = json.loads(ef.read().decode('utf-8'))
self.assertEqual(len(exported_data), 1906)

first_few_objectIds = ['564e73d388f663199aabf0d2', '55afb7c67d65cb39ee976598', '55afb7c67d65cb39ee976599', '55b08d327d65cb39ee9769e1', '55afb7c67d65cb39ee97659a']
for entry in exported_data[0:5]:
self.assertIn(entry.get('_id').get('$oid'), first_few_objectIds)

'''
Test 3 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there is after
entries = edb.get_timeseries_db().find({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Purging complete: {res} entries remaining")
print(f"Purging complete: {res} entries remaining")
with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname:
self.assertTrue(os.path.isdir(tmpdirname))

#Set the envrionment variable
os.environ['DATA_DIR'] = tmpdirname
self.assertEqual(os.environ['DATA_DIR'], tmpdirname)

# Fetch entries from timeseries db before purging to use in tests
ts = esta.TimeSeries.get_time_series(self.testUUID)
tq = estt.TimeQuery("data.ts", None, time.time() - 5)
sort_key = ts._get_sort_key(tq)
(ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key)
entries_to_export = list(ts_db_result)

# A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline
stat_pipeline_key = entries[0].get('metadata').get('key')
print(f"stat_pipeline_key = {stat_pipeline_key}")
self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
self.assertEqual(res, 1)

print("pipelineState_entries after full export purge =")
pipelineState_entries = edb.get_pipeline_state_db().find({"user_id": self.testUUID})
for entry in pipelineState_entries:
print(entry)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)

'''
Test 4 - Verify that restoring timeseries data works with sample real data
'''
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res-1} entries restored")
print(f"Restoring complete: {res-1} entries restored")

# Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline
self.assertEqual(res_stats_count, 2)
self.assertEqual(res, 1908)

# Test 5 - Verify that restoring timeseries data fails if data already exists
# Duplicate key error is ignored hence no entries should be inserted
logging.info("Attempting to load duplicate data...")
print("Attempting to load duplicate data...")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Restoring complete: {res-1} entries restored")
print(f"Restoring complete: {res-1} entries restored")

print("pipelineState_entries after running restore again =")
pipelineState_entries = edb.get_pipeline_state_db().find({"user_id": self.testUUID})
for entry in pipelineState_entries:
print(entry)
# self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
# self.assertEqual(res, 1908)
'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"About to purge {res} entries")
print(f"About to purge {res} entries")
self.assertEqual(res, 1906)

# Run the purge pipeline
file_names = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'), "full")
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the file exists after the export process
'''
self.assertTrue(pl.Path(file_names[0] + ".gz").is_file())
with gzip.open(file_names[0] + ".gz", 'r') as ef:
exported_data = json.loads(ef.read().decode('utf-8'))
self.assertEqual(len(exported_data), 1906)

'''
Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db
'''
entries_from_db = entries_to_export
print("Entries from db size: %s" % len(entries_from_db))
entries_from_db = entries_from_db[:5] + entries_from_db[-5:]
entries_from_file = exported_data[:5] + exported_data[-5:]
objectIds_from_db = [entry["_id"] for entry in entries_from_db]
objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file]
print("Object ids from db: %s" % objectIds_from_db)
print("Object ids from file: %s" % objectIds_from_file)
self.assertEqual(objectIds_from_db, objectIds_from_file)

'''
Test 4 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there is after
entries = edb.get_timeseries_db().find({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Purging complete: {res} entries remaining")
print(f"Purging complete: {res} entries remaining")

# A single entry with key 'stats/pipeline_time' should be present as this test involves running the pipeline
stat_pipeline_key = entries[0].get('metadata').get('key')
print(f"stat_pipeline_key = {stat_pipeline_key}")
self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
self.assertEqual(res, 1)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)

'''
Test 5 - Verify that restoring timeseries data works with sample real data
'''
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res-2} entries restored")
print(f"Restoring complete: {res-2} entries restored")

# Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline
self.assertEqual(res_stats_count, 2)
self.assertEqual(res, 1908)

'''
Test 6 - Verify that restoring timeseries data is skipped if data already exists
Duplicate key error is ignored in import_timeseries.py
Hence no entries should be inserted
'''
logging.info("Attempting to load duplicate data...")
print("Attempting to load duplicate data...")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res-2} entries restored")
print(f"Restoring complete: {res-2} entries restored")

# A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again
self.assertEqual(res_stats_count, 3)
self.assertEqual(res, 1909)

def testPurgeRestorePipelineIncremental(self):
with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname:
self.assertTrue(os.path.isdir(tmpdirname))

#Set the envrionment variable
os.environ['DATA_DIR'] = tmpdirname
self.assertEqual(os.environ['DATA_DIR'], tmpdirname)

# Fetch entries from timeseries db before purging to use in tests
ts = esta.TimeSeries.get_time_series(self.testUUID)
tq = estt.TimeQuery("data.ts", None, time.time() - 5)
sort_key = ts._get_sort_key(tq)
(ts_db_count, ts_db_result) = ts._get_entries_for_timeseries(ts.timeseries_db, None, tq, geo_query=None, extra_query_list=None, sort_key = sort_key)
entries_to_export = list(ts_db_result)

'''
Test 1 - Verify that purging timeseries data works with sample real data
'''
Expand All @@ -214,12 +249,21 @@ def testPurgeRestorePipelineIncremental(self):
exported_data.extend(json.loads(ef.read().decode('utf-8')))
self.assertEqual(len(exported_data), 1906)

last_few_objectIds = ['55b08d3e7d65cb39ee976def', '55b08d3e7d65cb39ee976df0', '55b08d3e7d65cb39ee976df1', '55b08e907d65cb39ee976e06', '55b08e907d65cb39ee976e07']
for entry in exported_data[-5:]:
self.assertIn(entry.get('_id').get('$oid'), last_few_objectIds)
'''
Test 3 - Compare the first and last few entries in the exported file with the entries in the timeseries db
'''
entries_from_db = entries_to_export
print("Entries from db size: %s" % len(entries_from_db))
entries_from_db = entries_from_db[:5] + entries_from_db[-5:]
entries_from_file = exported_data[:5] + exported_data[-5:]
objectIds_from_db = [entry["_id"] for entry in entries_from_db]
objectIds_from_file = [ObjectId(entry["_id"]["$oid"]) for entry in entries_from_file]
print("Object ids from db: %s" % objectIds_from_db)
print("Object ids from file: %s" % objectIds_from_file)
self.assertEqual(objectIds_from_db, objectIds_from_file)

'''
Test 3 - Verify that purging timeseries data works with sample real data
Test 4 - Verify that purging timeseries data works with sample real data
'''
# Check how much data there is after
entries = edb.get_timeseries_db().find({"user_id" : self.testUUID})
Expand All @@ -233,30 +277,43 @@ def testPurgeRestorePipelineIncremental(self):
self.assertEqual(stat_pipeline_key,'stats/pipeline_time')
self.assertEqual(res, 1)

print("pipelineState_entries after incremental export purge =")
pipelineState_entries = edb.get_pipeline_state_db().find({"user_id": self.testUUID})
for entry in pipelineState_entries:
print(entry)

# Run the restore pipeline
logging.info(f"About to restore entries")
print(f"About to restore entries")
print("File names: %s" % file_names)
epr.run_restore_pipeline_for_user(self.testUUID, file_names)

'''
Test 4 - Verify that restoring timeseries data works with sample real data
Test 5 - Verify that restoring timeseries data works with sample real data
'''
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res-1} entries restored")
print(f"Restoring complete: {res-1} entries restored")
logging.info(f"Restoring complete: {res-2} entries restored")
print(f"Restoring complete: {res-2} entries restored")

# Two additional entries with key 'stats/pipeline_time' should be present - one from the purge pipeline, other from the restore pipeline
self.assertEqual(res_stats_count, 2)
self.assertEqual(res, 1908)

'''
Test 6 - Verify that restoring timeseries data is skipped if data already exists
Duplicate key error is ignored in import_timeseries.py
Hence no entries should be inserted
'''
logging.info("Attempting to load duplicate data...")
print("Attempting to load duplicate data...")
epr.run_restore_pipeline_for_user(self.testUUID, file_names)
# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res_stats_count = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID, "metadata.key" : 'stats/pipeline_time'})
logging.info(f"Restoring complete: {res-2} entries restored")
print(f"Restoring complete: {res-2} entries restored")

# A third entry with key 'stats/pipeline_time' should be present after running the restore pipeline again
self.assertEqual(res_stats_count, 3)
self.assertEqual(res, 1909)

if __name__ == '__main__':
etc.configLogging()
unittest.main()

0 comments on commit 23734e5

Please sign in to comment.