Skip to content

Commit

Permalink
Notify the CDC Client if streaming stopped due to exception (#28998)
Browse files Browse the repository at this point in the history
* Add tinyint decode

* Notify the client if streaming stopped due to exception
  • Loading branch information
azexcy authored Nov 10, 2023
1 parent 2cb461c commit 79ca3bc
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ private Object readColumnData(final String data, final String columnType) {
return decodeString(data.substring(1));
}
switch (columnType) {
case "tinyint":
case "smallint":
return Short.parseShort(data);
case "integer":
return Integer.parseInt(data);
case "bigint":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,18 @@ void assertDecodeWithTsquery() {
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
}

@Test
void assertDecodeWitTinyint() {
MppTableData tableData = new MppTableData();
tableData.setTableName("public.test");
tableData.setOpType("INSERT");
tableData.setColumnsName(new String[]{"data"});
tableData.setColumnsType(new String[]{"tinyint"});
tableData.setColumnsVal(new String[]{"255"});
ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj, is(255));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;

import io.netty.channel.Channel;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
Expand Down Expand Up @@ -55,6 +56,7 @@ public final class CDCSocketSink implements PipelineSink {

private final ShardingSphereDatabase database;

@Getter
private final Channel channel;

private final Map<String, String> tableNameSchemaMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
Expand Down Expand Up @@ -184,7 +186,7 @@ private final class CDCExecuteCallback implements ExecuteCallback {

private final String identifier;

private final PipelineJobItemContext jobItemContext;
private final CDCJobItemContext jobItemContext;

@Override
public void onSuccess() {
Expand All @@ -200,6 +202,10 @@ public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage()));
}
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down

0 comments on commit 79ca3bc

Please sign in to comment.