Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 4, 2024
1 parent b6c05a1 commit 3917b97
Showing 1 changed file with 23 additions and 58 deletions.
81 changes: 23 additions & 58 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,14 @@ mod test {
use risingwave_common::array::Op;
use risingwave_common::catalog::ColumnId;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::types::{DataType, Date};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
use url::Url;

use super::*;
use crate::connector_common::AwsAuthProps;
use crate::parser::plain_parser::PlainParser;
use crate::parser::unified::avro::unix_epoch_days;
use crate::parser::{AccessBuilderImpl, SourceStreamChunkBuilder, SpecificParserConfig};
use crate::source::{SourceColumnDesc, SourceContext};

Expand Down Expand Up @@ -316,62 +315,28 @@ mod test {
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row();
for (i, field) in record.fields.iter().enumerate() {
let value = field.clone().1;
match value {
Value::String(str) | Value::Union(_, box Value::String(str)) => {
assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str())));
}
Value::Boolean(bool_val) => {
assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val)));
}
Value::Int(int_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int32(int_val)));
}
Value::Long(i64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val)));
}
Value::Float(f32_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into())));
}
Value::Double(f64_val) => {
assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into())));
}
Value::Date(days) => {
assert_eq!(
row[i],
Some(ScalarImpl::Date(
Date::with_days(days + unix_epoch_days()).unwrap(),
))
);
}
Value::TimestampMillis(millis) => {
assert_eq!(
row[i],
Some(Timestamptz::from_millis(millis).unwrap().into())
);
}
Value::TimestampMicros(micros) => {
assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into()));
}
Value::Bytes(bytes) => {
assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice())));
}
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
assert_eq!(
row[i],
Some(Interval::from_month_day_usec(months, days, usecs).into())
);
}
_ => {
unreachable!()
}
}
}

expect_test::expect![[r#"
("id", Int(32)) => Some(Int32(32))
("sequence_id", Long(64)) => Some(Int64(64))
("name", Union(1, String("str_value"))) => Some(Utf8("str_value"))
("score", Float(32.0)) => Some(Float32(OrderedFloat(32.0)))
("avg_score", Double(64.0)) => Some(Float64(OrderedFloat(64.0)))
("is_lasted", Boolean(true)) => Some(Bool(true))
("entrance_date", Date(0)) => Some(Date(Date(1970-01-01)))
("birthday", TimestampMillis(0)) => Some(Timestamptz(Timestamptz(0)))
("anniversary", TimestampMicros(0)) => Some(Timestamptz(Timestamptz(0)))
("passed", Duration(Duration { months: Months(1), days: Days(1), millis: Millis(1000) })) => Some(Interval(Interval { months: 1, days: 1, usecs: 1000000 }))
("bytes", Bytes([1, 2, 3, 4, 5])) => Some(Bytea([1, 2, 3, 4, 5]))"#]].assert_eq(&format!(
"{}",
record
.fields
.iter()
.zip_eq(row.iter())
.format_with("\n", |(avro, datum), f| {
f(&format_args!("{:?} => {:?}", avro, datum))
})
));
}

fn build_rw_columns() -> Vec<SourceColumnDesc> {
Expand Down

0 comments on commit 3917b97

Please sign in to comment.