Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Lunary's global event queue in callback handlers #28

Merged
merged 12 commits into from
May 21, 2024
19 changes: 6 additions & 13 deletions motleycrew/tracking/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
ChatMessage = None

from lunary import track_event
from lunary.event_queue import EventQueue
from lunary.consumer import Consumer
from lunary import event_queue_ctx

from motleycrew.common.enums import LunaryRunType, LunaryEventName
from motleycrew.common.utils import ensure_module_is_installed
Expand Down Expand Up @@ -81,26 +80,20 @@ class LlamaIndexLunaryCallbackHandler(BaseCallbackHandler):
def __init__(
self,
app_id: str,
event_starts_to_ignore: List[CBEventType] = [],
event_ends_to_ignore: List[CBEventType] = [],
queue: EventQueue = None,
event_starts_to_ignore: List[CBEventType] = None,
event_ends_to_ignore: List[CBEventType] = None,
):
ensure_module_is_installed("llama_index")
super(LlamaIndexLunaryCallbackHandler, self).__init__(
event_starts_to_ignore=event_starts_to_ignore,
event_ends_to_ignore=event_ends_to_ignore,
event_starts_to_ignore=event_starts_to_ignore or [],
event_ends_to_ignore=event_ends_to_ignore or [],
)

self.__app_id = app_id
self._track_event = track_event
self._event_run_type_ids = []

if queue is not None:
self.queue = EventQueue()
self.consumer = Consumer(self.queue, self.__app_id)
self.consumer.start()
else:
self.queue = queue
self.queue = event_queue_ctx.get()

def _get_initial_track_event_params(
self, run_type: LunaryRunType, event_name: LunaryEventName, run_id: str = None
Expand Down
Loading