Skip to content

Adding a New Java Client Library: Publisher

Max Dietz edited this page Dec 14, 2016 · 10 revisions

Writing a new Task

Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.

Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:

class CPSPublisherTask extends Task {

The class must extend from Task.

  private CPSPublisherTask(StartRequest request) {
    super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);

The constructor should take the start request, pass it to Task, and use any other information to initialize.

    this.pubSub = PubSubOptions.builder()
        .projectId(request.getProject())
        .build().service();
    this.topic = Preconditions.checkNotNull(request.getTopic());
    this.payload = LoadTestRunner.createMessage(request.getMessageSize());
    this.batchSize = request.getPublishBatchSize();

We now initialize the client library and store some state for later use.

    this.id = (new Random()).nextInt();
  }

We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.

  public static void main(String[] args) throws Exception {
    LoadTestRunner.Options options = new LoadTestRunner.Options();
    new JCommander(options, args);
    LoadTestRunner.run(options, CPSPublisherTask::new);
  }

Every new client library task must have a main function like this.

  @Override
  public ListenableFuture<RunResult> doRun() {

This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so.

    try {
      List<Message> messages = new ArrayList<>(batchSize);

We construct the list of messages.

      String sendTime = String.valueOf(System.currentTimeMillis());

All published messages should have the same sendTime

      for (int i = 0; i < batchSize; i++) {
        messages.add(
            Message.builder(payload)

Contsruct batchSize messages.

                .addAttribute("sendTime", sendTime)

We set the sendTime for proper end to end latency reporting

                .addAttribute("clientId", id.toString())

We set the clientId

                .addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))

The sequence number must atomically increase, and the first message must start at 0.

                .build());
      }
      pubSub.publish(topic, messages);

Publish the messages.

      return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));

Return a listenable future. If this client library returned futures, we could use Futures.transform to do this, but for a simple library like this we can return an immediate future.

    } catch (PubSubException e) {
      return Futures.immediateFailedFuture(e);

Do not throw an exception if it can possibly be recovered from, instead report it to the Task.

    }
  }
}
Clone this wiki locally