-
Notifications
You must be signed in to change notification settings - Fork 146
Adding a New Java Client Library: Publisher
Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.
Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:
class CPSPublisherTask extends Task {
The class must extend from Task.
private CPSPublisherTask(StartRequest request) {
super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);
The constructor should take the start request, pass it to Task, setting publish ack latency as the metric to report, and use any other information to initialize. Replace "gcloud" with the name of your client library.
this.pubSub = PubSubOptions.builder()
.projectId(request.getProject())
.build().service();
this.topic = Preconditions.checkNotNull(request.getTopic());
this.payload = LoadTestRunner.createMessage(request.getMessageSize());
this.batchSize = request.getPublishBatchSize();
We now initialize the client library and store some state for later use.
this.id = (new Random()).nextInt();
}
We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.
public static void main(String[] args) throws Exception {
LoadTestRunner.Options options = new LoadTestRunner.Options();
new JCommander(options, args);
LoadTestRunner.run(options, CPSPublisherTask::new);
}
Every new client library task must have a main function like this.
@Override
public ListenableFuture<RunResult> doRun() {
This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so.
try {
List<Message> messages = new ArrayList<>(batchSize);
We construct the list of messages.
String sendTime = String.valueOf(System.currentTimeMillis());
All published messages should have the same sendTime
for (int i = 0; i < batchSize; i++) {
messages.add(
Message.builder(payload)
Contsruct batchSize messages.
.addAttribute("sendTime", sendTime)
We set the sendTime for proper end to end latency reporting
.addAttribute("clientId", id.toString())
We set the clientId
.addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))
The sequence number must atomically increase, and the first message must start at 0.
.build());
}
pubSub.publish(topic, messages);
Publish the messages.
return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));
Return a listenable future. If this client library returned futures, we could use Futures.transform to do this, but for a simple library like this we can return an immediate future.
} catch (PubSubException e) {
return Futures.immediateFailedFuture(e);
Do not throw an exception if it can possibly be recovered from, instead report it to the Task.
}
}
}
There are a couple places you will now need to update in the load test framework. First you should add new command line flags to the Driver. The flag should be named --cps_<client_library_name>_java_publisher_count
.
In the run
method of Driver, you will need to add a clause near the others to add your type to the Map, something like below except with Gcloud
replaced with the name of your client library:
if (cpsGcloudJavaPublisherCount > 0) {
clientParamsMap.put(
new ClientParams(ClientType.CPS_GCLOUD_JAVA_PUBLISHER, null), cpsGcloudJavaPublisherCount);
}
You will now need to add it to the enum in Client. This enum is called ClientType
. You will also need to add cases for it in isPublisher
and isCpsPublisher
appropriately. You will also need to add a case to getSubscriberType
that returns the type of the subscriber you would like to receive messages that your task publishes.
Last you will need to add it to the spreadsheet output in SheetsService and increment cpsPublisherCount
or kafkaPublisherCount
accordingly.