-
Notifications
You must be signed in to change notification settings - Fork 146
Adding a New Java Client Library: Publisher
There are four main steps here:
- Add your client library to pom.xml
- Write a Task that exercises the publish path.
- Add the Task to the main Framework
- Write a startup script to run your Task on GCE
We will assume that you have taken care of the first step, and cover the three remaining steps in detail below.
The main goal of writing your Task is to execute a single Publish operation. We will next describe where to place your client, and walk you through a preexisting Task so that you can see all of the design choices we made, but the basic idea is to extend the Task class, and implement the doRun method to time a single Publish call using your client library.
Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.
Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:
The class must extend from Task.
class CPSPublisherTask extends Task {
The constructor should take the start request, pass it to Task, setting publish ack latency as the metric to report, and use any other information to initialize. Replace "gcloud" with the name of your client library.
private CPSPublisherTask(StartRequest request) {
super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);
We now initialize the client library and store some state for later use.
this.pubSub = PubSubOptions.builder()
.projectId(request.getProject())
.build().service();
this.topic = Preconditions.checkNotNull(request.getTopic());
The payload should be of the length specified by StartRequest message_size field. LoadTestRunner.createMessage is a helper function that will create a String of the specified length.
this.payload = LoadTestRunner.createMessage(request.getMessageSize());
this.batchSize = request.getPublishBatchSize();
We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.
this.id = (new Random()).nextInt();
}
Every new client library task must have a main function like this:
public static void main(String[] args) throws Exception {
LoadTestRunner.Options options = new LoadTestRunner.Options();
new JCommander(options, args);
LoadTestRunner.run(options, CPSPublisherTask::new);
}
This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so:
@Override
public ListenableFuture<RunResult> doRun() {
We construct the list of messages.
try {
List<Message> messages = new ArrayList<>(batchSize);
All published messages should have the same sendTime
String sendTime = String.valueOf(System.currentTimeMillis());
Contsruct batchSize messages.
for (int i = 0; i < batchSize; i++) {
messages.add(
Message.builder(payload)
We set the sendTime for proper end to end latency reporting
.addAttribute("sendTime", sendTime)
We set the clientId
.addAttribute("clientId", id.toString())
The sequence number must atomically increase, and the first message must start at 0.
.addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))
Publish the messages.
.build());
}
pubSub.publish(topic, messages);
Return a ListenableFuture. If the underlying client library returns a Future, we would return that (or use Futures.transform to adapt the result), but for a synchronous library like this we can return an ImmediateFuture.
return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));
Do not throw an exception if it can possibly be recovered from, instead report it to the Task.
} catch (PubSubException e) {
return Futures.immediateFailedFuture(e);
}
}
}
The next step is to add your Task to the framework, so that it can be run using command line flags like the other tasks. There are a couple places you will now need to update in the load test framework. First you should add new command line flags to the Driver. The flag should be named --cps_<client_library_name>_java_publisher_count
or --kafka...
for a Kafka based Task.
In the run
method of Driver, you will need to add a clause near the others to add your type to the Map, something like below except with Gcloud
replaced with the name of your client library:
if (cpsGcloudJavaPublisherCount > 0) {
clientParamsMap.put(
new ClientParams(ClientType.CPS_GCLOUD_JAVA_PUBLISHER, null), cpsGcloudJavaPublisherCount);
}
You will now need to add it to the enum in Client. This enum is called ClientType
. You will also need to add cases for it in isPublisher
and isCpsPublisher
appropriately. You will also need to add a case to getSubscriberType
that returns the type of the subscriber that will receive messages published by your task.
Last, you will need to add it to the spreadsheet output in SheetsService and increment cpsPublisherCount
or kafkaPublisherCount
accordingly.
You must also create a startup script. You can copy from here and only need to change the name of the file from 'experimental' to your client library name, and also change the last line from
java -Xmx5000M -cp ${TMP}/driver.jar com.google.pubsub.clients.experimental.CPSPublisherTask
to use your client library class.