Message / Event Bus written in Erlang.
Authors: Carlos Andres Bolaños R.A. ([email protected]
).
A new way to build soft real-time and high scalable messaging-based applications, not centralized but distributed!
See also: WEST.
ErlBus is a simple and lightweight tool to build messaging-based applications. Provides a simple and usable interface on top of known and proven libs/tools like pg2, gproc and riak_core, enabling a clearer and more powerful semantics for messaging patterns such as: Publish/Subscribe, Point-To-Point, Event-Driven Consumers, Task Executors, etc.
ErlBus also provides a flexible and configurable distribution model to achieve the desired scalability levels. You can run ErlBus in two ways:
- Default. ErlBus running on top of
pg2
and using Distributed Erlang. - ErlBus with
riak_core
andgproc
local. This option gives you more flexibility, you can configure parameters like replication factor and quorums (Read more Here).
Assuming you have a working Erlang installation (recommended 17 or later), building ErlBus should be as simple as:
$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make
ErlBus has 3 dependencies: gproc
, poolboy
and riak_core
. But the last one riak_core
is fetched
on-demand, when you want to run ErlBus in distributed mode, inheriting all riak_core
benefits.
Learn more about Riak Core.
In this scenario, ErlBus runs with gproc
locally on each node, and riak_core
on top of it,
managing the cluster and task/command distribution.
- To enable fetching of
riak_core
, export the OS environment variableEBUS_DIST=true
(this can be done e.g. from a GNU Makefile).
Start an Erlang console:
$ erl -pa ebin deps/*/ebin
Once into the erlang console:
% Start ebus
application:start(ebus).
ok
% Create anonymous function to act as handler
F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>
% Create anonymous handlers
MH1 = ebus_handler:new(F).
<0.50.0>
MH2 = ebus_handler:new(F, {my_ctx, <<"MH2">>}).
<0.52.0>
% Subscribe them to channel ch1
% Note that `ebus:sub/2,3` can receive either a single handler or a list of them
ebus:sub(ch1, [MH1, MH2]).
ok
% Let's publish a message to 'ch1'
ebus:pub(ch1, "Hello!").
[Pid: <0.50.0>][Channel: ch1][Msg: "Hello!"][Ctx: undefined]
[Pid: <0.52.0>][Channel: ch1][Msg: "Hello!"][Ctx: {my_ctx,<<"MH2">>}]
ok
% Another handler
MH3 = ebus_handler:new(F, {my_ctx, <<"MH3">>}).
<0.54.0>
% Subscribe the other handler 'MH3' to ch2
ebus:sub(ch2, MH3).
ok
% Publish to 'ch2'
ebus:pub(ch2, "Hello other!").
[Pid: <0.57.0>][Channel: ch2][Msg: "Hello other!"][Ctx: {my_ctx,<<"MH3">>}]
ok
% Unsubscribe 'MH2' from ch1
% Note that `ebus:unsub/2,3` can also receive a single handler (this case) or a list of them
ebus:unsub(ch1, MH2).
ok
% Publish again to 'ch1'
ebus:pub(ch1, "Hello again!").
[Pid: <0.50.0>][Channel: ch1][Msg: "Hello again!"][Ctx: undefined]
ok
Note:
- You may have noticed that is not necessary additional steps/calls to create/delete a channel, this is automatically handled by
ebus
, so you don't worry about it!
Now, let's make it more fun, start two Erlang consoles, first one:
$ erl -name [email protected] -setcookie ebus -pa ebin deps/*/ebin
The second one:
$ erl -name [email protected] -setcookie ebus -pa ebin deps/*/ebin
Then what we need to do is put these Erlang nodes in cluster, so from any of them send a ping to the other:
% From node1 ping node2
net_adm:ping('[email protected]').
pong
Excellent, we have both nodes in cluster, thanks to the beauty of Distributed Erlang.
So, let's repeat the above exercise but now in two nodes. In both nodes start ebus
:
% Start ebus
application:start(ebus).
ok
Then in node1
create a handler and subscription to a channel:
% Anonymous handler function
F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>
% Subscribe a handler
ebus:sub(ch1, ebus_handler:new(F)).
ok
Repeat the same thing above in node2
.
Once you have handlers subscribed to the same channel in both nodes, publish some messages from any node:
% Publish message
ebus:pub(ch1, "Hi!").
[Pid: <0.62.0>][Channel: ch1][Msg: "Hi!"][Ctx: undefined]
ok
And in the other node you will see that message has arrived too:
[Pid: <0.59.0>][Channel: ch1][Msg: "Hi!"][Ctx: undefined]
ok
So far, so good! Let's continue!
Previously we saw a simple example how Pub/Sub works with ebus
, but now let's digging into
the core modules first.
This module provides all pub/sub messaging functions. Please take a look at ebus module.
This module provides all needed functions to create and manage message handlers. There are two ways to create a message handler:
- Passing an anonymous function as callback (as we saw previously). The callback fun must be compliant
with the spec:
fun(({Channel :: any(), Payload :: any()}, Context :: any()) -> any())
. - Passing an existing module that implements the
ebus_handler
behavior, which defines the callback:handle_msg({Channel :: any(), Payload :: any()}, Context :: any()) -> any()
.
You may have noticed that the specification in both cases (anonymous fun and behaviors) is the same. Both cases receives the same arguments:
Channel
is the logical mechanism that allows communicate two or more endpoints each other (either Pub/Sub or Point-to-Point) through messages.Payload
is the message itself, the content af what you published or dispatched.Context
is an optional parameter that you can pass in the moment of the handler creation, and you want to be able to recovered at the moment of thehandle_msg
invocation.
Now we're going to do an example using both ways. First, we have to create an Erlang module to implement
the behavior ebus_handler
.
-module(my_handler).
-behaviour(ebus_handler).
%% API
-export([handle_msg/2]).
handle_msg({Channel, Msg}, Context) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Context]).
Once you have compiled your module(s) and started an Erlang console:
% Start ebus
application:start(ebus).
ok
% Create a new handler, passing a context as argument
% In this the context is a simple binary string with the name of the handler,
% but it can be anything that you want (tuple, record, map, etc.)
MH1 = ebus_handler:new(my_handler, <<"MH1">>).
<0.49.0>
% Anonymous function
F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>
% Handler with anonymous function
MH2 = ebus_handler:new(F, <<"MH2">>).
<0.50.0>
% From here, everything is the same as previous example
% Subscribe the handler to some channel
ebus:sub(my_channel, [MH1, MH2]).
ok
% Now the handler is ready to receive and process messages
% Publish a message/event
ebus:pub(my_channel, "Hello!").
ok
Again, you can start multiple Erlang nodes, put them in cluster, start ebus
on each one,
and now you have ebus
running in distributed fashion, it's extremely easy, you have not to do
anything at all.
The great thing here is that you don't need something special to implement a point-to-point behavior. Is as simple as this:
ebus:dispatch(ch1, "Hi!", MyHandler).
Instead of call ebus:pub(Channel, Message)
, you call ebus:dispatch(Channel, Message, Handler)
,
and the only difference is that you have to provide the Handler
which will receive the message.
The reason of this is that you're free to implement your scheduling/dispatching strategy. Also,
you can use ebus_util:get_best_pid(ListOfHandlers)
to find an available handler. For example:
%% Start ebus
application:start(ebus).
ok
%% Create some handlers
MH1 = ebus_handler:new(my_handler, <<"MH1">>).
<0.47.0>
MH2 = ebus_handler:new(my_handler, <<"MH2">>).
<0.48.0>
MH3 = ebus_handler:new(my_handler, <<"MH3">>).
<0.49.0>
%% Subscribe created handlers
ebus:sub(my_channel, [MH1, MH2, MH3]).
ok
%% Get the subscribed handlers
Handlers = ebus:subscribers(my_channel).
[<0.47.0>, <0.48.0>, <0.49.0>]
%% Find an available handler
Handler = ebus_util:get_best_pid(Handlers).
<0.47.0>
ebus:dispatch(my_channel, "Hi!", Handler).
Channel: ch1 - Msg: "Hi!"
ok
Note:
- The example above, assumes that you're working with the previous compiled handler
my_hanlder.erl
.
Suppose now that you have a handler that takes a while processing each message/event, so it will
be blocked until complete the task, and for some scenarios would be unthinkable. Therefore,
ebus_handler
module gives you the option to create a pool of workers attached to your handler,
and is totally transparent to you.
% Start ebus
application:start(ebus).
ok
% Create a handler with a worker pool (3 workers)
Pool1 = ebus_handler:new_pool(my_pool_1, 3, my_handler).
<0.49.0>
% Anonymous function
F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>
% Pool with anonymous function
Pool2 = ebus_handler:new_pool(my_pool_2, 3, F).
<0.50.0>
% And that's it, now the load will be distributed among the workers
% From here everything is as previously
% Finally, let's subscribe this new handler with workers to some channel
ebus:sub(my_channel, [Pool1, Pool2]).
ok
Note:
- Another way to get a point-to-point behavior is using the native pub/sub functions and task executors. The idea is to have just one handler with a pool of workers subscribed to one channel. So all published messages to that channel will be processed only by one worker attached to the handler (since there is only one subscribed handler).
ErlBus is distributed by default, inherits all properties of Distributed Erlang
and pg2
. But pg2
has some limitations, distribution model
works with full replication, which can cause problem when we have a considerable amount of subscribers,
and at the same time the amount of messages sent is too high. So for these scenarios ErlBus provides
another option: ebus_dist
, which is built on top of riak_core
and gproc
.
To start ebus_dist
, you must invoke erl
with: -ebus ebus_dist all
, and with a configuration file (with
riak_core
config): -config your_config_file.config
(you can use files located in config/*.config
).
Let's start two Erlang nodes:
$ erl -name [email protected] -setcookie ebus -pa ebin deps/*/ebin -ebus ebus_dist all -config config/dev1.config
$ erl -name [email protected] -setcookie ebus -pa ebin deps/*/ebin -ebus ebus_dist all -config config/dev2.config
Start ebus
on each one:
% Start ebus
application:start(ebus).
12:05:30.633 [info] Application lager started on node '[email protected]'
12:05:30.743 [info] Application sasl started on node '[email protected]'
12:05:30.768 [info] Application crypto started on node '[email protected]'
12:05:30.801 [info] Application riak_sysmon started on node '[email protected]'
12:05:30.884 [info] Application os_mon started on node '[email protected]'
12:05:30.892 [info] alarm_handler: {set,{system_memory_high_watermark,[]}}
12:05:30.918 [info] Application basho_stats started on node '[email protected]'
12:05:30.938 [info] Application eleveldb started on node '[email protected]'
12:05:30.960 [info] Application pbkdf2 started on node '[email protected]'
12:05:30.981 [info] Application poolboy started on node '[email protected]'
12:05:31.086 [info] Starting reporters with []
12:05:31.086 [info] Application exometer_core started on node '[email protected]'
12:05:31.154 [info] Application clique started on node '[email protected]'
12:05:31.374 [warning] No ring file available.
12:05:31.607 [info] monitor long_schedule <0.160.0> [{timeout,62},{in,{code_server,call,2}},{out,{code_server,'-handle_on_load/4-fun-0-',1}}]
12:05:32.306 [info] New capability: {riak_core,vnode_routing} = proxy
12:05:32.626 [info] New capability: {riak_core,staged_joins} = true
12:05:32.772 [info] New capability: {riak_core,resizable_ring} = true
12:05:32.895 [info] New capability: {riak_core,fold_req_version} = v2
12:05:33.039 [info] New capability: {riak_core,security} = true
12:05:33.196 [info] New capability: {riak_core,bucket_types} = true
12:05:33.330 [info] New capability: {riak_core,net_ticktime} = true
12:05:33.779 [info] Application riak_core started on node '[email protected]'
12:05:33.950 [info] Application ebus started on node '[email protected]'
ok
Add node2
to cluster using node1:
% From node2:
ebus_dist_console:join(["[email protected]"]).
Sent join request to node1@127.0.0.1
ok
Now node1
and node2
are in cluster. From here is the same as previous examples.
See examples.
$ make test
All notable changes to this project will be documented in the CHANGELOG.md.