-
Notifications
You must be signed in to change notification settings - Fork 146
Adding a New Java Client Library: Publisher
Each client library should get its own directory under java/com/google/pubsub/clients/. If adding a new client library for Cloud Pub/Sub, it should be named CPSPublisherTask, and KafkaPublisherTask for a Kafka client library.
Let us then look through an simplified and annotated version of com.google.pubsub.clients.gcloud.CPSPublisherTask:
class CPSPublisherTask extends Task {
The class must extend from Task.
private CPSPublisherTask(StartRequest request) {
super(request, "gcloud", MetricsHandler.MetricName.PUBLISH_ACK_LATENCY);
The constructor should take the start request, pass it to Task, and use any other information to initialize.
this.pubSub = PubSubOptions.builder()
.projectId(request.getProject())
.build().service();
this.topic = Preconditions.checkNotNull(request.getTopic());
this.payload = LoadTestRunner.createMessage(request.getMessageSize());
this.batchSize = request.getPublishBatchSize();
We now initialize the client library and store some state for later use.
this.id = (new Random()).nextInt();
}
We must create a random id for use in deduplication and completeness checking. There is no restriction on this id other than two clients must not share the same one, so using a random id is sufficient since we do not expect to have more than O(10s) of clients.
public static void main(String[] args) throws Exception {
LoadTestRunner.Options options = new LoadTestRunner.Options();
new JCommander(options, args);
LoadTestRunner.run(options, CPSPublisherTask::new);
}
Every new client library task must have a main function like this.
@Override
public ListenableFuture<RunResult> doRun() {
This is the core of any task. doRun should publish batchSize messages, and report the latency it took to do so.
try {
List<Message> messages = new ArrayList<>(batchSize);
We construct the list of messages.
String sendTime = String.valueOf(System.currentTimeMillis());
All published messages should have the same sendTime
for (int i = 0; i < batchSize; i++) {
messages.add(
Message.builder(payload)
Contsruct batchSize messages.
.addAttribute("sendTime", sendTime)
We set the sendTime for proper end to end latency reporting
.addAttribute("clientId", id.toString())
We set the clientId
.addAttribute("sequenceNumber", Integer.toString(sequenceNumber.getAndIncrement()))
The sequence number must atomically increase, and the first message must start at 0.
.build());
}
pubSub.publish(topic, messages);
Publish the messages.
return Futures.immediateFuture(RunResult.fromBatchSize(batchSize));
Return a listenable future. If this client library returned futures, we could use Futures.transform to do this, but for a simple library like this we can return an immediate future.
} catch (PubSubException e) {
return Futures.immediateFailedFuture(e);
Do not throw an exception if it can possibly be recovered from, instead report it to the Task.
}
}
}