Skip to content

Commit

Permalink
[improve][client] Add unified newTableView method in PulsarClient (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuruguo authored Jan 17, 2023
1 parent 1bb7b31 commit b4d5857
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ public void testTableView() throws Exception {
}
}

@Test
public void testNewTableView() throws Exception {
String topic = "persistent://public/default/new-tableview-test";
admin.topics().createPartitionedTopic(topic, 2);
Set<String> keys = this.publishMessages(topic, 10, false);
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableView()
.topic(topic)
.autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
.create();
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
assertEquals(tv.size(), 10);
});
assertEquals(tv.keySet(), keys);
}

@Test(timeOut = 30 * 1000, dataProvider = "topicDomain")
public void testTableViewUpdatePartitions(String topicDomain) throws Exception {
String topic = topicDomain + "://public/default/tableview-test-update-partitions";
Expand Down Expand Up @@ -242,7 +260,7 @@ public void testPublishNullValue(String topicDomain) throws Exception {
tv.close();

@Cleanup
TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
TableView<String> tv1 = pulsarClient.newTableView(Schema.STRING)
.topic(topic)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ static ClientBuilder builder() {
*
* <p>Example:
* <pre>{@code
* TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
* TableView<byte[]> tableView = client.newTableViewBuilder(Schema.BYTES)
* .topic("my-topic")
* .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
* .create();
Expand All @@ -240,9 +240,53 @@ static ClientBuilder builder() {
*
* @param schema provide a way to convert between serialized data and domain objects
* @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
* @deprecated Use {@link PulsarClient#newTableView(Schema)} to build and configure a {@link TableViewBuilder}
* instance
*/
@Deprecated
<T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema);

/**
* Create a table view builder for subscribing on a specific topic.
*
* <p>The TableView provides a key-value map view of a compacted topic. Messages without keys will
* be ignored.
*
* <p>Example:
* <pre>{@code
* TableView<byte[]> tableView = client.newTableView()
* .topic("my-topic")
* .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
* .create();
*
* tableView.forEach((k, v) -> System.out.println(k + ":" + v));
* }</pre>
*
* @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
*/
TableViewBuilder<byte[]> newTableView();

/**
* Create a table view builder with a specific schema for subscribing on a specific topic.
*
* <p>The TableView provides a key-value map view of a compacted topic. Messages without keys will
* be ignored.
*
* <p>Example:
* <pre>{@code
* TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
* .topic("my-topic")
* .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
* .create();
*
* tableView.forEach((k, v) -> System.out.println(k + ":" + v));
* }</pre>
*
* @param schema provide a way to convert between serialized data and domain objects
* @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
*/
<T> TableViewBuilder<T> newTableView(Schema<T> schema);

/**
* Update the service URL this client is using.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,25 @@ public <T> ReaderBuilder<T> newReader(Schema<T> schema) {
return new ReaderBuilderImpl<>(this, schema);
}

/**
* @deprecated use {@link #newTableView(Schema)} instead.
*/
@Override
@Deprecated
public <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema) {
return new TableViewBuilderImpl<>(this, schema);
}

@Override
public TableViewBuilder<byte[]> newTableView() {
return new TableViewBuilderImpl<>(this, Schema.BYTES);
}

@Override
public <T> TableViewBuilder<T> newTableView(Schema<T> schema) {
return new TableViewBuilderImpl<>(this, schema);
}

public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
return createProducerAsync(conf, Schema.BYTES, null);
}
Expand Down

0 comments on commit b4d5857

Please sign in to comment.