Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 17961 poc on rule map implementation #142

Open
wants to merge 1 commit into
base: crest-work
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 79 additions & 10 deletions tap_shopify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from tap_shopify.exceptions import ShopifyError
from tap_shopify.streams.base import shopify_error_handling, get_request_timeout
import tap_shopify.streams # Load stream objects into Context
from tap_shopify.rule_map import RuleMap

REQUIRED_CONFIG_KEYS = ["shop", "api_key"]
LOGGER = singer.get_logger()
Expand Down Expand Up @@ -53,22 +54,60 @@ def load_schemas():
return schemas


def get_discovery_metadata(stream, schema):
def get_discovery_metadata(stream, schema, rule_map, stream_name):
mdata = metadata.new()
mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties)
mdata = metadata.write(mdata, (), 'forced-replication-method', stream.replication_method)

if stream.replication_key:
mdata = metadata.write(mdata, (), 'valid-replication-keys', [stream.replication_key])

if 'stream_name' in rule_map:
# Write original-name of stream name in top level metadata
mdata = metadata.write(mdata, (), 'original-name', stream_name)

for field_name in schema['properties'].keys():
if field_name in stream.key_properties or field_name == stream.replication_key:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic')
else:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'available')

# Add metadata for nested(child) fields also if it's name is changed from original name.
add_child_into_metadata(schema['properties'][field_name], metadata, mdata,
rule_map, ('properties', field_name), )
if ('properties', field_name) in rule_map:
mdata.get(('properties', field_name)).update(
{'original-name': rule_map[('properties', field_name)]})

return metadata.to_list(mdata)

def add_child_into_metadata(schema, m_data, mdata, rule_map, parent=()):
"""
Add metadata for nested(child) fields also if it's name is changed from original name.
"""
if schema and isinstance(schema, dict) and schema.get('properties'):
for key in schema['properties'].keys():
# prepare key to find original-name of field in rule_map object
# Key is tuple of items found in breadcrumb.
breadcrumb = parent + ('properties', key)

# Iterate in recursive manner to go through each field of schema.
add_child_into_metadata(schema['properties'][key], m_data, mdata, rule_map, breadcrumb)

mdata = m_data.write(mdata, breadcrumb, 'inclusion', 'available')

if breadcrumb in rule_map:
# Add `original-name` field in metadata which contain actual name of field.
mdata.get(breadcrumb).update({'original-name': rule_map[breadcrumb]})

if schema.get('anyOf'):
for schema_fields in schema.get('anyOf'):
add_child_into_metadata(schema_fields, m_data, mdata, rule_map, parent)

if schema and isinstance(schema, dict) and schema.get('items'):
breadcrumb = parent + ('items',)
add_child_into_metadata(schema['items'], m_data, mdata, rule_map, breadcrumb)

def load_schema_references():
shared_schema_file = "definitions.json"
shared_schema_path = get_abs_path('schemas/')
Expand All @@ -84,7 +123,7 @@ def add_synthetic_key_to_schema(schema):
schema['properties']['_sdc_shop_' + k] = {'type': ["null", SDC_KEYS[k]]}
return schema

def discover():
def discover(rule_map):
initialize_shopify_client() # Checking token in discover mode

raw_schemas = load_schemas()
Expand All @@ -105,12 +144,32 @@ def discover():
catalog_schema = add_synthetic_key_to_schema(
singer.resolve_schema_references(schema, refs_copy))

# Define stream_name in GetStdFieldsFromApiFields
rule_map.GetStdFieldsFromApiFields[schema_name] = {}

# We face issue regarding ref. In some of the schema same ref is being used.
# When we change fields of one of the ref, changes reflect in all the places
# where the same ref is being used.
# Due to this, the `original-name` field name was missing in the metadata of the catalog.
# So, to prevent change in the actual schema, here we are creating a deep copy of schema
# and updating deep copy.
# We do not update the actual schema
schema_copy = copy.deepcopy(schema)

# Get updated schema by applying rule map
standard_catalog_schema = rule_map.apply_ruleset_on_schema(catalog_schema,
schema_copy, schema_name)

# Get standard name of schema
standard_schema_name = rule_map.apply_rule_set_on_stream_name(schema_name)

# create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': catalog_schema,
'metadata': get_discovery_metadata(stream, schema),
'stream': standard_schema_name,
'tap_stream_id': standard_schema_name,
'schema': standard_catalog_schema,
'metadata': get_discovery_metadata(stream, standard_catalog_schema,
rule_map.GetStdFieldsFromApiFields[schema_name], schema_name),
'key_properties': stream.key_properties,
'replication_key': stream.replication_key,
'replication_method': stream.replication_method
Expand All @@ -133,7 +192,7 @@ def shuffle_streams(stream_name):
Context.catalog["streams"] = top_half + bottom_half

# pylint: disable=too-many-locals
def sync():
def sync(rule_map):
shop_attributes = initialize_shopify_client()
sdc_fields = {"_sdc_shop_" + x: shop_attributes[x] for x in SDC_KEYS}

Expand All @@ -157,6 +216,10 @@ def sync():
stream_id = catalog_entry['tap_stream_id']
stream = Context.stream_objects[stream_id]()

# Fill rule_map object by original-name available in metadata
rule_map.fill_rule_map_object_by_catalog(stream_id,
metadata.to_map(catalog_entry['metadata']))

if not Context.is_selected(stream_id):
LOGGER.info('Skipping stream: %s', stream_id)
continue
Expand All @@ -172,6 +235,10 @@ def sync():
extraction_time = singer.utils.now()
record_schema = catalog_entry['schema']
record_metadata = metadata.to_map(catalog_entry['metadata'])

# Apply rule map on record
rec = rule_map.apply_ruleset_on_api_response(rec, stream_id)

rec = transformer.transform({**rec, **sdc_fields},
record_schema,
record_metadata)
Expand All @@ -197,19 +264,21 @@ def main():
Context.config = args.config
Context.state = args.state

rule_map = RuleMap()

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
catalog = discover(rule_map)
print(json.dumps(catalog, indent=2))
# Otherwise run in sync mode
else:
Context.tap_start = utils.now()
if args.catalog:
Context.catalog = args.catalog.to_dict()
else:
Context.catalog = discover()
Context.catalog = discover(rule_map)

sync()
sync(rule_map)
except pyactiveresource.connection.ResourceNotFound as exc:
raise ShopifyError(exc, 'Ensure shop is entered correctly') from exc
except pyactiveresource.connection.UnauthorizedAccess as exc:
Expand Down
Loading