Skip to content

Commit

Permalink
Merge pull request #26 from quantile-development/refactor
Browse files Browse the repository at this point in the history
Multiple refactors
  • Loading branch information
JulesHuisman authored Feb 18, 2023
2 parents 3d44eaf + c3f75b3 commit 0517c3c
Show file tree
Hide file tree
Showing 20 changed files with 743 additions and 773 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ ops:
TAP_SMOKE_TEST_STREAMS: '[{"stream_name": "new-stream", "input_filename": "demo.json"}]'
```
An example of running an arbitrary Meltano command.
```python
from dagster import repository, job
from dagster_meltano import meltano_resource, meltano_command_op

@job(resource_defs={"meltano": meltano_resource})
def meltano_command_job():
meltano_command_op("install loader tap-smoke-test")()

@repository()
def repository():
return [meltano_command_job]
```
## Development using VSCode
1. Open this repository in Visual Studio Code.
Expand Down
2 changes: 1 addition & 1 deletion dagster_meltano/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
load_jobs_from_meltano_project,
)
from dagster_meltano.meltano_resource import MeltanoResource, meltano_resource
from dagster_meltano.ops import meltano_install_op, meltano_run_op
from dagster_meltano.ops import meltano_command_op, meltano_install_op, meltano_run_op
40 changes: 0 additions & 40 deletions dagster_meltano/assets.py

This file was deleted.

2 changes: 2 additions & 0 deletions dagster_meltano/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class MeltanoCommandError(Exception):
pass
44 changes: 2 additions & 42 deletions dagster_meltano/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,14 @@
op,
)

from dagster_meltano.meltano_invoker import MeltanoInvoker
from dagster_meltano.ops import meltano_run_op as meltano_run_op_factory
from dagster_meltano.utils import generate_dagster_name


class Job:
def __init__(
self,
meltano_job: dict,
meltano_invoker: MeltanoInvoker,
) -> None:
def __init__(self, meltano_job: dict) -> None:
self.name = meltano_job["job_name"]
self.tasks = meltano_job["tasks"]
self.meltano_invoker = meltano_invoker

@property
def dagster_name(self) -> str:
Expand All @@ -54,44 +48,10 @@ def dagster_job():
meltano_run_op = meltano_run_op_factory(task)
current_task_contains_tap = self.task_contains_tap(task)

# # If the task does not contain a tap, we generate a new layer
# if not self.task_contains_tap(task):
# op_layers.append([])

if not current_task_contains_tap or not previous_task_contains_tap:
op_layers.append([])

# if not current_task_contains_tap and previous_task_contains_tap:

# op_layers.append([])

# # When we are in the first layer
# if len(op_layers) == 1:
# op_layers[-1].append(meltano_run_op())
# continue

op_layers[-1].append(meltano_run_op(op_layers[-2]))
op_layers[-1].append(meltano_run_op(after=op_layers[-2]))
previous_task_contains_tap = current_task_contains_tap

# logging.warning(op_layers)

# for op_layer_index, op_layer in enumerate(op_layers)[1:]:
# previous_layer = op_layers[op_layer_index - 1]

# for
# [meltano_run_op(previous_layer) for meltano_run_op in op_layer]

# meltano_run_op()

# meltano_run = meltano_run_op(command=task)

# if previous_done:
# previous_done = meltano_run(previous_done)
# else:
# previous_done = meltano_run()

# if self.task_contains_tap(task):
# previous_done.append(previous_done)
# continue

return dagster_job
10 changes: 0 additions & 10 deletions dagster_meltano/log_processing.py

This file was deleted.

50 changes: 0 additions & 50 deletions dagster_meltano/log_processing/__init__.py

This file was deleted.

22 changes: 0 additions & 22 deletions dagster_meltano/log_processing/json_processor.py

This file was deleted.

80 changes: 0 additions & 80 deletions dagster_meltano/log_processing/metadata_processor.py

This file was deleted.

15 changes: 0 additions & 15 deletions dagster_meltano/log_processing/passthrough_processor.py

This file was deleted.

2 changes: 1 addition & 1 deletion dagster_meltano/logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: json
formatter: default
stream: "ext://sys.stderr"

root:
Expand Down
Loading

0 comments on commit 0517c3c

Please sign in to comment.