Skip to content

Commit

Permalink
more progress on avro serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 22, 2023
1 parent 7cc876b commit e65c128
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 213 deletions.
6 changes: 3 additions & 3 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ pub(crate) async fn create_pipeline<'a>(
if is_preview {
for node in program.graph.node_weights_mut() {
// if it is a connector sink or switch to a web sink
if let Operator::ConnectorSink { .. } = node.operator {
node.operator = Operator::ConnectorSink(ConnectorOp::web_sink());
}
// if let Operator::ConnectorSink { .. } = node.operator {
// node.operator = Operator::ConnectorSink(ConnectorOp::web_sink());
// }
}
}

Expand Down
12 changes: 8 additions & 4 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct AvroFormat {
pub confluent_schema_registry: bool,

#[serde(default)]
pub embedded_schema: bool,
pub raw_datums: bool,

#[serde(default)]
pub into_unstructured_json: bool,
Expand All @@ -178,12 +178,12 @@ pub struct AvroFormat {
impl AvroFormat {
pub fn new(
confluent_schema_registry: bool,
embedded_schema: bool,
raw_datums: bool,
into_unstructured_json: bool,
) -> Self {
Self {
confluent_schema_registry,
embedded_schema,
raw_datums,
into_unstructured_json,
reader_schema: None,
}
Expand All @@ -194,7 +194,7 @@ impl AvroFormat {
opts.remove("avro.confluent_schema_registry")
.filter(|t| t == "true")
.is_some(),
opts.remove("avro.include_schema")
opts.remove("avro.raw_datums")
.filter(|t| t == "true")
.is_some(),
opts.remove("avro.into_unstructured_json")
Expand All @@ -206,6 +206,10 @@ impl AvroFormat {
pub fn add_reader_schema(&mut self, schema: apache_avro::Schema) {
self.reader_schema = Some(SerializableAvroSchema(schema));
}

pub fn sanitize_field(s: &str) -> String {
s.replace(".", "__")
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
Expand Down
136 changes: 32 additions & 104 deletions arroyo-sql/src/avro.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::ptr::null;
use crate::types::{StructDef, StructField, TypeDef};
use anyhow::{anyhow, bail};
use apache_avro::schema::{
FixedSchema, Name, RecordField, RecordFieldOrder, RecordSchema, UnionSchema,
};
use apache_avro::Schema;
use apache_avro::schema::{FixedSchema, Name, RecordField, RecordFieldOrder, RecordSchema, UnionSchema};
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use proc_macro2::{Ident, TokenStream};
use quote::quote;
use std::ptr::null;

pub const ROOT_NAME: &str = "ArroyoAvroRoot";

Expand Down Expand Up @@ -116,7 +118,8 @@ fn to_typedef(source_name: &str, schema: &Schema) -> (TypeDef, Option<String>) {

/// Generates code that serializes an arroyo data struct into avro
///
/// Note that this must align with the schemas described in arrow_to_avro below
/// Note that this must align with the schemas constructed in
/// `arroyo_worker::formats::avro::arrow_to_avro_schema`!
pub fn generate_serializer_item(record: &Ident, field: &Ident, td: &TypeDef) -> TokenStream {
use DataType::*;
let value = quote!(#record.#field);
Expand All @@ -127,31 +130,36 @@ pub fn generate_serializer_item(record: &Ident, field: &Ident, td: &TypeDef) ->
TypeDef::DataType(dt, nullable) => {
let inner = match dt {
Null => unreachable!("null fields are not supported"),
Boolean => quote!{ Boolean(v) },
Int8 | Int16 | Int32 | UInt8 | UInt16 => quote! { Int(v) },
Int64 | UInt32 => quote! { Long(v) },
UInt64 => quote! { Fixed(8, v.to_be_bytes().to_vec()) },
Float16 | Float32 => quote! { Float(v) },
Float64 => quote! { Double(v) },
Timestamp(t, tz) => {
match (t, tz) {
(TimeUnit::Microsecond | TimeUnit::Nanosecond, None) => quote!{ TimestampMicros(arroyo_types::to_micros(v) as i64) },
(TimeUnit::Microsecond | TimeUnit::Nanosecond, Some(_)) => quote! { LocalTimestampMicros(arroyo_types::to_micros(v) as i64) },
(TimeUnit::Millisecond | TimeUnit::Second, None) => quote!{ TimestampMillis(arroyo_types::to_millis(v) as i64) },
(TimeUnit::Millisecond | TimeUnit::Second, Some(_)) => quote!{ LocalTimestampMillis(arroyo_types::to_millis(v) as i64) },
Boolean => quote! { Boolean(*v) },
Int8 | Int16 | Int32 | UInt8 | UInt16 => quote! { Int(*v as i32) },
Int64 | UInt32 | UInt64 => quote! { Long(*v as i64) },
Float16 | Float32 => quote! { Float(*v as f32) },
Float64 => quote! { Double(*v as f64) },
Timestamp(t, tz) => match (t, tz) {
(TimeUnit::Microsecond | TimeUnit::Nanosecond, None) => {
quote! { TimestampMicros(arroyo_types::to_micros(*v) as i64) }
}
}
Date32 | Date64 => quote! { Date(arroyo_types::days_since_epoch(v)) },
(TimeUnit::Microsecond | TimeUnit::Nanosecond, Some(_)) => {
quote! { LocalTimestampMicros(arroyo_types::to_micros(*v) as i64) }
}
(TimeUnit::Millisecond | TimeUnit::Second, None) => {
quote! { TimestampMillis(arroyo_types::to_millis(*v) as i64) }
}
(TimeUnit::Millisecond | TimeUnit::Second, Some(_)) => {
quote! { LocalTimestampMillis(arroyo_types::to_millis(*v) as i64) }
}
},
Date32 | Date64 => quote! { Date(arroyo_types::days_since_epoch(*v)) },
Time32(_) => todo!("time32 is not supported"),
Time64(_) => todo!("time64 is not supported"),
Duration(_) => todo!("duration is not supported"),
Interval(_) => todo!("interval is not supported"),
Binary | FixedSizeBinary(_) | LargeBinary => quote!{ Bytes(v.clone()) },
Utf8 | LargeUtf8 => quote!{ String(v.clone()) },
List(t) | FixedSizeList(t, _) | LargeList(t) => {
Binary | FixedSizeBinary(_) | LargeBinary => quote! { Bytes(v.clone()) },
Utf8 | LargeUtf8 => quote! { String(v.clone()) },
List(_) | FixedSizeList(_, _) | LargeList(_) => {
todo!("lists are not supported")
}
Struct(fields) => unreachable!("typedefs should not contain structs"),
Struct(_) => unreachable!("typedefs should not contain structs"),
Union(_, _) => unimplemented!("unions are not supported"),
Dictionary(_, _) => unimplemented!("dictionaries are not supported"),
Decimal128(_, _) => unimplemented!("decimal128 is not supported"),
Expand All @@ -161,92 +169,12 @@ pub fn generate_serializer_item(record: &Ident, field: &Ident, td: &TypeDef) ->
};

if *nullable {
quote!{
Union(#value.is_some() as u32, Box::new(#value.map(|v| #inner).unwrap_or_else(Null)))
quote! {
Union(#value.is_some() as u32, Box::new(#value.as_ref().map(|v| #inner).unwrap_or(Null)))
}
} else {
quote!{let v = #value; #inner}
quote! {{let v = &#value; #inner}}
}
}
}
}

fn arrow_to_avro(name: &str, dt: &DataType) -> Schema {
match dt {
DataType::Null => unreachable!("null fields are not supported"),
DataType::Boolean => Schema::Boolean,
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => Schema::Int,
DataType::Int64 | DataType::UInt32 => Schema::Long,
DataType::UInt64 => Schema::Fixed(FixedSchema {
name: Name::from("dev.arroyo.types.uint64"),
aliases: None,
doc: None,
size: 8,
attributes: Default::default(),
}),
DataType::Float16 | DataType::Float32 => Schema::Float,
DataType::Float64 => Schema::Double,
DataType::Timestamp(t, tz) => {
match (t, tz) {
(TimeUnit::Microsecond | TimeUnit::Nanosecond, None) => Schema::TimestampMicros,
(TimeUnit::Microsecond | TimeUnit::Nanosecond, Some(_)) => Schema::LocalTimestampMicros,
(TimeUnit::Millisecond | TimeUnit::Second, None) => Schema::TimestampMillis,
(TimeUnit::Millisecond | TimeUnit::Second, Some(_)) => Schema::LocalTimestampMillis,
}
}
DataType::Date32 | DataType::Date64 => Schema::Date,
DataType::Time64(_) | DataType::Time32(_) => { todo!("time is not supported") },
DataType::Duration(_) => todo!("duration is not supported"),
DataType::Interval(_) => todo!("interval is not supported"),
DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => Schema::Bytes,
DataType::Utf8 | DataType::LargeUtf8 => Schema::String,
DataType::List(t) | DataType::FixedSizeList(t, _) | DataType::LargeList(t) => {
Schema::Array(Box::new(arrow_to_avro(name, t.data_type())))
}
DataType::Struct(fields) => {
arrow_to_avro_schema(name, fields)
}
DataType::Union(_, _) => unimplemented!("unions are not supported"),
DataType::Dictionary(_, _) => unimplemented!("dictionaries are not supported"),
DataType::Decimal128(_, _) => unimplemented!("decimal128 is not supported"),
DataType::Decimal256(_, _) => unimplemented!("decimal256 is not supported"),
DataType::Map(_, _) => unimplemented!("maps are not supported"),
DataType::RunEndEncoded(_, _) => unimplemented!("run end encoded is not supported"),
}
}

fn field_to_avro(index: usize, name: &str, field: &Field) -> RecordField {
let mut schema = arrow_to_avro(&format!("{}_{}", name, &field.name()), field.data_type());

if field.is_nullable() {
schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
}

RecordField {
name: field.name().clone(),
doc: None,
aliases: None,
default: None,
schema,
order: RecordFieldOrder::Ascending,
position: index,
custom_attributes: Default::default(),
}
}

pub fn arrow_to_avro_schema(name: &str, fields: &Fields) -> Schema {
let fields = fields
.iter()
.enumerate()
.map(|(i, f)| field_to_avro(i, name, &**f))
.collect();

Schema::Record(RecordSchema {
name: Name::from(name),
aliases: None,
doc: None,
fields,
lookup: Default::default(),
attributes: Default::default(),
})
}
31 changes: 18 additions & 13 deletions arroyo-sql/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ use arroyo_rpc::{
};
use datafusion::sql::sqlparser::ast::{DataType as SQLDataType, ExactNumberInfo, TimezoneInfo};

use crate::avro;
use arroyo_rpc::api_types::connections::{
FieldType, PrimitiveType, SourceField, SourceFieldType, StructType,
};
use arroyo_rpc::formats::AvroFormat;
use arroyo_rpc::formats::Format::Avro;
use datafusion_common::ScalarValue;
use proc_macro2::{Ident, TokenStream};
use quote::{format_ident, quote};
use regex::Regex;
use syn::PathArguments::AngleBracketed;
use syn::{parse_quote, parse_str, GenericArgument, Type};
use crate::avro;

#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd)]
pub struct StructDef {
Expand Down Expand Up @@ -315,26 +317,29 @@ impl StructDef {
}

fn generate_avro_writer(&self) -> TokenStream {
let record_ident = format_ident!("record");
let fields: Vec<_> = self.fields.iter()
let record_ident = format_ident!("self");
let fields: Vec<_> = self
.fields
.iter()
.map(|f| {
let name = f.name();
let name = AvroFormat::sanitize_field(&f.name());
let field_ident = f.field_ident();
let serializer = avro::generate_serializer_item(
&record_ident, &field_ident, &f.data_type);
quote!{
record.put(#name, #serializer);
let serializer =
avro::generate_serializer_item(&record_ident, &field_ident, &f.data_type);
quote! {
__avro_record.put(#name, #serializer);
}
})
.collect();

quote! {
fn write_avro<W: std::io::Write>(&self, writer: &mut apache_avro::Writer<W>, schema: &apache_avro::Schema) {
let mut record = apache_avro::types::Record::new(schema).unwrap();
parse_quote! {
fn to_avro(&self, schema: &arroyo_worker::apache_avro::Schema) -> arroyo_worker::apache_avro::types::Value {
let mut __avro_record = arroyo_worker::apache_avro::types::Record::new(schema).unwrap();
use arroyo_worker::apache_avro::types::Value::*;

#(#fields )*;
#(#fields )*

writer.append(record).unwrap();
__avro_record.into()
}
}
}
Expand Down
1 change: 0 additions & 1 deletion arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ pub fn from_nanos(ts: u128) -> SystemTime {
+ Duration::from_nanos((ts % 1_000_000_000) as u64)
}


// used for avro serialization -- returns the number of days since the UNIX EPOCH
pub fn days_since_epoch(time: SystemTime) -> i32 {
time.duration_since(UNIX_EPOCH)
Expand Down
7 changes: 7 additions & 0 deletions arroyo-worker/src/connectors/kafka/sink/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#![allow(clippy::unnecessary_mut_passed)]

use apache_avro::{Schema, Writer};
use std::io::Write;
use std::time::{Duration, SystemTime};

use crate::engine::{Context, OutQueue};
Expand Down Expand Up @@ -142,6 +145,10 @@ impl SchemaData for TestOutStruct {
fn to_raw_string(&self) -> Option<Vec<u8>> {
unimplemented!()
}

fn to_avro(&self, schema: &apache_avro::Schema) -> apache_avro::types::Value {
todo!()
}
}

struct KafkaSinkWithWrites {
Expand Down
6 changes: 6 additions & 0 deletions arroyo-worker/src/connectors/kafka/source/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use apache_avro::Writer;
use arrow::datatypes::{DataType, Field, Schema};
use arroyo_state::{BackingStore, StateBackend};
use rand::Rng;
use std::io::Write;
use std::time::{Duration, SystemTime};

use crate::connectors::kafka::source;
Expand Down Expand Up @@ -35,6 +37,10 @@ impl SchemaData for TestData {
fn to_raw_string(&self) -> Option<Vec<u8>> {
None
}

fn to_avro(&self, schema: &apache_avro::Schema) -> apache_avro::types::Value {
todo!()
}
}

pub struct KafkaTopicTester {
Expand Down
Loading

0 comments on commit e65c128

Please sign in to comment.