-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3aca0e2
commit bea4318
Showing
6 changed files
with
207 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import sys, os | ||
import time | ||
from threading import Thread | ||
import hl7 | ||
import logging | ||
from .hl7_message_parser import Hl7MessageParser | ||
logger = logging.getLogger(__name__) | ||
|
||
def run_in_separate_thread(monitor): | ||
monitor.monitor_folder() | ||
|
||
class Hl7FolderMonitor: | ||
|
||
def __init__(self, folder_path, handlers: dict, interval: int = 30): | ||
''' | ||
folder_path: the absolute path of the folder which will be monitored (for HL7 files to be found) | ||
handlers: dict: message type and callback. ex: {'ORM^O01': 'process_message'} | ||
interval: time (s) between to check of the folder | ||
''' | ||
self._folder_path = folder_path | ||
# An error message will probably never arrive from a file... | ||
# self._handlers = { | ||
# 'ERR': (handle_error_message,) | ||
# } | ||
self._handlers = {} | ||
self.add_handlers(handlers) | ||
self._interval = interval | ||
self.thread = None | ||
self._is_running = False | ||
self._parser = Hl7MessageParser() | ||
self._parser.set_field_definition('message_type', 'MSH.F9') | ||
|
||
def add_handlers(self, handlers: dict): | ||
self._handlers.update(handlers) | ||
|
||
def monitor_folder(self): | ||
self._is_running = True | ||
|
||
while self._is_running is True: | ||
# get files from folder | ||
for path in os.listdir(self._folder_path): | ||
full_path = os.path.join(self._folder_path, path) | ||
|
||
# quick parse and call handler if present | ||
with open(full_path, newline="\r") as f: | ||
file_content = f.read() | ||
message = self._parser.parse(file_content) | ||
message_type = message['message_type'] | ||
if message_type in self._handlers: | ||
self._handlers[message_type](file_content) | ||
else: | ||
logger.error(f"No handler found for {message_type} message. Keeping file for debug...") | ||
continue | ||
|
||
# delete file | ||
os.remove(full_path) | ||
|
||
# wait interval before next check | ||
time.sleep(self._interval) | ||
|
||
self._is_running = False | ||
|
||
def start(self): | ||
""" run the server in a separate thread | ||
call server.stop() from another thread to stop the server | ||
""" | ||
self.thread = Thread(target = run_in_separate_thread, args = (self,)) | ||
self.thread.start() | ||
|
||
def stop(self): | ||
"""stops a server that has been started with start()""" | ||
self._is_running = False | ||
if self.thread is not None: | ||
self.thread.join() | ||
self.thread = None | ||
|
||
def __enter__(self): | ||
self.start() | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.stop() | ||
|
||
def is_running(self): | ||
return self._is_running | ||
|
||
|
||
def default_message_handler(message): | ||
return NotImplementedError("Please implement a message handler.") | ||
|
||
# An error message will probably never arrive from a file... | ||
# def handle_error_message(self, message: str, error_description: str = None) -> hl7.Message: | ||
# | ||
# hl7_request = hl7.parse(message) # we need to re-parse it here only the build the response | ||
# | ||
# hl7_response = hl7.parse('MSH|^~\&|{sending_application}||{receiving_application}|{receiving_facility}|{date_time}||ACK^O01|{ack_message_id}|P|2.3||||||8859/1\rMSA|AR|{message_id}|{error}'.format( # TODO: handle encoding | ||
# sending_application = hl7_request['MSH.F5.R1.C1'], | ||
# receiving_application = hl7_request['MSH.F3.R1.C1'], | ||
# receiving_facility = hl7_request['MSH.F4.R1.C1'], | ||
# date_time = datetime.now().strftime("%Y%m%d%H%M%S"), | ||
# message_id = hl7_request['MSH.F10.R1.C1'], | ||
# ack_message_id = str(random.randrange(0, 10 ** 15)), | ||
# error = error_description | ||
# )) | ||
# return hl7_response | ||
|
||
# this is just a very quick usage example that does nothing usefull since it uses abstract handler | ||
if __name__ == "__main__": | ||
server = Hl7FolderMonitor('/home/messages', { | ||
'ORM^O01': (default_message_handler,) | ||
}) | ||
# terminate with Ctrl-C | ||
try: | ||
server.monitor_folder() | ||
except KeyboardInterrupt: | ||
sys.exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import os | ||
import time | ||
import unittest | ||
import hl7 # https://python-hl7.readthedocs.org/en/latest/ | ||
from orthanc_tools import hl7Lib | ||
import re | ||
from orthanc_tools import Hl7FolderMonitor | ||
import tempfile | ||
from orthanc_api_client import helpers | ||
from orthanc_tools import Hl7WorklistParserVetera, DicomWorklistBuilder, Hl7OrmWorklistMsgHandler | ||
|
||
def hl7_echo_message_handler(incoming_hl7_message: str) -> hl7.Message: | ||
""" | ||
This is a 'stupid' handler that just repeats the message it receives (useful for testing) | ||
""" | ||
pass | ||
return hl7.parse(incoming_hl7_message) | ||
|
||
|
||
class TestHl7FolderMonitor(unittest.TestCase): | ||
|
||
def test_start_and_stop(self): | ||
with tempfile.TemporaryDirectory() as temp_dir: | ||
monitor = Hl7FolderMonitor(temp_dir, { | ||
}, 5) | ||
|
||
# just make sure we can start/stop the server | ||
self.assertFalse(monitor.is_running()) | ||
|
||
monitor.start() | ||
self.assertTrue(monitor.is_running()) | ||
|
||
monitor.stop() | ||
self.assertFalse(monitor.is_running()) | ||
|
||
def test_callback_and_deletion(self): | ||
# start a monitor that will check the folder and delete the file after the callback | ||
with tempfile.TemporaryDirectory() as temp_dir: | ||
monitor = Hl7FolderMonitor(temp_dir, {'ORM^O01': hl7_echo_message_handler}, 3) | ||
|
||
# validate that ORM^O01 messages has been received | ||
hl7_str = "MSH|^~\&|TOTO|TUTU|SOFTNAME|CHABC|201602011049||ORM^O01|exp_ANE_5|P|2.3.1\rPID|1||8123456DK01||DUPONT^ALBERT ANTHONY|||||||||||||123456" | ||
|
||
file_path = temp_dir + "/test.hl7" | ||
f = open(file_path, "w") | ||
f.write(hl7_str) | ||
f.close() | ||
self.assertEqual(1, len(os.listdir(temp_dir))) | ||
|
||
monitor.start() | ||
|
||
# wait until the file has been deleted | ||
helpers.wait_until(lambda: len(os.listdir(temp_dir)) == 0, 4) | ||
self.assertEqual(0, len(os.listdir(temp_dir))) | ||
monitor.stop() | ||
|
||
def test_worklist_creation(self): | ||
# start a monitor that will check the folder and create the wl file | ||
with tempfile.TemporaryDirectory() as temp_dir_hl7: | ||
with tempfile.TemporaryDirectory() as temp_dir_wl: | ||
orm_parser = Hl7WorklistParserVetera() | ||
worklist_builder = DicomWorklistBuilder(folder=temp_dir_wl) | ||
orm_handler = Hl7OrmWorklistMsgHandler(parser=orm_parser, builder=worklist_builder) | ||
|
||
monitor = Hl7FolderMonitor(temp_dir_hl7, {'ORM^O01': orm_handler.handle_orm_message}, 3) | ||
|
||
hl7_str = "MSH|^~\&|VETERA|VETERA|conquest|conquest|20170731081517||ORM^O01|1000000001|P|2.5.0|||||\r"\ | ||
"PID|1|999888777||123456789012345|GP.Software^Vetera||20070501|F|||||||||||||||||||||||||||Katze|Balinese|ALTERED|ZH-123|\r"\ | ||
"ORC|NW||||||||20170731081517||||||||||\r"\ | ||
"OBR|||1000000001|HD||20170731081517|||||||||||||||DX|||ZUG||||||||Dr. P. Muster||||\r" | ||
|
||
file_path = temp_dir_hl7 + "/test.hl7" | ||
f = open(file_path, "w") | ||
f.write(hl7_str) | ||
f.close() | ||
|
||
self.assertEqual(1, len(os.listdir(temp_dir_hl7))) | ||
|
||
monitor.start() | ||
|
||
# wait until the hl7 file has been deleted, so that, the wl file should have been created | ||
helpers.wait_until(lambda: len(os.listdir(temp_dir_hl7)) == 0, 4) | ||
|
||
self.assertEqual(1, len(os.listdir(temp_dir_wl))) | ||
monitor.stop() | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters