Skip to content

Commit

Permalink
[FLINK-35197][table] Fix incomplete serialization and deserialization…
Browse files Browse the repository at this point in the history
… of materialized tables
  • Loading branch information
hackergin committed May 11, 2024
1 parent 9fe8d7b commit e80c286
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.catalog;

import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableColumn;
Expand All @@ -26,6 +27,8 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.CatalogManagerMocks;
Expand All @@ -45,6 +48,7 @@
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.HamcrestCondition.matching;
Expand Down Expand Up @@ -142,6 +146,13 @@ class CatalogBaseTableResolutionTest {
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")));

private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER =
new ContinuousRefreshHandler("remote", JobID.generate().toHexString());

private static final String DEFINITION_QUERY =
String.format(
"SELECT id, region, county FROM %s.%s.T", DEFAULT_CATALOG, DEFAULT_DATABASE);

private static final ResolvedSchema RESOLVED_VIEW_SCHEMA =
new ResolvedSchema(
Arrays.asList(
Expand Down Expand Up @@ -200,7 +211,7 @@ void testCatalogViewResolution() {
}

@Test
void testPropertyDeSerialization() {
void testPropertyDeSerialization() throws Exception {
final CatalogTable table = CatalogTable.fromProperties(catalogTableAsProperties());

final ResolvedCatalogTable resolvedTable =
Expand All @@ -209,6 +220,35 @@ void testPropertyDeSerialization() {
assertThat(resolvedTable.toProperties()).isEqualTo(catalogTableAsProperties());

assertThat(resolvedTable.getResolvedSchema()).isEqualTo(RESOLVED_TABLE_SCHEMA);

// test materialized table de/serialization
final CatalogMaterializedTable catalogMaterializedTable =
CatalogPropertiesUtil.deserializeCatalogMaterializedTable(
catalogMaterializedTableAsProperties());
final ResolvedCatalogMaterializedTable resolvedCatalogMaterializedTable =
resolveCatalogBaseTable(
ResolvedCatalogMaterializedTable.class, catalogMaterializedTable);
assertThat(
CatalogPropertiesUtil.serializeCatalogMaterializedTable(
resolvedCatalogMaterializedTable))
.isEqualTo(catalogMaterializedTableAsProperties());

assertThat(resolvedCatalogMaterializedTable.getResolvedSchema())
.isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
assertThat(resolvedCatalogMaterializedTable.getFreshness())
.isEqualTo(Duration.ofSeconds(30));
assertThat(resolvedCatalogMaterializedTable.getDefinitionQuery())
.isEqualTo(DEFINITION_QUERY);
assertThat(resolvedCatalogMaterializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
assertThat(resolvedCatalogMaterializedTable.getRefreshMode())
.isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
assertThat(resolvedCatalogMaterializedTable.getRefreshStatus())
.isEqualTo(CatalogMaterializedTable.RefreshStatus.INITIALIZING);
byte[] expectedBytes =
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(CONTINUOUS_REFRESH_HANDLER);
assertThat(resolvedCatalogMaterializedTable.getSerializedRefreshHandler())
.isEqualTo(expectedBytes);
}

@Test
Expand Down Expand Up @@ -365,6 +405,37 @@ private static Map<String, String> catalogTableAsProperties() {
properties.put("connector", "custom");
properties.put("comment", "This is an example table.");
properties.put("snapshot", "1688918400000");

return properties;
}

private static Map<String, String> catalogMaterializedTableAsProperties() throws Exception {
// add base properties
final Map<String, String> properties = new HashMap<>();
properties.put("schema.0.name", "id");
properties.put("schema.0.data-type", "INT NOT NULL");
properties.put("schema.1.name", "region");
properties.put("schema.1.data-type", "VARCHAR(200)");
properties.put("schema.1.comment", "This is a region column.");
properties.put("schema.2.name", "county");
properties.put("schema.2.data-type", "VARCHAR(200)");
properties.put("schema.3.name", "topic");
properties.put("schema.3.data-type", "VARCHAR(200)");
properties.put("schema.3.comment", "");
properties.put("schema.primary-key.name", "primary_constraint");
properties.put("schema.primary-key.columns", "id");
properties.put("freshness", "PT30S");
properties.put("logical-refresh-mode", "CONTINUOUS");
properties.put("refresh-mode", "CONTINUOUS");
properties.put("refresh-status", "INITIALIZING");
properties.put("definition-query", DEFINITION_QUERY);

// put refresh handler
properties.put(
"refresh-handler-bytes",
encodeBytesToBase64(
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(
CONTINUOUS_REFRESH_HANDLER)));
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import javax.annotation.Nullable;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -49,6 +48,8 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Utilities for de/serializing {@link Catalog} objects into a map of string properties. */
Expand Down Expand Up @@ -159,9 +160,8 @@ public static Map<String, String> serializeCatalogMaterializedTable(
if (resolvedMaterializedTable.getSerializedRefreshHandler() != null) {
properties.put(
REFRESH_HANDLER_BYTES,
new String(
resolvedMaterializedTable.getSerializedRefreshHandler(),
StandardCharsets.UTF_8));
encodeBytesToBase64(
resolvedMaterializedTable.getSerializedRefreshHandler()));
}

return properties;
Expand Down Expand Up @@ -248,7 +248,7 @@ public static CatalogMaterializedTable deserializeCatalogMaterializedTable(
final @Nullable byte[] refreshHandlerBytes =
StringUtils.isNullOrWhitespaceOnly(refreshHandlerStringBytes)
? null
: refreshHandlerStringBytes.getBytes(StandardCharsets.UTF_8);
: decodeBase64ToBytes(refreshHandlerStringBytes);

CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder();
builder.schema(schema)
Expand Down

0 comments on commit e80c286

Please sign in to comment.