Skip to content

Commit

Permalink
feat: improve dataplane transfer process logs (#4743)
Browse files Browse the repository at this point in the history
* Improve dataplane transfer process logs.

* Changes from PR.
  • Loading branch information
bmg13 authored Jan 20, 2025
1 parent a6e662a commit 446ff65
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public CompletableFuture<StreamResult<Object>> transfer(DataFlowStartMessage req

var source = sourceFactory.createSource(request);
sources.put(request.getProcessId(), source);
monitor.debug(() -> format("Transferring from %s to %s.", request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType()));
monitor.debug(() -> format("Transferring from %s to %s for flow id: %s.",
request.getProcessId(), request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType()));
return sink.transfer(source)
.thenApply(result -> {
terminate(request.getProcessId());
Expand Down Expand Up @@ -177,12 +178,14 @@ private DataSinkFactory getSinkFactory(DataFlowStartMessage request) {

@NotNull
private CompletableFuture<StreamResult<Object>> noSourceFactory(DataFlowStartMessage request) {
return completedFuture(StreamResult.error("Unknown data source type: " + request.getSourceDataAddress().getType()));
return completedFuture(StreamResult.error("Unknown data source type %s for flow id: %s.".formatted(
request.getProcessId(), request.getSourceDataAddress().getType())));
}

@NotNull
private CompletableFuture<StreamResult<Object>> noSinkFactory(DataFlowStartMessage request) {
return completedFuture(StreamResult.error("Unknown data sink type: " + request.getDestinationDataAddress().getType()));
return completedFuture(StreamResult.error("Unknown data sink type %s for flow id: %s.".formatted(
request.getProcessId(), request.getDestinationDataAddress().getType())));
}


Expand Down

0 comments on commit 446ff65

Please sign in to comment.