-
Notifications
You must be signed in to change notification settings - Fork 0
/
books_ingest.py
31 lines (23 loc) · 868 Bytes
/
books_ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import csv
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(hosts="http://127.0.0.1:9200")
file_path = "books.csv"
index_name = "books"
def transform_row(row):
cols = ['bookID', 'title', 'authors', 'average_rating', 'isbn', 'isbn13', 'language_code', 'num_pages',
'ratings_count', 'text_reviews_count', 'publication_date', 'publisher']
doc = {}
for i in range(len(cols)):
doc[cols[i]] = row[i]
return doc
def document_stream(file_to_index):
with open(file_to_index, "r") as csvfile:
for row in csv.reader(csvfile):
yield {"_index": index_name,
"_source": transform_row(row)
}
stream = document_stream(file_path)
for ok, response in helpers.streaming_bulk(es, actions=stream):
if not ok:
# Failure inserting
print(response)