-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdata_checker.py
108 lines (97 loc) · 3.68 KB
/
data_checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import boto3
import gzip
import json
import os
from math import ceil
from pymongo import MongoClient
def load_file(filepath):
documents = {}
lines = open(filepath, 'r').read().splitlines()
for line in lines:
columns = line.split('\t')
documents[columns[0]] = columns[1]
return documents
def get_document_dict(remote_bucket, remote_filename):
with open('/tmp/source.gz', 'wb') as dest:
gcp_client.download_fileobj(remote_bucket, remote_filename, dest)
with gzip.open('/tmp/source.gz', 'rb') as gzfile:
byte_contents = gzfile.read()
with open('/tmp/source.tsv', 'wb') as tsvfile:
count = tsvfile.write(byte_contents)
return load_file('/tmp/source.tsv')
def check_existence(document_dict):
id_list = ['PMID:' + document_id for document_id in document_dict.keys()]
print(id_list[:10])
print(len(id_list))
found_ids = []
subs = ceil(len(id_list) / 10000)
for i in range(subs):
start = i * 10000
end = min(start + 10000, len(id_list))
sublist = [doc['document_id'] for doc in collection.find({'document_id': {'$in': id_list[start:end]}})]
found_ids.extend(sublist)
print(f'{len(sublist)} | {len(found_ids)}')
unfound_ids = set(id_list) - set(found_ids)
print(len(unfound_ids))
missing_dict = {}
for unfound_id in unfound_ids:
document_id = unfound_id.replace('PMID:', '')
if document_id not in document_dict:
print('not sure what to do with this ID: ' + document_id)
continue
filename = document_dict[document_id]
if filename not in missing_dict:
missing_dict[filename] = []
missing_dict[filename].append(document_id)
return missing_dict
def check_nonexistence(document_dict):
id_list = ['PMID:' + document_id for document_id in document_dict.keys()]
print(id_list[:10])
print(len(id_list))
found_ids = []
subs = ceil(len(id_list) / 10000)
for i in range(subs):
start = i * 10000
end = min(start + 10000, len(id_list))
sublist = [doc['document_id'] for doc in collection.find({'document_id': {'$in': id_list[start:end]}})]
found_ids.extend(sublist)
print(f'{len(sublist)} | {len(found_ids)}')
print(len(found_ids))
found_dict = {}
for found_id in found_ids:
document_id = found_id.replace('PMID:', '')
if document_id not in document_dict:
print('not sure what to do with this ID:' + document_id)
continue
filename = document_dict[document_id]
if filename not in found_dict:
found_dict[filename] = []
found_dict[filename].append(document_id)
return found_dict
def lambda_handler(event, context):
if 'body' in event:
body = json.loads(event['body'])
else:
body = event
if os.environ and 'connection_string' in os.environ:
client = MongoClient(os.environ['connection_string'])
else:
return 'Could not get database connection information', 500
if 'source' not in body:
return 'No source information provided', 400
source_info = body['source']
global gcp_client
global collection
gcp_client = boto3.client(
's3',
region_name='auto',
endpoint_url='https://storage.googleapis.com',
aws_access_key_id=source_info['hmac_key_id'],
aws_secret_access_key=source_info['hmac_secret']
)
db = client['test']
collection = db['documentMetadata']
main_dict = get_document_dict(source_info['bucket'], source_info['filepath'])
if 'deleted' in source_info['filepath']:
return check_nonexistence(main_dict)
return check_existence(main_dict)