Skip to content

Commit

Permalink
Merge pull request #2 from smart-on-fhir/mikix/contained-schema
Browse files Browse the repository at this point in the history
fix: don't ignore schemas of contained resources
  • Loading branch information
mikix authored Apr 29, 2024
2 parents 584d930 + a027410 commit 51fd987
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cumulus_fhir_support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""FHIR support code for the Cumulus project"""

__version__ = "1.0.0"
__version__ = "1.1.0"

from .schemas import pyarrow_schema_from_rows
59 changes: 55 additions & 4 deletions cumulus_fhir_support/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ def pyarrow_schema_from_rows(resource_type: str, rows: Iterable[dict] = None) ->
:param rows: optionally a set of JSON FHIR resources to ensure are covered by the schema
:returns: a PyArrow schema that covers the unified shape of all provided rows
"""
rows = list(rows or [])

# Examine batch to see the full shape of it, in order to detect any deeply nested fields
# that we want to make sure to include in the final schema (normally, we go wide but only as
# deep as we need to)
batch_shape = _get_shape_of_dicts(None, rows and list(rows))
batch_shape = _get_shape_of_dicts(None, rows)

return _create_pyarrow_schema_for_resource(resource_type, batch_shape)
schema = _create_pyarrow_schema_for_resource(resource_type, batch_shape)
schema = _include_contained_schemas(schema, rows, batch_shape)
return schema


def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict:
Expand Down Expand Up @@ -79,7 +83,53 @@ def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict:
return total_shape


def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -> pyarrow.Schema:
def _include_contained_schemas(
schema: pyarrow.Schema, rows: list[dict], batch_shape: dict
) -> pyarrow.Schema:
"""
This will include all contained resource schemas into one big contained schema.
Specifically, any field found in the shape of the "contained" field will be included,
as long as any resource in the contained list (detected via "resourceType") has the field.
Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion
of whether it is wise to just comingle the schemas like this.
"""
# Grab all contained resource types that we have in the source data,
# which will inform the expected schema inside there.
contained_types = sorted(
filter(
None,
{
contained_obj.get("resourceType")
for row in rows
for contained_obj in row.get("contained", [])
},
)
)
if not contained_types:
return schema # no need to do anything
contained_shape = batch_shape.get("contained")

# Allow any found fields in any of the contained types
fields = {}
for contained_type in contained_types:
subschema = _create_pyarrow_schema_for_resource(contained_type, contained_shape, wide=False)
for name in subschema.names:
fields[name] = subschema.field(name) # will overwrite previous field of same name
fields = [fields[name] for name in sorted(fields)] # sort for a consistent order

contained_index = schema.get_field_index("contained")
schema = schema.remove(contained_index)
return schema.insert(
contained_index,
pyarrow.field("contained", pyarrow.list_(pyarrow.struct(fields))),
)


def _create_pyarrow_schema_for_resource(
resource_type: str, batch_shape: dict, wide: bool = True
) -> pyarrow.Schema:
"""
Creates a PyArrow schema based off the named resource (like 'Observation').
Expand All @@ -93,8 +143,9 @@ def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -
# fhirclient doesn't include `resourceType` in the list of properties. So do that manually.
type_field = pyarrow.field("resourceType", pyarrow.string())

level = 0 if wide else 2
return pyarrow.schema(
[type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=0)]
[type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=level)]
)


Expand Down
38 changes: 38 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,41 @@ def test_non_spec_field_are_ignored(self):
schema = support.pyarrow_schema_from_rows("Observation", rows)

self.assertNotIn("invalid_field", schema.names)

def test_contained_resources(self):
"""Verify that we include contained schemas"""
# Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion
# of whether it is wise to just comingle the schema like this.
rows = [
{
"contained": [
{"resourceType": "Medication", "code": {"text": "aspirin"}},
{"resourceType": "Patient", "gender": "unknown", "extraField": False},
{"unknownField": True, "gender": 3},
]
}
]
schema = support.pyarrow_schema_from_rows("MedicationRequest", rows)
contained_type = schema.field("contained").type.value_type

# Not-mentioned fields aren't present, but mentioned ones are

# Medication
self.assertEqual(pyarrow.string(), contained_type.field("code").type.field("text").type)
self.assertEqual(-1, contained_type.get_field_index("status"))

# Patient
self.assertEqual(pyarrow.string(), contained_type.field("gender").type)
self.assertEqual(-1, contained_type.get_field_index("birthDate"))

# Extra fields
self.assertEqual(-1, contained_type.get_field_index("extraField"))
self.assertEqual(-1, contained_type.get_field_index("unknownField"))

def test_contained_resources_empty(self):
"""Verify that we leave basic schema in there if no contained resources"""
schema = support.pyarrow_schema_from_rows("AllergyIntolerance")
contained_type = schema.field("contained").type.value_type
self.assertEqual(pyarrow.string(), contained_type.field("id").type)
self.assertEqual(pyarrow.string(), contained_type.field("implicitRules").type)
self.assertEqual(pyarrow.string(), contained_type.field("language").type)

0 comments on commit 51fd987

Please sign in to comment.