Skip to content

Commit

Permalink
Merge pull request #1279 from MetPX/issue1271
Browse files Browse the repository at this point in the history
Issue1271 - Fix accomodation to sarra component
  • Loading branch information
petersilva authored Oct 29, 2024
2 parents 2f35022 + 5d6086c commit 51a3663
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 22 deletions.
3 changes: 2 additions & 1 deletion sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,10 +1019,11 @@ def getContent(msg,options=None):

# inlined/embedded case.
if 'content' in msg:
logger.info("Getting msg from inline'd content")
if msg['content']['encoding'] == 'base64':
return b64decode(msg['content']['value'])
else:
return msg['content']['value'].encode('utf-8')
return msg['content']['value'].encode('utf-8') if not hasattr(options,'inputCharset') else msg['content']['value'].encode(options.inputCharset)

path=''
if msg['baseUrl'].startswith('file:'):
Expand Down
8 changes: 5 additions & 3 deletions sarracenia/bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class Bulletin:
from sarracenia.bulletin import Bulletin
"""

def __init__(self):
def __init__(self,options):
super().__init__()
self.o = options
self.seq = 0
self.binary = 0

Expand Down Expand Up @@ -125,7 +127,7 @@ def getData(self, msg, path):
try:

self.binary = 0
if msg['content']:
if 'content' in msg:
data = msg['content']['value']

# Change from b64. We want to get the header from the raw binary data. Not retrievable in b64 format
Expand Down Expand Up @@ -339,4 +341,4 @@ def getTime(self, data):
ddHHMM = time.strftime('%d%H%M', timeStruct)
return ddHHMM
except Exception as e:
return None
return None
2 changes: 1 addition & 1 deletion sarracenia/flowcb/gather/am.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Am(FlowCB):
def __init__(self, options):

super().__init__(options,logger)
self.bulletinHandler = Bulletin()
self.bulletinHandler = Bulletin(self.o)

self.url = urllib.parse.urlparse(self.o.sendTo)

Expand Down
43 changes: 26 additions & 17 deletions sarracenia/flowcb/rename/raw2bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self,options) :
super().__init__(options,logger)
self.seq = 0
self.binary = 0
self.bulletinHandler = Bulletin()
self.bulletinHandler = Bulletin(self.o)
# Need to redeclare these options to have their default values be initialized.
self.o.add_option('inputCharset', 'str', 'utf-8')
self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG'])
Expand All @@ -87,18 +87,23 @@ def after_accept(self,worklist):
new_worklist = []

for msg in worklist.incoming:
path = msg['new_dir'] + '/' + msg['new_file']

data = self.bulletinHandler.getData(msg, path)
# If called by a sarra, should always have post_baseDir, so should be OK in specifying it
path = self.o.post_baseDir + '/' + msg['relPath']

# AM bulletins that need their filename rewritten with data should only have two chars before the first underscore
# This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71
# These messages are still good, so we will add them to the good_msgs list
# if len(filenameFirstChars) != 2 and self.binary:
# good_msgs.append(msg)
# continue
data = msg.getContent(self.o)

if data == None:
# Determine if bulletin is binary or not
# From sundew source code
if data.splitlines()[1][:4] in self.o.binaryInitialCharacters:
# Decode data, only text. The raw binary data contains the header in which we're interested. Only get that header.
data = data.splitlines()[0].decode('ascii')
else:
# Data is not binary
data = data.decode(self.o.inputCharset)


if not data:
logger.error("No data was found. Skipping message")
worklist.rejected.append(msg)
continue
Expand Down Expand Up @@ -133,13 +138,16 @@ def after_accept(self,worklist):
# Generate a sequence (random ints)
seq = self.bulletinHandler.getSequence()


# Assign a default value for messages not coming from AM
if 'isProblem' not in msg:
msg['isProblem'] = False


# Rename file with data fetched
try:
# We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems
# We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins
if ddhhmm == None or msg["isProblem"]:
if ddhhmm == None or msg['isProblem']:
timehandler = datetime.datetime.now()

# Add current time as new timestamp to filename
Expand All @@ -162,13 +170,14 @@ def after_accept(self,worklist):
new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq

msg['new_file'] = new_file
# We need the rest of the fields to be also updated
del(msg['relPath'])

# No longer needed
del(msg['isProblem'])
msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])
if 'isProblem' in msg:
del(msg['isProblem'])

# msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])

logger.info(f"New filename (with path): {msg['relPath']}")
logger.info(f"New filename: {msg['new_file']}")
new_worklist.append(msg)

except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions tests/sarracenia/flowcb/gather/am__gather_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self):
self.fileAgeMax = 0
self.post_baseUrl = "http://localhost/"
self.post_format = "v02"
self.post_baseDir = "/this/path/is/fake"

def add_option(self, option, type, default = None):
if not hasattr(self, option):
Expand All @@ -49,6 +50,7 @@ def make_message():
m["to_clusters"] = "localhost"
m["baseUrl"] = "https://NotARealURL"
m["post_baseUrl"] = "https://NotARealURL"
m["post_baseDir"] = "/this/path/is/fake"
m["relPath"] = "ThisIsAPath/To/A/File.txt"
m["_deleteOnPost"] = set()
return m
Expand Down

0 comments on commit 51a3663

Please sign in to comment.