Skip to content

Commit

Permalink
feat(ingestion): Adds support for memory profiling (datahub-project#8856
Browse files Browse the repository at this point in the history
)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
pedro93 and hsheth2 authored Oct 12, 2023
1 parent 8813ae2 commit f6e1312
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 68 deletions.
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ module.exports = {
"metadata-ingestion/docs/dev_guides/classification",
"metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",
"metadata-ingestion/docs/dev_guides/sql_profiles",
"metadata-ingestion/docs/dev_guides/profiling_ingestions",
],
},
],
Expand Down
55 changes: 55 additions & 0 deletions metadata-ingestion/docs/dev_guides/profiling_ingestions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import FeatureAvailability from '@site/src/components/FeatureAvailability';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

# Profiling ingestions

<FeatureAvailability/>

**🤝 Version compatibility**
> Open Source DataHub: **0.11.1** | Acryl: **0.2.12**
This page documents how to perform memory profiles of ingestion runs.
It is useful when trying to size the amount of resources necessary to ingest some source or when developing new features or sources.

## How to use
Install the `debug` plugin for DataHub's CLI wherever the ingestion runs:

```bash
pip install 'acryl-datahub[debug]'
```

This will install [memray](https://github.com/bloomberg/memray) in your python environment.

Add a flag to your ingestion recipe to generate a memray memory dump of your ingestion:
```yaml
source:
...

sink:
...

flags:
generate_memory_profiles: "<path to folder where dumps will be written to>"
```
Once the ingestion run starts a binary file will be created and appended to during the execution of the ingestion.
These files follow the pattern `file-<ingestion-run-urn>.bin` for a unique identification.
Once the ingestion has finished you can use `memray` to analyze the memory dump in a flamegraph view using:

```$ memray flamegraph file-None-file-2023_09_18-21_38_43.bin```

This will generate an interactive HTML file for analysis:

<p align="center">
<img width="70%" src="https://github.com/datahub-project/static-assets/blob/main/imgs/metadata-ingestion/memray-example.png?raw=true"/>
</p>


`memray` has an extensive set of features for memory investigation. Take a look at their [documentation](https://bloomberg.github.io/memray/overview.html) to see the full feature set.


## Questions

If you've got any questions on configuring profiling, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@
deepdiff_dep = "deepdiff"
test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"}

debug_requirements = {
"memray"
}

base_dev_requirements = {
*base_requirements,
*framework_common,
Expand Down Expand Up @@ -723,5 +727,6 @@
"dev": list(dev_requirements),
"testing-utils": list(test_api_requirements), # To import `datahub.testing`
"integration-tests": list(full_test_dev_requirements),
"debug": list(debug_requirements),
},
)
148 changes: 80 additions & 68 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,77 +353,89 @@ def _time_to_print(self) -> bool:
return False

def run(self) -> None:
self.final_status = "unknown"
self._notify_reporters_on_ingestion_start()
callback = None
try:
callback = (
LoggingCallback()
if not self.config.failure_log.enabled
else DeadLetterQueueCallback(
self.ctx, self.config.failure_log.log_config
)
)
for wu in itertools.islice(
self.source.get_workunits(),
self.preview_workunits if self.preview_mode else None,
):
try:
if self._time_to_print():
self.pretty_print_summary(currently_running=True)
except Exception as e:
logger.warning(f"Failed to print summary {e}")

if not self.dry_run:
self.sink.handle_work_unit_start(wu)
try:
record_envelopes = self.extractor.get_records(wu)
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)

except RuntimeError:
raise
except SystemExit:
raise
except Exception as e:
logger.error(
"Failed to process some records. Continuing.", exc_info=e
with contextlib.ExitStack() as stack:
if self.config.flags.generate_memory_profiles:
import memray

stack.enter_context(
memray.Tracker(
f"{self.config.flags.generate_memory_profiles}/{self.config.run_id}.bin"
)
# TODO: Transformer errors should cause the pipeline to fail.

self.extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.source.close()
# no more data is coming, we need to let the transformers produce any additional records if they are holding on to state
for record_envelope in self.transform(
[
RecordEnvelope(
record=EndOfStream(), metadata={"workunit_id": "end-of-stream"}
)

self.final_status = "unknown"
self._notify_reporters_on_ingestion_start()
callback = None
try:
callback = (
LoggingCallback()
if not self.config.failure_log.enabled
else DeadLetterQueueCallback(
self.ctx, self.config.failure_log.log_config
)
]
):
if not self.dry_run and not isinstance(
record_envelope.record, EndOfStream
)
for wu in itertools.islice(
self.source.get_workunits(),
self.preview_workunits if self.preview_mode else None,
):
try:
if self._time_to_print():
self.pretty_print_summary(currently_running=True)
except Exception as e:
logger.warning(f"Failed to print summary {e}")

if not self.dry_run:
self.sink.handle_work_unit_start(wu)
try:
record_envelopes = self.extractor.get_records(wu)
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)

except RuntimeError:
raise
except SystemExit:
raise
except Exception as e:
logger.error(
"Failed to process some records. Continuing.",
exc_info=e,
)
# TODO: Transformer errors should cause the pipeline to fail.

self.extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.source.close()
# no more data is coming, we need to let the transformers produce any additional records if they are holding on to state
for record_envelope in self.transform(
[
RecordEnvelope(
record=EndOfStream(),
metadata={"workunit_id": "end-of-stream"},
)
]
):
# TODO: propagate EndOfStream and other control events to sinks, to allow them to flush etc.
self.sink.write_record_async(record_envelope, callback)

self.sink.close()
self.process_commits()
self.final_status = "completed"
except (SystemExit, RuntimeError, KeyboardInterrupt) as e:
self.final_status = "cancelled"
logger.error("Caught error", exc_info=e)
raise
finally:
clear_global_warnings()

if callback and hasattr(callback, "close"):
callback.close() # type: ignore

self._notify_reporters_on_ingestion_completion()
if not self.dry_run and not isinstance(
record_envelope.record, EndOfStream
):
# TODO: propagate EndOfStream and other control events to sinks, to allow them to flush etc.
self.sink.write_record_async(record_envelope, callback)

self.sink.close()
self.process_commits()
self.final_status = "completed"
except (SystemExit, RuntimeError, KeyboardInterrupt) as e:
self.final_status = "cancelled"
logger.error("Caught error", exc_info=e)
raise
finally:
clear_global_warnings()

if callback and hasattr(callback, "close"):
callback.close() # type: ignore

self._notify_reporters_on_ingestion_completion()

def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class FlagsConfig(ConfigModel):
),
)

generate_memory_profiles: Optional[str] = Field(
default=None,
description=(
"Generate memray memory dumps for ingestion process by providing a path to write the dump file in."
),
)


class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
Expand Down

0 comments on commit f6e1312

Please sign in to comment.