Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jecos committed Mar 8, 2024
1 parent ccf1ab8 commit 8da149d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
6 changes: 5 additions & 1 deletion dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
'--batch', batch,
'--study-id', study_id,
'--study-code', '{{ params.study_code }}'
)
) \
.add_spark_conf({
'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:3.3.2,io.delta:delta-core_2.12:2.3.0,io.projectglow:glow-spark3_2.12:1.2.1',
'spark.kubernetes.container.image': 'apache/spark:3.3.3'
})
def normalize_variant_operator(name):
return normalized_etl.prepend_args(name).operator(
task_id=f'normalize-{name}',
Expand Down
11 changes: 8 additions & 3 deletions dags/lib/operators/base_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class BaseConfig:
image: Optional[str] = None
arguments: List[str] = field(default_factory=list)

def args(self, *new_args):
def args(self, *new_args) -> Self:
c = copy.copy(self)
c.arguments = [*self.arguments, *new_args]
return c

def prepend_args(self, *new_args):
def prepend_args(self, *new_args) -> Self:
c = copy.copy(self)
c.arguments = [*new_args, *self.arguments]
return c
Expand All @@ -76,4 +76,9 @@ def build_operator(self, class_to_instantiate: Type[BaseKubernetesOperator], **k
namespace = self.kube_config.namespace,
service_account_name = self.kube_config.service_account_name,
**params
)
)

def with_image(self, new_image) -> Self:
c = copy.copy(self)
c.image = new_image
return c

0 comments on commit 8da149d

Please sign in to comment.