Detecting schema changes #1762
Replies: 2 comments 9 replies
-
As the author of the related mysql connector issue (conduitio-labs/conduit-connector-mysql#34), I was asked for comment here. I don't think i know enough about Conduit or the schema support to be able to be all that useful here. But my interest in this, as described in the related issue, is to be able to do full async mysql replication via conduit, which requires streaming the Sql binlog's DDL statements in addition to the DML ones. Of course, I could use the built-in async replication mechanisms (and am planning to do so for now), but I can see many advantages to being able to do so via NATS (or Kafka or any other relevant destination connector). Debezium appears to be able to do this, but conduit is much more appropriate for me and many others. This was specifically mentioned above and precluded, mostly on the grounds of:
Fair enough! I think that 2 could be mitigated by making something like this optional on both the source and destination side. If a clear need can be demonstrated for its support, the functionality can be prioritized for development. As for 1, I don't have much to add. Though the [debezium mysql docs] (https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-schema-history-topic) seem to cover this topic quite comprehensively - perhaps some inspiration can be gleaned from them? What might be an easier workaround for at least my use case, and maybe others, is to have a configurable option to include the Sql DDL statement as some sort of metadata in an OpenCDC record. The mysql destination connector would then read that and apply it to the destination mysql db. I see little downside to an approach like this. It would sort of satisfy all presented 3 options at the same time, but in a cruder, more easily-implemented manner ("Do Nothing" by not making it interoperable, but then satisfying 2 and 3 by just making it a connector-specific functionality. I hope this helps! Please feel free to ask questions or push back on anything! |
Beta Was this translation helpful? Give feedback.
-
The current schema support in Conduit is meant to primarily communicate the shape of the OpenCDC record (types, precision, scale, etc) and encode it in format which is platform agnostic (right now Avro). In MySQL and other formalized databases, schema can evolve both ways but usually "migration" happens forward not backward. Where the latter is mostly used as a reversal mechanism due an unforeseen events. If schema evolution is defined in this way, then each evolution has a point-in-time anchor (or version) which defines this process and allows for reversal. I think this contradicts with the behaviour Conduit has adopted (similarly to Confluent) with the v1 -> v2 -> v1. In other words, the schema versioning is this way anchored to the record shape than the schema evolution of the data. @nickchomey thanks for the comments. I appreciate your thoughts on this. Debezium grew up in the Kafka Connect world and the schema support there attempts to provide sufficient amount of metadata and help the consumer to make the right decision. This can be mirroring or extraction or what not, but ultimately there isn't (but maybe I missed it) a destination connector which can consume this schema and evolve the destination effectively. I suspect a big reason for this has to do with the connector ecosystem attempt to behave agnostic to whatever the source. Even in this case, an SMT can modify the record in a way that it may become incompatible with the schema and the schema will not reflect this. Ultimately, detecting schema changes in Conduit is a side-effect rather than a primary goal and may not provide the optimal tool for complete schema evolution. Most of the logic for building the schema evolution will fall to the connector and Option 2 can be reduced to SDK methods which allow for compared previous and current to identify delta and take action from there. Option 3 provides a dedicated facility to communicate schema changes, this again will be up to the connector to build because its knowledge of type and shape of the data before it is turned into an OpenCDC record. Then nearly all of these options have the job of type coercion, where each source will need to coerce the type it its field/data into avro and the destination will need do the reverse. Thus a Ts = The most naive and somewhat simple approach will be to provide both before and after schema in the OpenCDC record (of type schema) to represent clear transition. Additionally it can carry extra metadata to provide more context of the operation (e.g. Because Conduit is a streaming platform, mirroring and replication can be achieved loosely based on the data it generates. At most, it may be best to leave conduit to be responsible for data serder and allow the connectors to play the role of figuring out how to do this replication. MySQL to MySQL example, can be an extended schema record with the SQL binlog statement inject, which a destination MySQL connector can interpret and apply directly when provided. I'm a big fan of Do Nothing and allow ourselves to see how this plays out. ;-) |
Beta Was this translation helpful? Give feedback.
-
An OpenCDC record supports four operations:
snapshot
,create
,update
anddelete
. These operations suffice for transferring data records from the source to the destination, ensuring the destination receives the necessary information to apply the data correctly. However, there is a category of changes that Conduit and its source connectors currently do not address: schema changes.With the introduction of schema support in Conduit v0.11.0, we have enabled the attachment of schemas to records containing structured data. This allows destination connectors to detect schema changes by tracking the high watermark of the schema version per subject and applying any schema changes to the destination resource if the high watermark is exceeded.
There are a few potential issues with this approach:
Schema changes only get propagated once a data record with the new schema is produced. This means schema changes on the source won't be visible in the destination until some data on the source is created, updated, or deleted.
The schema registry does not bump the version if a schema change is reverted. Instead, it reuses the old version. For example, if a field is added to the record and then dropped in the next record, the schema version will go from 1 to 2 and back to 1. If the destination connector only compares the version against the high watermark, it won't detect the column drop. However, if the connector applies the changes every time the schema version doesn't match the last seen version, records arriving out of order may create unexpected results (e.g., a record with an additional field causes the destination connector to create the column, then a late-arriving record with the old schema makes the connector drop the column, deleting the previously inserted data).
The question is whether and how we want to address this. Here are a few options I can think of.
Option 1: Do Nothing
Doing nothing is a valid option. There is already a way to handle schema changes using the schema attached to a data record. The drawback is that schema changes get delayed until some actual data flows. However, the benefit is that no bespoke code is needed in source connectors to detect schema changes in destinations. This keeps connector development simple, which is a goal we are pursuing. In some use cases, this approach is already good enough; we need to ask ourselves what use cases require the schema change to be reflected ASAP and how common it is to solve that use case using Conduit?
Option 2:
OnSchemaChange
in destinationThis slightly improves the previous approach of doing nothing, by adding the tracking of the version high-watermark in the connector SDK. It would still be up to the destination connector to actually apply the change, but it would reduce some boilerplate code. Note that this still doesn't solve the issue of detecting schema changes when no data records are being produced.
One way to implement this would be to add a method like
OnSchemaChange
to the destination connector and execute it once the SDK detects a schema change. It needs to be an optional function, ensuring the pipeline continues to work even if the connector does not implement it.Option 3: Operation
schema
We could add another operation to the OpenCDC record, such as
schema
, triggered by the source connector upon detecting a schema change. The main question is how to represent this schema. We cannot use SQL DDL statements like Debezium, because we want all our sources and destinations to be compatible with each other, and some might not handle SQL. Currently, our schemas attached to records only support Avro, which could be a choice, though it limits us due to Avro's expressiveness (some types won't be representable). The other question is how can we nicely represent the difference between two schemas so that the destination connector could have an easier job applying the changes?Another challenge here is that source connectors need to produce this data. Connector developers would need to implement this functionality and inject schema records into the stream. This might not be trivial in some cases.
Note that if we add this operation to the record, we should make sure not to deliver the record using the
Write
method. Connectors that do not check the operation might consider it a normal data record or even fail if they do not recognize the operation. Instead, we should use something like anOnSchemaChange
function described above.Also note that if we decide to go with this option, we might still need to have automatic schema change detection in the connector SDK, in case the source connector is not implemented to produce these
schema
operations. Otherwise, destination connectors that rely on these operations could break.EDIT: Another thing I forgot to mention is schema changes for dropped collections. These can only be transmitted using this approach, since the previous two options require a data record to be emitted, while a dropped collection will not produce any more records.
Incompatible Changes
This problem exists regardless of the approach we take. Source connectors can currently produce any record, regardless of the schema. This means Conduit does not verify if the current record's schema matches or is compatible with the previous record's schema. Consequently, applying changes to the destination can result in unexpected behavior if done without checking the compatibility.
Even compatible changes may have unwanted effects. For instance, data can be lost if a field is removed from a schema and the connector decides to drop the field in the destination resource. Ultimately, it is up to the connector to decide how to apply the schema changes to the destination resource.
When using an external schema registry like the one from Confluent you can configure the compatibility mode and reject incompatible changes. Maybe Conduit should also implement this in the built-in schema registry and enforce only compatible schema changes. However, if we do that, we need to figure out the steps a user needs to take to recover such a pipeline.
I think that this feature currently has too many open questions and is not defined clearly enough for us to tackle it, so until we figure out the details I'm leaning toward option 1.
Related to conduitio-labs/conduit-connector-mysql#34.
Beta Was this translation helpful? Give feedback.
All reactions