From f9e093541899e43a2c3d25cb46dcdac9213dbcea Mon Sep 17 00:00:00 2001 From: Masoud Saeida Ardekani Date: Wed, 11 Oct 2023 15:11:01 -0700 Subject: [PATCH] Emit zero for flow-control launching metric if there is no inflight launching pipeline when appfabric starts --- .../ProgramNotificationSubscriberService.java | 14 +++++++++++--- .../app/services/RunRecordMonitorService.java | 4 ++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java index 8abbda45ff48..c54fea1d1c64 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java @@ -89,6 +89,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import javax.annotation.Nullable; import org.apache.twill.internal.CompositeService; @@ -278,6 +279,7 @@ protected void doStartUp() throws Exception { RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX); long startTs = System.currentTimeMillis(); + AtomicBoolean launching = new AtomicBoolean(false); Retries.runWithRetries( () -> store.scanActiveRuns( @@ -288,8 +290,10 @@ protected void doStartUp() throws Exception { } try { if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) { + launching.set(true); runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); } else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) { + launching.set(true); runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); // It is unknown what is the state of program runs in STARTING state. // A STARTING message is published again to retry STARTING logic. @@ -314,6 +318,10 @@ protected void doStartUp() throws Exception { }), retryStrategy, e -> true); + if (!launching.get()) { + // there is no launching pipeline + runRecordMonitorService.emitLaunchingMetrics(0); + } } @Nullable @@ -867,7 +875,7 @@ private void processWorkflowOnStop( } } - /** write to heart beat table if the recordedRunRecord is not null */ + /** write to heart beat table if the recordedRunRecord is not null. */ private void writeToHeartBeatTable( @Nullable RunRecordDetail recordedRunRecord, long timestampInSeconds, @@ -1024,7 +1032,7 @@ private void publishRecordedStatus( /** * Helper method to extract the time from the given properties map, or return -1 if no value was - * found + * found. * * @param properties the properties map * @param option the key to lookup in the properties map @@ -1057,7 +1065,7 @@ private BasicThrowable decodeBasicThrowable(@Nullable String encoded) { /** * Emit the metrics context for the program, the tags are constructed with the program run id and - * the profile id + * the profile id. */ private void emitProfileMetrics( ProgramRunId programRunId, diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java index 6fd3a23bfc70..aed722ebe583 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java @@ -188,6 +188,10 @@ public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) } } + public void emitLaunchingMetrics(long value) { + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value); + } + private void emitMetrics(String metricName, long value) { metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); }