diff --git a/az-core/src/main/java/azkaban/utils/TimeUtils.java b/az-core/src/main/java/azkaban/utils/TimeUtils.java index 43522fa9d5..a991b783e9 100644 --- a/az-core/src/main/java/azkaban/utils/TimeUtils.java +++ b/az-core/src/main/java/azkaban/utils/TimeUtils.java @@ -37,6 +37,7 @@ public class TimeUtils { private static final String DATE_TIME_ZONE_PATTERN = "yyyy/MM/dd HH:mm:ss z"; private static final String DATE_TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + private static int ONE_DAY = 86400; /** * Formats the given millisecond instant into a string using the pattern "yyyy/MM/dd HH:mm:ss z" @@ -236,4 +237,13 @@ public static String createPeriodString(final ReadablePeriod period) { public static boolean timeEscapedOver(long referenceTime, int second) { return ((System.currentTimeMillis() - referenceTime) / 1000F) > (second * 1.0); } + + /** + * Check how many days escaped over + * @param referenceTime reference time + * @return number of days + */ + public static int daysEscapedOver(long referenceTime) { + return Math.round(((System.currentTimeMillis() - referenceTime) / 1000f) / (ONE_DAY * 1.0f) - 0.5f); + } } diff --git a/az-core/src/test/java/azkaban/utils/TimeUtilsTest.java b/az-core/src/test/java/azkaban/utils/TimeUtilsTest.java index 597813cdaa..8c683cb041 100644 --- a/az-core/src/test/java/azkaban/utils/TimeUtilsTest.java +++ b/az-core/src/test/java/azkaban/utils/TimeUtilsTest.java @@ -29,4 +29,14 @@ public void testTimeEscapedOver() throws InterruptedException { Assert.assertTrue( TimeUtils.timeEscapedOver(baseTime, 1)); Assert.assertFalse(TimeUtils.timeEscapedOver(baseTime, 2)); } + + @Test + public void testDayEscapedOver() throws InterruptedException { + long baseTime = System.currentTimeMillis(); + long oneDayBefore = baseTime - 86399000; + TimeUnit.SECONDS.sleep(3); + Assert.assertEquals( TimeUtils.daysEscapedOver(baseTime), 0); + Assert.assertEquals(TimeUtils.daysEscapedOver(oneDayBefore), 1); + } + } diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java index 8ab02dcbb3..7c53b0036b 100644 --- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java +++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java @@ -170,17 +170,29 @@ protected String getMainArguments() { File srcFile = new File(getScriptAbsolutePath()); File dstFile = new File(getRunnableScriptAbsolutePath()); Path dstPath = Paths.get(getRunnableScriptDir()); + getLog().info(String.format("[Ramp Modification Start] [srcFile = %s, dstFile = %s, dstPath = %s]", + srcFile.toPath().toAbsolutePath().toString(), + dstFile.toPath().toAbsolutePath().toString(), + dstPath.toString())); if (!Files.exists(dstPath)) { Files.createDirectories(dstPath); + getLog().info(String.format("[Ramp Modification Destination Directory Created. %s]", + dstPath.toAbsolutePath().toString())); } dstFile.createNewFile(); + getLog().info(String.format("[Ramp Modification Destination File Created. %s]", + dstFile.toPath().toAbsolutePath().toString())); getLog().info(String.format("[Ramp Modify Script File] : old = %s, new = %s", srcFile.getAbsolutePath(), dstFile.getAbsolutePath())); copyAndModifyScript(srcFile, dstFile, rampRegisterItems); + getLog().info(String.format("[Ramp Modification End] [dstFile = %s]", + dstFile.toPath().toAbsolutePath().toString())); + list.add(getRunnableScript()); } catch (IOException e) { - e.printStackTrace(); + getLog().error(e); + getLog().info("[Ramp cannot successfully modify the script, Failover to the baseline.]"); + list.add(getScript()); } - list.add(getRunnableScript()); } return StringUtils.join((Collection) list, " "); diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableRamp.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableRamp.java index ff29083555..4b07719199 100644 --- a/azkaban-common/src/main/java/azkaban/executor/ExecutableRamp.java +++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableRamp.java @@ -215,7 +215,7 @@ public State refresh(State source) { this.numOfFailure = source.numOfFailure; this.numOfIgnored = source.numOfFailure; - this.isPaused = source.isPaused ? source.isPaused : this.isPaused; + this.isPaused = source.isPaused; this.rampStage = (source.rampStage > this.rampStage) ? source.rampStage : this.rampStage; this.isActive = source.isActive; @@ -344,7 +344,12 @@ public ExecutableRamp setState(State state) { public boolean isActive() { long diff = this.getState().startTime - System.currentTimeMillis(); - return this.getState().isActive && (!this.getState().isPaused) && (diff < 0); + boolean isActive = this.getState().isActive && (!this.getState().isPaused) && (diff < 0); + if (!isActive) { + LOGGER.info("[Ramp Is Isolated] (isActive = {}, isPause = {}, timeDiff = {}", + this.getState().isActive, this.getState().isPaused, diff); + } + return isActive; } synchronized public void cacheResult(Action action) { @@ -363,13 +368,15 @@ synchronized public void cacheResult(Action action) { this.state.lastUpdatedTime = System.currentTimeMillis(); // verify the failure threshold + int trails = this.state.numOfTrail + this.state.cachedNumOfTrail; + int fails = this.state.numOfFailure + this.state.cachedNumOfFailure; int failure = this.metadata.isPercentageScaleForMaxFailure - ? - (int) (((this.state.numOfFailure + this.state.cachedNumOfFailure) * 100.0) - / ((this.state.numOfTrail + this.state.cachedNumOfTrail) * 1.0)) - : (this.state.numOfFailure + this.state.cachedNumOfFailure); + ? (trails == 0) + ? 100 + : (int) ((fails * 100.0) / (trails * 1.0)) + : fails; - LOGGER.info(String.format("Cache Ramp Result : [id = %s, action: %s, %s failure: %d, numOfTrail (%d, %d), numOfSuccess: (%d, %d), numOfFailure: (%d, %d), numOfIgnore: (%d, %d)]" + LOGGER.info("[Ramp Cached Result] (id = {}, action: {}, {} failure: {}, numOfTrail ({}, {}), numOfSuccess: ({}, {}), numOfFailure: ({}, {}), numOfIgnore: ({}, {}))" , this.id , action.name() , this.metadata.isPercentageScaleForMaxFailure ? "Percentage" : " " @@ -382,14 +389,26 @@ synchronized public void cacheResult(Action action) { , this.state.cachedNumOfFailure , this.state.numOfIgnored , this.state.cachedNumOfIgnored - )); - if (failure > this.metadata.maxFailureToRampDown) { - LOGGER.warn(String.format("Failure over the threshold to Ramp Down [id = %s, failure = %d, threshold = %d]", this.id, failure, this.metadata.maxFailureToRampDown)); - if (this.state.rampStage > 0) { - this.state.rampStage--; + ); + + if (this.metadata.maxFailureToRampDown != 0) { + if (failure > this.metadata.maxFailureToRampDown) { + if (this.state.rampStage > 0) { + int currentStage = this.state.rampStage; + this.state.rampStage--; + int futureStage = this.state.rampStage; + LOGGER.warn("[RAMP DOWN] (rampId = {}, failure = {}, threshold = {}, from stage {} to stage {}.)", + this.getId(), failure, this.metadata.maxFailureToRampDown, currentStage, futureStage); + } + } + } + + if (this.metadata.maxFailureToPause != 0) { + if (failure > this.metadata.maxFailureToPause) { + this.state.setPaused(true); + LOGGER.info("[RAMP STOP] (rampId = {}, failure = {}, threshold = {}, timestamp = {})", + this.getId(), failure, this.metadata.maxFailureToPause, System.currentTimeMillis()); } - } else if (failure > this.metadata.maxFailureToPause) { - LOGGER.warn(String.format("Failure over the threshold to Pause the Ramp [id = %s, failure = %d, threshold = %d]", this.id, failure, this.metadata.maxFailureToRampDown)); } this.getState().markChanged(); diff --git a/azkaban-common/src/main/java/azkaban/ramppolicy/SimpleAutoRampPolicy.java b/azkaban-common/src/main/java/azkaban/ramppolicy/SimpleAutoRampPolicy.java index f59d798517..13d98991a1 100644 --- a/azkaban-common/src/main/java/azkaban/ramppolicy/SimpleAutoRampPolicy.java +++ b/azkaban-common/src/main/java/azkaban/ramppolicy/SimpleAutoRampPolicy.java @@ -20,6 +20,8 @@ import azkaban.utils.Props; import azkaban.utils.TimeUtils; import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -31,11 +33,15 @@ * stage 5: 100% */ public class SimpleAutoRampPolicy extends SimpleRampPolicy { - private static int ONE_DAY = 86400; private static final int MAX_RAMP_STAGE = 5; private static final ImmutableList RAMP_STAGE_RESCALE_TABLE = ImmutableList.builder() .add(5, 25, 50, 75) .build(); + private static final ImmutableList AUTO_RAMP_INTERVAL_TABLE = ImmutableList.builder() + .add(1, 2, 3, 4) + .build(); + + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleAutoRampPolicy.class); public SimpleAutoRampPolicy(Props sysProps, Props privateProps) { @@ -60,15 +66,34 @@ protected int getRampStage(ExecutableFlow flow) { } @Override - protected void preprocess(ExecutableRamp executableRamp) { // TODO VERIFY AUTO RAMP MECHANISM - if (TimeUtils.timeEscapedOver(executableRamp.getState().getLastUpdatedTime(), ONE_DAY)) { - int rampStage = executableRamp.getState().getRampStage(); - if (rampStage <= getMaxRampStage()) { - executableRamp.getState().setRampStage(rampStage + 1); - executableRamp.getState().setLastUpdatedTime(System.currentTimeMillis()); - } else { - executableRamp.getState().setEndTime(System.currentTimeMillis()); + protected void preprocess(ExecutableRamp executableRamp) { + int escapedDays = TimeUtils.daysEscapedOver(executableRamp.getState().getStartTime()); + int rampStage = executableRamp.getState().getRampStage(); + int maxStage = getMaxRampStage(); + + if (rampStage == 0) { + // The ramp is still not stated yet. Auto Ramp should not be triggered. + return; + } + + try { + if (escapedDays >= AUTO_RAMP_INTERVAL_TABLE.get(rampStage - 1)) { + if (rampStage < maxStage) { + // Ramp up + int newStage = rampStage + 1; + long timeStamp = System.currentTimeMillis(); + executableRamp.getState().setRampStage(newStage); + executableRamp.getState().setLastUpdatedTime(timeStamp); + if (newStage == maxStage) { + executableRamp.getState().setEndTime(timeStamp); + } + LOGGER.info("[AUTO RAMP UP] (rampId = {}, current Stage = {}, new Stage = {}, timeStamp = {}", + executableRamp.getId(), rampStage, newStage, timeStamp); + } } + } catch (Exception e) { + LOGGER.error("[AUTO RAMP ERROR] (rampId = {}, ramStage = {}, message = {}", + executableRamp.getId(), rampStage, e.getMessage()); } } } diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRampManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRampManager.java index 14f839f7f8..d80be8dab8 100644 --- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRampManager.java +++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRampManager.java @@ -386,12 +386,20 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec for (ExecutableRamp executableRamp : executableRampMap.getActivatedAll()) { try { String rampId = executableRamp.getId(); + LOGGER.info("[Ramp Check] (rampId = {}, rampStage = {}, executionId = {}, flowName = {}, RampPercentageId = {})", + rampId, + executableRamp.getState().getRampStage(), + executableFlow.getExecutionId(), + flowName, + executableFlow.getRampPercentageId() + ); // get Base Props Props baseProps = new Props(); baseProps.putAll(executableRampDependencyMap.getDefaultValues(executableRampItemsMap.getDependencies(rampId))); ExecutableRampStatus status = executableRampExceptionalFlowItemsMap.check(rampId, flowName); + LOGGER.info("[Ramp Status] (Status = {}, flowName = {})", status.name(), flowName); switch (status) { case BLACKLISTED: // blacklist executableFlowRampMetadata.setRampProps( @@ -402,7 +410,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.BLACKLISTED.name() ) ); - LOGGER.info("Ramp Flow As BlackListed Item. [rampid = {}, flowName = {}]", rampId, flowName); + LOGGER.info("[Ramp BlackListed]. [rampId = {}, flowName = {}]", rampId, flowName); break; case WHITELISTED: // whitelist @@ -414,7 +422,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.WHITELISTED.name() ) ); - LOGGER.info("Ramp Flow As WhiteListed Item. [rampid = {}, flowName = {}]", rampId, flowName); + LOGGER.info("[Ramp WhiteListed]. [rampId = {}, flowName = {}]", rampId, flowName); break; case SELECTED: // selected @@ -426,7 +434,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.SELECTED.name() ) ); - LOGGER.info("Ramp Flow As Selected Item. [rampid = {}, flowName = {}]", rampId, flowName); + LOGGER.info("[Ramp Selected]. [rampId = {}, flowName = {}]", rampId, flowName); break; case UNSELECTED: // selected @@ -438,8 +446,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.UNSELECTED.name() ) ); - LOGGER.info("Ramp Flow As Unselected Item. [rampid = {}, flowName = {}]", - rampId, flowName); + LOGGER.info("[Ramp Unselected]. [rampId = {}, flowName = {}]", rampId, flowName); break; case EXCLUDED: @@ -451,12 +458,18 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.EXCLUDED.name() ) ); - LOGGER.info("Ramp Flow As Excluded Item. [rampid = {}, flowName = {}]", - rampId, flowName); + LOGGER.info("[Ramp Excluded]. [rampId = {}, flowName = {}]", rampId, flowName); break; default: RampPolicy rampPolicy = rampPolicyManager.buildRampPolicyExecutor(executableRamp.getPolicy(), globalProps); + LOGGER.info ("[Ramp Policy Selecting]. [policy = {}, rampId = {}, flowName = {}, executionId = {}, RampPercentageId = {}]", + rampPolicy.getClass().getName(), + rampId, + flowName, + executableFlow.getExecutionId(), + executableFlow.getRampPercentageId() + ); if (rampPolicy.check(executableFlow, executableRamp)) { // Ramp Enabled executableFlowRampMetadata.setRampProps( @@ -467,8 +480,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.SELECTED.name() ) ); - LOGGER.info("Undetermined Ramp Flow is selected for Ramping. [rampid = {}, flowName = {}]", - rampId, flowName); + LOGGER.info("[Ramp Policy Selected]. [rampId = {}, flowName = {}]", rampId, flowName); } else { executableFlowRampMetadata.setRampProps( rampId, @@ -478,8 +490,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec ExecutableRampStatus.UNSELECTED.name() ) ); - LOGGER.info("Undetermined Ramp Flow is not selected for Ramping. [rampid = {}, flowName = {}]", - rampId, flowName); + LOGGER.info("[Ramp Policy Unselected]. [rampId = {}, flowName = {}]", rampId, flowName); } break; } @@ -522,6 +533,10 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec private void moveFiles(File sourceDir, File destinationDir, String regExpression) { try { FileIOUtils.moveFiles(sourceDir, destinationDir, regExpression); + LOGGER.info("Success to move files from {} to {} with REGEXP {}", + sourceDir.getAbsolutePath(), + destinationDir.getAbsolutePath(), + regExpression); } catch (IOException e) { LOGGER.error( String.format("Fail to move files from %s to %s with REGEXP %s", @@ -575,8 +590,12 @@ synchronized public void logFlowAction(FlowRunner flowRunner, Action action) { if (!Action.SUCCEEDED.equals(action)) { String rampId = executableRamp.getId(); String flowName = flowRunner.getExecutableFlow().getFlowName(); - LOGGER.warn("Flow will be excluded from ramping. [rampId = {}, flow = {}, action = {}]", - rampId, rampId, action.name()); + LOGGER.warn("[Ramp Exclude Flow]. [executionId = {}, rampId = {}, flowName = {}, action = {}]", + flowRunner.getExecutableFlow().getExecutionId(), + rampId, + flowName, + action.name() + ); executableRampExceptionalFlowItemsMap.add(rampId, flowName, ExecutableRampStatus.EXCLUDED, System.currentTimeMillis(), true); }