Skip to content

Commit

Permalink
Merge branch 'main' into chore/switch-e2e-sql-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Aug 28, 2024
2 parents d56ee69 + befb04f commit 1019c79
Show file tree
Hide file tree
Showing 51 changed files with 895 additions and 330 deletions.
18 changes: 18 additions & 0 deletions dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,30 @@ import {
} from "../../proto/gen/catalog"
import {
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
RelationIdInfos,
TableFragments,
} from "../../proto/gen/meta"
import { ColumnCatalog, Field } from "../../proto/gen/plan_common"
import { UserInfo } from "../../proto/gen/user"
import api from "./api"

// NOTE(kwannoel): This can be optimized further, instead of fetching the entire TableFragments struct,
// We can fetch the fields we need from TableFragments, in a truncated struct.
export async function getFragmentsByJobId(
jobId: number
): Promise<TableFragments> {
let route = "/fragments/job_id/" + jobId.toString()
let tableFragments: TableFragments = TableFragments.fromJSON(
await api.get(route)
)
return tableFragments
}

export async function getRelationIdInfos(): Promise<RelationIdInfos> {
let fragmentIds: RelationIdInfos = await api.get("/relation_id_infos")
return fragmentIds
}

export async function getFragments(): Promise<TableFragments[]> {
let fragmentList: TableFragments[] = (await api.get("/fragments2")).map(
TableFragments.fromJSON
Expand Down
71 changes: 40 additions & 31 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ import {
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import {
getFragmentsByJobId,
getRelationIdInfos,
getStreamingJobs,
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"
Expand Down Expand Up @@ -194,28 +198,33 @@ interface EmbeddedBackPressureInfo {

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)
const { response: relationIdInfos } = useFetch(getRelationIdInfos)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
const [tableFragments, setTableFragments] = useState<TableFragments>()

const toast = useErrorToast()

useEffect(() => {
if (relationId) {
setTableFragments(undefined)
getFragmentsByJobId(relationId).then((tf) => {
setTableFragments(tf)
})
}
}, [relationId])

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
if (relationId) {
const fragments = fragmentList.find((x) => x.tableId === relationId)
if (fragments) {
const fragmentDep = buildFragmentDependencyAsEdges(fragments)
return {
fragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
if (tableFragments) {
const fragmentDep = buildFragmentDependencyAsEdges(tableFragments)
return {
fragments: tableFragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
}
}
}, [fragmentList, relationId])
}, [tableFragments])

useEffect(() => {
if (relationList) {
Expand Down Expand Up @@ -255,38 +264,38 @@ export default function Streaming() {

const handleSearchFragment = () => {
const searchFragIdInt = parseInt(searchFragId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
if (tf.fragments[fragmentId].fragmentId == searchFragIdInt) {
setRelationId(tf.tableId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
if (parseInt(fragmentId) == searchFragIdInt) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(searchFragIdInt)
return
}
}
}
}

toast(new Error(`Fragment ${searchFragIdInt} not found`))
}

const handleSearchActor = () => {
const searchActorIdInt = parseInt(searchActorId)
if (fragmentList) {
for (const tf of fragmentList) {
for (const fragmentId in tf.fragments) {
const fragment = tf.fragments[fragmentId]
for (const actor of fragment.actors) {
if (actor.actorId == searchActorIdInt) {
setRelationId(tf.tableId)
setSelectedFragmentId(fragment.fragmentId)
return
}
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
let actorIds = fragmentIdToRelationId[fragmentId].ids
if (actorIds.includes(searchActorIdInt)) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(parseInt(fragmentId))
return
}
}
}
}

toast(new Error(`Actor ${searchActorIdInt} not found`))
}

Expand Down
52 changes: 52 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ distribution key id NULL NULL
table description rw_customers NULL NULL


# add column
system ok
mysql -e "
USE mytest;
Expand All @@ -64,6 +65,57 @@ primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# rename column on upstream will not be replicated, since we do not support rename column
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v1 TO v11;
ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2);
"

sleep 3s

# table schema unchanges, since we reject rename column
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# revert column rename on upstream
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v11 TO v1;
ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2);
"

# drop columns
system ok
mysql -e "
USE mytest;
ALTER TABLE customers DROP COLUMN modified;
ALTER TABLE customers DROP COLUMN v1;
ALTER TABLE customers DROP COLUMN v2;
"

sleep 3s

# modified column should be dropped
query TTTT
describe rw_customers;
----
id bigint false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
drop source mysql_source cascade;
2 changes: 1 addition & 1 deletion e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
FORMAT upsert ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);
Expand Down
17 changes: 17 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,20 @@ service EventLogService {
rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse);
rpc AddEventLog(AddEventLogRequest) returns (AddEventLogResponse);
}

message ActorIds {
repeated uint32 ids = 1;
}

message FragmentIdToActorIdMap {
map<uint32, ActorIds> map = 1;
}

/// Provides all the ids: relation_id, fragment_id, actor_id
/// in an hierarchical format.
/// relation_id -> [fragment_id]
/// fragment_id -> [actor_id]
message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ opendal = { workspace = true, features = [
"services-gcs",
"services-memory",
"services-s3",
"services-webhdfs",
] }
openssl = "0.10"
parking_lot = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources {
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use risingwave_pb::plan_common::{
use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand All @@ -57,6 +57,8 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(AZBLOB_CONNECTOR, HashSet::from(["file", "offset"])),
(POSIX_FS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub fn parse_schema_change(
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];

let upstream_ddl = accessor
let upstream_ddl: String = accessor
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
.to_owned_datum()
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/file_sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pub mod fs;
pub mod gcs;
pub mod opendal_sink;
pub mod s3;
pub mod webhdfs;
1 change: 1 addition & 0 deletions src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub enum EngineType {
S3,
Fs,
Azblob,
Webhdfs,
}

impl<S: OpendalSinkBackend> Sink for FileSink<S> {
Expand Down
Loading

0 comments on commit 1019c79

Please sign in to comment.