Skip to content

Commit

Permalink
feat: add a new option allowing the processing of EMu dumps to halt a…
Browse files Browse the repository at this point in the history
…fter 1 date's worth has been processed
  • Loading branch information
jrdh committed Nov 13, 2023
1 parent af1d63a commit d912236
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
8 changes: 7 additions & 1 deletion dataimporter/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,13 @@ def queue_changes(self, records: Iterable[SourceRecord], db_name: str):
for view in views:
view.queue(batch)

def queue_emu_changes(self):
def queue_emu_changes(self, only_one: bool = False):
"""
Look for new EMu dumps, upsert the records into the appropriate DataDB and then
queue the changes into the derived views.
:param only_one: if True, only process the first set of dumps and then return,
otherwise, process them all (default: False)
"""
last_queued = self.emu_status.get()
dumps = find_emu_dumps(self.config.dumps_path, after=last_queued)
Expand Down Expand Up @@ -232,6 +235,9 @@ def queue_emu_changes(self):
# we've handled all the dumps from this date, update the last date stored on
# disk in case we fail later to avoid redoing work
self.emu_status.update(dump_date)
# stop if necessary
if only_one:
break

def queue_gbif_changes(self):
"""
Expand Down
45 changes: 45 additions & 0 deletions tests/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,51 @@ def test_queue_emu_changes(self, config: Config):
assert importer.views["image"].changes.size() == 4
assert importer.views["mss"].changes.size() == 4

def test_queue_emu_changes_only_one(self, config: Config):
importer = DataImporter(config)

first_dump_date = date(2023, 10, 3)
create_dump(
config.dumps_path,
"ecatalogue",
first_dump_date,
create_ecatalogue("1", EcatalogueType.specimen),
)
create_dump(
config.dumps_path, "emultimedia", first_dump_date, create_emultimedia("1")
)
create_dump(
config.dumps_path, "etaxonomy", first_dump_date, create_etaxonomy("1")
)

importer.queue_emu_changes(only_one=True)

assert importer.emu_status.get() == first_dump_date
assert importer.dbs["ecatalogue"].size() == 1
assert importer.dbs["emultimedia"].size() == 1
assert importer.dbs["etaxonomy"].size() == 1

second_dump_date = date(2023, 10, 4)
create_dump(
config.dumps_path,
"ecatalogue",
second_dump_date,
create_ecatalogue("2", EcatalogueType.specimen),
)
create_dump(
config.dumps_path, "emultimedia", second_dump_date, create_emultimedia("2")
)
create_dump(
config.dumps_path, "etaxonomy", second_dump_date, create_etaxonomy("2")
)

importer.queue_emu_changes(only_one=True)

assert importer.emu_status.get() == second_dump_date
assert importer.dbs["ecatalogue"].size() == 2
assert importer.dbs["emultimedia"].size() == 2
assert importer.dbs["etaxonomy"].size() == 2

def test_queue_gbif_changes(self, config: Config):
gbif_records = [
SourceRecord("1", {"x": "1"}, "gbif"),
Expand Down

0 comments on commit d912236

Please sign in to comment.