diff --git a/sarracenia/__init__.py b/sarracenia/__init__.py index a2797165c..f0c8b64da 100755 --- a/sarracenia/__init__.py +++ b/sarracenia/__init__.py @@ -1019,10 +1019,11 @@ def getContent(msg,options=None): # inlined/embedded case. if 'content' in msg: + logger.info("Getting msg from inline'd content") if msg['content']['encoding'] == 'base64': return b64decode(msg['content']['value']) else: - return msg['content']['value'].encode('utf-8') + return msg['content']['value'].encode('utf-8') if not hasattr(options,'inputCharset') else msg['content']['value'].encode(options.inputCharset) path='' if msg['baseUrl'].startswith('file:'): diff --git a/sarracenia/bulletin.py b/sarracenia/bulletin.py index 6e2b3b0a0..fc461f5a1 100644 --- a/sarracenia/bulletin.py +++ b/sarracenia/bulletin.py @@ -23,7 +23,9 @@ class Bulletin: from sarracenia.bulletin import Bulletin """ - def __init__(self): + def __init__(self,options): + super().__init__() + self.o = options self.seq = 0 self.binary = 0 @@ -125,7 +127,7 @@ def getData(self, msg, path): try: self.binary = 0 - if msg['content']: + if 'content' in msg: data = msg['content']['value'] # Change from b64. We want to get the header from the raw binary data. Not retrievable in b64 format @@ -339,4 +341,4 @@ def getTime(self, data): ddHHMM = time.strftime('%d%H%M', timeStruct) return ddHHMM except Exception as e: - return None \ No newline at end of file + return None diff --git a/sarracenia/flowcb/gather/am.py b/sarracenia/flowcb/gather/am.py index ae6f48808..e615fc38f 100644 --- a/sarracenia/flowcb/gather/am.py +++ b/sarracenia/flowcb/gather/am.py @@ -78,7 +78,7 @@ class Am(FlowCB): def __init__(self, options): super().__init__(options,logger) - self.bulletinHandler = Bulletin() + self.bulletinHandler = Bulletin(self.o) self.url = urllib.parse.urlparse(self.o.sendTo) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index 6a8d73d18..6d8c8a115 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -76,7 +76,7 @@ def __init__(self,options) : super().__init__(options,logger) self.seq = 0 self.binary = 0 - self.bulletinHandler = Bulletin() + self.bulletinHandler = Bulletin(self.o) # Need to redeclare these options to have their default values be initialized. self.o.add_option('inputCharset', 'str', 'utf-8') self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG']) @@ -87,18 +87,23 @@ def after_accept(self,worklist): new_worklist = [] for msg in worklist.incoming: - path = msg['new_dir'] + '/' + msg['new_file'] - data = self.bulletinHandler.getData(msg, path) + # If called by a sarra, should always have post_baseDir, so should be OK in specifying it + path = self.o.post_baseDir + '/' + msg['relPath'] - # AM bulletins that need their filename rewritten with data should only have two chars before the first underscore - # This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71 - # These messages are still good, so we will add them to the good_msgs list - # if len(filenameFirstChars) != 2 and self.binary: - # good_msgs.append(msg) - # continue + data = msg.getContent(self.o) - if data == None: + # Determine if bulletin is binary or not + # From sundew source code + if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: + # Decode data, only text. The raw binary data contains the header in which we're interested. Only get that header. + data = data.splitlines()[0].decode('ascii') + else: + # Data is not binary + data = data.decode(self.o.inputCharset) + + + if not data: logger.error("No data was found. Skipping message") worklist.rejected.append(msg) continue @@ -133,13 +138,16 @@ def after_accept(self,worklist): # Generate a sequence (random ints) seq = self.bulletinHandler.getSequence() - + # Assign a default value for messages not coming from AM + if 'isProblem' not in msg: + msg['isProblem'] = False + # Rename file with data fetched try: # We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems # We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins - if ddhhmm == None or msg["isProblem"]: + if ddhhmm == None or msg['isProblem']: timehandler = datetime.datetime.now() # Add current time as new timestamp to filename @@ -162,13 +170,14 @@ def after_accept(self,worklist): new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq msg['new_file'] = new_file - # We need the rest of the fields to be also updated - del(msg['relPath']) + # No longer needed - del(msg['isProblem']) - msg.updatePaths(self.o, msg['new_dir'], msg['new_file']) + if 'isProblem' in msg: + del(msg['isProblem']) + + # msg.updatePaths(self.o, msg['new_dir'], msg['new_file']) - logger.info(f"New filename (with path): {msg['relPath']}") + logger.info(f"New filename: {msg['new_file']}") new_worklist.append(msg) except Exception as e: diff --git a/tests/sarracenia/flowcb/gather/am__gather_test.py b/tests/sarracenia/flowcb/gather/am__gather_test.py index 17c51aeae..7f7981016 100755 --- a/tests/sarracenia/flowcb/gather/am__gather_test.py +++ b/tests/sarracenia/flowcb/gather/am__gather_test.py @@ -25,6 +25,7 @@ def __init__(self): self.fileAgeMax = 0 self.post_baseUrl = "http://localhost/" self.post_format = "v02" + self.post_baseDir = "/this/path/is/fake" def add_option(self, option, type, default = None): if not hasattr(self, option): @@ -49,6 +50,7 @@ def make_message(): m["to_clusters"] = "localhost" m["baseUrl"] = "https://NotARealURL" m["post_baseUrl"] = "https://NotARealURL" + m["post_baseDir"] = "/this/path/is/fake" m["relPath"] = "ThisIsAPath/To/A/File.txt" m["_deleteOnPost"] = set() return m