Skip to content

Commit

Permalink
Clean Airflow Lineage Backend and migrate status to millis (open-meta…
Browse files Browse the repository at this point in the history
…data#13666)

* Clean Airflow Lineage Backend and migrate status to millis

* Format

* chore(ui): update executions startTs and endTs to millis

* Remove lineage providers

---------

Co-authored-by: Sachin Chaurasiya <[email protected]>
  • Loading branch information
pmbrull and Sachin-chaurasiya authored Oct 20, 2023
1 parent 2454da1 commit 8cf8720
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 494 deletions.
13 changes: 13 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,16 @@ ALTER TABLE entity_extension_time_series DROP COLUMN temp;

ALTER TABLE entity_extension_time_series MODIFY COLUMN entityFQNHash VARCHAR(768) COLLATE ascii_bin, MODIFY COLUMN jsonSchema VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin,
ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp);

-- Airflow pipeline status set to millis
UPDATE entity_extension_time_series ts
JOIN pipeline_entity p
ON ts.entityFQNHash = p.fqnHash
SET ts.json = JSON_INSERT(
JSON_REMOVE(ts.json, '$.timestamp'),
'$.timestamp',
JSON_EXTRACT(ts.json, '$.timestamp') * 1000
)
WHERE ts.extension = 'pipeline.pipelineStatus'
AND JSON_EXTRACT(p.json, '$.serviceType') = 'Airflow'
;
13 changes: 13 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,16 @@ ALTER TABLE entity_extension_time_series DROP COLUMN temp;

ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(256) , ALTER COLUMN extension TYPE VARCHAR(256),
ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp);

-- Airflow pipeline status set to millis
UPDATE entity_extension_time_series ts
SET json = jsonb_set(
ts.json,
'{timestamp}',
to_jsonb(cast(ts.json #> '{timestamp}' as int8) *1000)
)
FROM pipeline_entity p
WHERE ts.entityFQNHash = p.fqnHash
and ts.extension = 'pipeline.pipelineStatus'
AND p.json #>> '{serviceType}' = 'Airflow'
;
19 changes: 6 additions & 13 deletions ingestion/src/airflow_provider_openmetadata/lineage/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,13 @@ class OpenMetadataLineageBackend(LineageBackend):
"""
Sends lineage data from tasks to OpenMetadata.
Configurable via ``airflow_provider_openmetadata.cfg`` as follows: ::
Configurable via `airflow.cfg` as follows:
[lineage]
backend = airflow_provider_openmetadata.lineage.OpenMetadataLineageBackend
airflow_service_name = airflow #make sure this service_name matches
the one configured in openMetadata
openmetadata_api_endpoint = http://localhost:8585
auth_provider_type = no-auth # use google here if you are
configuring google as SSO
secret_key = google-client-secret-key # it needs to be configured
only if you are using google as SSO the one configured in openMetadata
openmetadata_api_endpoint = http://localhost:8585
auth_provider_type = no-auth # use google here if you are configuring google as SSO
secret_key = google-client-secret-key # it needs to be configured
only if you are using google as SSO
backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
airflow_service_name = airflow
openmetadata_api_endpoint = http://localhost:8585/api
jwt_token = <token> # To auth to the OpenMetadata API
"""

def send_lineage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
from pydantic import BaseModel

from airflow_provider_openmetadata.lineage.config.commons import LINEAGE
from airflow_provider_openmetadata.lineage.config.providers import (
InvalidAirflowProviderException,
provider_config_registry,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
AuthProvider,
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)


class AirflowLineageConfig(BaseModel):
Expand All @@ -43,21 +42,6 @@ def parse_airflow_config(
Get airflow config from airflow.cfg and parse it
to the config model
"""
auth_provider_type = conf.get(
LINEAGE, "auth_provider_type", fallback=AuthProvider.no_auth.value
)

if auth_provider_type == AuthProvider.no_auth.value:
security_config = None
else:
load_security_config_fn = provider_config_registry.registry.get(
auth_provider_type
)
if not load_security_config_fn:
raise InvalidAirflowProviderException(
f"Cannot find {auth_provider_type} in airflow providers registry."
)
security_config = load_security_config_fn(conf)

return AirflowLineageConfig(
airflow_service_name=airflow_service_name,
Expand All @@ -73,8 +57,14 @@ def parse_airflow_config(
"openmetadata_api_endpoint",
fallback="http://localhost:8585/api",
),
authProvider=auth_provider_type,
securityConfig=security_config,
authProvider=AuthProvider.openmetadata.value,
securityConfig=OpenMetadataJWTClientConfig(
jwtToken=conf.get(
LINEAGE,
"jwt_token",
fallback=None,
),
),
verifySSL=conf.get(LINEAGE, "verify_ssl", fallback="no-ssl"),
),
)
Expand Down Expand Up @@ -102,10 +92,5 @@ def get_lineage_config() -> AirflowLineageConfig:
config = json.load(config_file)
return AirflowLineageConfig.parse_obj(config)

# If nothing is configured, let's use a default for local
return AirflowLineageConfig(
airflow_service_name="airflow",
metadata_config=OpenMetadataConnection(
hostPort="http://localhost:8585/api",
),
)
# If nothing is configured, raise
raise ValueError("Missing lineage backend configuration")

This file was deleted.

Loading

0 comments on commit 8cf8720

Please sign in to comment.