diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 6852fe166f..5903720688 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -39,6 +39,7 @@ jobs: - { integration_test: "meltano-config", needs_postgres: false} - { integration_test: "meltano-annotations", needs_postgres: false} - { integration_test: "meltano-manifest", needs_postgres: false} + - { integration_test: "meltano-run-merge-states", needs_postgres: false} - { integration_test: "meltano-expand-envvars-in-array", needs_postgres: false} fail-fast: false diff --git a/docs/docs/reference/command-line-interface.md b/docs/docs/reference/command-line-interface.md index 95ce09d6db..1b25fbe53e 100644 --- a/docs/docs/reference/command-line-interface.md +++ b/docs/docs/reference/command-line-interface.md @@ -124,16 +124,16 @@ meltano add extractor this-will-be-ignored --from-ref tap-spotify--matatika.yml # The above also applies to the plugin variant, if provided meltano add extractor this-will-be-ignored --variant this-will-also-be-ignored --from-ref tap-spotify--matatika.yml -# Once added, the custom plugin defintion can be updated by removing the plugin +# Once added, the custom plugin definition can be updated by removing the plugin # and re-adding it with the same `meltano add --from-ref` command meltano remove extractor tap-spotify meltano add extractor tap-spotify --from-ref tap-spotify--matatika.yml ``` -Using `--from-ref` allows you to add a plugin before it is avilable on [Meltano Hub](https://hub.meltano.com/), such as during development or testing of a plugin. It can also be used to try out plugins that have their [definition](/concepts/project#custom-plugin-definitions) published an accessible at a public URL, external to the Hub. +Using `--from-ref` allows you to add a plugin before it is available on [Meltano Hub](https://hub.meltano.com/), such as during development or testing of a plugin. It can also be used to try out plugins that have their [definition](/concepts/project#custom-plugin-definitions) published an accessible at a public URL, external to the Hub. :::note - Meltano will throw an error if the referenced plugin definiton is invalid or missing any required properties - see the [Meltano Hub plugin definition syntax](/reference/plugin-definition-syntax) for more information. + Meltano will throw an error if the referenced plugin definition is invalid or missing any required properties - see the [Meltano Hub plugin definition syntax](/reference/plugin-definition-syntax) for more information. ::: By default, `meltano add` will attempt to install the plugin after adding it. Use `--no-install` to skip this behavior: @@ -454,6 +454,10 @@ meltano config set --from-file ./file.txt uuidgen | meltano config set --from-file - ``` +:::info +

When setting a config value for an object or array setting, the file contents must be valid JSON.

+::: + ## `docs` Open the Meltano documentation site in the default browser. @@ -495,6 +499,8 @@ meltano el [--state-id TEXT] - One or more `--select ` options can be passed to only extract records for matching [selected entities](#select). Similarly, `--exclude ` can be used to extract records for all selected entities _except_ for those specified. +- A `--merge-state` flag can be passed to merge state with that of previous runs. + Notes: - The entities that are currently selected for extraction can be discovered using [`meltano select --list `](#select). @@ -915,7 +921,7 @@ Run a set of command blocks in series. Command blocks are specified as a list of plugin names, e.g. `meltano run some_tap some_mapping some_target some_plugin:some_cmd` and are run in the order they are specified from left to right. A failure in any block will cause the entire run to abort. -Multiple commmand blocks can be chained together or repeated, and extractor/loader pairs will automatically be linked to perform EL work. +Multiple command blocks can be chained together or repeated, and extractor/loader pairs will automatically be linked to perform EL work. If you have an active environment defined, a State ID is autogenerated for each extractor/loader pair and used to store and look up the [incremental replication state](/guide/integration#incremental-replication-state) in the [system database](/guide/production#storing-metadata). This allows subsequent runs with the same extractor and loader combinations to start where the previous run ended. The format of the generated id's is `:-to-(: tap-gitlab target-postgres - `--full-refresh` will force a full refresh and ignore the prior state. The new state after completion will still be updated with the execution results, unless `--no-state-update` is also specified. - `--force` will force a job run even if a conflicting job with the same generated ID is in progress. - `--state-id-suffix` define a custom suffix to generate a state ID with for each EL pair. +- `--merge-state` will merge state with that of previous runs. See the [example in the Meltano repository](https://github.com/meltano/meltano/blob/main/integration/example-library/meltano-run-merge-states/index.md). Examples: @@ -967,6 +974,9 @@ meltano --environment=dev run --force tap-gitlab target-postgres tap-salesforce # run a pipeline with a custom state ID suffix # the autogenerated ID for the EL pair will be 'dev:tap-gitlab-to-target-postgres:pipeline-alias' meltano --environment=dev --state-id-suffix pipeline-alias run tap-gitlab hide-secrets target-postgres + +# run a pipeline, merging state with that of previous runs. +meltano --environment=dev run --merge-state tap-gitlab target-postgres ``` ### Using `run` with Environments @@ -1437,7 +1447,7 @@ meltano state copy #### Examples ```bash -# Use prod state to update dev environemnt +# Use prod state to update dev environment meltano state copy prod:tap-gitlab-to-target-jsonl dev:tap-gitlab-to-target-jsonl ``` diff --git a/integration/example-library/meltano-run-merge-states/.gitignore b/integration/example-library/meltano-run-merge-states/.gitignore new file mode 100644 index 0000000000..15e24c3f86 --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/.gitignore @@ -0,0 +1,3 @@ +/venv +/.meltano +.env diff --git a/integration/example-library/meltano-run-merge-states/ending-meltano.yml b/integration/example-library/meltano-run-merge-states/ending-meltano.yml new file mode 100644 index 0000000000..5962a857f9 --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/ending-meltano.yml @@ -0,0 +1,27 @@ +version: 1 +default_environment: dev +project_id: a3e6d53c-8ccc-4cac-a89c-08b70120f243 +environments: +- name: dev +- name: staging +- name: prod +send_anonymous_usage_stats: false +plugins: + extractors: + - name: tap-with-state + namespace: tap_with_state + variant: custom + executable: ./tap.py + capabilities: + - discover + - catalog + - state + settings: + - name: ts + kind: date_iso8601 + description: Dummy timestamp + select: ["*.*"] + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/integration/example-library/meltano-run-merge-states/index.md b/integration/example-library/meltano-run-merge-states/index.md new file mode 100644 index 0000000000..2049547cc7 --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/index.md @@ -0,0 +1,89 @@ +# Get setup + +This example shows how state from sequential invocations of `meltano run` can be merged together to create a state object that combines bookmarks from streams synced in different runs. + +This is useful for when you want to backfill/refresh a single stream, without losing the state of other streams. + +```shell +meltano install +``` + +## Without state merging (default) + +### Extract all streams + +```shell +TAP_WITH_STATE_TS='2023-01-01T00:00:00Z' \ +meltano run tap-with-state target-jsonl --state-id-suffix=no-merge +``` + +### Extract a single stream + +Run a 'full refresh' pipeline of a single stream. + +```shell +TAP_WITH_STATE_TS='2023-01-01T01:00:00Z' \ +TAP_WITH_STATE__SELECT_FILTER='["stream_1"]' \ +meltano run tap-with-state target-jsonl --full-refresh --state-id-suffix=no-merge +``` + +Note that the state will only contain the bookmark for `stream_1`. + +```shell +meltano --environment=dev state get dev:tap-with-state-to-target-jsonl:no-merge +``` + +```json +{ + "singer_state": { + "bookmarks": { + "stream_1": { + "created_at": "2023-01-01T01:00:00Z" + } + } + } +} +``` + +## With state merging + +### Extract all streams + +```shell +TAP_WITH_STATE_TS='2023-01-01T00:00:00Z' \ +meltano run tap-with-state target-jsonl --state-id-suffix=merge +``` + +### Filter a single stream, merging states + +Run a 'full refresh' pipeline of a single stream, but merge the current pipelines state with the latest stored state. + +```shell +TAP_WITH_STATE_TS='2023-01-01T01:00:00Z' \ +TAP_WITH_STATE__SELECT_FILTER='["stream_1"]' \ +meltano run tap-with-state target-jsonl --full-refresh --state-id-suffix=merge --merge-state +``` + +Note that the state will now contain both the new bookmark for `stream_1` and the old bookmarks for the other streams. + +```shell +meltano --environment=dev state get dev:tap-with-state-to-target-jsonl:merge +``` + +```json +{ + "singer_state": { + "bookmarks": { + "stream_1": { + "created_at": "2023-01-01T01:00:00Z" + }, + "stream_2": { + "created_at": "2023-01-01T00:00:00Z" + }, + "stream_3": { + "created_at": "2023-01-01T00:00:00Z" + } + } + } +} +``` diff --git a/integration/example-library/meltano-run-merge-states/meltano.yml b/integration/example-library/meltano-run-merge-states/meltano.yml new file mode 100644 index 0000000000..5962a857f9 --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/meltano.yml @@ -0,0 +1,27 @@ +version: 1 +default_environment: dev +project_id: a3e6d53c-8ccc-4cac-a89c-08b70120f243 +environments: +- name: dev +- name: staging +- name: prod +send_anonymous_usage_stats: false +plugins: + extractors: + - name: tap-with-state + namespace: tap_with_state + variant: custom + executable: ./tap.py + capabilities: + - discover + - catalog + - state + settings: + - name: ts + kind: date_iso8601 + description: Dummy timestamp + select: ["*.*"] + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/integration/example-library/meltano-run-merge-states/plugins/loaders/target-jsonl--andyh1203.lock b/integration/example-library/meltano-run-merge-states/plugins/loaders/target-jsonl--andyh1203.lock new file mode 100644 index 0000000000..5825fc4aac --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/plugins/loaders/target-jsonl--andyh1203.lock @@ -0,0 +1,34 @@ +{ + "plugin_type": "loaders", + "name": "target-jsonl", + "namespace": "target_jsonl", + "variant": "andyh1203", + "label": "JSON Lines (JSONL)", + "docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203", + "repo": "https://github.com/andyh1203/target-jsonl", + "pip_url": "target-jsonl", + "description": "JSONL loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png", + "settings": [ + { + "name": "destination_path", + "kind": "string", + "value": "output", + "label": "Destination Path", + "description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n" + }, + { + "name": "do_timestamp_file", + "kind": "boolean", + "value": false, + "label": "Include Timestamp in File Names", + "description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n" + }, + { + "name": "custom_name", + "kind": "string", + "label": "Custom File Name Override", + "description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n" + } + ] +} \ No newline at end of file diff --git a/integration/example-library/meltano-run-merge-states/tap.py b/integration/example-library/meltano-run-merge-states/tap.py new file mode 100755 index 0000000000..62825aea42 --- /dev/null +++ b/integration/example-library/meltano-run-merge-states/tap.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python + +"""Simple Singer tap.""" + +# flake8: noqa + +from __future__ import annotations + +import argparse +import datetime +import json +import sys + +CATALOG = { + "streams": [ + { + "tap_stream_id": f"stream_{i}", + "schema": { + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + }, + }, + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "selected": True, + }, + }, + ], + } + for i in (1, 2, 3) + ], +} + + +def sync_stream(stream_id: str, timestamp: str): + """Sync a stream.""" + sys.stdout.write( + json.dumps( + { + "type": "SCHEMA", + "stream": stream_id, + "schema": { + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + }, + }, + "key_properties": ["id"], + }, + ) + + "\n", + ) + sys.stdout.write( + json.dumps( + { + "type": "RECORD", + "stream": stream_id, + "record": { + "id": 1, + "created_at": timestamp, + }, + }, + ) + + "\n", + ) + + +def is_selected(stream_id: str, catalog: dict): + """Check if a stream is selected.""" + for stream in catalog["streams"]: + if stream["tap_stream_id"] != stream_id: + continue + metadata = stream.get("metadata", []) + stream_metadata = next( + filter(lambda m: m.get("breadcrumb") == [], metadata), + {}, + ) + print(f"stream_metadata: {stream_metadata}", file=sys.stderr) + return stream_metadata.get("metadata", {}).get("selected", False) + + return False + + +def sync(config: dict, catalog: dict, state: dict): + """Sync data.""" + print(f"catalog: {catalog}", file=sys.stderr) + timestamp = config.get( + "ts", + datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), + ) + state = {"bookmarks": {}} + + for i in (1, 2, 3): + if not is_selected(f"stream_{i}", catalog): + continue + sync_stream(f"stream_{i}", timestamp) + state["bookmarks"][f"stream_{i}"] = {"created_at": timestamp} + + sys.stdout.write(json.dumps({"type": "STATE", "value": state})) + + +def discover(): + """Discover catalog.""" + sys.stdout.write(json.dumps(CATALOG)) + + +def main(): + """Main entrypoint.""" + parser = argparse.ArgumentParser() + parser.add_argument("--config", type=argparse.FileType("r")) + parser.add_argument("--catalog", type=argparse.FileType("r")) + parser.add_argument("--state", type=argparse.FileType("r")) + parser.add_argument("--discover", action="store_true") + args = parser.parse_args() + + if args.discover: + discover() + return + + config = json.load(args.config) if args.config else {} + catalog = json.load(args.catalog) if args.catalog else {} + state = json.load(args.state) if args.state else {} + + sync(config, catalog, state) + + +if __name__ == "__main__": + main() diff --git a/integration/meltano-run-merge-states/validate.sh b/integration/meltano-run-merge-states/validate.sh new file mode 100644 index 0000000000..9edcd532dd --- /dev/null +++ b/integration/meltano-run-merge-states/validate.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +# shellcheck disable=SC1091 + +set -euo pipefail + +source "$(git rev-parse --show-toplevel)/integration/commons.sh" +cd "${TEST_DOCS_DIR}" + +meltano state get dev:tap-with-state-to-target-jsonl:no-merge | grep '{"singer_state": {"bookmarks": {"stream_1": {"created_at": "2023-01-01T01:00:00Z"}}}}' +meltano state get dev:tap-with-state-to-target-jsonl:merge | grep '{"singer_state": {"bookmarks": {"stream_1": {"created_at": "2023-01-01T01:00:00Z"}, "stream_2": {"created_at": "2023-01-01T00:00:00Z"}, "stream_3": {"created_at": "2023-01-01T00:00:00Z"}}}}' diff --git a/src/meltano/cli/cli.py b/src/meltano/cli/cli.py index 43602b0401..cad39094fe 100644 --- a/src/meltano/cli/cli.py +++ b/src/meltano/cli/cli.py @@ -98,7 +98,6 @@ def cli( # noqa: C901,WPS231 ProjectSettingsService.config_override["cli.log_config"] = log_config ctx.obj["explicit_no_environment"] = no_environment - no_color = get_no_color_flag() if no_color: ctx.color = False diff --git a/src/meltano/cli/elt.py b/src/meltano/cli/elt.py index 457c95bea1..2353d511c3 100644 --- a/src/meltano/cli/elt.py +++ b/src/meltano/cli/elt.py @@ -92,6 +92,11 @@ class ELOptions: ), is_flag=True, ) + merge_state = click.option( + "--merge-state", + is_flag=True, + help="Merges state with that of previous runs.", + ) @click.command( @@ -110,6 +115,7 @@ class ELOptions: @ELOptions.dump @ELOptions.state_id @ELOptions.force +@ELOptions.merge_state @click.pass_context @pass_project(migrate=True) @run_async @@ -127,6 +133,7 @@ async def el( # WPS408 dump: str, state_id: str, force: bool, + merge_state: bool, ): """ Run an EL pipeline to Extract and Load data. @@ -153,6 +160,7 @@ async def el( # WPS408 dump, state_id, force, + merge_state, ) @@ -173,6 +181,7 @@ async def el( # WPS408 @ELOptions.dump @ELOptions.state_id @ELOptions.force +@ELOptions.merge_state @click.pass_context @pass_project(migrate=True) @run_async @@ -191,6 +200,7 @@ async def elt( # WPS408 dump: str, state_id: str, force: bool, + merge_state: bool, ): """ Run an ELT pipeline to Extract, Load, and Transform data. @@ -218,6 +228,7 @@ async def elt( # WPS408 dump, state_id, force, + merge_state, ) @@ -236,6 +247,7 @@ async def _run_el_command( dump: str, state_id: str, force: bool, + merge_state: bool, ): if platform.system() == "Windows": raise CliError( @@ -276,6 +288,7 @@ async def _run_el_command( select_filter=select_filter, catalog=catalog, state=state, + merge_state=merge_state, ) if dump: @@ -303,6 +316,7 @@ def _elt_context_builder( select_filter=None, catalog=None, state=None, + merge_state=False, ): select_filter = select_filter or [] transform_name = None @@ -322,6 +336,7 @@ def _elt_context_builder( .with_select_filter(select_filter) .with_catalog(catalog) .with_state(state) + .with_merge_state(merge_state) ) diff --git a/src/meltano/cli/run.py b/src/meltano/cli/run.py index 99aff08389..38744ed76e 100644 --- a/src/meltano/cli/run.py +++ b/src/meltano/cli/run.py @@ -65,6 +65,11 @@ "--state-id-suffix", help="Define a custom suffix to autogenerate state IDs with.", ) +@click.option( + "--merge-state", + is_flag=True, + help="Merges state with that of previous runs.", +) @click.argument( "blocks", nargs=-1, @@ -80,6 +85,7 @@ async def run( no_state_update: bool, force: bool, state_id_suffix: str, + merge_state: bool, blocks: list[str], ): """ @@ -116,10 +122,11 @@ async def run( logger, project, blocks, - full_refresh, - no_state_update, - force, - state_id_suffix, + full_refresh=full_refresh, + no_state_update=no_state_update, + force=force, + state_id_suffix=state_id_suffix, + merge_state=merge_state, ) parsed_blocks = list(parser.find_blocks(0)) if not parsed_blocks: diff --git a/src/meltano/core/block/extract_load.py b/src/meltano/core/block/extract_load.py index 24984b610c..972d4c6c88 100644 --- a/src/meltano/core/block/extract_load.py +++ b/src/meltano/core/block/extract_load.py @@ -50,6 +50,7 @@ def __init__( update_state: bool | None = True, state_id_suffix: str | None = None, base_output_logger: OutputLogger | None = None, + merge_state: bool | None = False, ): """Use an ELBContext to pass information on to ExtractLoadBlocks. @@ -62,6 +63,7 @@ def __init__( update_state: Whether to update the state of the job. state_id_suffix: The state ID suffix to use. base_output_logger: The base logger to use. + merge_state: Whether to merge state at the end of run. """ self.project = project self.session = session @@ -70,6 +72,7 @@ def __init__( self.force = force self.update_state = update_state self.state_id_suffix = state_id_suffix + self.merge_state = merge_state # not yet used but required to satisfy the interface self.dry_run = False @@ -113,6 +116,7 @@ def __init__(self, project: Project): self._state_id_suffix = None self._env = {} self._blocks = [] + self._merge_state = False self._base_output_logger = None @@ -128,6 +132,19 @@ def with_job(self, job: Job): self._job = job return self + def with_merge_state(self, merge_state: bool): + """Set whether the state is to be merged or overwritten. + + Args: + merge_state : merge the state for the context + + Returns: + self + + """ + self._merge_state = merge_state + return self + def with_full_refresh(self, full_refresh: bool): """Set whether this is a full refresh. @@ -274,6 +291,7 @@ def context(self) -> ELBContext: update_state=self._state_update, state_id_suffix=self._state_id_suffix, base_output_logger=self._base_output_logger, + merge_state=self._merge_state, ) diff --git a/src/meltano/core/block/parser.py b/src/meltano/core/block/parser.py index 491f4af413..b1d3361650 100644 --- a/src/meltano/core/block/parser.py +++ b/src/meltano/core/block/parser.py @@ -70,6 +70,7 @@ def __init__( no_state_update: bool | None = False, force: bool | None = False, state_id_suffix: str | None = None, + merge_state: bool | None = False, ): """ Parse a meltano run command invocation into a list of blocks. @@ -84,6 +85,7 @@ def __init__( force: Whether to force a run if a job is already running (applies to all found sets). state_id_suffix: State ID suffix to use. + merge_state: Whether to merge state at end of run. Raises: ClickException: If a block name is not found. @@ -98,6 +100,7 @@ def __init__( self._plugins: list[ProjectPlugin] = [] self._commands: dict[int, str] = {} self._mappings_ref: dict[int, str] = {} + self._merge_state = merge_state task_sets_service: TaskSetsService = TaskSetsService(project) @@ -237,6 +240,7 @@ def _find_next_elb_set( # noqa: WPS231, WPS213 .with_full_refresh(self._full_refresh) .with_no_state_update(self._no_state_update) .with_state_id_suffix(self._state_id_suffix) + .with_merge_state(self._merge_state) ) if self._plugins[offset].type != PluginType.EXTRACTORS: diff --git a/src/meltano/core/elt_context.py b/src/meltano/core/elt_context.py index d1afa2dc00..d515294494 100644 --- a/src/meltano/core/elt_context.py +++ b/src/meltano/core/elt_context.py @@ -95,6 +95,7 @@ def __init__( catalog: str | None = None, state: str | None = None, base_output_logger: OutputLogger | None = None, + merge_state: bool | None = False, ): """Initialise ELT Context instance. @@ -113,6 +114,7 @@ def __init__( catalog: Catalog to pass to extractor. state: State to pass to extractor. base_output_logger: OutputLogger to use. + merge_state: Flag. Merges State at the end of run """ self.project = project self.job = job @@ -131,6 +133,7 @@ def __init__( self.state = state self.base_output_logger = base_output_logger + self.merge_state = merge_state @property def elt_run_dir(self): @@ -217,6 +220,7 @@ def __init__(self, project: Project): self._catalog = None self._state = None self._base_output_logger = None + self._merge_state = False def with_session(self, session: Session) -> ELTContextBuilder: """Include session when building context. @@ -332,6 +336,18 @@ def with_full_refresh(self, full_refresh: bool) -> ELTContextBuilder: self._full_refresh = full_refresh return self + def with_merge_state(self, merge_state: bool): + """Set whether the state is to be merged or overwritten. + + Args: + merge_state: Merges the state at the end of run. + + Returns: + Updated ELTContextBuilder instance. + """ + self._merge_state = merge_state + return self + def with_select_filter(self, select_filter: list[str]) -> ELTContextBuilder: """Include select filters when building context. @@ -486,4 +502,5 @@ def context(self) -> ELTContext: catalog=self._catalog, state=self._state, base_output_logger=self._base_output_logger, + merge_state=self._merge_state, ) diff --git a/src/meltano/core/logging/utils.py b/src/meltano/core/logging/utils.py index ab970ae4d4..987228af0d 100644 --- a/src/meltano/core/logging/utils.py +++ b/src/meltano/core/logging/utils.py @@ -183,15 +183,15 @@ def change_console_log_level(log_level: int = logging.DEBUG) -> None: class SubprocessOutputWriter(t.Protocol): """A basic interface suitable for use with `capture_subprocess_output`.""" - def writelines(self, lines: str): - """Write the provided lines to an output. + def writeline(self, line: str): + """Write the provided line to an output. Args: - lines: String to write + line: String to write """ -async def _write_line_writer(writer, line): +async def _write_line_writer(writer: SubprocessOutputWriter, line: bytes): # StreamWriters like a subprocess's stdin need special consideration if isinstance(writer, asyncio.StreamWriter): try: @@ -224,7 +224,7 @@ async def capture_subprocess_output( subprocess. line_writers: A `StreamWriter`, or object has a compatible writelines method. """ - while not reader.at_eof(): + while reader and not reader.at_eof(): line = await reader.readline() if not line: continue diff --git a/src/meltano/core/plugin/singer/target.py b/src/meltano/core/plugin/singer/target.py index cf6115a1ff..eacd8b4a58 100644 --- a/src/meltano/core/plugin/singer/target.py +++ b/src/meltano/core/plugin/singer/target.py @@ -3,6 +3,7 @@ import json import logging +import typing as t from datetime import datetime from meltano.core.behavior.hookable import hook @@ -13,6 +14,9 @@ from . import PluginType, SingerPlugin +if t.TYPE_CHECKING: + from sqlalchemy.orm import Session + logger = logging.getLogger(__name__) @@ -25,7 +29,7 @@ class BookmarkWriter: def __init__( self, job: Job, - session: object, + session: Session, payload_flag: int = Payload.STATE, state_service: StateService | None = None, ): @@ -161,7 +165,9 @@ def setup_bookmark_writer(self, plugin_invoker: PluginInvoker): if not elt_context or not elt_context.job or not elt_context.session: return - incomplete_state = elt_context.full_refresh and elt_context.select_filter + incomplete_state = ( + elt_context.full_refresh and elt_context.select_filter + ) or elt_context.merge_state payload_flag = Payload.INCOMPLETE_STATE if incomplete_state else Payload.STATE plugin_invoker.add_output_handler( diff --git a/tests/meltano/core/runner/test_runner.py b/tests/meltano/core/runner/test_runner.py index 67f39d71c2..458c4337bf 100644 --- a/tests/meltano/core/runner/test_runner.py +++ b/tests/meltano/core/runner/test_runner.py @@ -157,12 +157,16 @@ async def test_invoke( @pytest.mark.asyncio() @pytest.mark.parametrize( - ("full_refresh", "select_filter", "payload_flag"), + ("full_refresh", "merge_state", "select_filter", "payload_flag"), ( - (False, [], Payload.STATE), - (True, [], Payload.STATE), - (False, ["entity"], Payload.STATE), - (True, ["entity"], Payload.INCOMPLETE_STATE), + (False, False, [], Payload.STATE), + (True, False, [], Payload.STATE), + (False, False, ["entity"], Payload.STATE), + (True, False, ["entity"], Payload.INCOMPLETE_STATE), + (False, True, [], Payload.INCOMPLETE_STATE), + (True, True, [], Payload.INCOMPLETE_STATE), + (False, True, ["entity"], Payload.INCOMPLETE_STATE), + (True, True, ["entity"], Payload.INCOMPLETE_STATE), ), ) async def test_bookmark( @@ -177,6 +181,7 @@ async def test_bookmark( select_filter, payload_flag, elt_context, + merge_state, ): lines = (b'{"line": 1}\n', b'{"line": 2}\n', b'{"line": 3}\n') @@ -188,6 +193,7 @@ async def test_bookmark( subject.context.full_refresh = full_refresh subject.context.select_filter = select_filter + subject.context.merge_state = merge_state target_invoker = plugin_invoker_factory( target,