A pulsar client implemented purely in elixir.
def deps do
[
{:pulsar_ex, "~> 0.14"}
]
end
- Topic discovery on partition changes
- Graceful shutdown for both producer and consumer
- compressed message payload on producer - lz4, zlib, zstd, snappy
- Batched producer
- Consumer with batch index support
- Producer Retry Strategy
- Consumer Workers with rich features
- Job Interceptors
- Job Middlewares
- Verbose Logging and Telemetry
- Multiple Clusters
- Regex Based Consumer
PulsarEx.produce("persistent://public/default/test.json", "payload")
This creates a producer on the fly with default options, check PulsarEx.Producer
PulsarEx.produce("persistent://public/default/test.json", "payload", [properties: %{prop1: "1", prop2: "2", partition_key: "123", ordering_key: "321", event_time: Timex.now()}])
PulsarEx.produce("persistent://public/default/test.json", "payload", [delay: 5_000])
deliver_at = Timex.add(Timex.now(), Timex.Duration.from_milliseconds(10_000))
PulsarEx.produce("persistent://public/default/test.json", "payload", [deliver_at_time: deliver_at])
All the producers are pooled, by default the pool size is 1, and to create a pool of producers 5 producers
PulsarEx.produce("persistent://public/default/test.json", "payload", [], [num_producers: 5])
Note: the pool is per partition, so if you have a partitioned topics with 10 partitions, you will get total of 10 pools, each with 5 producers. Messages are routed to partitions based on partition_key, or in a round-robin fashion if no partition key is specified. If the topic already exists on pulsar, PulsarEx will lookup the topic and find out how many partitions the topic has. PulsarEx also periodically detects if there are more partitions added to the topic, so no application restart is needed.
By default, messages are not batched when publishing to pulsar. To enable batch, and change batch size
PulsarEx.produce("persistent://public/default/test.json", "payload", [], [batch_enabled: true, batch_size: 100])
You can also change the flush interval for batched producing
PulsarEx.produce("persistent://public/default/test.json", "payload", [], [batch_enabled: true, batch_size: 100, flush_interval: 1_000])
First, you will need to add nimble_lz4
to your deps
defp deps do
{:nimble_lz4, "~> 0.1.2"}
end
And then
PulsarEx.produce("persistent://public/default/test.json", "payload", [], [compression: :lz4])
def MyConsumer do
use PulsarEx.Consumer
@impl ConsumerCallback
def handle_messages(messages, _state) do
Enum.map(messages, &Logger.info/1)
end
end
And then
MyConsumer.start()
To see the supported options, check PulsarEx.Consumer
.
Note: Raw consumers always handle messages in batches, so the handle_messages
callback expects a list of messages. However, batch_size can be specified at time of starting the consumer
MyConsumer.start(batch_size: 1)
And then you can simply do:
@impl PulsarEx.ConsumerCallback
def handle_messages([message], _state) do
IO.inspect(message)
[:ok]
end
Important Note: to instruct the consumer to send ACK or NACK, PulsarEx.Consumer relies on the returned value of your handle_messages callback. This callback expects a list of 3 possible values: :ok | {:ok, any()} | {:error, any()}, see PulsarEx.ConsumerCallback
You can also specify the consumer options in the definition of your consumer:
def MyConsumer do
use PulsarEx.Consumer,
batch_size: 1,
subscription: "test",
subscription_type: :failover,
receiving_queue_size: 1000,
redelivery_policy: :exp,
max_redelivery_attempts: 5,
dead_letter_topic: "persistent://public/default/dlq",
dead_letter_producer_opts: [num_producers: 3]
@impl ConsumerCallback
def handle_messages(messages, _state) do
Enum.map(messages, &Logger.info/1)
end
end
If you are looking for using PulsarEx as a job queue. You should use PulsarEx.Worker
, instead of PulsarEx.Consumer
. The worker module provides a rich set of tools and functions, as well as logging & stats for you to work with pulsar topics as your job queue. And the worker module also supports all the options that are supported by the PulsarEx.Consumer
module, except for the batch_size
, that the worker module only processes 1 message at a time.
See PulsarEx.WorkerCallback
for the callbacks expected to implement your worker.
defmodule TestWorker do
use PulsarEx.Worker,
otp_app: :my_app,
topic: "persistent://public/default/test.json",
subscription: "test",
subscription_type: :failover,
receiving_queue_size: 1000,
redelivery_policy: :exp,
max_redelivery_attempts: 5,
dead_letter_topic: "persistent://public/default/dlq",
dead_letter_producer_opts: [num_producers: 3]
def handle_job(%JobState{} = message) do
IO.inspect(message)
:ok
end
end
Note: while in PulsarEx.Consumer
, you are handling a list of PulsarEx.ConsumerMessage
, in the PulsarEx.Worker, you are handling a struct that's wrapping the PulsarEx.ConsumerMessage
, which is the PulsarEx.JobState
PulsarEx.JobState
is useful for writing consumer middlewares, we have two middlewares that are loaded by default for each worker:
PulsarEx.Middlewares.Logging
and PulsarEx.Middlewares.Telemetry
To write your own middleware and load it into your worker:
defmodule MyWorker.Middlewares.Test do
@behaviour PulsarEx.Middleware
import PulsarEx.JobState
@impl true
def call(handler) do
fn %PulsarEx.JobState{} = job_state ->
job_state
|> assign(:property1, "My test middleware")
|> handler.()
end
end
end
And then
defmodule MyWorker do
use PulsarEx.Worker,
otp_app: :my_app,
middlewares: [
MyWorker.Middlewares.Test
]
...
end
Middlewares are useful especially when you have multiple workers that share the same execution logic, such as logging, stats, exception handling, etc.
At the very basic level, you can do
MyWorker.enqueue(%{"my_job_context" => 123} = _params)
Params are encoded into json and published to pulsar as payload. You can also specify message options:
MyWorker.enqueue(%{"my_job_context" => 123}, properties: %{prop1: "1"}, partition_key: 123)
If you want to include your partition logic in your worker, you can instead implement the partition_key
or ordering_key
callbacks
defmodule MyWorker do
use PulsarEx.Worker,
otp_app: :my_app,
middlewares: [
MyWorker.Middlewares.Test
]
@impl PulsarEx.WorkerCallback
def partition_key(%{id: id} = _params, _message_opts) do
"#{id}"
end
@impl PulsarEx.WorkerCallback
def ordering_key(%{ip_address: ip_address} = _params, _message_opts) do
"#{ip_address}"
end
...
end
And then
MyWorker.enqueue(%{"my_job_context" => 123}, properties: %{prop1: "1"})
partition_key and ordering_key will be calculated based on your job payload.
defmodule MyWorker do
use PulsarEx.Worker,
otp_app: :my_app,
middlewares: [
MyWorker.Middlewares.Test
],
jobs: [:create, :update]
@impl PulsarEx.WorkerCallback
def handle_job(:create, %{properties: properties} = state) do
IO.inspect(state)
:ok
end
@impl PulsarEx.WorkerCallback
def handle_job(:update, %{properties: properties} = state) do
IO.inspect(state)
:ok
end
end
So when you enqueue the job
MyWorker.enqueue_job(:create, %{prop1: "test"})
This is useful when you want to take advantage of elixir's pattern matching to handle a bundle of similar processing of messages.
There are several ways to make it possible:
Consume topics in the same namespace:
Consumes any topics with .json
suffix under my-tenant/my-namespace
defmodule MyWorker do
use PulsarEx.Worker,
otp_app: :my_app,
tenant: "my-tenant",
namespace: "my-namespace",
topic: ~r/.*\.json/
...
@impl PulsarEx.WorkerCallback
def topic(%{id: 1}, _message_opts) do
"persistent://my-tenant/my-namespace/1.json"
end
def topic(%{id: 2}, _message_opts) do
"persistent://my-tenant/my-namespace/2.json"
end
...
end
MyWorker.start()
Define a consumer without topic/subscription
defmodule MyWorker do
use PulsarEx.Worker,
otp_app: :my_app,
...
@impl PulsarEx.WorkerCallback
def topic(%{id: 1}, _message_opts) do
"persistent://my-tenant/my-namespace/1.json"
end
def topic(%{id: 2}, _message_opts) do
"persistent://my-tenant/my-namespace/2.json"
end
...
end
PulsarEx.start_consumer(topic, subscription, MyWorker, opts)
PulsarEx.start_consumer(tenant_or_regex, namespace_or_regex, topic_or_regex, subscription, MyWorker, opts)
PulsarEx manages to discover topics based on all your tenant, namespace, and topic, either they are string or regex patterns. Consumer will also periodically discover new topics when they are added.
Sometimes you want to have some logic that allows you to skip publishing a job, for example deduping, or simply have more stats/logs. So you can write an interceptor
defmodule MyInterceptor do
@behaviour PulsarEx.Interceptor
alias PulsarEx.JobInfo
require Logger
@impl true
def call(handler) do
fn %JobInfo{worker: worker, topic: topic, job: job} = job_info ->
Logger.info("publishing job #{job} for worker #{worker} to topic #{topic}")
handler.(job_info)
end
end
end
And you can load it up in your worker:
defmodule MyWorker do
use PulsarEx.Worker
interceptors: [MyInterceptor]
end
How to write tests that involves enqueue/consumer logic?
In your config/test.exs, you can do
config :my_app, MyWorker,
workers: 5, # starts a pool of 5 consumers per partition
max_redelivery_attempts: 3,
inline: true
All the options you have specified in the definition of your worker can also be specified here.
The inline
option in your test environment enables you to run your handle_job
logic, inline with your enqueue/enqueue_job
call. Which by-passes the actual message routing through pulsar, making your tests independent of a pulsar server. For example, in your test:
def MyWorker do
use PulsarEx.Worker,
jobs: [:create]
def handle_job(:create, %{payload: %{test: true}}) do
raise "This is a test"
end
end
defmodule MyWorkerTest do
use ExUnit.Case
describe "create" do
test "raise exception for test" do
assert_raise RuntimeError, "This is a test", fn ->
MyWorker.enqueue_job(:create, %{test: true})
end
end
end
end