Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging to marc2post #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 117 additions & 67 deletions marc2post
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#!/usr/bin/env python

import sys
import json
from os import path
import re

import sys
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from os import listdir, path
from os.path import isdir, isfile, join
from time import gmtime, strftime
from hashlib import md5
import yaml
from modules.config_parser import args
from pymarc import MARCReader, XMLWriter
from pymarc.marcxml import record_to_xml, record_to_xml_node
from requests_futures import sessions

from datetime import datetime, timedelta
from time import gmtime, strftime
# from multiprocessing.dummy import Pool as ThreadPool

from os import listdir
from os.path import isdir, isfile, join

from requests_futures import sessions
# from multiprocessing.dummy import Pool as ThreadPool

import xml.etree.ElementTree as ET
from pymarc import MARCReader
from pymarc import XMLWriter
from pymarc.marcxml import record_to_xml, record_to_xml_node
try:
sys.path.insert(1, "/marklogic/applications/lds-id")
from prep import processing_logger as pl
except Exception:
print("Failed to load logger")

config = yaml.safe_load(open(args.config))
print()
Expand All @@ -34,8 +36,10 @@ print(jobconfig)
print()

today = datetime.now()
yesterday = today - timedelta(1) #Yesterday's date.
startdate = yesterday # Set the startdate to yesterday, unless overwritten by parameter.
yesterday = today - timedelta(1) # Yesterday's date.
startdate = (
yesterday # Set the startdate to yesterday, unless overwritten by parameter.
)

if args.since != "":
startdate = datetime.strptime(args.since, "%Y-%m-%d")
Expand All @@ -48,6 +52,7 @@ starttime = st.isoformat()[:19]
load_report["job_start"] = starttime
load_report["loaded_records_success"] = 0
load_report["loaded_records_fail"] = 0
load_report["inputrecords"] = 0

load_report["unsuppressed"] = 0
load_report["suppressed"] = 0
Expand All @@ -58,65 +63,77 @@ yesterday_formatted = datetime.strftime(yesterday, date_format)
startdate_formatted = datetime.strftime(startdate, date_format)

dates = [startdate_formatted]
if (yesterday_formatted != startdate_formatted):
if yesterday_formatted != startdate_formatted:
nextday = startdate
nextday_formatted_sourcedate = datetime.strftime(startdate, date_format)
while nextday_formatted_sourcedate != yesterday_formatted:
nextday = nextday + timedelta(days=1)
nextday_formatted_sourcedate = datetime.strftime(nextday, date_format)
dates.append(nextday_formatted_sourcedate)

print(dates)

days = {}
for d in dates:
files = []
for s in jobconfig["sources"]:
file_date = datetime.strftime(datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"])
file = s["source_directory"] + s["file_pattern"].replace('%SOURCEDATE%', file_date)
file_date = datetime.strftime(
datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"]
)
file = s["source_directory"] + s["file_pattern"].replace(
"%SOURCEDATE%", file_date
)
if path.exists(file):
files.append(file)
days[d] = {}
days[d]["files"] = files

# Got here so let's set up some MARC processing info.


def process_ingest_response(f):
global load_report
load_report["inputrecords"] += 1
try:
r = {
"status_code": f.result().status_code,
"output": f.result().content
}
r = {"status_code": f.result().status_code, "output": f.result().content}
if "Location" not in f.result().headers:
# Probably have an error of some kind.
load_report["loaded_records_fail"] += 1
print("No 'location' header found in response. Possible error: {}".format(f.result().content))
print(
"No 'location' header found in response. Possible error: {}".format(
f.result().content
)
)
print("Source record: {}".format(f.result().request.body))
else:
load_report["loaded_records_success"] += 1
r["uri"] = f.result().headers["Location"]
print("Ingest status for {}: {}".format(r["uri"], r["status_code"]))
except:
print('Error processing requests-future result.')
print("Error processing requests-future result.")
e_type = sys.exc_info()[0]
e_value = str(sys.exc_info()[1])
e_traceback = sys.exc_info()[2]
print('Error processing requests-future result, e_type: {}'.format(e_type))
print('Error processing requests-future result, e_value: {}'.format(e_value))
print('Error processing requests-future result, e_traceback: {}'.format(e_traceback))

find_fromoldcatalog = re.compile(re.escape('[from old catalog]'), re.IGNORECASE)
auth=(jobconfig["target"]["username"], jobconfig["target"]["password"])
headers={"Content-type": "application/xml"}
print("Error processing requests-future result, e_type: {}".format(e_type))
print("Error processing requests-future result, e_value: {}".format(e_value))
print(
"Error processing requests-future result, e_traceback: {}".format(
e_traceback
)
)


find_fromoldcatalog = re.compile(re.escape("[from old catalog]"), re.IGNORECASE)
auth = (jobconfig["target"]["username"], jobconfig["target"]["password"])
headers = {"Content-type": "application/xml"}
session = sessions.FuturesSession(max_workers=jobconfig["threads"])

for d in days:
print("Processing date: " + d)
for f in days[d]["files"]:
print("Processing file (" + d + "): " + f)
marcxml_records = []
with open(f, 'rb') as fh:
with open(f, "rb") as fh:
reader = MARCReader(fh)
for r in reader:
try:
Expand All @@ -125,61 +142,94 @@ for d in days:
# special characters using &#1234 convention.
# https://stackoverflow.com/questions/15304229/convert-python-elementtree-to-string
bytesxml = ET.tostring(bytesxml, encoding="unicode")
bytesxml = find_fromoldcatalog.sub('', bytesxml)
bytesxml = find_fromoldcatalog.sub("", bytesxml)
marcxml_records.append(bytesxml)
except:
print('Error generating MARC/XML.')
print("Error generating MARC/XML.")
if r is not None:
print('Error record 001: ' + str(r['001']))
print("Error record 001: " + str(r["001"]))
load_report["loaded_records_fail"] += 1
e_type = sys.exc_info()[0]
e_value = str(sys.exc_info()[1])
e_traceback = sys.exc_info()[2]
print('Error generating MARC/XML, e_type: {}'.format(e_type))
print('Error generating MARC/XML, e_value: {}'.format(e_value))
print('Error generating MARC/XML, e_traceback: {}'.format(e_traceback))

if 'delete' in f:
print("Error generating MARC/XML, e_type: {}".format(e_type))
print("Error generating MARC/XML, e_value: {}".format(e_value))
print(
"Error generating MARC/XML, e_traceback: {}".format(e_traceback)
)

if "delete" in f:
load_report["deletes"] += len(marcxml_records)
suppress_status = "deleted" #Suppressing seems to make sense, though not quite right.
elif 'unsuppressed' not in f:
suppress_status = (
"deleted" # Suppressing seems to make sense, though not quite right.
)
elif "unsuppressed" not in f:
load_report["suppressed"] += len(marcxml_records)
suppress_status = "suppressed"
else:
load_report["unsuppressed"] += len(marcxml_records)
suppress_status = "unsuppressed"

futures = [
session.post(jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status, auth=auth, headers=headers, data=r.encode('utf-8'))
session.post(
jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status,
auth=auth,
headers=headers,
data=r.encode("utf-8"),
)
for r in marcxml_records
]
results = [
process_ingest_response(f)
for f in futures
]

results = [process_ingest_response(f) for f in futures]

et = datetime.now()
endtime = et.isoformat()[:19]
timedelta = et - st

# Finalize values in `load_report` to work with logger
identifier = md5(startdate_formatted).hexdigest()
load_report["job_end"] = endtime
load_report["status"] = "success"

print('************')
print('************')
print('Job started at: {}'.format(starttime))
print('Job ended at: {}'.format(endtime))
print('Elapsed time: {}'.format(timedelta))
print('Start date of loaded records: {}'.format(startdate_formatted))
print('Number of days loaded: {}'.format(len(dates)))
print('Number of records loaded successfully: {}'.format(load_report["loaded_records_success"]))
print('Number of records load failures: {}'.format(load_report["loaded_records_fail"]))
print('Number of unsuppressed records: {}'.format(load_report["unsuppressed"]))
print('Number of suppressed records: {}'.format(load_report["suppressed"]))
print('Number of deleted records: {}'.format(load_report["deletes"]))
print('************')
print('************')
load_report["status"] = (
"success" if load_report["loaded_records_fail"] == 0 else "failures_present"
)
load_report["load_type"] = "daily"
load_report["logmessage"] = (
f"Success: Loaded {load_report["loaded_records_success"]} of {load_report["loaded_records"]} records processed."
if load_report["status"] == "success"
else "Failures: Loaded {load_report["loaded_records_success"]} of {load_report["loaded_records"]} records processed. Check process output for error details."
)
# Need to match expected key values
date_format = "%Y-%m-%dT%H:%M:%S.%f"
load_report["startdt"] = datetime.strftime(starttime, date_format)
load_report["enddt"] = datetime.strftime(endtime, date_format)
load_report["lastmoddt"] = load_report["enddt"]
load_report["loaded_records_success"] = load_report["outputrecords"]
load_report["loaded_records_fail"] = load_report["outputfailures"]
load_report["identifier"] = identifier

print("************")
print("************")
print("Job started at: {}".format(starttime))
print("Job ended at: {}".format(endtime))
print("Elapsed time: {}".format(timedelta))
print("Start date of loaded records: {}".format(startdate_formatted))
print("Number of days loaded: {}".format(len(dates)))
print(
"Number of records loaded successfully: {}".format(
load_report["loaded_records_success"]
)
)
print("Number of records load failures: {}".format(load_report["loaded_records_fail"]))
print("Number of unsuppressed records: {}".format(load_report["unsuppressed"]))
print("Number of suppressed records: {}".format(load_report["suppressed"]))
print("Number of deleted records: {}".format(load_report["deletes"]))
print("************")
print("************")

print()
print()

try:
logxml = pl.build_log(pl.baselogxml, load_report)
pl.save_log(logxml, identifier)
except Exception as e:
f"Failed to create log: {e}"