Skip to content

Commit

Permalink
storage: add leo_storage_handler_object:delete/4 for #702
Browse files Browse the repository at this point in the history
  • Loading branch information
mocchira committed Apr 20, 2017
1 parent 391742d commit a68ff10
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions apps/leo_storage/src/leo_storage_handler_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

-export([get/1, get/2, get/3, get/4, get/5,
put/1, put/2, put/4,
delete/1, delete/2, delete/3,
delete/1, delete/2, delete/3, delete/4,
delete_objects_under_dir/1,
delete_objects_under_dir/2,
delete_objects_under_dir/3,
Expand Down Expand Up @@ -346,7 +346,7 @@ put(Object, ReqId) ->
Ret = replicate_fun(?REP_LOCAL, ?CMD_PUT, Object#?OBJECT.addr_id,
Object#?OBJECT{method = ?CMD_PUT,
clock = leo_date:clock(),
req_id = ReqId}),
req_id = ReqId}, gateway),
case Ret of
{ok, _} ->
?access_log_put(Object#?OBJECT.key, Object#?OBJECT.dsize, ReqId, BeginTime, ok);
Expand Down Expand Up @@ -526,7 +526,7 @@ delete(Object, ReqId, CheckUnderDir) ->
dsize = 0,
clock = leo_date:clock(),
req_id = ReqId,
del = ?DEL_TRUE}) of
del = ?DEL_TRUE}, gateway) of
{ok,_} ->
?access_log_delete(Key, Object#?OBJECT.dsize, ReqId, BeginTime, ok),
delete_1(ok, Object, CheckUnderDir);
Expand All @@ -540,6 +540,33 @@ delete(Object, ReqId, CheckUnderDir) ->
{error, Cause}
end.

%% @doc Remova an object (request from leo_mq)
-spec(delete(Object, ReqId, CheckUnderDir, From) ->
ok | {error, any()} when Object::#?OBJECT{},
ReqId::integer()|reference(),
CheckUnderDir::boolean(),
From::atom()).
delete(Object, ReqId, CheckUnderDir, leo_mq) ->
Key = Object#?OBJECT.key,
?debug("delete/3", [{from, leo_mq}, {method, del}, {key, Key}, {req_id, ReqId}]),
case replicate_fun(?REP_LOCAL, ?CMD_DELETE,
Object#?OBJECT.addr_id,
Object#?OBJECT{method = ?CMD_DELETE,
data = <<>>,
dsize = 0,
clock = leo_date:clock(),
req_id = ReqId,
del = ?DEL_TRUE}, leo_mq) of
{ok,_} ->
delete_1(ok, Object, CheckUnderDir);
{error, Cause = not_found} ->
delete_1({error, Cause}, Object, CheckUnderDir);
{error, Cause} ->
?error("delete/4", [{from, leo_mq}, {method, del},
{key, Object#?OBJECT.key}, {req_id, ReqId}, {cause, Cause}]),
{error, Cause}
end.

%% @private
delete_1(Ret,_Object, false) ->
Ret;
Expand Down Expand Up @@ -1154,18 +1181,24 @@ read_and_repair_3(_,_,_) ->

%% @doc Replicate an object from local-node to remote node
%% @private
-spec(replicate_fun(replication(), request_verb(), integer(), #?OBJECT{}) ->
-spec(replicate_fun(replication(), request_verb(), integer(), #?OBJECT{}, atom()) ->
{ok, ETag} | {error, any()} when ETag::{etag, integer()}).
replicate_fun(?REP_LOCAL, Method, AddrId, Object) ->
replicate_fun(?REP_LOCAL, Method, AddrId, Object, From) ->
%% Check state of the node
case leo_watchdog_state:find_not_safe_items(?WD_EXCLUDE_ITEMS) of
not_found ->
case leo_redundant_manager_api:get_redundancies_by_addr_id(put, AddrId) of
{ok, #redundancies{nodes = Redundancies,
n = NumOfReplicas,
w = WriteQuorum,
d = DeleteQuorum,
ring_hash = RingHash}} ->
Quorum = ?quorum(Method, WriteQuorum, DeleteQuorum),
Quorum = case From of
leo_mq ->
?quorum(Method, NumOfReplicas, NumOfReplicas);
_ ->
?quorum(Method, WriteQuorum, DeleteQuorum)
end,
leo_storage_replicator:replicate(
Method, Quorum, Redundancies,
Object#?OBJECT{ring_hash = RingHash},
Expand Down

0 comments on commit a68ff10

Please sign in to comment.