From f1aa8bd94217da2896e7c037aa00aa7dcd128457 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Fri, 15 Jan 2010 18:30:06 +0000 Subject: [PATCH] Initial commit. --- Makefile | 4 ++ ebin/rabbit_lvc_plugin.app | 12 ++++++ include/rabbit_lvc_plugin.hrl | 4 ++ src/rabbit_exchange_type_lvc.erl | 72 ++++++++++++++++++++++++++++++++ src/rabbit_lvc_plugin.erl | 61 +++++++++++++++++++++++++++ src/rabbit_lvc_plugin_sup.erl | 17 ++++++++ 6 files changed, 170 insertions(+) create mode 100644 Makefile create mode 100644 ebin/rabbit_lvc_plugin.app create mode 100644 include/rabbit_lvc_plugin.hrl create mode 100644 src/rabbit_exchange_type_lvc.erl create mode 100644 src/rabbit_lvc_plugin.erl create mode 100644 src/rabbit_lvc_plugin_sup.erl diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e4b5edd --- /dev/null +++ b/Makefile @@ -0,0 +1,4 @@ +PACKAGE=rabbit_lvc_plugin +DEPS=rabbitmq-server rabbitmq-erlang-client + +include ../include.mk diff --git a/ebin/rabbit_lvc_plugin.app b/ebin/rabbit_lvc_plugin.app new file mode 100644 index 0000000..8a7856a --- /dev/null +++ b/ebin/rabbit_lvc_plugin.app @@ -0,0 +1,12 @@ +{application, rabbit_lvc_plugin, + [{description, "RabbitMQ last-value cache exchange plugin"}, + {vsn, "0.01"}, + {modules, [ + rabbit_lvc_plugin, + rabbit_lvc_plugin_sup, + rabbit_exchange_type_lvc + ]}, + {registered, []}, + {mod, {rabbit_lvc_plugin, []}}, + {env, []}, + {applications, [kernel, stdlib, rabbit, mnesia]}]}. diff --git a/include/rabbit_lvc_plugin.hrl b/include/rabbit_lvc_plugin.hrl new file mode 100644 index 0000000..f415614 --- /dev/null +++ b/include/rabbit_lvc_plugin.hrl @@ -0,0 +1,4 @@ +-define(LVC_TABLE, lvc). + +-record(cachekey, {exchange, routing_key}). +-record(cached, {key, content}). diff --git a/src/rabbit_exchange_type_lvc.erl b/src/rabbit_exchange_type_lvc.erl new file mode 100644 index 0000000..8b7abea --- /dev/null +++ b/src/rabbit_exchange_type_lvc.erl @@ -0,0 +1,72 @@ +-module(rabbit_exchange_type_lvc). +-include_lib("../rabbitmq-server/include/rabbit.hrl"). +-include("rabbit_lvc_plugin.hrl"). + +-behaviour(rabbit_exchange_behaviour). + +-rabbit_boot_step({?MODULE, + [{mfa, {rabbit_exchange_type, register, [<<"x-lvc">>, ?MODULE]}}, + {post, rabbit_exchange_type}, + {pre, exchange_recovery}]}). + +-export([description/0, publish/2]). +-export([validate/1, recover/2, create/1, delete/2, add_binding/2, delete_binding/2]). + +description() -> + {{name, <<"lvc">>}, + {description, <<"Last-value cache exchange.">>}}. + +publish(Exchange = #exchange{name = Name}, + Delivery = #delivery{message = #basic_message{ + routing_key = RK, + content = Content + }}) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = mnesia:write(?LVC_TABLE, + #cached{key = #cachekey{exchange=Name, routing_key=RK}, + content = Content}, + write) + end), + rabbit_exchange_type_direct:publish(Exchange, Delivery). + +%% TODO in recover, init, delete: manage entries for a table + +validate(_X) -> ok. + +recover(X, _Bs) -> create(X). + +create(_X) -> ok. + +delete(_X, __Bs) -> ok. + +add_binding(#exchange{ name = XName }, + #binding{ key = RoutingKey, + queue_name = QueueName }) -> + io:format("LVC bind ~p to ~p", [XName, RoutingKey]), + case mnesia:dirty_read( + ?LVC_TABLE, + #cachekey{exchange=XName, routing_key=RoutingKey}) of + [] -> ok; + [#cached{content = Content}] -> + case rabbit_amqqueue:lookup(QueueName) of + %% if there's one, it's this one + {error, not_found} -> + rabbit_misc:protocol_error( + internal_error, + "could not find binding for routing key '~s'", + [RoutingKey]); + {ok, #amqqueue{ pid = Q }} -> + io:format("LVC deliver-on-bind '~s'", [RoutingKey]), + rabbit_amqqueue:deliver(Q, + #delivery{ + message = #basic_message{ + content = Content, + exchange_name = XName, + routing_key = RoutingKey + }}) + end + end, + ok. + +delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_lvc_plugin.erl b/src/rabbit_lvc_plugin.erl new file mode 100644 index 0000000..4bf8f42 --- /dev/null +++ b/src/rabbit_lvc_plugin.erl @@ -0,0 +1,61 @@ +-module(rabbit_lvc_plugin). + +-include("rabbit_lvc_plugin.hrl"). + +-define(APPNAME, ?MODULE). + +-behaviour(application). +-behaviour(gen_server). + +-export([start/2, stop/1]). +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% Application + +start(normal, []) -> + io:format("starting ~s...", [?APPNAME]), + ok = setup_schema(), + {ok, SupPid} = rabbit_lvc_plugin_sup:start_link(), + io:format(" done~n"), + {ok, SupPid}. + +stop(_State) -> + ok. + +%% For supervisor + +start_link() -> + gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). + +%% gen_server -- I don't need this (yet) + +init([]) -> + {ok, ok}. + +handle_call(Msg,_From,State) -> + {ok, State}. + +handle_cast(_,State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_,State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% private + +setup_schema() -> + case mnesia:create_table(?LVC_TABLE, + [{attributes, record_info(fields, cached)}, + {record_name, cached}, + {type, set}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, ?LVC_TABLE}} -> ok + end. diff --git a/src/rabbit_lvc_plugin_sup.erl b/src/rabbit_lvc_plugin_sup.erl new file mode 100644 index 0000000..f8e564b --- /dev/null +++ b/src/rabbit_lvc_plugin_sup.erl @@ -0,0 +1,17 @@ +-module(rabbit_lvc_plugin_sup). +-behaviour(supervisor). + +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []). + +init([]) -> + {ok, {{one_for_one, 3, 10}, + [{rabbit_lvc_plugin, + {rabbit_lvc_plugin, start_link, []}, + permanent, + 10000, + worker, + [rabbit_lvc_plugin]} + ]}}.