Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Bridgen committed Jan 15, 2010
0 parents commit f1aa8bd
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PACKAGE=rabbit_lvc_plugin
DEPS=rabbitmq-server rabbitmq-erlang-client

include ../include.mk
12 changes: 12 additions & 0 deletions ebin/rabbit_lvc_plugin.app
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{application, rabbit_lvc_plugin,
[{description, "RabbitMQ last-value cache exchange plugin"},
{vsn, "0.01"},
{modules, [
rabbit_lvc_plugin,
rabbit_lvc_plugin_sup,
rabbit_exchange_type_lvc
]},
{registered, []},
{mod, {rabbit_lvc_plugin, []}},
{env, []},
{applications, [kernel, stdlib, rabbit, mnesia]}]}.
4 changes: 4 additions & 0 deletions include/rabbit_lvc_plugin.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-define(LVC_TABLE, lvc).

-record(cachekey, {exchange, routing_key}).
-record(cached, {key, content}).
72 changes: 72 additions & 0 deletions src/rabbit_exchange_type_lvc.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-module(rabbit_exchange_type_lvc).
-include_lib("../rabbitmq-server/include/rabbit.hrl").
-include("rabbit_lvc_plugin.hrl").

-behaviour(rabbit_exchange_behaviour).

-rabbit_boot_step({?MODULE,
[{mfa, {rabbit_exchange_type, register, [<<"x-lvc">>, ?MODULE]}},
{post, rabbit_exchange_type},
{pre, exchange_recovery}]}).

-export([description/0, publish/2]).
-export([validate/1, recover/2, create/1, delete/2, add_binding/2, delete_binding/2]).

description() ->
{{name, <<"lvc">>},
{description, <<"Last-value cache exchange.">>}}.

publish(Exchange = #exchange{name = Name},
Delivery = #delivery{message = #basic_message{
routing_key = RK,
content = Content
}}) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
ok = mnesia:write(?LVC_TABLE,
#cached{key = #cachekey{exchange=Name, routing_key=RK},
content = Content},
write)
end),
rabbit_exchange_type_direct:publish(Exchange, Delivery).

%% TODO in recover, init, delete: manage entries for a table

validate(_X) -> ok.

recover(X, _Bs) -> create(X).

create(_X) -> ok.

delete(_X, __Bs) -> ok.

add_binding(#exchange{ name = XName },
#binding{ key = RoutingKey,
queue_name = QueueName }) ->
io:format("LVC bind ~p to ~p", [XName, RoutingKey]),
case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{exchange=XName, routing_key=RoutingKey}) of
[] -> ok;
[#cached{content = Content}] ->
case rabbit_amqqueue:lookup(QueueName) of
%% if there's one, it's this one
{error, not_found} ->
rabbit_misc:protocol_error(
internal_error,
"could not find binding for routing key '~s'",
[RoutingKey]);
{ok, #amqqueue{ pid = Q }} ->
io:format("LVC deliver-on-bind '~s'", [RoutingKey]),
rabbit_amqqueue:deliver(Q,
#delivery{
message = #basic_message{
content = Content,
exchange_name = XName,
routing_key = RoutingKey
}})
end
end,
ok.

delete_binding(_X, _B) -> ok.
61 changes: 61 additions & 0 deletions src/rabbit_lvc_plugin.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-module(rabbit_lvc_plugin).

-include("rabbit_lvc_plugin.hrl").

-define(APPNAME, ?MODULE).

-behaviour(application).
-behaviour(gen_server).

-export([start/2, stop/1]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

%% Application

start(normal, []) ->
io:format("starting ~s...", [?APPNAME]),
ok = setup_schema(),
{ok, SupPid} = rabbit_lvc_plugin_sup:start_link(),
io:format(" done~n"),
{ok, SupPid}.

stop(_State) ->
ok.

%% For supervisor

start_link() ->
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).

%% gen_server -- I don't need this (yet)

init([]) ->
{ok, ok}.

handle_call(Msg,_From,State) ->
{ok, State}.

handle_cast(_,State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(_,State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% private

setup_schema() ->
case mnesia:create_table(?LVC_TABLE,
[{attributes, record_info(fields, cached)},
{record_name, cached},
{type, set}]) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?LVC_TABLE}} -> ok
end.
17 changes: 17 additions & 0 deletions src/rabbit_lvc_plugin_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-module(rabbit_lvc_plugin_sup).
-behaviour(supervisor).

-export([start_link/0, init/1]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).

init([]) ->
{ok, {{one_for_one, 3, 10},
[{rabbit_lvc_plugin,
{rabbit_lvc_plugin, start_link, []},
permanent,
10000,
worker,
[rabbit_lvc_plugin]}
]}}.

0 comments on commit f1aa8bd

Please sign in to comment.