Skip to content

Commit

Permalink
[TASK] Add More Logs to diagnose Dali Pig Ramp by HadoopPigJob (azkab…
Browse files Browse the repository at this point in the history
…an#2437)

* [TASK] Add More Logs to diagnose Dali Pig Ramp by HadoopPigJob

* [TASK] Refine logs

* [BUGFIX] Fix Pause Status Setting Problem in Ramp Feature

* [TASK] Fix the AutoRampPolicy Bug
  • Loading branch information
kxu1026 authored Feb 6, 2020
1 parent 236a2af commit 56d0fd2
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 38 deletions.
10 changes: 10 additions & 0 deletions az-core/src/main/java/azkaban/utils/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TimeUtils {

private static final String DATE_TIME_ZONE_PATTERN = "yyyy/MM/dd HH:mm:ss z";
private static final String DATE_TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static int ONE_DAY = 86400;

/**
* Formats the given millisecond instant into a string using the pattern "yyyy/MM/dd HH:mm:ss z"
Expand Down Expand Up @@ -236,4 +237,13 @@ public static String createPeriodString(final ReadablePeriod period) {
public static boolean timeEscapedOver(long referenceTime, int second) {
return ((System.currentTimeMillis() - referenceTime) / 1000F) > (second * 1.0);
}

/**
* Check how many days escaped over
* @param referenceTime reference time
* @return number of days
*/
public static int daysEscapedOver(long referenceTime) {
return Math.round(((System.currentTimeMillis() - referenceTime) / 1000f) / (ONE_DAY * 1.0f) - 0.5f);
}
}
10 changes: 10 additions & 0 deletions az-core/src/test/java/azkaban/utils/TimeUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ public void testTimeEscapedOver() throws InterruptedException {
Assert.assertTrue( TimeUtils.timeEscapedOver(baseTime, 1));
Assert.assertFalse(TimeUtils.timeEscapedOver(baseTime, 2));
}

@Test
public void testDayEscapedOver() throws InterruptedException {
long baseTime = System.currentTimeMillis();
long oneDayBefore = baseTime - 86399000;
TimeUnit.SECONDS.sleep(3);
Assert.assertEquals( TimeUtils.daysEscapedOver(baseTime), 0);
Assert.assertEquals(TimeUtils.daysEscapedOver(oneDayBefore), 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,29 @@ protected String getMainArguments() {
File srcFile = new File(getScriptAbsolutePath());
File dstFile = new File(getRunnableScriptAbsolutePath());
Path dstPath = Paths.get(getRunnableScriptDir());
getLog().info(String.format("[Ramp Modification Start] [srcFile = %s, dstFile = %s, dstPath = %s]",
srcFile.toPath().toAbsolutePath().toString(),
dstFile.toPath().toAbsolutePath().toString(),
dstPath.toString()));
if (!Files.exists(dstPath)) {
Files.createDirectories(dstPath);
getLog().info(String.format("[Ramp Modification Destination Directory Created. %s]",
dstPath.toAbsolutePath().toString()));
}
dstFile.createNewFile();
getLog().info(String.format("[Ramp Modification Destination File Created. %s]",
dstFile.toPath().toAbsolutePath().toString()));
getLog().info(String.format("[Ramp Modify Script File] : old = %s, new = %s",
srcFile.getAbsolutePath(), dstFile.getAbsolutePath()));
copyAndModifyScript(srcFile, dstFile, rampRegisterItems);
getLog().info(String.format("[Ramp Modification End] [dstFile = %s]",
dstFile.toPath().toAbsolutePath().toString()));
list.add(getRunnableScript());
} catch (IOException e) {
e.printStackTrace();
getLog().error(e);
getLog().info("[Ramp cannot successfully modify the script, Failover to the baseline.]");
list.add(getScript());
}
list.add(getRunnableScript());
}

return StringUtils.join((Collection<String>) list, " ");
Expand Down
47 changes: 33 additions & 14 deletions azkaban-common/src/main/java/azkaban/executor/ExecutableRamp.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public State refresh(State source) {
this.numOfFailure = source.numOfFailure;
this.numOfIgnored = source.numOfFailure;

this.isPaused = source.isPaused ? source.isPaused : this.isPaused;
this.isPaused = source.isPaused;
this.rampStage = (source.rampStage > this.rampStage) ? source.rampStage : this.rampStage;
this.isActive = source.isActive;

Expand Down Expand Up @@ -344,7 +344,12 @@ public ExecutableRamp setState(State state) {

public boolean isActive() {
long diff = this.getState().startTime - System.currentTimeMillis();
return this.getState().isActive && (!this.getState().isPaused) && (diff < 0);
boolean isActive = this.getState().isActive && (!this.getState().isPaused) && (diff < 0);
if (!isActive) {
LOGGER.info("[Ramp Is Isolated] (isActive = {}, isPause = {}, timeDiff = {}",
this.getState().isActive, this.getState().isPaused, diff);
}
return isActive;
}

synchronized public void cacheResult(Action action) {
Expand All @@ -363,13 +368,15 @@ synchronized public void cacheResult(Action action) {
this.state.lastUpdatedTime = System.currentTimeMillis();

// verify the failure threshold
int trails = this.state.numOfTrail + this.state.cachedNumOfTrail;
int fails = this.state.numOfFailure + this.state.cachedNumOfFailure;
int failure = this.metadata.isPercentageScaleForMaxFailure
?
(int) (((this.state.numOfFailure + this.state.cachedNumOfFailure) * 100.0)
/ ((this.state.numOfTrail + this.state.cachedNumOfTrail) * 1.0))
: (this.state.numOfFailure + this.state.cachedNumOfFailure);
? (trails == 0)
? 100
: (int) ((fails * 100.0) / (trails * 1.0))
: fails;

LOGGER.info(String.format("Cache Ramp Result : [id = %s, action: %s, %s failure: %d, numOfTrail (%d, %d), numOfSuccess: (%d, %d), numOfFailure: (%d, %d), numOfIgnore: (%d, %d)]"
LOGGER.info("[Ramp Cached Result] (id = {}, action: {}, {} failure: {}, numOfTrail ({}, {}), numOfSuccess: ({}, {}), numOfFailure: ({}, {}), numOfIgnore: ({}, {}))"
, this.id
, action.name()
, this.metadata.isPercentageScaleForMaxFailure ? "Percentage" : " "
Expand All @@ -382,14 +389,26 @@ synchronized public void cacheResult(Action action) {
, this.state.cachedNumOfFailure
, this.state.numOfIgnored
, this.state.cachedNumOfIgnored
));
if (failure > this.metadata.maxFailureToRampDown) {
LOGGER.warn(String.format("Failure over the threshold to Ramp Down [id = %s, failure = %d, threshold = %d]", this.id, failure, this.metadata.maxFailureToRampDown));
if (this.state.rampStage > 0) {
this.state.rampStage--;
);

if (this.metadata.maxFailureToRampDown != 0) {
if (failure > this.metadata.maxFailureToRampDown) {
if (this.state.rampStage > 0) {
int currentStage = this.state.rampStage;
this.state.rampStage--;
int futureStage = this.state.rampStage;
LOGGER.warn("[RAMP DOWN] (rampId = {}, failure = {}, threshold = {}, from stage {} to stage {}.)",
this.getId(), failure, this.metadata.maxFailureToRampDown, currentStage, futureStage);
}
}
}

if (this.metadata.maxFailureToPause != 0) {
if (failure > this.metadata.maxFailureToPause) {
this.state.setPaused(true);
LOGGER.info("[RAMP STOP] (rampId = {}, failure = {}, threshold = {}, timestamp = {})",
this.getId(), failure, this.metadata.maxFailureToPause, System.currentTimeMillis());
}
} else if (failure > this.metadata.maxFailureToPause) {
LOGGER.warn(String.format("Failure over the threshold to Pause the Ramp [id = %s, failure = %d, threshold = %d]", this.id, failure, this.metadata.maxFailureToRampDown));
}

this.getState().markChanged();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import azkaban.utils.Props;
import azkaban.utils.TimeUtils;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -31,11 +33,15 @@
* stage 5: 100%
*/
public class SimpleAutoRampPolicy extends SimpleRampPolicy {
private static int ONE_DAY = 86400;
private static final int MAX_RAMP_STAGE = 5;
private static final ImmutableList<Integer> RAMP_STAGE_RESCALE_TABLE = ImmutableList.<Integer>builder()
.add(5, 25, 50, 75)
.build();
private static final ImmutableList<Integer> AUTO_RAMP_INTERVAL_TABLE = ImmutableList.<Integer>builder()
.add(1, 2, 3, 4)
.build();

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleAutoRampPolicy.class);


public SimpleAutoRampPolicy(Props sysProps, Props privateProps) {
Expand All @@ -60,15 +66,34 @@ protected int getRampStage(ExecutableFlow flow) {
}

@Override
protected void preprocess(ExecutableRamp executableRamp) { // TODO VERIFY AUTO RAMP MECHANISM
if (TimeUtils.timeEscapedOver(executableRamp.getState().getLastUpdatedTime(), ONE_DAY)) {
int rampStage = executableRamp.getState().getRampStage();
if (rampStage <= getMaxRampStage()) {
executableRamp.getState().setRampStage(rampStage + 1);
executableRamp.getState().setLastUpdatedTime(System.currentTimeMillis());
} else {
executableRamp.getState().setEndTime(System.currentTimeMillis());
protected void preprocess(ExecutableRamp executableRamp) {
int escapedDays = TimeUtils.daysEscapedOver(executableRamp.getState().getStartTime());
int rampStage = executableRamp.getState().getRampStage();
int maxStage = getMaxRampStage();

if (rampStage == 0) {
// The ramp is still not stated yet. Auto Ramp should not be triggered.
return;
}

try {
if (escapedDays >= AUTO_RAMP_INTERVAL_TABLE.get(rampStage - 1)) {
if (rampStage < maxStage) {
// Ramp up
int newStage = rampStage + 1;
long timeStamp = System.currentTimeMillis();
executableRamp.getState().setRampStage(newStage);
executableRamp.getState().setLastUpdatedTime(timeStamp);
if (newStage == maxStage) {
executableRamp.getState().setEndTime(timeStamp);
}
LOGGER.info("[AUTO RAMP UP] (rampId = {}, current Stage = {}, new Stage = {}, timeStamp = {}",
executableRamp.getId(), rampStage, newStage, timeStamp);
}
}
} catch (Exception e) {
LOGGER.error("[AUTO RAMP ERROR] (rampId = {}, ramStage = {}, message = {}",
executableRamp.getId(), rampStage, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,20 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
for (ExecutableRamp executableRamp : executableRampMap.getActivatedAll()) {
try {
String rampId = executableRamp.getId();
LOGGER.info("[Ramp Check] (rampId = {}, rampStage = {}, executionId = {}, flowName = {}, RampPercentageId = {})",
rampId,
executableRamp.getState().getRampStage(),
executableFlow.getExecutionId(),
flowName,
executableFlow.getRampPercentageId()
);

// get Base Props
Props baseProps = new Props();
baseProps.putAll(executableRampDependencyMap.getDefaultValues(executableRampItemsMap.getDependencies(rampId)));

ExecutableRampStatus status = executableRampExceptionalFlowItemsMap.check(rampId, flowName);
LOGGER.info("[Ramp Status] (Status = {}, flowName = {})", status.name(), flowName);
switch (status) {
case BLACKLISTED: // blacklist
executableFlowRampMetadata.setRampProps(
Expand All @@ -402,7 +410,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.BLACKLISTED.name()
)
);
LOGGER.info("Ramp Flow As BlackListed Item. [rampid = {}, flowName = {}]", rampId, flowName);
LOGGER.info("[Ramp BlackListed]. [rampId = {}, flowName = {}]", rampId, flowName);
break;

case WHITELISTED: // whitelist
Expand All @@ -414,7 +422,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.WHITELISTED.name()
)
);
LOGGER.info("Ramp Flow As WhiteListed Item. [rampid = {}, flowName = {}]", rampId, flowName);
LOGGER.info("[Ramp WhiteListed]. [rampId = {}, flowName = {}]", rampId, flowName);
break;

case SELECTED: // selected
Expand All @@ -426,7 +434,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.SELECTED.name()
)
);
LOGGER.info("Ramp Flow As Selected Item. [rampid = {}, flowName = {}]", rampId, flowName);
LOGGER.info("[Ramp Selected]. [rampId = {}, flowName = {}]", rampId, flowName);
break;

case UNSELECTED: // selected
Expand All @@ -438,8 +446,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.UNSELECTED.name()
)
);
LOGGER.info("Ramp Flow As Unselected Item. [rampid = {}, flowName = {}]",
rampId, flowName);
LOGGER.info("[Ramp Unselected]. [rampId = {}, flowName = {}]", rampId, flowName);
break;

case EXCLUDED:
Expand All @@ -451,12 +458,18 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.EXCLUDED.name()
)
);
LOGGER.info("Ramp Flow As Excluded Item. [rampid = {}, flowName = {}]",
rampId, flowName);
LOGGER.info("[Ramp Excluded]. [rampId = {}, flowName = {}]", rampId, flowName);
break;

default:
RampPolicy rampPolicy = rampPolicyManager.buildRampPolicyExecutor(executableRamp.getPolicy(), globalProps);
LOGGER.info ("[Ramp Policy Selecting]. [policy = {}, rampId = {}, flowName = {}, executionId = {}, RampPercentageId = {}]",
rampPolicy.getClass().getName(),
rampId,
flowName,
executableFlow.getExecutionId(),
executableFlow.getRampPercentageId()
);
if (rampPolicy.check(executableFlow, executableRamp)) {
// Ramp Enabled
executableFlowRampMetadata.setRampProps(
Expand All @@ -467,8 +480,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.SELECTED.name()
)
);
LOGGER.info("Undetermined Ramp Flow is selected for Ramping. [rampid = {}, flowName = {}]",
rampId, flowName);
LOGGER.info("[Ramp Policy Selected]. [rampId = {}, flowName = {}]", rampId, flowName);
} else {
executableFlowRampMetadata.setRampProps(
rampId,
Expand All @@ -478,8 +490,7 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
ExecutableRampStatus.UNSELECTED.name()
)
);
LOGGER.info("Undetermined Ramp Flow is not selected for Ramping. [rampid = {}, flowName = {}]",
rampId, flowName);
LOGGER.info("[Ramp Policy Unselected]. [rampId = {}, flowName = {}]", rampId, flowName);
}
break;
}
Expand Down Expand Up @@ -522,6 +533,10 @@ synchronized public void configure(ExecutableFlow executableFlow, File flowDirec
private void moveFiles(File sourceDir, File destinationDir, String regExpression) {
try {
FileIOUtils.moveFiles(sourceDir, destinationDir, regExpression);
LOGGER.info("Success to move files from {} to {} with REGEXP {}",
sourceDir.getAbsolutePath(),
destinationDir.getAbsolutePath(),
regExpression);
} catch (IOException e) {
LOGGER.error(
String.format("Fail to move files from %s to %s with REGEXP %s",
Expand Down Expand Up @@ -575,8 +590,12 @@ synchronized public void logFlowAction(FlowRunner flowRunner, Action action) {
if (!Action.SUCCEEDED.equals(action)) {
String rampId = executableRamp.getId();
String flowName = flowRunner.getExecutableFlow().getFlowName();
LOGGER.warn("Flow will be excluded from ramping. [rampId = {}, flow = {}, action = {}]",
rampId, rampId, action.name());
LOGGER.warn("[Ramp Exclude Flow]. [executionId = {}, rampId = {}, flowName = {}, action = {}]",
flowRunner.getExecutableFlow().getExecutionId(),
rampId,
flowName,
action.name()
);
executableRampExceptionalFlowItemsMap.add(rampId, flowName, ExecutableRampStatus.EXCLUDED,
System.currentTimeMillis(), true);
}
Expand Down

0 comments on commit 56d0fd2

Please sign in to comment.