diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 9c1779839a294..b6569d6a21dc1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -173,6 +173,24 @@ public void testTableView() throws Exception { } } + @Test + public void testNewTableView() throws Exception { + String topic = "persistent://public/default/new-tableview-test"; + admin.topics().createPartitionedTopic(topic, 2); + Set keys = this.publishMessages(topic, 10, false); + @Cleanup + TableView tv = pulsarClient.newTableView() + .topic(topic) + .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) + .create(); + tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v))); + Awaitility.await().untilAsserted(() -> { + log.info("Current tv size: {}", tv.size()); + assertEquals(tv.size(), 10); + }); + assertEquals(tv.keySet(), keys); + } + @Test(timeOut = 30 * 1000, dataProvider = "topicDomain") public void testTableViewUpdatePartitions(String topicDomain) throws Exception { String topic = topicDomain + "://public/default/tableview-test-update-partitions"; @@ -242,7 +260,7 @@ public void testPublishNullValue(String topicDomain) throws Exception { tv.close(); @Cleanup - TableView tv1 = pulsarClient.newTableViewBuilder(Schema.STRING) + TableView tv1 = pulsarClient.newTableView(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 05a21fa1585bf..90095300cca72 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -230,7 +230,7 @@ static ClientBuilder builder() { * *

Example: *

{@code
-     *  TableView tableView = client.newTableView(Schema.BYTES)
+     *  TableView tableView = client.newTableViewBuilder(Schema.BYTES)
      *            .topic("my-topic")
      *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
      *            .create();
@@ -240,9 +240,53 @@ static ClientBuilder builder() {
      *
      * @param schema provide a way to convert between serialized data and domain objects
      * @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
+     * @deprecated Use {@link PulsarClient#newTableView(Schema)} to build and configure a {@link TableViewBuilder}
+     * instance
      */
+    @Deprecated
      TableViewBuilder newTableViewBuilder(Schema schema);
 
+    /**
+     * Create a table view builder for subscribing on a specific topic.
+     *
+     * 

The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + *

Example: + *

{@code
+     *  TableView tableView = client.newTableView()
+     *            .topic("my-topic")
+     *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+     *            .create();
+     *
+     *  tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+     * }
+ * + * @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance + */ + TableViewBuilder newTableView(); + + /** + * Create a table view builder with a specific schema for subscribing on a specific topic. + * + *

The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + *

Example: + *

{@code
+     *  TableView tableView = client.newTableView(Schema.BYTES)
+     *            .topic("my-topic")
+     *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+     *            .create();
+     *
+     *  tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+     * }
+ * + * @param schema provide a way to convert between serialized data and domain objects + * @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance + */ + TableViewBuilder newTableView(Schema schema); + /** * Update the service URL this client is using. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index d964328d59cbd..eba7ff91f65e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -305,11 +305,25 @@ public ReaderBuilder newReader(Schema schema) { return new ReaderBuilderImpl<>(this, schema); } + /** + * @deprecated use {@link #newTableView(Schema)} instead. + */ @Override + @Deprecated public TableViewBuilder newTableViewBuilder(Schema schema) { return new TableViewBuilderImpl<>(this, schema); } + @Override + public TableViewBuilder newTableView() { + return new TableViewBuilderImpl<>(this, Schema.BYTES); + } + + @Override + public TableViewBuilder newTableView(Schema schema) { + return new TableViewBuilderImpl<>(this, schema); + } + public CompletableFuture> createProducerAsync(ProducerConfigurationData conf) { return createProducerAsync(conf, Schema.BYTES, null); }