Skip to content

Adding a New Java Client Library: Publisher

Max Dietz edited this page Jan 11, 2017 · 10 revisions

There are four main steps here:

  1. Add your client library to pom.xml
  2. Write a Task that exercises the publish path.
  3. Add the Task to the main Framework
  4. Write a startup script to run your Task on GCE

We will assume that you have taken care of the first step, and cover the three remaining steps in detail below.

Writing a new Task

The main goal of writing your Task is to execute a single Publish operation. We will next describe where to place your client, and walk you through a preexisting Task so that you can see all of the design choices we made, but the basic idea is to extend the Task class, and implement the doRun method to time a single Publish call using your client library.

Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.

Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:

The class must extend from Task.

class CPSPublisherTask extends Task {

The constructor should take the start request, pass it to Task, setting publish ack latency as the metric to report, and use any other information to initialize. Replace "gcloud" with the name of your client library.

  private CPSPublisherTask(StartRequest request) {
    super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);

We now initialize the client library and store some state for later use.

    this.pubSub = PubSubOptions.builder()
        .projectId(request.getProject())
        .build().service();
    this.topic = Preconditions.checkNotNull(request.getTopic());

The payload should be of the length specified by StartRequest message_size field. LoadTestRunner.createMessage is a helper function that will create a String of the specified length.

    this.payload = LoadTestRunner.createMessage(request.getMessageSize());
    this.batchSize = request.getPublishBatchSize();

We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.

    this.id = (new Random()).nextInt();
  }

Every new client library task must have a main function like this:

  public static void main(String[] args) throws Exception {
    LoadTestRunner.Options options = new LoadTestRunner.Options();
    new JCommander(options, args);
    LoadTestRunner.run(options, CPSPublisherTask::new);
  }

This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so:

  @Override
  public ListenableFuture<RunResult> doRun() {

We construct the list of messages.

    try {
      List<Message> messages = new ArrayList<>(batchSize);

All published messages should have the same sendTime

      String sendTime = String.valueOf(System.currentTimeMillis());

Contsruct batchSize messages.

      for (int i = 0; i < batchSize; i++) {
        messages.add(
            Message.builder(payload)

We set the sendTime for proper end to end latency reporting

                .addAttribute("sendTime", sendTime)

We set the clientId

                .addAttribute("clientId", id.toString())

The sequence number must atomically increase, and the first message must start at 0.

                .addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))

Publish the messages.

                .build());
      }
      pubSub.publish(topic, messages);

Return a ListenableFuture. If the underlying client library returns a Future, we would return that (or use Futures.transform to adapt the result), but for a synchronous library like this we can return an ImmediateFuture.

      return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));

Do not throw an exception if it can possibly be recovered from, instead report it to the Task.

    } catch (PubSubException e) {
      return Futures.immediateFailedFuture(e);
    }
  }
}

Adding the Task to the Framework

The next step is to add your Task to the framework, so that it can be run using command line flags like the other tasks. There are a couple places you will now need to update in the load test framework. First you should add new command line flags to the Driver. The flag should be named --cps_<client_library_name>_java_publisher_count or --kafka... for a Kafka based Task.

In the run method of Driver, you will need to add a clause near the others to add your type to the Map, something like below except with Gcloud replaced with the name of your client library:

if (cpsGcloudJavaPublisherCount > 0) {
  clientParamsMap.put(
      new ClientParams(ClientType.CPS_GCLOUD_JAVA_PUBLISHER, null), cpsGcloudJavaPublisherCount);
}

You will now need to add it to the enum in Client. This enum is called ClientType. You will also need to add cases for it in isPublisher and isCpsPublisher appropriately. You will also need to add a case to getSubscriberType that returns the type of the subscriber that will receive messages published by your task.

Last, you will need to add it to the spreadsheet output in SheetsService and increment cpsPublisherCount or kafkaPublisherCount accordingly.

Adding a Startup Script

You must also create a startup script. You can copy from here and only need to change the name of the file from 'experimental' to your client library name, and also change the last line from

java -Xmx5000M -cp ${TMP}/driver.jar com.google.pubsub.clients.experimental.CPSPublisherTask

to use your client library class.