Skip to content

Commit

Permalink
emitting :key_deleted when key is purged
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxliron committed Apr 19, 2024
1 parent 8ee338c commit 89e6eab
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
29 changes: 28 additions & 1 deletion lib/gnat/jetstream/api/kv.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ defmodule Gnat.Jetstream.API.KV do
@stream_prefix "KV_"
@subject_prefix "$KV."
@two_minutes_in_nanoseconds 1_200_000_000
@operation_header "kv-operation"
@operation_del "DEL"
@operation_purge "PURGE"

@type bucket_options ::
{:history, non_neg_integer()}
Expand Down Expand Up @@ -248,10 +251,34 @@ defmodule Gnat.Jetstream.API.KV do
Gnat.Jetstream.API.KV.Watcher.stop(pid)
end

@doc ~S"""
Returns true if operation is `{"kv-operation", "DEL"}` or `{"kv-operation", "PURGE"}`
## Parameters
- `headers` - a list of headers to test
## Example
iex> is_delete_operation([{"kv-operation", "DEL"}])
true
iex> is_delete_operation([{"kv-operation", "PURGE"}])
true
iex> is_delete_operation([{"kv-operation", "ADD"}])
true
"""
@spec is_delete_operation?(headers :: Gnat.headers()) :: boolean()
def is_delete_operation?(headers) do
headers
|> Enum.filter(fn {k, v} ->
k == @operation_header and (v == @operation_del or v == @operation_purge)
end)
|> length() > 0
end

defp receive_keys(keys \\ %{}, bucket_name) do
receive do
{:msg, %{topic: key, body: body, headers: headers}} ->
if {"kv-operation", "DEL"} in headers do
if is_delete_operation?(headers) do
receive_keys(keys, bucket_name)
else
Map.put(keys, subject_to_key(key, bucket_name), body) |> receive_keys(bucket_name)
Expand Down
5 changes: 1 addition & 4 deletions lib/gnat/jetstream/api/kv/watcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ defmodule Gnat.Jetstream.API.KV.Watcher do

alias Gnat.Jetstream.API.{Consumer, KV, Util}

@operation_header "kv-operation"
@operation_del "DEL"

@type keywatch_handler ::
(action :: :key_deleted | :key_added, key :: String.t(), value :: any() -> nil)

Expand Down Expand Up @@ -54,7 +51,7 @@ defmodule Gnat.Jetstream.API.KV.Watcher do
def handle_info({:msg, %{topic: key, body: body, headers: headers}}, state) do
key = KV.subject_to_key(key, state.bucket_name)

if {@operation_header, @operation_del} in headers do
if KV.is_delete_operation?(headers) do
state.handler.(:key_deleted, key, body)
end

Expand Down
8 changes: 8 additions & 0 deletions test/jetstream/api/kv_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,18 @@ defmodule Gnat.Jetstream.API.KVTest do
KV.put_value(:gnat, bucket, "baz", "quz")
assert_receive({:key_added, "baz", "quz"})

KV.put_value(:gnat, bucket, "quz", "qux")
assert_receive({:key_added, "quz", "qux"})

KV.delete_key(:gnat, bucket, "baz")
# key deletions don't carry the data removed
assert_receive({:key_deleted, "baz", ""})

# ensure we get delete event on purge
KV.purge_key(:gnat, bucket, "quz")
assert_receive({:key_deleted, "quz", ""})


KV.unwatch(watcher_pid)

:ok = KV.delete_bucket(:gnat, bucket)
Expand Down

0 comments on commit 89e6eab

Please sign in to comment.