From 3e1616925370d804deaccb33348641f801d49781 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 9 Nov 2023 18:12:53 +0100 Subject: [PATCH] Feat: better handling for task ids in handle_cast --- README.md | 34 ++++++++++++++++++++++++++++------ lib/surrealix/dispatch.ex | 8 ++++++-- lib/surrealix/handler_table.ex | 6 +++++- lib/surrealix/socket.ex | 28 +++++++++++++++------------- 4 files changed, 54 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index be54787..76e1b21 100644 --- a/README.md +++ b/README.md @@ -19,16 +19,38 @@ Surrealix.live(pid, "person", true) ## for more complex LIVE queries Surrealix.query(pid, "LIVE SELECT * FROM person;") Surrealix.query(pid, "LIVE SELECT DIFF FROM person;") +``` +```elixir +## Example with live query callbacks - -## start dispatch registry - -Surrealix.Dispatch.attach("first", [:live_query], fn (event, data, config)-> IO.inspect({:res, event, data}) end) -Surrealix.Dispatch.remove("first") -## try running following query in the SurrealDB shell: `create person:1 set name = "John"` +{:ok, pid} = Surrealix.start_link(namespace: "test", database: "test") +Surrealix.signin(pid, %{user: "root", pass: "root"}) +Surrealix.use(pid, "test", "test") +{:ok, + %{ + "result" => [ + %{ + "result" => lq_id, + } + ] + }} = Surrealix.query(pid, "LIVE SELECT * FROM person;") + +event = [:live_query, lq_id] +Surrealix.Dispatch.attach("logger", event, fn (event, data, config) -> + IO.puts ">>>>>> CALLBACK WORKS!" + IO.inspect(event, label: :EVENT) + IO.inspect(data, label: :DATA) +end) + +## now change data in the person table +Surrealix.query(pid, "update person:2 set name = 1") + +## to remove the callback +# Surrealix.Dispatch.remove_by_event(event) ``` + ## Installation If [available in Hex](https://hex.pm/docs/publish), the package can be installed diff --git a/lib/surrealix/dispatch.ex b/lib/surrealix/dispatch.ex index 23504fa..b11ed10 100644 --- a/lib/surrealix/dispatch.ex +++ b/lib/surrealix/dispatch.ex @@ -34,7 +34,11 @@ defmodule Surrealix.Dispatch do HandlerTable.insert(id, event, fun, config) end - def remove(id) do - HandlerTable.remove(id) + def remove_by_id(id) do + HandlerTable.remove_by_id(id) + end + + def remove_by_event(id) do + HandlerTable.remove_by_event(id) end end diff --git a/lib/surrealix/handler_table.ex b/lib/surrealix/handler_table.ex index ffb3d53..8ff414c 100644 --- a/lib/surrealix/handler_table.ex +++ b/lib/surrealix/handler_table.ex @@ -48,10 +48,14 @@ defmodule Surrealix.HandlerTable do end end - def remove(id) do + def remove_by_id(id) do :ets.match_delete(@table, {id, :_, :_, :_}) end + def remove_by_event(event) do + :ets.match_delete(@table, {:_, event, :_, :_}) + end + # Debug / dev functions ############# def delete_all() do :ets.delete_all_objects(@table) diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index b58ddc6..a439c38 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -46,29 +46,31 @@ defmodule Surrealix.Socket do exit(:normal) end - def handle_cast(caller, state) do + def handle_cast({method, args, id, task}, state) do Logger.debug("[surrealix] [handle_cast] #{inspect(state)}") - {method, args, id} = caller payload = build_cast_payload(method, args, id) - + state = SocketState.add_task(state, id, task) frame = {:text, payload} - {:reply, frame, args} + {:reply, frame, state} end def handle_frame({_type, msg}, state) do - task = Keyword.get(state, :__receiver__) json = Jason.decode!(msg) id = Map.get(json, "id") - - if not Process.alive?(task.pid) do - Surrealix.Dispatch.execute([:live_query], json) - end - - if Process.alive?(task.pid) do - Process.send(task.pid, {:ok, json, id}, []) + task = SocketState.get_task(state, id) + + if is_nil(task) do + # there is no registered task for this ID, it must be a live query update! + lq_id = get_in(json, ["result", "id"]) + Surrealix.Dispatch.execute([:live_query, lq_id], json) + else + if Process.alive?(task.pid) do + Process.send(task.pid, {:ok, json, id}, []) + end end + state = SocketState.delete_task(state, id) {:ok, state} end @@ -93,7 +95,7 @@ defmodule Surrealix.Socket do end) args = Keyword.merge([__receiver__: task], args) - WebSockex.cast(pid, {method, args, id}) + WebSockex.cast(pid, {method, args, id, task}) task_timeout = Keyword.get(opts, :timeout, :infinity) res = Task.await(task, task_timeout)