Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate existing resources #25

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt_invoke/internal/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.0'
__version__ = '0.3.0'
88 changes: 82 additions & 6 deletions dbt_invoke/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import ast
import json
import functools

from invoke import task

Expand Down Expand Up @@ -445,6 +447,32 @@ def _delete_all_property_files(ctx, transformed_ls_results):
else:
_LOGGER.info('There are no files to delete.')

@functools.lru_cache
def _read_manifest(
target_path
):
with open(Path(target_path,'manifest').with_suffix('.json'), "r",encoding='utf-8') as manifest_json:
return json.loads(manifest_json.read())

def _get_current_location(
ctx,
resource_dict
):
"""
Find if the resource is already documented somewhere

:param ctx: An Invoke context object
:param resource_dict: A dictionary representing the json output for
this resource from the "dbt ls" command
:return: Str or None
"""
node_unique_id = resource_dict['unique_id']
patch_path = _read_manifest(ctx.config['target_path'])['nodes'][node_unique_id]['patch_path']
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robastel this section of the manifest is the result of dbt having discovered the matching yaml for that node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is created when the dbt compile takes place from memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, great, I forgot you already did that!

Logistically, how would you like to go about working on our hybrid approach?

I'm happy to take an attempt at combining our two approaches. Also happy to review if you feel like you want to take a shot at it. I'm going to sign off for today, but I'll keep in touch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have finished work for the week now. Happy to take a stab at this Monday if you haven’t already done it. 🙂

if patch_path:
return Path(
ctx.config['project_path'],
patch_path.split('//')[1]
)

def _create_property_file(
ctx,
Expand All @@ -455,7 +483,10 @@ def _create_property_file(
**kwargs,
):
"""
Create a property file
Create a property file.
A property file will be created using the existing data from another file if it is present.
It will then be updated with any column changes etc.
Any existing definitions in other property files will be cleaned up.

:param ctx: An Invoke context object
:param resource_location: The location of the file representing the
Expand All @@ -475,10 +506,51 @@ def _create_property_file(
f' Resource {counter} of {total},'
f' {resource_location}'
)
columns = _get_columns(ctx, resource_location, resource_dict, **kwargs)
current_property_path = _get_current_location(ctx, resource_dict)
property_path = Path(
ctx.config['project_path'], resource_location
).with_suffix('.yml')
if current_property_path and Path(current_property_path) != property_path:
# Migrate the existing contents into the correct location
resource_type = resource_dict['resource_type']
resource_name = resource_dict['name']
_LOGGER.info(f"An existing definition was located for {resource_name} of type {resource_type}.")
_LOGGER.info(f"Found from Manifest path {current_property_path} but expected {property_path}. Migrating")
current_property_yaml = _utils.parse_yaml(current_property_path)
property_file_dict = _get_property_header(
resource_name, resource_type
)
# Gather the definition from within the existing yaml file (which could have many resources defined in it)
current_definition_index = next((index for (index, x) in enumerate(current_property_yaml[_SUPPORTED_RESOURCE_TYPES[resource_type]]) if x['name'] == resource_name), None)
# If we look in the file and we don't find the definition then we have a sync issue. Throw an exception.
if current_definition_index is None:
raise Exception('Manifest is out of sync with repository. Please re-run one of the documented pre requisite dbt functions ie dbt-run')
# Extract the current definition so we can move it
definition = current_property_yaml[_SUPPORTED_RESOURCE_TYPES[resource_type]][current_definition_index]
# Move the existing defintion into the new file
property_file_dict[_SUPPORTED_RESOURCE_TYPES[resource_type]][0] = definition
# Write out the new file with the existing information
_utils.write_yaml(
property_path,
property_file_dict
)
# Remove the existing definition from where it was found
current_property_yaml[_SUPPORTED_RESOURCE_TYPES[resource_type]].pop(current_definition_index)
# Check if this was the last of that resource type in the file
if len(current_property_yaml[_SUPPORTED_RESOURCE_TYPES[resource_type]]) == 0:
# Remove the property type from the yaml if its now empty
current_property_yaml.pop(_SUPPORTED_RESOURCE_TYPES[resource_type])
# Check if the yaml file no longer contains resources
if not set(_SUPPORTED_RESOURCE_TYPES.values()).intersection(current_property_yaml.keys()):
Path(current_property_path).unlink()
else:
_utils.write_yaml(
current_property_path,
current_property_yaml
)

columns = _get_columns(ctx, resource_location, resource_dict, **kwargs)

property_file_dict = _structure_property_file_dict(
property_path,
resource_dict,
Expand Down Expand Up @@ -574,10 +646,14 @@ def _structure_property_file_dict(location, resource_dict, columns_list):
property_file_dict = _get_property_header(resource_name, resource_type)
# Get the sub-dictionaries of each existing column
resource_type_plural = _SUPPORTED_RESOURCE_TYPES[resource_type]
existing_columns_dict = {
item['name']: item
for item in property_file_dict[resource_type_plural][0]['columns']
}
print('testing if cols key exists')
if 'columns' in property_file_dict[resource_type_plural][0].keys():
existing_columns_dict = {
item['name']: item
for item in property_file_dict[resource_type_plural][0]['columns']
}
else:
existing_columns_dict = {}
# For each column we want in the property file,
# reuse the sub-dictionary if it exists
# or else create a new sub-dictionary
Expand Down