From 8d0a8758379497a3802aeb5decc3de9238136fbd Mon Sep 17 00:00:00 2001 From: Franz Osorio Date: Mon, 26 Feb 2024 11:17:50 -0500 Subject: [PATCH] Save log for marc2post Logging is handled by `processing_logger.py` in `lds-id/prep` --- marc2post | 184 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 117 insertions(+), 67 deletions(-) diff --git a/marc2post b/marc2post index ca4f382..99ffd6f 100755 --- a/marc2post +++ b/marc2post @@ -1,26 +1,28 @@ #!/usr/bin/env python -import sys import json -from os import path import re - +import sys +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta +from os import listdir, path +from os.path import isdir, isfile, join +from time import gmtime, strftime +from hashlib import md5 import yaml from modules.config_parser import args +from pymarc import MARCReader, XMLWriter +from pymarc.marcxml import record_to_xml, record_to_xml_node +from requests_futures import sessions -from datetime import datetime, timedelta -from time import gmtime, strftime +# from multiprocessing.dummy import Pool as ThreadPool -from os import listdir -from os.path import isdir, isfile, join -from requests_futures import sessions -# from multiprocessing.dummy import Pool as ThreadPool - -import xml.etree.ElementTree as ET -from pymarc import MARCReader -from pymarc import XMLWriter -from pymarc.marcxml import record_to_xml, record_to_xml_node +try: + sys.path.insert(1, "/marklogic/applications/lds-id") + from prep import processing_logger as pl +except Exception: + print("Failed to load logger") config = yaml.safe_load(open(args.config)) print() @@ -34,8 +36,10 @@ print(jobconfig) print() today = datetime.now() -yesterday = today - timedelta(1) #Yesterday's date. -startdate = yesterday # Set the startdate to yesterday, unless overwritten by parameter. +yesterday = today - timedelta(1) # Yesterday's date. +startdate = ( + yesterday # Set the startdate to yesterday, unless overwritten by parameter. +) if args.since != "": startdate = datetime.strptime(args.since, "%Y-%m-%d") @@ -48,6 +52,7 @@ starttime = st.isoformat()[:19] load_report["job_start"] = starttime load_report["loaded_records_success"] = 0 load_report["loaded_records_fail"] = 0 +load_report["inputrecords"] = 0 load_report["unsuppressed"] = 0 load_report["suppressed"] = 0 @@ -58,22 +63,26 @@ yesterday_formatted = datetime.strftime(yesterday, date_format) startdate_formatted = datetime.strftime(startdate, date_format) dates = [startdate_formatted] -if (yesterday_formatted != startdate_formatted): +if yesterday_formatted != startdate_formatted: nextday = startdate nextday_formatted_sourcedate = datetime.strftime(startdate, date_format) while nextday_formatted_sourcedate != yesterday_formatted: nextday = nextday + timedelta(days=1) nextday_formatted_sourcedate = datetime.strftime(nextday, date_format) dates.append(nextday_formatted_sourcedate) - + print(dates) days = {} for d in dates: files = [] for s in jobconfig["sources"]: - file_date = datetime.strftime(datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"]) - file = s["source_directory"] + s["file_pattern"].replace('%SOURCEDATE%', file_date) + file_date = datetime.strftime( + datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"] + ) + file = s["source_directory"] + s["file_pattern"].replace( + "%SOURCEDATE%", file_date + ) if path.exists(file): files.append(file) days[d] = {} @@ -81,34 +90,42 @@ for d in dates: # Got here so let's set up some MARC processing info. + def process_ingest_response(f): global load_report + load_report["inputrecords"] += 1 try: - r = { - "status_code": f.result().status_code, - "output": f.result().content - } + r = {"status_code": f.result().status_code, "output": f.result().content} if "Location" not in f.result().headers: # Probably have an error of some kind. load_report["loaded_records_fail"] += 1 - print("No 'location' header found in response. Possible error: {}".format(f.result().content)) + print( + "No 'location' header found in response. Possible error: {}".format( + f.result().content + ) + ) print("Source record: {}".format(f.result().request.body)) else: load_report["loaded_records_success"] += 1 r["uri"] = f.result().headers["Location"] print("Ingest status for {}: {}".format(r["uri"], r["status_code"])) except: - print('Error processing requests-future result.') + print("Error processing requests-future result.") e_type = sys.exc_info()[0] e_value = str(sys.exc_info()[1]) e_traceback = sys.exc_info()[2] - print('Error processing requests-future result, e_type: {}'.format(e_type)) - print('Error processing requests-future result, e_value: {}'.format(e_value)) - print('Error processing requests-future result, e_traceback: {}'.format(e_traceback)) - -find_fromoldcatalog = re.compile(re.escape('[from old catalog]'), re.IGNORECASE) -auth=(jobconfig["target"]["username"], jobconfig["target"]["password"]) -headers={"Content-type": "application/xml"} + print("Error processing requests-future result, e_type: {}".format(e_type)) + print("Error processing requests-future result, e_value: {}".format(e_value)) + print( + "Error processing requests-future result, e_traceback: {}".format( + e_traceback + ) + ) + + +find_fromoldcatalog = re.compile(re.escape("[from old catalog]"), re.IGNORECASE) +auth = (jobconfig["target"]["username"], jobconfig["target"]["password"]) +headers = {"Content-type": "application/xml"} session = sessions.FuturesSession(max_workers=jobconfig["threads"]) for d in days: @@ -116,7 +133,7 @@ for d in days: for f in days[d]["files"]: print("Processing file (" + d + "): " + f) marcxml_records = [] - with open(f, 'rb') as fh: + with open(f, "rb") as fh: reader = MARCReader(fh) for r in reader: try: @@ -125,61 +142,94 @@ for d in days: # special characters using Ӓ convention. # https://stackoverflow.com/questions/15304229/convert-python-elementtree-to-string bytesxml = ET.tostring(bytesxml, encoding="unicode") - bytesxml = find_fromoldcatalog.sub('', bytesxml) + bytesxml = find_fromoldcatalog.sub("", bytesxml) marcxml_records.append(bytesxml) except: - print('Error generating MARC/XML.') + print("Error generating MARC/XML.") if r is not None: - print('Error record 001: ' + str(r['001'])) + print("Error record 001: " + str(r["001"])) load_report["loaded_records_fail"] += 1 e_type = sys.exc_info()[0] e_value = str(sys.exc_info()[1]) e_traceback = sys.exc_info()[2] - print('Error generating MARC/XML, e_type: {}'.format(e_type)) - print('Error generating MARC/XML, e_value: {}'.format(e_value)) - print('Error generating MARC/XML, e_traceback: {}'.format(e_traceback)) - - if 'delete' in f: + print("Error generating MARC/XML, e_type: {}".format(e_type)) + print("Error generating MARC/XML, e_value: {}".format(e_value)) + print( + "Error generating MARC/XML, e_traceback: {}".format(e_traceback) + ) + + if "delete" in f: load_report["deletes"] += len(marcxml_records) - suppress_status = "deleted" #Suppressing seems to make sense, though not quite right. - elif 'unsuppressed' not in f: + suppress_status = ( + "deleted" # Suppressing seems to make sense, though not quite right. + ) + elif "unsuppressed" not in f: load_report["suppressed"] += len(marcxml_records) suppress_status = "suppressed" else: load_report["unsuppressed"] += len(marcxml_records) suppress_status = "unsuppressed" - + futures = [ - session.post(jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status, auth=auth, headers=headers, data=r.encode('utf-8')) + session.post( + jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status, + auth=auth, + headers=headers, + data=r.encode("utf-8"), + ) for r in marcxml_records ] - results = [ - process_ingest_response(f) - for f in futures - ] - + results = [process_ingest_response(f) for f in futures] + et = datetime.now() endtime = et.isoformat()[:19] timedelta = et - st +# Finalize values in `load_report` to work with logger +identifier = md5(startdate_formatted).hexdigest() load_report["job_end"] = endtime -load_report["status"] = "success" - -print('************') -print('************') -print('Job started at: {}'.format(starttime)) -print('Job ended at: {}'.format(endtime)) -print('Elapsed time: {}'.format(timedelta)) -print('Start date of loaded records: {}'.format(startdate_formatted)) -print('Number of days loaded: {}'.format(len(dates))) -print('Number of records loaded successfully: {}'.format(load_report["loaded_records_success"])) -print('Number of records load failures: {}'.format(load_report["loaded_records_fail"])) -print('Number of unsuppressed records: {}'.format(load_report["unsuppressed"])) -print('Number of suppressed records: {}'.format(load_report["suppressed"])) -print('Number of deleted records: {}'.format(load_report["deletes"])) -print('************') -print('************') +load_report["status"] = ( + "success" if load_report["loaded_records_fail"] == 0 else "failures_present" +) +load_report["load_type"] = "daily" +load_report["logmessage"] = ( + f"Success: Loaded {load_report["loaded_records_success"]} of {load_report["loaded_records"]} records processed." + if load_report["status"] == "success" + else "Failures: Loaded {load_report["loaded_records_success"]} of {load_report["loaded_records"]} records processed. Check process output for error details." +) +# Need to match expected key values +date_format = "%Y-%m-%dT%H:%M:%S.%f" +load_report["startdt"] = datetime.strftime(starttime, date_format) +load_report["enddt"] = datetime.strftime(endtime, date_format) +load_report["lastmoddt"] = load_report["enddt"] +load_report["loaded_records_success"] = load_report["outputrecords"] +load_report["loaded_records_fail"] = load_report["outputfailures"] +load_report["identifier"] = identifier + +print("************") +print("************") +print("Job started at: {}".format(starttime)) +print("Job ended at: {}".format(endtime)) +print("Elapsed time: {}".format(timedelta)) +print("Start date of loaded records: {}".format(startdate_formatted)) +print("Number of days loaded: {}".format(len(dates))) +print( + "Number of records loaded successfully: {}".format( + load_report["loaded_records_success"] + ) +) +print("Number of records load failures: {}".format(load_report["loaded_records_fail"])) +print("Number of unsuppressed records: {}".format(load_report["unsuppressed"])) +print("Number of suppressed records: {}".format(load_report["suppressed"])) +print("Number of deleted records: {}".format(load_report["deletes"])) +print("************") +print("************") print() print() +try: + logxml = pl.build_log(pl.baselogxml, load_report) + pl.save_log(logxml, identifier) +except Exception as e: + f"Failed to create log: {e}"