From 9295ba906005c43955e0f4b3ca72cbc0d6ad4c96 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Fri, 19 Feb 2016 13:49:30 -0800 Subject: [PATCH] renamed TaskRunner to Operator --- .../src/main/java/io/digdag/cli/Run.java | 20 +++++------ .../main/java/io/digdag/core/DigdagEmbed.java | 12 +++---- .../java/io/digdag/core/agent/LocalAgent.java | 4 +-- .../digdag/core/agent/LocalAgentManager.java | 8 ++--- ...unnerManager.java => OperatorManager.java} | 18 +++++----- ...ctory.java => RequireOperatorFactory.java} | 24 ++++++------- .../core/workflow/WorkflowExecutor.java | 10 +++--- digdag-docs/src/getting_started.rst | 2 +- digdag-docs/src/index.rst | 2 +- .../src/{task_types.rst => operators.rst} | 20 +++++------ digdag-docs/src/workflow_definition.rst | 4 +-- .../spi/{TaskRunner.java => Operator.java} | 2 +- .../java/io/digdag/spi/OperatorFactory.java | 10 ++++++ .../java/io/digdag/spi/TaskRunnerFactory.java | 10 ------ .../digdag/standards/StandardsExtension.java | 36 +++++++++---------- .../command/DockerCommandExecutor.java | 1 - .../{task => operator}/ArchiveFiles.java | 2 +- .../BaseOperator.java} | 14 ++++---- .../EmbulkOperatorFactory.java} | 24 ++++++------- .../MailOperatorFactory.java} | 24 ++++++------- .../PyOperatorFactory.java} | 26 +++++++------- .../RbOperatorFactory.java} | 26 +++++++------- .../ShOperatorFactory.java} | 24 ++++++------- .../{task => operator}/td/TDOperation.java | 4 +-- .../{task => operator}/td/TDQuery.java | 2 +- .../td/TdDdlOperatorFactory.java} | 26 +++++++------- .../td/TdLoadOperatorFactory.java} | 26 +++++++------- .../td/TdOperatorFactory.java} | 26 +++++++------- 28 files changed, 203 insertions(+), 204 deletions(-) rename digdag-core/src/main/java/io/digdag/core/agent/{TaskRunnerManager.java => OperatorManager.java} (93%) rename digdag-core/src/main/java/io/digdag/core/agent/{RequireTaskRunnerFactory.java => RequireOperatorFactory.java} (82%) rename digdag-docs/src/{task_types.rst => operators.rst} (91%) rename digdag-spi/src/main/java/io/digdag/spi/{TaskRunner.java => Operator.java} (76%) create mode 100644 digdag-spi/src/main/java/io/digdag/spi/OperatorFactory.java delete mode 100644 digdag-spi/src/main/java/io/digdag/spi/TaskRunnerFactory.java rename digdag-standards/src/main/java/io/digdag/standards/{task => operator}/ArchiveFiles.java (98%) rename digdag-standards/src/main/java/io/digdag/standards/{task/BaseTaskRunner.java => operator/BaseOperator.java} (81%) rename digdag-standards/src/main/java/io/digdag/standards/{task/EmbulkTaskRunnerFactory.java => operator/EmbulkOperatorFactory.java} (86%) rename digdag-standards/src/main/java/io/digdag/standards/{task/MailTaskRunnerFactory.java => operator/MailOperatorFactory.java} (89%) rename digdag-standards/src/main/java/io/digdag/standards/{task/PyTaskRunnerFactory.java => operator/PyOperatorFactory.java} (86%) rename digdag-standards/src/main/java/io/digdag/standards/{task/RbTaskRunnerFactory.java => operator/RbOperatorFactory.java} (87%) rename digdag-standards/src/main/java/io/digdag/standards/{task/ShTaskRunnerFactory.java => operator/ShOperatorFactory.java} (85%) rename digdag-standards/src/main/java/io/digdag/standards/{task => operator}/td/TDOperation.java (95%) rename digdag-standards/src/main/java/io/digdag/standards/{task => operator}/td/TDQuery.java (98%) rename digdag-standards/src/main/java/io/digdag/standards/{task/td/TdDdlTaskRunnerFactory.java => operator/td/TdDdlOperatorFactory.java} (76%) rename digdag-standards/src/main/java/io/digdag/standards/{task/td/TdLoadTaskRunnerFactory.java => operator/td/TdLoadOperatorFactory.java} (84%) rename digdag-standards/src/main/java/io/digdag/standards/{task/td/TdTaskRunnerFactory.java => operator/td/TdOperatorFactory.java} (91%) diff --git a/digdag-cli/src/main/java/io/digdag/cli/Run.java b/digdag-cli/src/main/java/io/digdag/cli/Run.java index 7fd975d2f3..31ab978e3f 100644 --- a/digdag-cli/src/main/java/io/digdag/cli/Run.java +++ b/digdag-cli/src/main/java/io/digdag/cli/Run.java @@ -38,7 +38,7 @@ import io.digdag.core.session.StoredTask; import io.digdag.core.session.StoredSessionAttemptWithSession; import io.digdag.core.session.TaskStateCode; -import io.digdag.core.agent.TaskRunnerManager; +import io.digdag.core.agent.OperatorManager; import io.digdag.core.agent.TaskCallbackApi; import io.digdag.core.agent.SetThreadName; import io.digdag.core.agent.ConfigEvalEngine; @@ -50,7 +50,7 @@ import io.digdag.core.config.ConfigLoaderManager; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.ScheduleTime; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigException; @@ -143,8 +143,8 @@ public SystemExitException usage(String error) System.err.println(" -s, --status DIR use this directory to read and write session status"); System.err.println(" -p, --param KEY=VALUE overwrite a parameter (use multiple times to set many parameters)"); System.err.println(" -P, --params-file PATH.yml read parameters from a YAML file"); - System.err.println(" -d, --dry-run don't run tasks. show tasks only"); - System.err.println(" -e, --show-params show task parameters"); + System.err.println(" -d, --dry-run dry-run mode doesn't execute tasks"); + System.err.println(" -e, --show-params show task parameters before running a task"); System.err.println(" -t, --session-time \"yyyy-MM-dd[ HH:mm:ss]\" set session_time to this time"); //System.err.println(" -g, --graph OUTPUT.png visualize a task and exit"); //System.err.println(" -d, --dry-run dry run mode"); @@ -161,10 +161,10 @@ public void run(String taskNamePattern) throws Exception .addModules(binder -> { binder.bind(ResumeStateManager.class).in(Scopes.SINGLETON); binder.bind(YamlMapper.class).in(Scopes.SINGLETON); // used by ResumeStateManager - binder.bind(Run.class).toInstance(this); // used by TaskRunnerManagerWithSkip + binder.bind(Run.class).toInstance(this); // used by OperatorManagerWithSkip }) .overrideModules((list) -> ImmutableList.of(Modules.override(list).with((binder) -> { - binder.bind(TaskRunnerManager.class).to(TaskRunnerManagerWithSkip.class).in(Scopes.SINGLETON); + binder.bind(OperatorManager.class).to(OperatorManagerWithSkip.class).in(Scopes.SINGLETON); }))) .initialize() .getInjector(); @@ -307,19 +307,19 @@ else if (sessionStatusPath == null) { private Function skipTaskReports = (fullName) -> null; - public static class TaskRunnerManagerWithSkip - extends TaskRunnerManager + public static class OperatorManagerWithSkip + extends OperatorManager { private final ConfigFactory cf; private final Run cmd; private final YamlMapper yamlMapper; @Inject - public TaskRunnerManagerWithSkip( + public OperatorManagerWithSkip( AgentConfig config, AgentId agentId, TaskCallbackApi callback, ArchiveManager archiveManager, ConfigLoaderManager configLoader, WorkflowCompiler compiler, ConfigFactory cf, - ConfigEvalEngine evalEngine, Set factories, + ConfigEvalEngine evalEngine, Set factories, Run cmd, YamlMapper yamlMapper) { super(config, agentId, callback, archiveManager, configLoader, compiler, cf, evalEngine, factories); diff --git a/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java b/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java index d4cc5e9925..42b2d96be2 100644 --- a/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java +++ b/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java @@ -10,7 +10,7 @@ import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.multibindings.Multibinder; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.TemplateEngine; import io.digdag.spi.TaskQueueFactory; import io.digdag.core.agent.AgentId; @@ -18,11 +18,11 @@ import io.digdag.core.agent.AgentConfig; import io.digdag.core.agent.AgentConfigProvider; import io.digdag.core.agent.LocalAgentManager; -import io.digdag.core.agent.TaskRunnerManager; +import io.digdag.core.agent.OperatorManager; import io.digdag.core.agent.ConfigEvalEngine; import io.digdag.core.agent.TaskCallbackApi; import io.digdag.core.agent.InProcessTaskCallbackApi; -import io.digdag.core.agent.RequireTaskRunnerFactory; +import io.digdag.core.agent.RequireOperatorFactory; import io.digdag.core.agent.ArchiveManager; import io.digdag.core.agent.CurrentDirectoryArchiveManager; import io.digdag.core.queue.TaskQueueManager; @@ -133,7 +133,7 @@ private static List standardModules(ConfigElement systemConfig) binder.bind(LocalSite.class).in(Scopes.SINGLETON); binder.bind(ArchiveManager.class).to(CurrentDirectoryArchiveManager.class).in(Scopes.SINGLETON); binder.bind(TaskCallbackApi.class).to(InProcessTaskCallbackApi.class).in(Scopes.SINGLETON); - binder.bind(TaskRunnerManager.class).in(Scopes.SINGLETON); + binder.bind(OperatorManager.class).in(Scopes.SINGLETON); binder.bind(ConfigEvalEngine.class).in(Scopes.SINGLETON); binder.bind(TaskQueueManager.class).in(Scopes.SINGLETON); binder.bind(ScheduleExecutor.class).in(Scopes.SINGLETON); @@ -146,8 +146,8 @@ private static List standardModules(ConfigElement systemConfig) Multibinder taskQueueBinder = Multibinder.newSetBinder(binder, TaskQueueFactory.class); taskQueueBinder.addBinding().to(DatabaseTaskQueueFactory.class).in(Scopes.SINGLETON); - Multibinder taskExecutorBinder = Multibinder.newSetBinder(binder, TaskRunnerFactory.class); - taskExecutorBinder.addBinding().to(RequireTaskRunnerFactory.class).in(Scopes.SINGLETON); + Multibinder taskExecutorBinder = Multibinder.newSetBinder(binder, OperatorFactory.class); + taskExecutorBinder.addBinding().to(RequireOperatorFactory.class).in(Scopes.SINGLETON); }, new ExtensionServiceLoaderModule() ); diff --git a/digdag-core/src/main/java/io/digdag/core/agent/LocalAgent.java b/digdag-core/src/main/java/io/digdag/core/agent/LocalAgent.java index 08e43bc1c6..6adbc3deee 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/LocalAgent.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/LocalAgent.java @@ -18,12 +18,12 @@ public class LocalAgent private final AgentConfig config; private final AgentId agentId; private final TaskQueueClient queue; - private final TaskRunnerManager runner; + private final OperatorManager runner; private final ExecutorService executor; private volatile boolean stop = false; public LocalAgent(AgentConfig config, AgentId agentId, - TaskQueueClient queue, TaskRunnerManager runner) + TaskQueueClient queue, OperatorManager runner) { this.agentId = agentId; this.config = config; diff --git a/digdag-core/src/main/java/io/digdag/core/agent/LocalAgentManager.java b/digdag-core/src/main/java/io/digdag/core/agent/LocalAgentManager.java index fd25a2c8aa..77224efb0c 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/LocalAgentManager.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/LocalAgentManager.java @@ -12,7 +12,7 @@ public class LocalAgentManager private final AgentConfig config; private final AgentId agentId; private final TaskQueueManager queueManager; - private final TaskRunnerManager taskRunnerManager; + private final OperatorManager operatorManager; private final ExecutorService executor; @Inject @@ -20,12 +20,12 @@ public LocalAgentManager( AgentConfig config, AgentId agentId, TaskQueueManager queueManager, - TaskRunnerManager taskRunnerManager) + OperatorManager operatorManager) { this.config = config; this.agentId = agentId; this.queueManager = queueManager; - this.taskRunnerManager = taskRunnerManager; + this.operatorManager = operatorManager; this.executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder() .setDaemon(true) @@ -43,7 +43,7 @@ public void startLocalAgent(int siteId, String queueName) config, agentId, queueManager.getInProcessTaskQueueClient(siteId), - taskRunnerManager + operatorManager ) ); } diff --git a/digdag-core/src/main/java/io/digdag/core/agent/TaskRunnerManager.java b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java similarity index 93% rename from digdag-core/src/main/java/io/digdag/core/agent/TaskRunnerManager.java rename to digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java index 6eac1ef4c2..de671998eb 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/TaskRunnerManager.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java @@ -33,9 +33,9 @@ import io.digdag.core.repository.WorkflowDefinition; import io.digdag.spi.*; -public class TaskRunnerManager +public class OperatorManager { - private static Logger logger = LoggerFactory.getLogger(TaskRunnerManager.class); + private static Logger logger = LoggerFactory.getLogger(OperatorManager.class); protected final AgentConfig config; protected final AgentId agentId; @@ -45,16 +45,16 @@ public class TaskRunnerManager private final WorkflowCompiler compiler; private final ConfigFactory cf; private final ConfigEvalEngine evalEngine; - private final Map executorTypes; + private final Map executorTypes; private final ScheduledExecutorService heartbeatScheduler; private final ConcurrentHashMap lockIdMap = new ConcurrentHashMap<>(); @Inject - public TaskRunnerManager(AgentConfig config, AgentId agentId, + public OperatorManager(AgentConfig config, AgentId agentId, TaskCallbackApi callback, ArchiveManager archiveManager, ConfigLoaderManager configLoader, WorkflowCompiler compiler, ConfigFactory cf, - ConfigEvalEngine evalEngine, Set factories) + ConfigEvalEngine evalEngine, Set factories) { this.config = config; this.agentId = agentId; @@ -65,8 +65,8 @@ public TaskRunnerManager(AgentConfig config, AgentId agentId, this.cf = cf; this.evalEngine = evalEngine; - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (TaskRunnerFactory factory : factories) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (OperatorFactory factory : factories) { builder.put(factory.getType(), factory); } this.executorTypes = builder.build(); @@ -205,11 +205,11 @@ private void runWithArchive(Path archivePath, TaskRequest request, Config nextSt protected TaskResult callExecutor(Path archivePath, String type, TaskRequest mergedRequest) { - TaskRunnerFactory factory = executorTypes.get(type); + OperatorFactory factory = executorTypes.get(type); if (factory == null) { throw new ConfigException("Unknown task type: " + type); } - TaskRunner executor = factory.newTaskExecutor(archivePath, mergedRequest); + Operator executor = factory.newTaskExecutor(archivePath, mergedRequest); return executor.run(); } diff --git a/digdag-core/src/main/java/io/digdag/core/agent/RequireTaskRunnerFactory.java b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java similarity index 82% rename from digdag-core/src/main/java/io/digdag/core/agent/RequireTaskRunnerFactory.java rename to digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java index 75d79bee39..ee69e4999d 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/RequireTaskRunnerFactory.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java @@ -14,23 +14,23 @@ import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; import io.digdag.spi.TaskReport; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.TaskExecutionException; import io.digdag.core.agent.RetryControl; import io.digdag.core.session.SessionStateFlags; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigFactory; -public class RequireTaskRunnerFactory - implements TaskRunnerFactory +public class RequireOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(RequireTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(RequireOperatorFactory.class); private final TaskCallbackApi callback; @Inject - public RequireTaskRunnerFactory(TaskCallbackApi callback) + public RequireOperatorFactory(TaskCallbackApi callback) { this.callback = callback; } @@ -41,19 +41,19 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new RequireTaskRunner(callback, request); + return new RequireOperator(callback, request); } - private class RequireTaskRunner - implements TaskRunner + private class RequireOperator + implements Operator { private final TaskCallbackApi callback; private final TaskRequest request; private ConfigFactory cf; - public RequireTaskRunner(TaskCallbackApi callback, TaskRequest request) + public RequireOperator(TaskCallbackApi callback, TaskRequest request) { this.callback = callback; this.request = request; @@ -69,7 +69,7 @@ public TaskResult run() isDone = runTask(); } catch (RuntimeException ex) { - Config error = TaskRunnerManager.makeExceptionError(request.getConfig().getFactory(), ex); + Config error = OperatorManager.makeExceptionError(request.getConfig().getFactory(), ex); boolean doRetry = retry.evaluate(); if (doRetry) { throw new TaskExecutionException(ex, error, diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 16e2a8e004..1476231f23 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -23,7 +23,7 @@ import com.google.inject.Inject; import com.fasterxml.jackson.databind.ObjectMapper; import io.digdag.core.agent.RetryControl; -import io.digdag.core.agent.TaskRunnerManager; +import io.digdag.core.agent.OperatorManager; import io.digdag.core.agent.AgentId; import io.digdag.core.session.*; import io.digdag.spi.TaskRequest; @@ -745,10 +745,10 @@ private void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId params.setAll(attempt.getParams()); collectParams(params, task, attempt); - // create TaskRequest for TaskRunnerManager. - // TaskRunnerManager will ignore localConfig because it reloads config from dagfile_path with using the lates params. + // create TaskRequest for OperatorManager. + // OperatorManager will ignore localConfig because it reloads config from dagfile_path with using the lates params. // TaskRequest.config usually stores params merged with local config. but here passes only params (local config is not merged) - // so that TaskRunnerManager can build it using the reloaded local config. + // so that OperatorManager can build it using the reloaded local config. TaskRequest request = TaskRequest.builder() .siteId(attempt.getSiteId()) .repositoryId(attempt.getSession().getRepositoryId()) @@ -796,7 +796,7 @@ private void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId logger.error("Enqueue error, making this task failed: {}", task, ex); // TODO retry here? return taskFailed(lockedTask, - TaskRunnerManager.makeExceptionError(cf, ex)); + OperatorManager.makeExceptionError(cf, ex)); } }).or(false); } diff --git a/digdag-docs/src/getting_started.rst b/digdag-docs/src/getting_started.rst index 3c15107c08..ff0fc0734c 100644 --- a/digdag-docs/src/getting_started.rst +++ b/digdag-docs/src/getting_started.rst @@ -52,5 +52,5 @@ Next steps * `Scheduling workflow `_ * `Detailed documents about workflow definition `_ -* `More choices of task types `_ +* `More choices of operators `_ diff --git a/digdag-docs/src/index.rst b/digdag-docs/src/index.rst index e76fe3e7bc..1881a2de22 100644 --- a/digdag-docs/src/index.rst +++ b/digdag-docs/src/index.rst @@ -43,7 +43,7 @@ Table of Contents workflow_definition.rst scheduling_workflow.rst running_tasks_on_docker.rst - task_types.rst + operators.rst mastering_sessions.rst reciipes.rst command_reference.rst diff --git a/digdag-docs/src/task_types.rst b/digdag-docs/src/operators.rst similarity index 91% rename from digdag-docs/src/task_types.rst rename to digdag-docs/src/operators.rst index dc4ae1b0aa..47cf6b8e6c 100644 --- a/digdag-docs/src/task_types.rst +++ b/digdag-docs/src/operators.rst @@ -1,4 +1,4 @@ -Task types +Operators ================================== .. contents:: @@ -8,7 +8,7 @@ Task types require>: Runs another workflow ---------------------------------- -**require>:** task runs another workflow. It's skipped if the workflow is already done successfully. +**require>:** operator runs another workflow. It's skipped if the workflow is already done successfully. .. code-block:: yaml @@ -26,7 +26,7 @@ require>: Runs another workflow py>: Python scripts ---------------------------------- -**py>:** task runs a Python script using ``python`` command. +**py>:** operator runs a Python script using ``python`` command. TODO: link to `Python API documents `_ for details including variable mappings to keyword arguments. .. code-block:: yaml @@ -45,7 +45,7 @@ TODO: link to `Python API documents `_ for details including vari rb>: Ruby scripts ---------------------------------- -**rb>:** task runs a Ruby script using ``ruby`` command. +**rb>:** operator runs a Ruby script using ``ruby`` command. TODO: add more description here TODO: link to `Ruby API documents `_ for details including best practices how to configure the workflow using ``export: require:``. @@ -75,7 +75,7 @@ TODO: link to `Ruby API documents `_ for details including best sh>: Shell scripts ---------------------------------- -**sh>:** task runs a shell script. +**sh>:** operator runs a shell script. TODO: add more description here @@ -95,7 +95,7 @@ TODO: add more description here td>: Treasure Data queries ---------------------------------- -**td>:** task runs a Hive or Presto query on Treasure Data. +**td>:** operator runs a Hive or Presto query on Treasure Data. TODO: add more description here @@ -155,7 +155,7 @@ TODO: add more description here td_load>: Treasure Data bulk loading ---------------------------------- -**td_load>:** task loads data from storages, databases, or services. +**td_load>:** operator loads data from storages, databases, or services. TODO: add more description here @@ -194,7 +194,7 @@ TODO: add more description here td_ddl>: Treasure Data operations ---------------------------------- -**type: td_ddl** task runs an operational task on Treasure Data. +**type: td_ddl** operator runs an operational task on Treasure Data. TODO: add more description here @@ -239,7 +239,7 @@ TODO: add more description here mail>: Sending email ---------------------------------- -**mail>:** task sends an email. +**mail>:** operator sends an email. To use Gmail SMTP server, you need to do either of: @@ -327,7 +327,7 @@ To use Gmail SMTP server, you need to do either of: embulk>: Embulk data transfer ---------------------------------- -**embulk>:** task runs `Embulk `_ to transfer data across storages including local files. +**embulk>:** operator runs `Embulk `_ to transfer data across storages including local files. .. code-block:: yaml diff --git a/digdag-docs/src/workflow_definition.rst b/digdag-docs/src/workflow_definition.rst index a04214fb45..e2c9e29bdb 100644 --- a/digdag-docs/src/workflow_definition.rst +++ b/digdag-docs/src/workflow_definition.rst @@ -41,10 +41,10 @@ Workflow is defined in a YAML file named "digdag.yml". An example is like this: Key names starting with ``+`` sign is a task. Tasks run from the top to bottom in order. A task can be nested as a child of another task. In above example, ``+step2`` runs after ``+step1`` as a child of ``+main`` task. -task types> +operators> ---------------------------------- -A task with ``type>: command`` parameter executes an action. You can choose various kinds of operators such as running `shell scripts `_, `Python methods `_, `sending email `_, etc. See `Task types `_ page for the list of built-in operators. +A task with ``type>: command`` parameter executes an action. You can choose various kinds of operators such as running `shell scripts `_, `Python methods `_, `sending email `_, etc. See `Operators `_ page for the list of built-in operators. .. note:: diff --git a/digdag-spi/src/main/java/io/digdag/spi/TaskRunner.java b/digdag-spi/src/main/java/io/digdag/spi/Operator.java similarity index 76% rename from digdag-spi/src/main/java/io/digdag/spi/TaskRunner.java rename to digdag-spi/src/main/java/io/digdag/spi/Operator.java index db6cf0df0d..38b55d40e4 100644 --- a/digdag-spi/src/main/java/io/digdag/spi/TaskRunner.java +++ b/digdag-spi/src/main/java/io/digdag/spi/Operator.java @@ -2,7 +2,7 @@ import io.digdag.client.config.Config; -public interface TaskRunner +public interface Operator { TaskResult run(); } diff --git a/digdag-spi/src/main/java/io/digdag/spi/OperatorFactory.java b/digdag-spi/src/main/java/io/digdag/spi/OperatorFactory.java new file mode 100644 index 0000000000..34087bfe5d --- /dev/null +++ b/digdag-spi/src/main/java/io/digdag/spi/OperatorFactory.java @@ -0,0 +1,10 @@ +package io.digdag.spi; + +import java.nio.file.Path; + +public interface OperatorFactory +{ + String getType(); + + Operator newTaskExecutor(Path archivePath, TaskRequest request); +} diff --git a/digdag-spi/src/main/java/io/digdag/spi/TaskRunnerFactory.java b/digdag-spi/src/main/java/io/digdag/spi/TaskRunnerFactory.java deleted file mode 100644 index 884c608cea..0000000000 --- a/digdag-spi/src/main/java/io/digdag/spi/TaskRunnerFactory.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.digdag.spi; - -import java.nio.file.Path; - -public interface TaskRunnerFactory -{ - String getType(); - - TaskRunner newTaskExecutor(Path archivePath, TaskRequest request); -} diff --git a/digdag-standards/src/main/java/io/digdag/standards/StandardsExtension.java b/digdag-standards/src/main/java/io/digdag/standards/StandardsExtension.java index 262b4e91fe..99fd687c92 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/StandardsExtension.java +++ b/digdag-standards/src/main/java/io/digdag/standards/StandardsExtension.java @@ -8,7 +8,7 @@ import com.google.common.collect.ImmutableList; import io.digdag.core.Extension; import io.digdag.spi.SchedulerFactory; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.TaskQueueFactory; import io.digdag.spi.CommandExecutor; import io.digdag.standards.scheduler.CronSchedulerFactory; @@ -16,14 +16,14 @@ import io.digdag.standards.scheduler.DailySchedulerFactory; import io.digdag.standards.scheduler.HourlySchedulerFactory; import io.digdag.standards.scheduler.MinutesIntervalSchedulerFactory; -import io.digdag.standards.task.PyTaskRunnerFactory; -import io.digdag.standards.task.RbTaskRunnerFactory; -import io.digdag.standards.task.ShTaskRunnerFactory; -import io.digdag.standards.task.MailTaskRunnerFactory; -import io.digdag.standards.task.EmbulkTaskRunnerFactory; -import io.digdag.standards.task.td.TdTaskRunnerFactory; -import io.digdag.standards.task.td.TdLoadTaskRunnerFactory; -import io.digdag.standards.task.td.TdDdlTaskRunnerFactory; +import io.digdag.standards.operator.PyOperatorFactory; +import io.digdag.standards.operator.RbOperatorFactory; +import io.digdag.standards.operator.ShOperatorFactory; +import io.digdag.standards.operator.MailOperatorFactory; +import io.digdag.standards.operator.EmbulkOperatorFactory; +import io.digdag.standards.operator.td.TdOperatorFactory; +import io.digdag.standards.operator.td.TdLoadOperatorFactory; +import io.digdag.standards.operator.td.TdDdlOperatorFactory; import io.digdag.standards.command.SimpleCommandExecutor; import io.digdag.standards.command.DockerCommandExecutor; @@ -44,15 +44,15 @@ public void configure(Binder binder) binder.bind(CommandExecutor.class).to(DockerCommandExecutor.class).in(Scopes.SINGLETON); binder.bind(SimpleCommandExecutor.class).in(Scopes.SINGLETON); - Multibinder taskExecutorBinder = Multibinder.newSetBinder(binder, TaskRunnerFactory.class); - taskExecutorBinder.addBinding().to(PyTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(RbTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(ShTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(MailTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(EmbulkTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(TdTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(TdLoadTaskRunnerFactory.class).in(Scopes.SINGLETON); - taskExecutorBinder.addBinding().to(TdDdlTaskRunnerFactory.class).in(Scopes.SINGLETON); + Multibinder taskExecutorBinder = Multibinder.newSetBinder(binder, OperatorFactory.class); + taskExecutorBinder.addBinding().to(PyOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(RbOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(ShOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(MailOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(EmbulkOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(TdOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(TdLoadOperatorFactory.class).in(Scopes.SINGLETON); + taskExecutorBinder.addBinding().to(TdDdlOperatorFactory.class).in(Scopes.SINGLETON); Multibinder schedulerBinder = Multibinder.newSetBinder(binder, SchedulerFactory.class); schedulerBinder.addBinding().to(CronSchedulerFactory.class).in(Scopes.SINGLETON); diff --git a/digdag-standards/src/main/java/io/digdag/standards/command/DockerCommandExecutor.java b/digdag-standards/src/main/java/io/digdag/standards/command/DockerCommandExecutor.java index 69fa349d6d..ba5d3db70a 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/command/DockerCommandExecutor.java +++ b/digdag-standards/src/main/java/io/digdag/standards/command/DockerCommandExecutor.java @@ -24,7 +24,6 @@ import io.digdag.spi.CommandExecutor; import io.digdag.spi.TaskRequest; import io.digdag.client.config.Config; -import io.digdag.standards.task.ArchiveFiles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.Locale.ENGLISH; diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/ArchiveFiles.java b/digdag-standards/src/main/java/io/digdag/standards/operator/ArchiveFiles.java similarity index 98% rename from digdag-standards/src/main/java/io/digdag/standards/task/ArchiveFiles.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/ArchiveFiles.java index 240bd9d6bf..20cb1cb895 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/ArchiveFiles.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/ArchiveFiles.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.util.ArrayList; diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/BaseTaskRunner.java b/digdag-standards/src/main/java/io/digdag/standards/operator/BaseOperator.java similarity index 81% rename from digdag-standards/src/main/java/io/digdag/standards/task/BaseTaskRunner.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/BaseOperator.java index 94496d3ac4..859991eb5e 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/BaseTaskRunner.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/BaseOperator.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.util.ArrayList; @@ -6,13 +6,13 @@ import com.google.common.collect.*; import com.google.common.base.*; import io.digdag.core.agent.RetryControl; -import io.digdag.core.agent.TaskRunnerManager; +import io.digdag.core.agent.OperatorManager; import io.digdag.spi.*; import io.digdag.client.config.Config; -import io.digdag.spi.TaskRunner; +import io.digdag.spi.Operator; -public abstract class BaseTaskRunner - implements TaskRunner +public abstract class BaseOperator + implements Operator { protected final Path archivePath; protected final ArchiveFiles archive; @@ -21,7 +21,7 @@ public abstract class BaseTaskRunner protected final List inputs; protected final List outputs; - public BaseTaskRunner(Path archivePath, TaskRequest request) + public BaseOperator(Path archivePath, TaskRequest request) { this.archivePath = archivePath; this.archive = new ArchiveFiles(archivePath); @@ -53,7 +53,7 @@ public TaskResult run() } } catch (RuntimeException ex) { - Config error = TaskRunnerManager.makeExceptionError(request.getConfig().getFactory(), ex); + Config error = OperatorManager.makeExceptionError(request.getConfig().getFactory(), ex); boolean doRetry = retry.evaluate(); if (doRetry) { throw new TaskExecutionException(ex, error, diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/EmbulkTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java similarity index 86% rename from digdag-standards/src/main/java/io/digdag/standards/task/EmbulkTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java index 0c03d2b723..740970fed8 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/EmbulkTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/EmbulkOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.io.InputStream; @@ -21,17 +21,17 @@ import io.digdag.spi.TemplateException; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.digdag.client.config.Config; import static java.nio.charset.StandardCharsets.UTF_8; -public class EmbulkTaskRunnerFactory - implements TaskRunnerFactory +public class EmbulkOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(EmbulkTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(EmbulkOperatorFactory.class); private final CommandExecutor exec; private final TemplateEngine templateEngine; @@ -39,7 +39,7 @@ public class EmbulkTaskRunnerFactory private final YAMLFactory yaml; @Inject - public EmbulkTaskRunnerFactory(CommandExecutor exec, TemplateEngine templateEngine, ObjectMapper mapper) + public EmbulkOperatorFactory(CommandExecutor exec, TemplateEngine templateEngine, ObjectMapper mapper) { this.exec = exec; this.templateEngine = templateEngine; @@ -54,15 +54,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new EmbulkTaskRunner(archivePath, request); + return new EmbulkOperator(archivePath, request); } - private class EmbulkTaskRunner - extends BaseTaskRunner + private class EmbulkOperator + extends BaseOperator { - public EmbulkTaskRunner(Path archivePath, TaskRequest request) + public EmbulkOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/MailTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java similarity index 89% rename from digdag-standards/src/main/java/io/digdag/standards/task/MailTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java index d3fd83e2cb..d7291d0e00 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/MailTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.util.Properties; @@ -18,22 +18,22 @@ import io.digdag.spi.CommandExecutor; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigException; -public class MailTaskRunnerFactory - implements TaskRunnerFactory +public class MailOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(MailTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(MailOperatorFactory.class); private final CommandExecutor exec; @Inject - public MailTaskRunnerFactory(CommandExecutor exec) + public MailOperatorFactory(CommandExecutor exec) { this.exec = exec; } @@ -44,15 +44,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new MailTaskRunner(archivePath, request); + return new MailOperator(archivePath, request); } - private class MailTaskRunner - extends BaseTaskRunner + private class MailOperator + extends BaseOperator { - public MailTaskRunner(Path archivePath, TaskRequest request) + public MailOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/PyTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java similarity index 86% rename from digdag-standards/src/main/java/io/digdag/standards/task/PyTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java index 6b5f5bb4b6..f13cccfa6d 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/PyTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/PyOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.util.stream.Collectors; @@ -23,20 +23,20 @@ import io.digdag.spi.CommandExecutor; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import io.digdag.client.config.Config; -public class PyTaskRunnerFactory - implements TaskRunnerFactory +public class PyOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(PyTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(PyOperatorFactory.class); private final String runnerScript; { try (InputStreamReader reader = new InputStreamReader( - PyTaskRunnerFactory.class.getResourceAsStream("/digdag/standards/py/runner.py"), + PyOperatorFactory.class.getResourceAsStream("/digdag/standards/py/runner.py"), StandardCharsets.UTF_8)) { runnerScript = CharStreams.toString(reader); } @@ -49,7 +49,7 @@ public class PyTaskRunnerFactory private final ObjectMapper mapper; @Inject - public PyTaskRunnerFactory(CommandExecutor exec, ObjectMapper mapper) + public PyOperatorFactory(CommandExecutor exec, ObjectMapper mapper) { this.exec = exec; this.mapper = mapper; @@ -61,15 +61,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new PyTaskRunner(archivePath, request); + return new PyOperator(archivePath, request); } - private class PyTaskRunner - extends BaseTaskRunner + private class PyOperator + extends BaseOperator { - public PyTaskRunner(Path archivePath, TaskRequest request) + public PyOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/RbTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java similarity index 87% rename from digdag-standards/src/main/java/io/digdag/standards/task/RbTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java index 953ead08d3..c6a93e80a0 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/RbTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/RbOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.List; import java.util.stream.Collectors; @@ -23,20 +23,20 @@ import io.digdag.spi.CommandExecutor; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import io.digdag.client.config.Config; -public class RbTaskRunnerFactory - implements TaskRunnerFactory +public class RbOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(RbTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(RbOperatorFactory.class); private final String runnerScript; { try (InputStreamReader reader = new InputStreamReader( - RbTaskRunnerFactory.class.getResourceAsStream("/digdag/standards/rb/runner.rb"), + RbOperatorFactory.class.getResourceAsStream("/digdag/standards/rb/runner.rb"), StandardCharsets.UTF_8)) { runnerScript = CharStreams.toString(reader); } @@ -49,7 +49,7 @@ public class RbTaskRunnerFactory private final ObjectMapper mapper; @Inject - public RbTaskRunnerFactory(CommandExecutor exec, ObjectMapper mapper) + public RbOperatorFactory(CommandExecutor exec, ObjectMapper mapper) { this.exec = exec; this.mapper = mapper; @@ -61,15 +61,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new RbTaskRunner(archivePath, request); + return new RbOperator(archivePath, request); } - private class RbTaskRunner - extends BaseTaskRunner + private class RbOperator + extends BaseOperator { - public RbTaskRunner(Path archivePath, TaskRequest request) + public RbOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/ShTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java similarity index 85% rename from digdag-standards/src/main/java/io/digdag/standards/task/ShTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java index e7df3eff9c..c607cf6789 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/ShTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task; +package io.digdag.standards.operator; import java.util.Map; import java.util.regex.Pattern; @@ -15,23 +15,23 @@ import io.digdag.spi.CommandExecutor; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.digdag.client.config.Config; -public class ShTaskRunnerFactory - implements TaskRunnerFactory +public class ShOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(ShTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(ShOperatorFactory.class); private static Pattern VALID_ENV_KEY = Pattern.compile("[a-zA-Z_]+"); private final CommandExecutor exec; @Inject - public ShTaskRunnerFactory(CommandExecutor exec) + public ShOperatorFactory(CommandExecutor exec) { this.exec = exec; } @@ -42,15 +42,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new ShTaskRunner(archivePath, request); + return new ShOperator(archivePath, request); } - private class ShTaskRunner - extends BaseTaskRunner + private class ShOperator + extends BaseOperator { - public ShTaskRunner(Path archivePath, TaskRequest request) + public ShOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/td/TDOperation.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperation.java similarity index 95% rename from digdag-standards/src/main/java/io/digdag/standards/task/td/TDOperation.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperation.java index b860f5721a..4f98091daa 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/td/TDOperation.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperation.java @@ -1,8 +1,8 @@ -package io.digdag.standards.task.td; +package io.digdag.standards.operator.td; import java.io.Closeable; import io.digdag.client.config.Config; -import io.digdag.standards.task.BaseTaskRunner; +import io.digdag.standards.operator.BaseOperator; import com.treasuredata.client.TDClient; import com.treasuredata.client.TDClientException; import com.treasuredata.client.TDClientHttpConflictException; diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/td/TDQuery.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDQuery.java similarity index 98% rename from digdag-standards/src/main/java/io/digdag/standards/task/td/TDQuery.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/td/TDQuery.java index bddc6299c7..083a50c3e2 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/td/TDQuery.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDQuery.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task.td; +package io.digdag.standards.operator.td; import java.io.InputStream; import java.io.IOException; diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdDdlTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java similarity index 76% rename from digdag-standards/src/main/java/io/digdag/standards/task/td/TdDdlTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java index ef4abef072..f04bfd049b 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdDdlTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task.td; +package io.digdag.standards.operator.td; import java.util.List; import java.nio.file.Path; @@ -7,9 +7,9 @@ import com.google.common.collect.*; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; -import io.digdag.standards.task.BaseTaskRunner; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; +import io.digdag.standards.operator.BaseOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.digdag.client.config.Config; @@ -18,13 +18,13 @@ import com.treasuredata.client.model.TDJobRequest; import com.treasuredata.client.model.TDJobRequestBuilder; -public class TdDdlTaskRunnerFactory - implements TaskRunnerFactory +public class TdDdlOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(TdDdlTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(TdDdlOperatorFactory.class); @Inject - public TdDdlTaskRunnerFactory() + public TdDdlOperatorFactory() { } public String getType() @@ -33,15 +33,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new TdDdlTaskRunner(archivePath, request); + return new TdDdlOperator(archivePath, request); } - private class TdDdlTaskRunner - extends BaseTaskRunner + private class TdDdlOperator + extends BaseOperator { - public TdDdlTaskRunner(Path archivePath, TaskRequest request) + public TdDdlOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdLoadTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java similarity index 84% rename from digdag-standards/src/main/java/io/digdag/standards/task/td/TdLoadTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java index bf92192872..fddc630f76 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdLoadTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdLoadOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task.td; +package io.digdag.standards.operator.td; import java.nio.file.Path; import java.io.IOException; @@ -11,11 +11,11 @@ import org.slf4j.LoggerFactory; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.TemplateEngine; import io.digdag.spi.TemplateException; -import io.digdag.standards.task.BaseTaskRunner; +import io.digdag.standards.operator.BaseOperator; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigException; import com.treasuredata.client.TDClient; @@ -25,15 +25,15 @@ import org.msgpack.value.Value; import static java.nio.charset.StandardCharsets.UTF_8; -public class TdLoadTaskRunnerFactory - implements TaskRunnerFactory +public class TdLoadOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(TdLoadTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(TdLoadOperatorFactory.class); private final TemplateEngine templateEngine; @Inject - public TdLoadTaskRunnerFactory(TemplateEngine templateEngine) + public TdLoadOperatorFactory(TemplateEngine templateEngine) { this.templateEngine = templateEngine; } @@ -44,15 +44,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new TdLoadTaskRunner(archivePath, request); + return new TdLoadOperator(archivePath, request); } - private class TdLoadTaskRunner - extends BaseTaskRunner + private class TdLoadOperator + extends BaseOperator { - public TdLoadTaskRunner(Path archivePath, TaskRequest request) + public TdLoadOperator(Path archivePath, TaskRequest request) { super(archivePath, request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdTaskRunnerFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java similarity index 91% rename from digdag-standards/src/main/java/io/digdag/standards/task/td/TdTaskRunnerFactory.java rename to digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java index 6ac2cbf0b9..f1e823b3e4 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/task/td/TdTaskRunnerFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java @@ -1,4 +1,4 @@ -package io.digdag.standards.task.td; +package io.digdag.standards.operator.td; import java.util.List; import java.nio.file.Path; @@ -10,11 +10,11 @@ import com.google.common.collect.*; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import io.digdag.spi.TaskRunner; -import io.digdag.spi.TaskRunnerFactory; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorFactory; import io.digdag.spi.TemplateEngine; import io.digdag.spi.TemplateException; -import io.digdag.standards.task.BaseTaskRunner; +import io.digdag.standards.operator.BaseOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.digdag.client.config.Config; @@ -26,15 +26,15 @@ import org.msgpack.value.Value; import static java.nio.charset.StandardCharsets.UTF_8; -public class TdTaskRunnerFactory - implements TaskRunnerFactory +public class TdOperatorFactory + implements OperatorFactory { - private static Logger logger = LoggerFactory.getLogger(TdTaskRunnerFactory.class); + private static Logger logger = LoggerFactory.getLogger(TdOperatorFactory.class); private final TemplateEngine templateEngine; @Inject - public TdTaskRunnerFactory(TemplateEngine templateEngine) + public TdOperatorFactory(TemplateEngine templateEngine) { this.templateEngine = templateEngine; } @@ -45,15 +45,15 @@ public String getType() } @Override - public TaskRunner newTaskExecutor(Path archivePath, TaskRequest request) + public Operator newTaskExecutor(Path archivePath, TaskRequest request) { - return new TdTaskRunner(archivePath, request); + return new TdOperator(archivePath, request); } - private class TdTaskRunner - extends BaseTaskRunner + private class TdOperator + extends BaseOperator { - public TdTaskRunner(Path archivePath, TaskRequest request) + public TdOperator(Path archivePath, TaskRequest request) { super(archivePath, request); }