-
Notifications
You must be signed in to change notification settings - Fork 0
/
ingest-task.py
90 lines (80 loc) · 3.19 KB
/
ingest-task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
Ingest cellxgene data and upload to s3 in parallel using ray
"""
import argparse
import tqdm
import pandas as pd
import ray
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--census-version", default="2023-12-15")
parser.add_argument("--index", default="index.feather")
parser.add_argument("-n", "--max-num-observations", type=int, default=None)
parser.add_argument("-c", "--observations-per-file", type=int, default=100)
parser.add_argument("-d", "--max-parallel-downloads", type=int, default=None)
parser.add_argument("--bucket", type=str, default="braingeneers")
parser.add_argument(
"--gene-filter", default=None, help="ex. ENSG00000161798,ENSG00000139618"
)
parser.add_argument("dest", type=str, nargs="?", default="cellxgene")
args = parser.parse_args()
print(f"Starting Ray cluster with {args.max_parallel_downloads} cpus")
ray.init(num_cpus=args.max_parallel_downloads, ignore_reinit_error=True)
df = pd.read_feather(args.index)
soma_ids = df.soma_joinid[0 : args.max_num_observations].values
chunks = [
soma_ids[start_soma_id : start_soma_id + args.observations_per_file]
for start_soma_id in range(0, len(soma_ids), args.observations_per_file)
]
print(
f"Downloading {len(soma_ids):,} observations in {len(chunks):,} files to s3://{args.bucket}/{args.dest}/"
)
if args.gene_filter:
print(f"Filtering for genes: {args.gene_filter}")
@ray.remote
def ingest_chunk(chunk, args):
import cellxgene_census
import boto3
import tempfile
if args.gene_filter:
genes = ",".join([f"'{g}'" for g in args.gene_filter.split(",")])
var_value_filter = f"feature_id in [{genes}]"
else:
var_value_filter = None
with cellxgene_census.open_soma(census_version=args.census_version) as census:
anndata = cellxgene_census.get_anndata(
census=census,
organism="Homo sapiens",
var_value_filter=var_value_filter,
obs_coords=chunk,
column_names={
"obs": [
"soma_joinid",
],
"var": [
"soma_joinid",
"feature_id",
"feature_name",
"feature_length",
],
},
)
name = f"{str(chunk[0])}-{str(chunk[-1])}.h5ad"
with tempfile.NamedTemporaryFile() as f:
anndata.write_h5ad(f.name)
s3 = boto3.client("s3")
s3.upload_file(f.name, "braingeneers", f"{args.dest}/{name}")
return anndata
futures = [ingest_chunk.remote(chunk, args) for chunk in chunks]
progress_bar = tqdm.trange(len(chunks))
anndatas = []
while futures:
finished, futures = ray.wait(futures)
anndatas.append(ray.get(finished[0]))
progress_bar.update(len(finished))
progress_bar.close()
ray.shutdown()
print("Done.")