diff --git a/apps/leo_storage/src/leo_storage_handler_object.erl b/apps/leo_storage/src/leo_storage_handler_object.erl index 0b26d52b..a9ba3679 100644 --- a/apps/leo_storage/src/leo_storage_handler_object.erl +++ b/apps/leo_storage/src/leo_storage_handler_object.erl @@ -36,7 +36,7 @@ -export([get/1, get/2, get/3, get/4, get/5, put/1, put/2, put/4, - delete/1, delete/2, delete/3, + delete/1, delete/2, delete/3, delete/4, delete_objects_under_dir/1, delete_objects_under_dir/2, delete_objects_under_dir/3, @@ -346,7 +346,7 @@ put(Object, ReqId) -> Ret = replicate_fun(?REP_LOCAL, ?CMD_PUT, Object#?OBJECT.addr_id, Object#?OBJECT{method = ?CMD_PUT, clock = leo_date:clock(), - req_id = ReqId}), + req_id = ReqId}, gateway), case Ret of {ok, _} -> ?access_log_put(Object#?OBJECT.key, Object#?OBJECT.dsize, ReqId, BeginTime, ok); @@ -526,7 +526,7 @@ delete(Object, ReqId, CheckUnderDir) -> dsize = 0, clock = leo_date:clock(), req_id = ReqId, - del = ?DEL_TRUE}) of + del = ?DEL_TRUE}, gateway) of {ok,_} -> ?access_log_delete(Key, Object#?OBJECT.dsize, ReqId, BeginTime, ok), delete_1(ok, Object, CheckUnderDir); @@ -540,6 +540,33 @@ delete(Object, ReqId, CheckUnderDir) -> {error, Cause} end. +%% @doc Remova an object (request from leo_mq) +-spec(delete(Object, ReqId, CheckUnderDir, From) -> + ok | {error, any()} when Object::#?OBJECT{}, + ReqId::integer()|reference(), + CheckUnderDir::boolean(), + From::atom()). +delete(Object, ReqId, CheckUnderDir, leo_mq) -> + Key = Object#?OBJECT.key, + ?debug("delete/3", [{from, leo_mq}, {method, del}, {key, Key}, {req_id, ReqId}]), + case replicate_fun(?REP_LOCAL, ?CMD_DELETE, + Object#?OBJECT.addr_id, + Object#?OBJECT{method = ?CMD_DELETE, + data = <<>>, + dsize = 0, + clock = leo_date:clock(), + req_id = ReqId, + del = ?DEL_TRUE}, leo_mq) of + {ok,_} -> + delete_1(ok, Object, CheckUnderDir); + {error, Cause = not_found} -> + delete_1({error, Cause}, Object, CheckUnderDir); + {error, Cause} -> + ?error("delete/4", [{from, leo_mq}, {method, del}, + {key, Object#?OBJECT.key}, {req_id, ReqId}, {cause, Cause}]), + {error, Cause} + end. + %% @private delete_1(Ret,_Object, false) -> Ret; @@ -1154,18 +1181,24 @@ read_and_repair_3(_,_,_) -> %% @doc Replicate an object from local-node to remote node %% @private --spec(replicate_fun(replication(), request_verb(), integer(), #?OBJECT{}) -> +-spec(replicate_fun(replication(), request_verb(), integer(), #?OBJECT{}, atom()) -> {ok, ETag} | {error, any()} when ETag::{etag, integer()}). -replicate_fun(?REP_LOCAL, Method, AddrId, Object) -> +replicate_fun(?REP_LOCAL, Method, AddrId, Object, From) -> %% Check state of the node case leo_watchdog_state:find_not_safe_items(?WD_EXCLUDE_ITEMS) of not_found -> case leo_redundant_manager_api:get_redundancies_by_addr_id(put, AddrId) of {ok, #redundancies{nodes = Redundancies, + n = NumOfReplicas, w = WriteQuorum, d = DeleteQuorum, ring_hash = RingHash}} -> - Quorum = ?quorum(Method, WriteQuorum, DeleteQuorum), + Quorum = case From of + leo_mq -> + ?quorum(Method, NumOfReplicas, NumOfReplicas); + _ -> + ?quorum(Method, WriteQuorum, DeleteQuorum) + end, leo_storage_replicator:replicate( Method, Quorum, Redundancies, Object#?OBJECT{ring_hash = RingHash},