forked from hsheth2/sample-dbt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_generated.py
103 lines (68 loc) · 3.33 KB
/
process_generated.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import json
def process_catalog(catalog):
catalog["metadata"]["generated_at"] = "2021-06-19T21:38:36.384613Z"
catalog["metadata"]["invocation_id"] = "just-some-random-id-2"
return catalog
def process_manifest(manifest):
def process_node(item):
if "root_path" in item:
item["root_path"] = "/some-path/sample-dbt"
if 'created_at' in item:
item['created_at'] = 1663278957.5715818
return item
root_path_fields = ["nodes", "docs", "macros", "sources"]
for root_path_field in root_path_fields:
for item_key, item in manifest[root_path_field].items():
manifest[root_path_field][item_key] = process_node(item)
manifest["metadata"]["generated_at"] = "2021-06-18T21:38:36.384613Z"
manifest["metadata"]["invocation_id"] = "just-some-random-id"
return manifest
def process_sources(sources):
def process_result(result):
result["max_loaded_at_time_ago_in_s"] = 42276862.910052
result["snapshotted_at"] = "2021-06-18T17:08:55.925443+00:00"
result['execution_time'] = 0.023441791534423828
for timing in result['timing']:
timing['completed_at'] = "2022-09-16T19:06:38.239639Z"
timing['started_at'] = "2022-09-16T19:06:38.239635Z"
return result
sources["elapsed_time"] = 3.1415
sources["metadata"]["generated_at"] = "2021-06-18T21:38:36.384613Z"
sources["metadata"]["invocation_id"] = "just-some-random-id"
sources["results"] = [process_result(result) for result in sources["results"]]
return sources
def process_run_results(run_results):
run_results["elapsed_time"] = 3.1415
run_results["metadata"]["generated_at"] = "2021-06-18T21:38:36.384613Z"
run_results["metadata"]["invocation_id"] = "just-some-random-id"
def process_run(result):
result['execution_time'] = 0.023441791534423828
for timing in result['timing']:
timing['completed_at'] = "2022-09-16T19:06:38.239639Z"
timing['started_at'] = "2022-09-16T19:06:38.239635Z"
return result
run_results["results"] = [process_run(result) for result in run_results["results"]]
return run_results
def main():
with open("./target/catalog.json", "r") as file:
catalog = json.load(file)
with open("./target/manifest.json", "r") as file:
manifest = json.load(file)
with open("./target/sources.json", "r") as file:
sources = json.load(file)
with open("./target/run_results.json", "r") as file:
run_results = json.load(file)
processed_catalog = process_catalog(catalog)
processed_manifest = process_manifest(manifest)
processed_sources = process_sources(sources)
processed_run_results = process_run_results(run_results)
with open("./target_processed/dbt_catalog.json", "w") as file:
json.dump(processed_catalog, file, indent=2, sort_keys=True)
with open("./target_processed/dbt_manifest.json", "w") as file:
json.dump(processed_manifest, file, indent=2, sort_keys=True)
with open("./target_processed/dbt_sources.json", "w") as file:
json.dump(processed_sources, file, indent=2, sort_keys=True)
with open("./target_processed/dbt_run_results.json", "w") as file:
json.dump(processed_run_results, file, indent=2, sort_keys=True)
if __name__ == "__main__":
main()