-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Features/61 alembic seed lookup tables #65
Conversation
Merge branch 'main' into features/61_alembic_seed_lookup_tables
Coverage reportThe coverage rate went from None of the new lines are part of the tested code. Therefore, there is no coverage data about them. |
closes #61 |
I might not have the correct or all data for each lookup table. I can update them. |
tests/migrations/test_migrations.py
Outdated
def data_feed_migration_helper(table_name): | ||
file_dir = os.path.dirname(os.path.realpath(__file__)) | ||
data_file_path = f"{file_dir}/../../db_revisions/feed/%s.csv" % table_name | ||
with open(data_file_path) as f: | ||
reader = csv.reader(f, delimiter="|") | ||
next(reader) | ||
output_list = list(tuple(line) for line in reader) | ||
return output_list | ||
|
||
|
||
def test_address_state_data_feed(alembic_runner: MigrationContext, alembic_engine: Engine): | ||
# Migrate up to, but not including this new migration | ||
alembic_runner.migrate_up_before("7b6ff51002b5") | ||
|
||
# Test address_state feed | ||
address_state_tablename = AddressStateDao.__tablename__ | ||
alembic_runner.migrate_up_one() | ||
with alembic_engine.connect() as conn: | ||
address_state_rows = conn.execute(text("SELECT code, name from %s" % address_state_tablename)).fetchall() | ||
address_state_expected = data_feed_migration_helper(address_state_tablename) | ||
assert address_state_rows == address_state_expected | ||
|
||
|
||
def test_federal_regulator_data_feed(alembic_runner: MigrationContext, alembic_engine: Engine): | ||
# Migrate up to, but not including this new migration | ||
alembic_runner.migrate_up_before("26a742d97ad9") | ||
|
||
# Test federal_regulator feed | ||
federal_regulator_tablename = FederalRegulatorDao.__tablename__ | ||
alembic_runner.migrate_up_one() | ||
with alembic_engine.connect() as conn: | ||
federal_regulator_rows = conn.execute(text("SELECT id, name from %s" % federal_regulator_tablename)).fetchall() | ||
federal_regulator_expected = data_feed_migration_helper(federal_regulator_tablename) | ||
assert federal_regulator_rows == federal_regulator_expected | ||
|
||
|
||
def test_hmda_institution_type_data_feed(alembic_runner: MigrationContext, alembic_engine: Engine): | ||
# Migrate up to, but not including this new migration | ||
alembic_runner.migrate_up_before("f4ff7d1aa6df") | ||
|
||
# Test hmda_institution_type feed | ||
hmda_institution_type_tablename = HMDAInstitutionTypeDao.__tablename__ | ||
alembic_runner.migrate_up_one() | ||
with alembic_engine.connect() as conn: | ||
hmda_institution_type_rows = conn.execute( | ||
text("SELECT id, name from %s" % hmda_institution_type_tablename) | ||
).fetchall() | ||
hmda_institution_type_expected = data_feed_migration_helper(hmda_institution_type_tablename) | ||
assert hmda_institution_type_rows == hmda_institution_type_expected | ||
|
||
|
||
def test_sbl_institution_type_data_feed(alembic_runner: MigrationContext, alembic_engine: Engine): | ||
# Migrate up to, but not including this new migration | ||
alembic_runner.migrate_up_before("a41281b1e109") | ||
|
||
# Test sbl_institution_type feed | ||
sbl_institution_type_tablename = SBLInstitutionTypeDao.__tablename__ | ||
alembic_runner.migrate_up_one() | ||
with alembic_engine.connect() as conn: | ||
sbl_institution_type_rows = conn.execute( | ||
text("SELECT id, name from %s" % sbl_institution_type_tablename) | ||
).fetchall() | ||
sbl_institution_type_expected = data_feed_migration_helper(sbl_institution_type_tablename) | ||
assert sbl_institution_type_rows == sbl_institution_type_expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, a couple of things, let's move the new tests into a separate file, and name it something that relates to seeding the tables; the other thing is I don't think you need to have actual sql statement in here to check the results; you should be able to leverage sqlalchemy's orm retrieval for all the statements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on it ...
…test_lookup_tables_data_feed)
data = get_feed_data_from_file(FederalRegulatorDao.__tablename__) | ||
|
||
op.bulk_insert(FederalRegulatorDao.__table__, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mentioned in chat, but let's use the actual table names here and the other scripts, instead of relying on the dao property so that in a rare case should there actually be a change in the table name for the dao there is a history, and there wouldn't be conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
… coupling them with dao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good for now, we'll get this merged. I think ideally we'd want to keep the source data inside the script; or you'd need to create versioned csv files... e.g. what if we add additional institution types? I like the file reader to keep things cleaner, but let's rename them in a versioned way in another ticket.
Sounds better. I can create another ticket to make these changes. |
No description provided.