Skip to content

Commit

Permalink
renamed TaskRunner to Operator
Browse files Browse the repository at this point in the history
  • Loading branch information
frsyuki committed Feb 19, 2016
1 parent 495610a commit 9295ba9
Show file tree
Hide file tree
Showing 28 changed files with 203 additions and 204 deletions.
20 changes: 10 additions & 10 deletions digdag-cli/src/main/java/io/digdag/cli/Run.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.digdag.core.session.StoredTask;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.TaskStateCode;
import io.digdag.core.agent.TaskRunnerManager;
import io.digdag.core.agent.OperatorManager;
import io.digdag.core.agent.TaskCallbackApi;
import io.digdag.core.agent.SetThreadName;
import io.digdag.core.agent.ConfigEvalEngine;
Expand All @@ -50,7 +50,7 @@
import io.digdag.core.config.ConfigLoaderManager;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.TaskRunnerFactory;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.ScheduleTime;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
Expand Down Expand Up @@ -143,8 +143,8 @@ public SystemExitException usage(String error)
System.err.println(" -s, --status DIR use this directory to read and write session status");
System.err.println(" -p, --param KEY=VALUE overwrite a parameter (use multiple times to set many parameters)");
System.err.println(" -P, --params-file PATH.yml read parameters from a YAML file");
System.err.println(" -d, --dry-run don't run tasks. show tasks only");
System.err.println(" -e, --show-params show task parameters");
System.err.println(" -d, --dry-run dry-run mode doesn't execute tasks");
System.err.println(" -e, --show-params show task parameters before running a task");
System.err.println(" -t, --session-time \"yyyy-MM-dd[ HH:mm:ss]\" set session_time to this time");
//System.err.println(" -g, --graph OUTPUT.png visualize a task and exit");
//System.err.println(" -d, --dry-run dry run mode");
Expand All @@ -161,10 +161,10 @@ public void run(String taskNamePattern) throws Exception
.addModules(binder -> {
binder.bind(ResumeStateManager.class).in(Scopes.SINGLETON);
binder.bind(YamlMapper.class).in(Scopes.SINGLETON); // used by ResumeStateManager
binder.bind(Run.class).toInstance(this); // used by TaskRunnerManagerWithSkip
binder.bind(Run.class).toInstance(this); // used by OperatorManagerWithSkip
})
.overrideModules((list) -> ImmutableList.of(Modules.override(list).with((binder) -> {
binder.bind(TaskRunnerManager.class).to(TaskRunnerManagerWithSkip.class).in(Scopes.SINGLETON);
binder.bind(OperatorManager.class).to(OperatorManagerWithSkip.class).in(Scopes.SINGLETON);
})))
.initialize()
.getInjector();
Expand Down Expand Up @@ -307,19 +307,19 @@ else if (sessionStatusPath == null) {

private Function<String, TaskResult> skipTaskReports = (fullName) -> null;

public static class TaskRunnerManagerWithSkip
extends TaskRunnerManager
public static class OperatorManagerWithSkip
extends OperatorManager
{
private final ConfigFactory cf;
private final Run cmd;
private final YamlMapper yamlMapper;

@Inject
public TaskRunnerManagerWithSkip(
public OperatorManagerWithSkip(
AgentConfig config, AgentId agentId,
TaskCallbackApi callback, ArchiveManager archiveManager,
ConfigLoaderManager configLoader, WorkflowCompiler compiler, ConfigFactory cf,
ConfigEvalEngine evalEngine, Set<TaskRunnerFactory> factories,
ConfigEvalEngine evalEngine, Set<OperatorFactory> factories,
Run cmd, YamlMapper yamlMapper)
{
super(config, agentId, callback, archiveManager, configLoader, compiler, cf, evalEngine, factories);
Expand Down
12 changes: 6 additions & 6 deletions digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.digdag.spi.TaskRunnerFactory;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.TemplateEngine;
import io.digdag.spi.TaskQueueFactory;
import io.digdag.core.agent.AgentId;
import io.digdag.core.agent.AgentIdProvider;
import io.digdag.core.agent.AgentConfig;
import io.digdag.core.agent.AgentConfigProvider;
import io.digdag.core.agent.LocalAgentManager;
import io.digdag.core.agent.TaskRunnerManager;
import io.digdag.core.agent.OperatorManager;
import io.digdag.core.agent.ConfigEvalEngine;
import io.digdag.core.agent.TaskCallbackApi;
import io.digdag.core.agent.InProcessTaskCallbackApi;
import io.digdag.core.agent.RequireTaskRunnerFactory;
import io.digdag.core.agent.RequireOperatorFactory;
import io.digdag.core.agent.ArchiveManager;
import io.digdag.core.agent.CurrentDirectoryArchiveManager;
import io.digdag.core.queue.TaskQueueManager;
Expand Down Expand Up @@ -133,7 +133,7 @@ private static List<Module> standardModules(ConfigElement systemConfig)
binder.bind(LocalSite.class).in(Scopes.SINGLETON);
binder.bind(ArchiveManager.class).to(CurrentDirectoryArchiveManager.class).in(Scopes.SINGLETON);
binder.bind(TaskCallbackApi.class).to(InProcessTaskCallbackApi.class).in(Scopes.SINGLETON);
binder.bind(TaskRunnerManager.class).in(Scopes.SINGLETON);
binder.bind(OperatorManager.class).in(Scopes.SINGLETON);
binder.bind(ConfigEvalEngine.class).in(Scopes.SINGLETON);
binder.bind(TaskQueueManager.class).in(Scopes.SINGLETON);
binder.bind(ScheduleExecutor.class).in(Scopes.SINGLETON);
Expand All @@ -146,8 +146,8 @@ private static List<Module> standardModules(ConfigElement systemConfig)
Multibinder<TaskQueueFactory> taskQueueBinder = Multibinder.newSetBinder(binder, TaskQueueFactory.class);
taskQueueBinder.addBinding().to(DatabaseTaskQueueFactory.class).in(Scopes.SINGLETON);

Multibinder<TaskRunnerFactory> taskExecutorBinder = Multibinder.newSetBinder(binder, TaskRunnerFactory.class);
taskExecutorBinder.addBinding().to(RequireTaskRunnerFactory.class).in(Scopes.SINGLETON);
Multibinder<OperatorFactory> taskExecutorBinder = Multibinder.newSetBinder(binder, OperatorFactory.class);
taskExecutorBinder.addBinding().to(RequireOperatorFactory.class).in(Scopes.SINGLETON);
},
new ExtensionServiceLoaderModule()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ public class LocalAgent
private final AgentConfig config;
private final AgentId agentId;
private final TaskQueueClient queue;
private final TaskRunnerManager runner;
private final OperatorManager runner;
private final ExecutorService executor;
private volatile boolean stop = false;

public LocalAgent(AgentConfig config, AgentId agentId,
TaskQueueClient queue, TaskRunnerManager runner)
TaskQueueClient queue, OperatorManager runner)
{
this.agentId = agentId;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ public class LocalAgentManager
private final AgentConfig config;
private final AgentId agentId;
private final TaskQueueManager queueManager;
private final TaskRunnerManager taskRunnerManager;
private final OperatorManager operatorManager;
private final ExecutorService executor;

@Inject
public LocalAgentManager(
AgentConfig config,
AgentId agentId,
TaskQueueManager queueManager,
TaskRunnerManager taskRunnerManager)
OperatorManager operatorManager)
{
this.config = config;
this.agentId = agentId;
this.queueManager = queueManager;
this.taskRunnerManager = taskRunnerManager;
this.operatorManager = operatorManager;
this.executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
Expand All @@ -43,7 +43,7 @@ public void startLocalAgent(int siteId, String queueName)
config,
agentId,
queueManager.getInProcessTaskQueueClient(siteId),
taskRunnerManager
operatorManager
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import io.digdag.core.repository.WorkflowDefinition;
import io.digdag.spi.*;

public class TaskRunnerManager
public class OperatorManager
{
private static Logger logger = LoggerFactory.getLogger(TaskRunnerManager.class);
private static Logger logger = LoggerFactory.getLogger(OperatorManager.class);

protected final AgentConfig config;
protected final AgentId agentId;
Expand All @@ -45,16 +45,16 @@ public class TaskRunnerManager
private final WorkflowCompiler compiler;
private final ConfigFactory cf;
private final ConfigEvalEngine evalEngine;
private final Map<String, TaskRunnerFactory> executorTypes;
private final Map<String, OperatorFactory> executorTypes;

private final ScheduledExecutorService heartbeatScheduler;
private final ConcurrentHashMap<Long, String> lockIdMap = new ConcurrentHashMap<>();

@Inject
public TaskRunnerManager(AgentConfig config, AgentId agentId,
public OperatorManager(AgentConfig config, AgentId agentId,
TaskCallbackApi callback, ArchiveManager archiveManager,
ConfigLoaderManager configLoader, WorkflowCompiler compiler, ConfigFactory cf,
ConfigEvalEngine evalEngine, Set<TaskRunnerFactory> factories)
ConfigEvalEngine evalEngine, Set<OperatorFactory> factories)
{
this.config = config;
this.agentId = agentId;
Expand All @@ -65,8 +65,8 @@ public TaskRunnerManager(AgentConfig config, AgentId agentId,
this.cf = cf;
this.evalEngine = evalEngine;

ImmutableMap.Builder<String, TaskRunnerFactory> builder = ImmutableMap.builder();
for (TaskRunnerFactory factory : factories) {
ImmutableMap.Builder<String, OperatorFactory> builder = ImmutableMap.builder();
for (OperatorFactory factory : factories) {
builder.put(factory.getType(), factory);
}
this.executorTypes = builder.build();
Expand Down Expand Up @@ -205,11 +205,11 @@ private void runWithArchive(Path archivePath, TaskRequest request, Config nextSt

protected TaskResult callExecutor(Path archivePath, String type, TaskRequest mergedRequest)
{
TaskRunnerFactory factory = executorTypes.get(type);
OperatorFactory factory = executorTypes.get(type);
if (factory == null) {
throw new ConfigException("Unknown task type: " + type);
}
TaskRunner executor = factory.newTaskExecutor(archivePath, mergedRequest);
Operator executor = factory.newTaskExecutor(archivePath, mergedRequest);

return executor.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.TaskReport;
import io.digdag.spi.TaskRunner;
import io.digdag.spi.TaskRunnerFactory;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.TaskExecutionException;
import io.digdag.core.agent.RetryControl;
import io.digdag.core.session.SessionStateFlags;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigFactory;

public class RequireTaskRunnerFactory
implements TaskRunnerFactory
public class RequireOperatorFactory
implements OperatorFactory
{
private static Logger logger = LoggerFactory.getLogger(RequireTaskRunnerFactory.class);
private static Logger logger = LoggerFactory.getLogger(RequireOperatorFactory.class);

private final TaskCallbackApi callback;

@Inject
public RequireTaskRunnerFactory(TaskCallbackApi callback)
public RequireOperatorFactory(TaskCallbackApi callback)
{
this.callback = callback;
}
Expand All @@ -41,19 +41,19 @@ public String getType()
}

@Override
public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request)
public Operator newTaskExecutor(Path archivePath, TaskRequest request)
{
return new RequireTaskRunner(callback, request);
return new RequireOperator(callback, request);
}

private class RequireTaskRunner
implements TaskRunner
private class RequireOperator
implements Operator
{
private final TaskCallbackApi callback;
private final TaskRequest request;
private ConfigFactory cf;

public RequireTaskRunner(TaskCallbackApi callback, TaskRequest request)
public RequireOperator(TaskCallbackApi callback, TaskRequest request)
{
this.callback = callback;
this.request = request;
Expand All @@ -69,7 +69,7 @@ public TaskResult run()
isDone = runTask();
}
catch (RuntimeException ex) {
Config error = TaskRunnerManager.makeExceptionError(request.getConfig().getFactory(), ex);
Config error = OperatorManager.makeExceptionError(request.getConfig().getFactory(), ex);
boolean doRetry = retry.evaluate();
if (doRetry) {
throw new TaskExecutionException(ex, error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.inject.Inject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.digdag.core.agent.RetryControl;
import io.digdag.core.agent.TaskRunnerManager;
import io.digdag.core.agent.OperatorManager;
import io.digdag.core.agent.AgentId;
import io.digdag.core.session.*;
import io.digdag.spi.TaskRequest;
Expand Down Expand Up @@ -745,10 +745,10 @@ private void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId
params.setAll(attempt.getParams());
collectParams(params, task, attempt);

// create TaskRequest for TaskRunnerManager.
// TaskRunnerManager will ignore localConfig because it reloads config from dagfile_path with using the lates params.
// create TaskRequest for OperatorManager.
// OperatorManager will ignore localConfig because it reloads config from dagfile_path with using the lates params.
// TaskRequest.config usually stores params merged with local config. but here passes only params (local config is not merged)
// so that TaskRunnerManager can build it using the reloaded local config.
// so that OperatorManager can build it using the reloaded local config.
TaskRequest request = TaskRequest.builder()
.siteId(attempt.getSiteId())
.repositoryId(attempt.getSession().getRepositoryId())
Expand Down Expand Up @@ -796,7 +796,7 @@ private void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId
logger.error("Enqueue error, making this task failed: {}", task, ex);
// TODO retry here?
return taskFailed(lockedTask,
TaskRunnerManager.makeExceptionError(cf, ex));
OperatorManager.makeExceptionError(cf, ex));
}
}).or(false);
}
Expand Down
2 changes: 1 addition & 1 deletion digdag-docs/src/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ Next steps

* `Scheduling workflow <scheduling_workflow.html>`_
* `Detailed documents about workflow definition <workflow_definition.html>`_
* `More choices of task types <task_types.html>`_
* `More choices of operators <operators.html>`_

2 changes: 1 addition & 1 deletion digdag-docs/src/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Table of Contents
workflow_definition.rst
scheduling_workflow.rst
running_tasks_on_docker.rst
task_types.rst
operators.rst
mastering_sessions.rst
reciipes.rst
command_reference.rst
Expand Down
Loading

0 comments on commit 9295ba9

Please sign in to comment.