Skip to content

Commit

Permalink
feat: Added function and property canHandleNegativeCoresRequest
Browse files Browse the repository at this point in the history
doc: added some comments
doc: Added some documentation
doc: fix docstrings and parameters
doc: added debug message
doc: explain why we allow negative value
doc: update debug message for rqd
  • Loading branch information
KernAttila committed Aug 26, 2023
1 parent 0eea95f commit 264a756
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 90 deletions.
41 changes: 25 additions & 16 deletions cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,42 @@ public String getAllocationId() {
public String getFacilityId() {
return facilityId;
}
public boolean canHandleNegativeCoresRequirement(int minCores) {
if (minCores > 0) {
logger.debug(getName() + " can handle the job with " + minCores + " cores.");

public boolean canHandleNegativeCoresRequest(int requestedCores) {
// Request is positive, no need to test further.
if (requestedCores > 0) {
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
// All cores are available, validate the request.
if (cores == idleCores) {
logger.debug(getName() + " can handle the job with " + minCores + " cores.");
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
return true;
}
logger.debug(getName() + " cannot handle the job with " + minCores + " cores.");
// Some or all cores are busy, avoid booking again.
logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores.");
return false;
}

public int handleNegativeCoresRequirement(int minCores) {
// Do not process positive requests
logger.debug("requested minCores:" + minCores);
if (minCores > 0) {
return minCores;
public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
// If request is negative but cores are already used, return 0
if (minCores <=0 && idleCores < cores) {
if (requestedCores <=0 && idleCores < cores) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
int requestedCores = idleCores + minCores;
logger.debug("Requested core number is " + minCores + " <= 0, " +
"matching up to max number with difference " + idleCores + " > " + requestedCores);
return requestedCores;
// Book all cores minus the request
int totalCores = idleCores + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCores + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
Expand Down
26 changes: 16 additions & 10 deletions cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,25 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu
this.maxGpuMemory = maxGpuMemory;
}

public int handleNegativeCoresRequirement(int minCores) {
// Do not process positive requests
if (minCores > 0) {
return minCores;
public int handleNegativeCoresRequirement(int requestedCores) {
// If we request a <=0 amount of cores, return positive core count.

if (requestedCores > 0) {
// Do not process positive core requests.
logger.debug("Requested " + requestedCores + " cores.");
return requestedCores;
}
// If request is negative but cores are already used, return 0
if (minCores <=0 && idleCoreUnits < threads) {
if (requestedCores <=0 && idleCoreUnits < threads) {
// If request is negative but cores are already used, return 0.
// We don't want to overbook the host.
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
return 0;
}
int requestedCores = idleCoreUnits + minCores;
logger.debug("Requested core number is " + minCores + " <= 0, " +
"matching up to max number with difference " + idleCoreUnits + " > " + requestedCores);
return requestedCores;
// Book all cores minus the request
int totalCores = idleCoreUnits + requestedCores;
logger.debug("Requested " + requestedCores + " cores <= 0, " +
idleCoreUnits + " cores are free, booking " + totalCores + " cores");
return totalCores;
}

@Override
Expand Down
19 changes: 3 additions & 16 deletions cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class VirtualProc extends FrameEntity implements ProcInterface {
public String os;
public byte[] childProcesses;

public boolean canLaunch;
public boolean canHandleNegativeCoresRequest;
public int coresReserved;
public long memoryReserved;
public long memoryUsed;
Expand Down Expand Up @@ -100,7 +100,6 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame) {
proc.unbooked = false;
proc.isLocalDispatch = host.isLocalDispatch;

// proc.canLaunch = host.canHandleNegativeCoresRequirement(frame.minCores);
proc.coresReserved = frame.minCores;
proc.memoryReserved = frame.minMemory;
proc.gpusReserved = frame.minGpus;
Expand All @@ -115,18 +114,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame) {
*/

if (host.strandedCores > 0) {
logger.debug("host.strandedCores > 0 : " + host.strandedCores);
proc.coresReserved = proc.coresReserved + host.strandedCores;
}

proc.canLaunch = host.canHandleNegativeCoresRequirement(proc.coresReserved);
proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);

if (proc.coresReserved == 0) {
logger.debug("Reserving all cores");
proc.coresReserved = host.cores;
}
else if (proc.coresReserved < 0) {
logger.debug("Reserving all cores " + proc.coresReserved);
logger.debug("Reserving all cores minus " + proc.coresReserved);
proc.coresReserved = host.cores + proc.coresReserved;
}
else if (proc.coresReserved >= 100) {
Expand All @@ -148,15 +146,13 @@ else if (proc.coresReserved >= 100) {
// CueUtil.isDayTime()) {
if (host.threadMode == ThreadMode.ALL_VALUE) {
proc.coresReserved = wholeCores * 100;
logger.debug("host.threadMode == ThreadMode.ALL_VALUE : proc.coresReserved=" + proc.coresReserved);
} else {
if (frame.threadable) {
if (host.idleMemory - frame.minMemory
<= Dispatcher.MEM_STRANDED_THRESHHOLD) {
proc.coresReserved = wholeCores * 100;
} else {
proc.coresReserved = getCoreSpan(host, frame.minMemory);
logger.debug("proc.coresReserved = getCoreSpan(host, frame.minMemory):" + proc.coresReserved);
}

if (host.threadMode == ThreadMode.VARIABLE_VALUE
Expand All @@ -183,28 +179,24 @@ else if (proc.coresReserved >= 100) {
* original.
*/
if (proc.coresReserved < originalCores) {
logger.debug("proc.coresReserved < originalCores: " + proc.coresReserved + " < " + originalCores);
proc.coresReserved = originalCores;
}

/*
* Check to ensure we haven't exceeded max cores.
*/
if (frame.maxCores > 0 && proc.coresReserved >= frame.maxCores) {
logger.debug("frame.maxCores > 0 && proc.coresReserved >= frame.maxCores");
proc.coresReserved = frame.maxCores;
}

if (proc.coresReserved > host.idleCores) {
logger.debug("proc.coresReserved > host.idleCores");
if (host.threadMode == ThreadMode.VARIABLE_VALUE
&& frame.threadable && wholeCores == 1) {
throw new JobDispatchException(
"Do not allow threadable frame running one core on a ThreadMode.Variable host.");
}
proc.coresReserved = wholeCores * 100;
}
logger.debug("finally, proc.coresReserved = " + proc.coresReserved);
}

/*
Expand Down Expand Up @@ -265,19 +257,14 @@ public static final VirtualProc build(DispatchHost host,
*/
public static int getCoreSpan(DispatchHost host, long minMemory) {
int totalCores = (int) (Math.floor(host.cores / 100.0));
logger.debug("getCoreSpan() -> totalCores = " + totalCores);
int idleCores = (int) (Math.floor(host.idleCores / 100.0));
logger.debug("getCoreSpan() -> idleCores = " + idleCores);
if (idleCores < 1) {
return 100;
}

long memPerCore = host.idleMemory / totalCores;
logger.debug("getCoreSpan() -> memPerCore = " + memPerCore);
double procs = minMemory / (double) memPerCore;
logger.debug("getCoreSpan() -> procs = " + procs);
int reserveCores = (int) (Math.round(procs)) * 100;
logger.debug("getCoreSpan() -> reserveCores = " + reserveCores);

return reserveCores;
}
Expand Down
29 changes: 16 additions & 13 deletions cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public interface LayerDao {
public List<LayerDetail> getLayerDetails(JobInterface job);

/**
* Returns true if supplied layer is compelte.
* Returns true if supplied layer is complete.
*
* @param layer
* @return boolean
Expand All @@ -82,7 +82,7 @@ public interface LayerDao {
void insertLayerDetail(LayerDetail l);

/**
* gets a layer detail from an object that implments layer
* gets a layer detail from an object that implements layer
*
* @param layer
* @return LayerDetail
Expand Down Expand Up @@ -167,7 +167,7 @@ public interface LayerDao {
void updateLayerTags(LayerInterface layer, Set<String> tags);

/**
* Insert a key/valye pair into the layer environment
* Insert a key/value pair into the layer environment
*
* @param layer
* @param key
Expand Down Expand Up @@ -282,7 +282,7 @@ public interface LayerDao {

/**
* Update all layers of the set type in the specified job
* with the new max cores requirement.
* with the new min cores requirement.
*
* @param job
* @param cores
Expand All @@ -292,7 +292,7 @@ public interface LayerDao {

/**
* Update all layers of the set type in the specified job
* with the new min cores requirement.
* with the new min gpu requirement.
*
* @param job
* @param gpus
Expand All @@ -304,17 +304,16 @@ public interface LayerDao {
* Update a layer's max cores value, which limits how
* much threading can go on.
*
* @param job
* @param cores
* @param type
* @param layer
* @param threadable
*/
void updateThreadable(LayerInterface layer, boolean threadable);

/**
* Update a layer's timeout value, which limits how
* much the frame can run on a host.
*
* @param job
* @param layer
* @param timeout
*/
void updateTimeout(LayerInterface layer, int timeout);
Expand All @@ -323,8 +322,8 @@ public interface LayerDao {
* Update a layer's LLU timeout value, which limits how
* much the frame can run on a host without updates in the log file.
*
* @param job
* @param timeout
* @param layer
* @param timeout_llu
*/
void updateTimeoutLLU(LayerInterface layer, int timeout_llu);

Expand All @@ -341,7 +340,7 @@ public interface LayerDao {

/**
* Appends a tag to the current set of tags. If the tag
* already exists than nothing happens.
* already exists then nothing happens.
*
* @param layer
* @param val
Expand All @@ -363,8 +362,9 @@ public interface LayerDao {
* Update layer usage with processor time usage.
* This happens when the proc has completed or failed some work.
*
* @param proc
* @param layer
* @param newState
* @param exitStatus
*/
void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus);

Expand All @@ -387,6 +387,9 @@ public interface LayerDao {

/**
* Enable/disable memory optimizer.
*
* @param layer
* @param state
*/
void enableMemoryOptimizer(LayerInterface layer, boolean state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,23 +258,17 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {
host.getName() + " " + host.idleCores + "/" + host.idleMemory +
" on job " + job.getName());

logger.debug("Frames summary before dispatch:");
for (DispatchFrame frame: frames) {
logger.debug("frame.minCores: " + frame.minCores + ", frame.command: " + frame.command);
}
for (DispatchFrame frame: frames) {

VirtualProc proc = VirtualProc.build(host, frame);
if (frame.minCores <= 0 && !proc.canLaunch) {
if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) {
logger.debug("Cannot dispatch job, host is busy.");
break;
}
if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) ||
host.idleMemory < frame.minMemory ||
host.idleGpus < frame.minGpus ||
host.idleGpuMemory < frame.minGpuMemory) {
logger.debug("Cannot dispatch, host.idleCores < host.handleNegativeCoresRequirement(frame.minCores)");
logger.debug(host.idleCores + " < " + host.handleNegativeCoresRequirement(frame.minCores) + " : frame.minCores");
logger.debug("Cannot dispatch, insufficient resources.");
break;
}

Expand All @@ -290,7 +284,8 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {

boolean success = new DispatchFrameTemplate(proc, job, frame, false) {
public void wrapDispatchFrame() {
logger.debug("Dispatching frame with minCores: " + frame.minCores + " on proc with coresReserved= " + proc.coresReserved);
logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " +
proc.coresReserved + " coresReserved");
dispatch(frame, proc);
dispatchSummary(proc, frame, "Booking");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ public JobDetail createJob(BuildableJob buildableJob) {
}

if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
logger.debug("layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN");
logger.debug(layer.minimumCores + " < " +Dispatcher.CORE_POINTS_RESERVED_MIN);
// layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN;
layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN;
}

logger.info("creating layer " + layer.name + " range: " + layer.range);
Expand Down
Loading

0 comments on commit 264a756

Please sign in to comment.