Skip to content

Adding a New Java Client Library: Publisher

Max Dietz edited this page Dec 15, 2016 · 10 revisions

There are four main steps here:

  1. Add your client library to pom.xml
  2. Write a Task that exercises the publish path.
  3. Add the Task to the main Framework
  4. Write a startup script to run your Task on GCE

We will assume that you have taken care of the first step, and cover the three remaining steps in detail below.

Writing a new Task

The main goal of writing your Task is to execute a single Publish operation. We will next describe where to place your client, and walk you through a preexisting Task so that you can see all of the design choices we made, but the basic idea is to extend the Task class, and implement the doRun method to time a single Publish call using your client library.

Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.

Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:

class CPSPublisherTask extends Task {

The class must extend from Task.

  private CPSPublisherTask(StartRequest request) {
    super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);

The constructor should take the start request, pass it to Task, setting publish ack latency as the metric to report, and use any other information to initialize. Replace "gcloud" with the name of your client library.

    this.pubSub = PubSubOptions.builder()
        .projectId(request.getProject())
        .build().service();
    this.topic = Preconditions.checkNotNull(request.getTopic());
    this.payload = LoadTestRunner.createMessage(request.getMessageSize());
    this.batchSize = request.getPublishBatchSize();

We now initialize the client library and store some state for later use.

    this.id = (new Random()).nextInt();
  }

We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.

  public static void main(String[] args) throws Exception {
    LoadTestRunner.Options options = new LoadTestRunner.Options();
    new JCommander(options, args);
    LoadTestRunner.run(options, CPSPublisherTask::new);
  }

Every new client library task must have a main function like this.

  @Override
  public ListenableFuture<RunResult> doRun() {

This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so.

    try {
      List<Message> messages = new ArrayList<>(batchSize);

We construct the list of messages.

      String sendTime = String.valueOf(System.currentTimeMillis());

All published messages should have the same sendTime

      for (int i = 0; i < batchSize; i++) {
        messages.add(
            Message.builder(payload)

Contsruct batchSize messages.

                .addAttribute("sendTime", sendTime)

We set the sendTime for proper end to end latency reporting

                .addAttribute("clientId", id.toString())

We set the clientId

                .addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))

The sequence number must atomically increase, and the first message must start at 0.

                .build());
      }
      pubSub.publish(topic, messages);

Publish the messages.

      return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));

Return a listenable future. If this client library returned futures, we could use Futures.transform to do this, but for a simple library like this we can return an immediate future.

    } catch (PubSubException e) {
      return Futures.immediateFailedFuture(e);

Do not throw an exception if it can possibly be recovered from, instead report it to the Task.

    }
  }
}

Adding the Task to the Framework

The next step is to add your Task to the framework, so that it can be run using command line flags like the other tasks. There are a couple places you will now need to update in the load test framework. First you should add new command line flags to the Driver. The flag should be named --cps_<client_library_name>_java_publisher_count.

In the run method of Driver, you will need to add a clause near the others to add your type to the Map, something like below except with Gcloud replaced with the name of your client library:

if (cpsGcloudJavaPublisherCount > 0) {
  clientParamsMap.put(
      new ClientParams(ClientType.CPS_GCLOUD_JAVA_PUBLISHER, null), cpsGcloudJavaPublisherCount);
}

You will now need to add it to the enum in Client. This enum is called ClientType. You will also need to add cases for it in isPublisher and isCpsPublisher appropriately. You will also need to add a case to getSubscriberType that returns the type of the subscriber you would like to receive messages that your task publishes.

Last you will need to add it to the spreadsheet output in SheetsService and increment cpsPublisherCount or kafkaPublisherCount accordingly.

Adding a Startup Script

You must also create a startup script. You can copy from here and only need to change the name of the file from 'experimental' to your client library name, and also change the last line from

java -Xmx5000M -cp ${TMP}/driver.jar com.google.pubsub.clients.experimental.CPSPublisherTask

to use your client library class.

Clone this wiki locally