From 6829bd6b94ea34348436910d15b37463e5c496e5 Mon Sep 17 00:00:00 2001 From: Jesse Date: Thu, 4 Apr 2024 11:51:29 +0200 Subject: [PATCH] Allow batch write retries for aerospike (#2476) --- Cargo.lock | 24 ++++++++++++------------ dozer-sink-aerospike/src/aerospike.rs | 19 ++++++++++++++++--- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af436c64f5..90ea1fa764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,7 +72,7 @@ dependencies = [ "encoding_rs", "flate2", "futures-core", - "h2 0.3.25", + "h2 0.3.26", "http 0.2.12", "httparse", "httpdate", @@ -2629,7 +2629,7 @@ dependencies = [ "deno_net", "deno_tls", "fastwebsockets", - "h2 0.4.3", + "h2 0.4.4", "http 1.1.0", "http-body-util", "hyper 1.1.0", @@ -3998,9 +3998,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -4017,9 +4017,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51ee2dd2e4f378392eeff5d51618cd9a63166a2513846bbc55f21cfacd9199d4" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" dependencies = [ "bytes", "fnv", @@ -4332,14 +4332,14 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.25", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -4355,7 +4355,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.3", + "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -7162,7 +7162,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.25", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -9155,7 +9155,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2 0.3.25", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -9182,7 +9182,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2 0.3.25", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index 1a98592817..5deb836801 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -255,19 +255,23 @@ impl Client { }) } + pub(crate) fn config(&self) -> &as_config { + unsafe { &(*self.inner.as_ptr()).config } + } + pub(crate) unsafe fn write_batch( &self, batch: *mut as_batch_records, + policy: Option<*const as_policy_batch>, ) -> Result<(), AerospikeError> { debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size); let started = Instant::now(); - let policy = self.inner.as_ref().config.policies.batch; as_try(|err| { aerospike_batch_write( self.inner.as_ptr(), err, - &policy as *const as_policy_batch, + policy.unwrap_or(std::ptr::null()), batch, ) })?; @@ -1210,7 +1214,16 @@ impl<'a> WriteBatch<'a> { } pub(crate) fn execute(mut self) -> Result<(), AerospikeError> { - unsafe { self.client.write_batch(self.inner.take().unwrap().as_ptr()) } + let config = self.client.config(); + let mut policy = config.policies.batch; + policy.base.max_retries = 2; + policy.base.sleep_between_retries = 1000; + unsafe { + self.client.write_batch( + self.inner.take().unwrap().as_ptr(), + Some((&policy) as *const as_policy_batch), + ) + } } }