Skip to content

Commit

Permalink
fix(sink): handle Kinesis PutRecords partial success and throttle (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 13, 2024
1 parent cad31da commit cd9c31e
Showing 1 changed file with 185 additions and 36 deletions.
221 changes: 185 additions & 36 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use aws_sdk_kinesis::operation::put_records::builders::PutRecordsFluentBuilder;
use aws_sdk_kinesis::operation::put_records::PutRecordsOutput;
use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_sdk_kinesis::types::{PutRecordsRequestEntry, PutRecordsResultEntry};
use aws_sdk_kinesis::Client as KinesisClient;
use futures::{FutureExt, TryFuture};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use with_options::WithOptions;

use super::catalog::SinkFormatDesc;
Expand Down Expand Up @@ -155,9 +154,9 @@ pub struct KinesisSinkWriter {
}

struct KinesisSinkPayloadWriter {
// builder should always be `Some`. Making it an option so that we can call
// builder methods that take the builder ownership as input and return with a new builder.
builder: Option<PutRecordsFluentBuilder>,
client: KinesisClient,
entries: Vec<(PutRecordsRequestEntry, usize)>,
stream_name: String,
}

impl KinesisSinkWriter {
Expand Down Expand Up @@ -191,39 +190,173 @@ impl KinesisSinkWriter {
}

fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
let builder = self
.client
.put_records()
.stream_name(&self.config.common.stream_name);
KinesisSinkPayloadWriter {
builder: Some(builder),
client: self.client.clone(),
entries: vec![],
stream_name: self.config.common.stream_name.clone(),
}
}
}

mod opaque_type {
use std::cmp::min;
use std::time::Duration;

use thiserror_ext::AsReport;
use tokio::time::sleep;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::warn;

use super::*;
pub type KinesisSinkPayloadWriterDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;

impl KinesisSinkPayloadWriter {
pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
// For reference to the behavior of `put_records`
// https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

async move {
let builder = self.builder.expect("should not be None");
let context_fmt = format!(
"failed to put record to {}",
builder
.get_stream_name()
.as_ref()
.expect("should have set stream name")
);
Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| builder.clone().send(),
)
.await
.with_context(|| context_fmt.clone())
.map_err(SinkError::Kinesis)?;
// From the doc of `put_records`:
// Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MiB,
// up to a limit of 5 MiB for the entire request, including partition keys. Each shard can support writes up to
// 1,000 records per second, up to a maximum data write total of 1 MiB per second.

const MAX_RECORD_COUNT: usize = 500;
const MAX_SINGLE_RECORD_PAYLOAD_SIZE: usize = 1 << 20;
const MAX_TOTAL_RECORD_PAYLOAD_SIZE: usize = 5 * (1 << 20);
// Allow at most 3 times of retry when not making any progress to avoid endless retry
const MAX_NO_PROGRESS_RETRY_COUNT: usize = 3;

let mut remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
let total_count = self.entries.len();
let mut start_idx = 0;

let mut throttle_delay = None;

while start_idx < total_count {
// 1. Prepare the records to be sent

// The maximum possible number of records that can be sent in this iteration.
// Can be smaller than this number when the total payload size exceeds `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
let max_record_count = min(MAX_RECORD_COUNT, total_count - start_idx);
let mut records = Vec::with_capacity(max_record_count);
let mut total_payload_size = 0;
for i in start_idx..(start_idx + max_record_count) {
let (record, size) = &self.entries[i];
if *size >= MAX_SINGLE_RECORD_PAYLOAD_SIZE {
warn!(
size,
partition = record.partition_key,
"encounter a large single record"
);
}
if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE {
total_payload_size += *size;
records.push(record.clone());
} else {
break;
}
}
if records.is_empty() {
// at least include one record even if its size exceed `MAX_TOTAL_RECORD_PAYLOAD_SIZE`
records.push(self.entries[start_idx].0.clone());
}

// 2. send the records and handle the result
let record_count = records.len();
match self
.client
.put_records()
.stream_name(&self.stream_name)
.set_records(Some(records))
.send()
.await
{
Ok(output) => {
if record_count != output.records.len() {
return Err(SinkError::Kinesis(anyhow!("request record count {} not match the response record count {}", record_count, output.records.len())));
}
// From the doc of `put_records`:
// A single record failure does not stop the processing of subsequent records. As a result,
// PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same
// order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard.

// Therefore, to ensure at least once and eventual consistency, we figure out the first failed entry, and retry
// all the following entries even if the following entries may have been successfully processed.
if let Some((first_failed_idx, result_entry)) = Self::first_failed_entry(output) {
// first_failed_idx is also the number of successful entries
let partially_sent_count = first_failed_idx;
if partially_sent_count > 0 {
warn!(
partially_sent_count,
record_count,
"records are partially sent. code: [{}], message: [{}]",
result_entry.error_code.unwrap_or_default(),
result_entry.error_message.unwrap_or_default()
);
start_idx += partially_sent_count;
// reset retry count when having progress
remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
} else if let Some(err_code) = &result_entry.error_code && err_code == "ProvisionedThroughputExceededException" {
// From the doc of `put_records`:
// The ErrorCode parameter reflects the type of error and can be one of the following values:
// ProvisionedThroughputExceededException or InternalFailure. ErrorMessage provides more detailed
// information about the ProvisionedThroughputExceededException exception including the account ID,
// stream name, and shard ID of the record that was throttled.
let throttle_delay = throttle_delay.get_or_insert_with(|| ExponentialBackoff::from_millis(100).factor(2).max_delay(Duration::from_secs(2)).map(jitter)).next().expect("should not be none");
warn!(err_string = ?result_entry.error_message, ?throttle_delay, "throttle");
sleep(throttle_delay).await;
} else {
// no progress due to some internal error
assert_eq!(first_failed_idx, 0);
remaining_no_progress_retry_count -= 1;
if remaining_no_progress_retry_count == 0 {
return Err(SinkError::Kinesis(anyhow!(
"failed to send records. sent {} out of {}, last err: code: [{}], message: [{}]",
start_idx,
total_count,
result_entry.error_code.unwrap_or_default(),
result_entry.error_message.unwrap_or_default()
)));
} else {
warn!(
remaining_no_progress_retry_count,
sent = start_idx,
total_count,
"failed to send records. code: [{}], message: [{}]",
result_entry.error_code.unwrap_or_default(),
result_entry.error_message.unwrap_or_default()
)
}
}
} else {
start_idx += record_count;
// reset retry count when having progress
remaining_no_progress_retry_count = MAX_NO_PROGRESS_RETRY_COUNT;
// reset throttle delay when records can be fully sent.
throttle_delay = None;
}
}
Err(e) => {
remaining_no_progress_retry_count -= 1;
if remaining_no_progress_retry_count == 0 {
return Err(SinkError::Kinesis(anyhow!(e).context(format!(
"failed to send records. sent {} out of {}",
start_idx, total_count,
))));
} else {
warn!(
remaining_no_progress_retry_count,
sent = start_idx,
total_count,
"failed to send records. err: [{:?}]",
e.as_report(),
)
}
}
}
}
Ok(())
}
.boxed()
Expand All @@ -233,16 +366,32 @@ mod opaque_type {
pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture;

impl KinesisSinkPayloadWriter {
fn first_failed_entry(output: PutRecordsOutput) -> Option<(usize, PutRecordsResultEntry)> {
// From the doc of `put_records`:
// A successfully processed record includes ShardId and SequenceNumber values. The ShardId parameter
// identifies the shard in the stream where the record is stored. The SequenceNumber parameter is an
// identifier assigned to the put record, unique to all records in the stream.
//
// An unsuccessfully processed record includes ErrorCode and ErrorMessage values. ErrorCode reflects
// the type of error and can be one of the following values: ProvisionedThroughputExceededException or
// InternalFailure. ErrorMessage provides more detailed information about the ProvisionedThroughputExceededException
// exception including the account ID, stream name, and shard ID of the record that was throttled.
output
.records
.into_iter()
.find_position(|entry| entry.shard_id.is_none())
}

fn put_record(&mut self, key: String, payload: Vec<u8>) {
self.builder = Some(
self.builder.take().expect("should not be None").records(
PutRecordsRequestEntry::builder()
.partition_key(key)
.data(Blob::new(payload))
.build()
.expect("should not fail because we have set `data` and `partition_key`"),
),
);
let size = key.len() + payload.len();
self.entries.push((
PutRecordsRequestEntry::builder()
.partition_key(key)
.data(Blob::new(payload))
.build()
.expect("should not fail because we have set `data` and `partition_key`"),
size,
))
}
}

Expand Down

0 comments on commit cd9c31e

Please sign in to comment.