diff --git a/dozer-sinks/bigquery/src/lib.rs b/dozer-sinks/bigquery/src/lib.rs index 501cf170f7..094b6e8fb4 100644 --- a/dozer-sinks/bigquery/src/lib.rs +++ b/dozer-sinks/bigquery/src/lib.rs @@ -1,5 +1,5 @@ use std::{ - ops::Mul, + ops::{Deref, Mul}, sync::Arc, thread, time::{Duration, Instant}, @@ -83,6 +83,7 @@ pub struct BigQuerySink { impl Sink for BigQuerySink { fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + self.send_batch(); Ok(()) } @@ -117,7 +118,7 @@ impl Sink for BigQuerySink { impl BigQuerySink { pub fn new(mut config: BigQueryConfig, schema: Schema) -> Self { - let (sender, receiver) = channel(20); + let (sender, receiver) = channel(1000000); let options = config.options.get_or_insert_with(Default::default); let max_batch_size = *options.batch_size.get_or_insert_with(default_batch_size); let _var = options @@ -144,7 +145,7 @@ impl BigQuerySink { fn send_batch(&mut self) { let mut batch = Vec::new(); std::mem::swap(&mut batch, &mut self.batch); - debug!("sending {} ops to bigquery client", batch.len()); + // debug!("sending {} ops to bigquery client", batch.len()); if let Err(err) = self.sender.blocking_send(batch) { panic!("bigquery client crashed: {err:?}"); } @@ -165,6 +166,26 @@ impl BigQuerySink { } } +struct Metrics { + start_time: Instant, + total_processed_bytes: usize, + total_parquet_conversion_time: Duration, + total_parquet_upload_time: Duration, + total_bigquery_load_time: Duration, +} + +impl Metrics { + fn new() -> Self { + Self { + start_time: Instant::now(), + total_processed_bytes: Default::default(), + total_parquet_conversion_time: Default::default(), + total_parquet_upload_time: Default::default(), + total_bigquery_load_time: Default::default(), + } + } +} + async fn bigquery_client( mut records_receiver: Receiver>, config: BigQueryConfig, @@ -172,14 +193,6 @@ async fn bigquery_client( ) -> Result<(), BoxedError> { info!("Starting BigQuery Sink"); - let Destination { - stage_gcs_bucket_name: bucket_name, - project, - dataset, - table, - .. - } = &config.destination; - let options = config.options.as_ref().unwrap(); let max_batch_size = options.batch_size.unwrap(); let max_stage_size_in_bytes = options @@ -193,142 +206,289 @@ async fn bigquery_client( html_escape::decode_html_entities(&config.auth.service_account_key).to_string(); let gcp_sa_key = yup_oauth2::parse_service_account_key(&service_account_key)?; + let metrics = Arc::new(std::sync::Mutex::new(Metrics::new())); + + let (parquet_sender, parquet_receiver) = channel(1000); + tokio::spawn(parquet_loader( + parquet_receiver, + gcp_sa_key.clone(), + max_stage_size_in_bytes, + config.destination.clone(), + metrics.clone(), + )); + let client = gcp_bigquery_client::Client::from_service_account_key(gcp_sa_key, false).await?; create_table(&config, &client, &schema).await?; - let stage_object_store = GoogleCloudStorageBuilder::new() - .with_bucket_name(bucket_name) - .with_service_account_key(service_account_key) - .with_retry(RetryConfig { - backoff: BackoffConfig::default(), - max_retries: usize::max_value(), - retry_timeout: std::time::Duration::from_secs(u64::MAX), - }) - .build()?; - let arrow_schema = Arc::new(map_to_arrow_schema(&schema)?); - let mut buffer: Vec; - let mut writer: ArrowWriter<&mut Vec>; - let mut records_processed: usize; + let mut records = Vec::new(); - let mut total_staged_size_bytes = 0usize; - let mut staged_files_uris = Vec::new(); + 'main: loop { + loop { + let capacity = 100; + let mut results = Vec::with_capacity(capacity); + let num_results = records_receiver.recv_many(&mut results, capacity).await; + if num_results == 0 { + break 'main; + } + let continue_reading = num_results < capacity; + for ops in results { + // debug!("got {} ops", ops.len()); + + for op in ops { + match op { + Operation::Insert { new } => records.push(new), + Operation::BatchInsert { new } => records.extend(new), + Operation::Delete { .. } => todo!(), + Operation::Update { .. } => todo!(), + } + } + } + if !continue_reading || records.len() >= max_batch_size { + break; + } + } - let mut total_written_bytes = 0usize; + let num_recods = records.len(); - let batch_start_time = Instant::now(); + if num_recods >= max_batch_size { + debug!("writing parquet file with {num_recods} records"); + let mut parquet_records = Vec::new(); + let schema = schema.clone(); + let arrow_schema = arrow_schema.clone(); + let destination = config.destination.clone(); + let service_account_key = service_account_key.clone(); + let parquet_sender = parquet_sender.clone(); + let metrics = metrics.clone(); + + std::mem::swap(&mut parquet_records, &mut records); + tokio::task::spawn_blocking(move || { + if let Err(err) = (move || -> Result<(), BoxedError> { + let conversion_start_time = Instant::now(); + + let mut buffer = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None).unwrap(); + + for record in parquet_records { + writer.write(&map_record_to_arrow(record, &schema)?)?; + } + + if let Err(err) = writer.close() { + error!("parquet error: {err}"); + } + + let conversion_time = Instant::now() - conversion_start_time; + metrics.lock().unwrap().total_parquet_conversion_time += conversion_time; + + let Destination { + stage_gcs_bucket_name: bucket_name, + project, + dataset, + table, + .. + } = &destination; + + let filename = format!( + "{project}.{dataset}.{table}-{}.parquet", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + ); + + let num_bytes = buffer.len(); + let bytes = Bytes::from(buffer); + + tokio::spawn({ + let stage_object_store = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket_name) + .with_service_account_key(&service_account_key) + .with_retry(RetryConfig { + backoff: BackoffConfig::default(), + max_retries: usize::max_value(), + retry_timeout: std::time::Duration::from_secs(u64::MAX), + }) + .build()?; + + let sender = parquet_sender; + let uri = format!("gs://{bucket_name}/{filename}"); + async move { + let upload_start_time = Instant::now(); + + if let Err(err) = stage_object_store + .put(&Path::from(filename.clone()), bytes) + .await + { + error!("failed to upload parquet file {uri}; {err}"); + return; + } + + let upload_time = Instant::now() - upload_start_time; + metrics.lock().unwrap().total_parquet_upload_time += upload_time; + + let _r = sender + .send(ParquetMetadata { + gcs_uri: uri, + size_in_bytes: num_bytes, + }) + .await; + } + }); - macro_rules! reset_segment { - () => { - buffer = Vec::new(); - writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None).unwrap(); - records_processed = 0usize; - }; + Ok(()) + })() { + error!("error writing parquet: {err}"); + panic!(); + } + }); + } } - reset_segment!(); + Ok(()) +} - loop { - let Some(ops) = records_receiver.recv().await else { - break; - }; +struct ParquetMetadata { + gcs_uri: String, + size_in_bytes: usize, +} - let mut records = Vec::with_capacity(ops.len()); - for op in ops { - match op { - Operation::Insert { new } => records.push(new), - Operation::BatchInsert { new } => records.extend(new), - Operation::Delete { .. } => todo!(), - Operation::Update { .. } => todo!(), +fn report(metrics: &Metrics) { + let uptime = Instant::now() - metrics.start_time; + + struct Format<'a>(&'a Duration); + impl std::fmt::Display for Format<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let time = self.0; + let hours = time.as_secs() / 3600; + let minutes = (time.as_secs() % 3600) / 60; + let seconds = time.as_secs() % 60; + let millis = time.subsec_millis(); + write!(f, "{hours:02}:{minutes:02}:{seconds:02}")?; + if millis != 0 { + write!(f, ".{millis:03}")?; } + Ok(()) } + } - let num_recods = records.len(); - for record in records { - writer.write(&map_record_to_arrow(record, &schema)?)?; - } + debug!( + concat!( + "\nMetrics Report:\n", + " rate: {} MB/sec\n", + " uptime: {}\n", + " in-memory records-to-parquet conversion time: {} (parallelized)\n", + " parquet upload to GCS time: {} (parallelized)\n", + " bigquery load parquet from GCS time: {} (parallelized)\n" + ), + metrics.total_processed_bytes as f64 / 1024f64 / 1024f64 / uptime.as_secs_f64(), + Format(&uptime), + Format(&metrics.total_parquet_conversion_time), + Format(&metrics.total_parquet_upload_time), + Format(&metrics.total_bigquery_load_time), + ); +} - records_processed += num_recods; - - if records_processed >= max_batch_size { - debug!("writing parquet file with {records_processed} records"); - writer.close()?; - let num_bytes = buffer.len(); - let bytes = Bytes::from(buffer); - reset_segment!(); - let filename = format!( - "{project}.{dataset}.{table}-{}.parquet", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() - ); - stage_object_store - .put(&Path::from(filename.clone()), bytes) - .await?; - - staged_files_uris.push(format!("gs://{bucket_name}/{filename}")); - total_staged_size_bytes += num_bytes; - - if total_staged_size_bytes >= max_stage_size_in_bytes { - debug!( - "loading {} parquet files into bigquery", - staged_files_uris.len() - ); - let job = Job { - configuration: Some(JobConfiguration { - job_timeout_ms: Some("300000".to_string()), - load: Some(JobConfigurationLoad { - create_disposition: Some("CREATE_NEVER".to_string()), - destination_table: Some(TableReference::new(project, dataset, table)), - source_format: Some("PARQUET".to_string()), - source_uris: Some(staged_files_uris), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }; - staged_files_uris = Vec::new(); +async fn parquet_loader( + mut receiver: Receiver, + gcp_sa_key: yup_oauth2::ServiceAccountKey, + max_stage_size_in_bytes: usize, + destination: Destination, + metrics: Arc>, +) -> Result<(), BoxedError> { + let mut total_staged_bytes = 0usize; + let mut staged_files_uris = Vec::new(); - let job = client.job().insert(project, job).await?; + let Destination { + project, + dataset, + table, + .. + } = &destination; - let job_ref = job.job_reference.expect("job_reference not found"); - debug!("job_ref: {job_ref:?}"); + let client = gcp_bigquery_client::Client::from_service_account_key(gcp_sa_key, false).await?; - while client - .job() - .get_job(project, job_ref.job_id.as_ref().unwrap(), None) - .await? - .status - .expect("job_status not found") - .state - != Some("DONE".to_string()) - { - tokio::time::sleep(Duration::from_secs_f64(0.5)).await; - } + loop { + let Some(ParquetMetadata { + gcs_uri, + size_in_bytes, + }) = receiver.recv().await + else { + break; + }; - debug!( - "Inserted {} MB of data into BigQuery", - total_staged_size_bytes / 1024 / 1024 - ); + total_staged_bytes += size_in_bytes; + staged_files_uris.push(gcs_uri); - let batch_time = Instant::now() - batch_start_time; - debug!( - "report: Total time: {} hours, {} minutes, {} seconds {} milliseconds\n rate: {} MB/sec", - batch_time.as_secs() / 3600, - (batch_time.as_secs() % 3600) / 60, - batch_time.as_secs() % 60, - batch_time.subsec_millis(), - total_written_bytes as f64 / 1024f64 / 1024f64 / batch_time.as_secs_f64() - ); + if total_staged_bytes >= max_stage_size_in_bytes { + debug!( + "loading {} parquet files into bigquery", + staged_files_uris.len() + ); - total_written_bytes += total_staged_size_bytes; + let load_start_time = Instant::now(); - total_staged_size_bytes = 0; - } + let job = Job { + configuration: Some(JobConfiguration { + job_timeout_ms: Some("300000".to_string()), + load: Some(JobConfigurationLoad { + create_disposition: Some("CREATE_NEVER".to_string()), + destination_table: Some(TableReference::new(project, dataset, table)), + source_format: Some("PARQUET".to_string()), + source_uris: Some(staged_files_uris), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }; + let job = client.job().insert(project, job).await?; + + let job_ref = job.job_reference.expect("job_reference not found"); + debug!("job_ref: {job_ref:?}"); + + metrics.lock().unwrap().total_processed_bytes += total_staged_bytes; + total_staged_bytes = 0; + staged_files_uris = Vec::new(); + + tokio::spawn({ + let jobs = client.job().clone(); + let project = project.clone(); + let metrics = metrics.clone(); + async move { + loop { + match jobs + .get_job(&project, job_ref.job_id.as_ref().unwrap(), None) + .await + { + Ok(job) => { + if job.status.expect("job_status not found").state + == Some("DONE".to_string()) + { + break; + } else { + tokio::time::sleep(Duration::from_secs_f64(0.5)).await; + continue; + } + } + Err(err) => { + error!( + "error getting job status for job: {:?}; {err}", + job_ref.job_id + ); + return; + } + } + } + + let load_time = Instant::now() - load_start_time; + metrics.lock().unwrap().total_bigquery_load_time += load_time; + + report(metrics.lock().unwrap().deref()); + } + }); } }