Skip to content

Commit

Permalink
Merge pull request #5 from temporalio/ex4
Browse files Browse the repository at this point in the history
Exercise 4 - Async Activity Completion
  • Loading branch information
MasonEgger authored Apr 29, 2024
2 parents ce027ba + 9e11879 commit 1535e81
Show file tree
Hide file tree
Showing 37 changed files with 1,749 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .bash_aliases
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ alias ex3s="cd ${GITPOD_REPO_ROOT}/exercises/async-activity-completion/solution"
alias ex3w="mvn exec:java -Dexec.mainClass='asyncactivitycompletion.AsyncActivityCompletionWorker'"
alias ex3st="mvn exec:java -Dexec.mainClass='asyncactivitycompletion.Starter'"
alias ex3sg="mvn exec:java -Dexec.mainClass='asyncactivitycompletion.SignalClient'"
ex3c() {
mvn exec:java -Dexec.mainClass="asyncactivitycompletion.CompletionClient" -Dexec.args="${1}"
}
echo "Your workspace is located at: ${GITPOD_REPO_ROOT}"
echo "Type the command workspace to return to the workspace directory at any time."
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Compiled class file
*.class
**target*

# Log file
*.log
Expand Down
90 changes: 90 additions & 0 deletions exercises/async-activity-completion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Exercise 4: Asynchronous Activity Completion

During this exercise, you will:

- Retrieve a task token from your Activity execution
- Set the `doNotCompleteOnReturn()` context to indicate that the Activity is waiting for an external completion.
- Use another Temporal Client to communicate the result of the asynchronous Activity back to the Workflow

Make your changes to the code in the `practice` subdirectory (look for `TODO` comments that will guide you to where you should make changes to the code). If you need a hint or want to verify your changes, look at the complete version in the `solution` subdirectory.

### GitPod Environment Shortcuts

If you are executing the exercises in the provided GitPod environment, you
can take advantage of certain aliases to aid in navigation and execution of
the code.

| Command | Action |
| :---------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `ex4` | Change to Exercise 3 Practice Directory |
| `ex4s` | Change to Exercise 3 Solution Directory |
| `ex4w` | Execute the Exercise 3 Worker. Must be within the appropriate directory for this to succeed. (either `practice` or `solution`) |
| `ex4st` | Execute the Exercise 3 Starter. Must be within the appropriate directory for this to succeed. (either `practice` or `solution`) |
| `ex4c TASK_TOKEN TRANSLATION` | Complete the Exercise 3 Activity, passing in the Task Token and verified translation. Must be within the appropriate directory for this to succeed. (either `practice` or `solution`) |

## Part A: Retrieving the Task Token

1. Open the `TranslationActivitiesActivitiesImpl.java` file in the `src/main/java/asyncactivitycompletion` subdirectory.
1. In the `translateTerm()` method, add the line `ActivityExecutionContext context = Activity.getExecutionContext();` to get the current Execution Context of the Activity.
1. Add a call to `getTaskToken()` from the `context` object above and store it in a `byte []` named `taskToken`
1. Uncomment the line below to convert the `taskToken` byte array to Base64.
1. Log the Task Token at `info` level using the `logger` object for later use. You will need to convert this to a new String.
1. Save the file.

## Part B: Set Your Activity to `doNotCompleteOnReturn()`

1. Continue editing the same Activity definition in the `TranslationActivitiesImpl.java` file.
1. Add a call to the `doNotCompleteOnReturn();` method at the end of the `translateTerm()` method using the `context` object from Part A. This notifies Temporal that the Activity should not be completed on return and will be completed asynchronously.
1. Save the file.
1. Open the `TranslationWorkflowImpl.java` file in the `src/main/java/asyncactivitycompletion` subdirectory.
1. Observe that the Workflow's `StartToCloseTimeout` has been lengthened to `300` seconds for this exercise. Activities can still time out if they are running in the background.
1. If you don't do this and your Activity retries due to a timeout, the Task Token will be reset.
1. Close this file without making any changes.

**Note:** In practice, it is recommended to use Heartbeats for longer running
Activities. While this exercise doesn't include them, it is a good idea to
include them in Activities that will complete Asynchronously.

## Part C: Configure a Client to send CompleteActivity

1. Open the `VerifyAndCompleteTranslation.java` file in the `src/main/java/asyncactivitycompletion` subdirectory.
1. The first thing you'll need to do is add some way of supplying the `taskToken` and translated text specific to the Activity you are trying to complete at runtime. In a production system, you might store and retrieve the token from a database, but for now, you can configure this Client to accept it as a command line argument. Both the `taskToken` and `translation` can be found in the logs of the Worker.
1. Read in the token from the command line `args[0]` and decode the base 64, storing it in a `byte[]`. Hint, invert the call in `TranslationActivitiesImpl.java`
1. Read in the Translation of the phrase that was outputted in the Worker logs as `args[1]`
1. Add a call to the `complete();` method using the `activityCompletionClient`. This call should provide the task token and result of the Activity. This notifies Temporal that the Activity should not be completed on return and will be completed asynchronously.
1. The result has already been instantiated into a `TranslationActivityOutput` object for you.
1. Save the file.

## Part D: Running the Workflow and Completing it Asynchronously

At this point, you can run your Workflow. As with the Signal Exercise, the Workflow will not return on its own -- in this case, because your Activity is set to complete asynchronously, and will wait to receive `complete()`.

1. In one terminal, navigate to the `practice` subdirectory
1. If you're in the GitPod environment you can instead run `ex4`
1. Compile the code using `mvn clean compile`
1. Run the worker using `mvn exec:java -Dexec.mainClas='asyncactivitycompletion.TranslationWorker'`.
1. If you're in the GitPod environment you can instead run `ex4w`
1. In another terminal, navigate to the `practice` subdirectory
1. If you're in the GitPod environment you can instead run `ex4`
1. Invoke the Workflow using `mvn exec:java -Dexec.mainClass='asyncactivitycompletion.Starter' -Dexec.args="Mason de"`
If you're in the GitPod environment you can instead run `ex4st Mason de`, replacing the name with yours
1. Navigate back to the Worker terminal. Your work will produce some logging, eventually including your `taskToken`:

```
10:28:40.579 INFO - sayHelloGoodbye Workflow Invoked with input name: Mason language code: de
10:28:40.614 INFO - translateTerm Activity received input: asyncactivitycompletion.model.TranslationActivityInput@394250e6
10:28:40.614 INFO - TASK TOKEN: CiQ1NzVkNTNlYi1lM2UyLTRmNmEtODFjMy04ZmY0NmJiYjJjOWYSFHRyYW5zbGF0aW9uLXdvcmtmbG93GiQ0OWQ5NjgyOC1iYmJkLTQ5MjMtOTE4Mi00MWY2YmFlNjI4YzEgBSgBMiRlNGJmZmJhMC1jNGJhLTM1MDgtYThkYS01MjgwYjNjMzVkZmJCDVRyYW5zbGF0ZVRlcm1KCAgBEJuAQBgB
10:28:40.614 INFO - [ACTIVITY INVOKED] translateTerm invoked with input term: hello language code: de
10:28:40.642 INFO - Translation Service returned: Hallo
```

1. You can now use this token to send a `complete()` call from another client. In another terminal, navigate to the `practice` subdirectory
1. If you're in the GitPod environment you can instead run `ex4`
1. Run the command `mvn exec:java -Dexec.mainClass='asyncactivitycompletion.VerifyAndCompleteTranslatin' -Dexec.args="TASK_TOKEN TRANSLATION"` with your Task Token replacing `TASK_TOKEN` with your Task Token to complete the Activity and replacing `TRANSLATION` with the results from the translation service
1. If you're in the GitPod environment you can instead run `ex4c TASK_TOKEN TRANSLATION` replacing `TASK_TOKEN` with your Task Token to complete the Activity and replacing `TRANSLATION` with the results from the translation service
1. This will cause your Activity to return and your Workflow to successfully complete. The terminal running your Worker process should now show
```
12:07:43.689 INFO - Workflow Completed
```

### This is the end of the exercise.
109 changes: 109 additions & 0 deletions exercises/async-activity-completion/practice/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.temporal.learn</groupId>
<artifactId>async-activity-completion</artifactId>
<version>1.0.0-SNAPSHOT</version>

<name>async activity completion</name>
<url>https://learn.temporal.io/</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.20.1</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-testing</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package asyncactivitycompletion;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import asyncactivitycompletion.model.TranslationWorkflowInput;
import asyncactivitycompletion.model.TranslationWorkflowOutput;

public class Starter {
public static void main(String[] args) throws Exception {

WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();

WorkflowClient client = WorkflowClient.newInstance(service);

WorkflowOptions options = WorkflowOptions.newBuilder()
.setWorkflowId("translation-workflow")
.setTaskQueue("translation-tasks")
.build();

TranslationWorkflow workflow = client.newWorkflowStub(TranslationWorkflow.class, options);

String name = args[0];
String languageCode = args[1];

TranslationWorkflowInput input = new TranslationWorkflowInput(name, languageCode);

TranslationWorkflowOutput greeting = workflow.sayHelloGoodbye(input);

System.out.printf("Workflow result: %s\n", greeting);
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package asyncactivitycompletion;

import io.temporal.activity.ActivityInterface;
import asyncactivitycompletion.model.TranslationActivityInput;
import asyncactivitycompletion.model.TranslationActivityOutput;

@ActivityInterface
public interface TranslationActivities {

TranslationActivityOutput translateTerm(TranslationActivityInput input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package asyncactivitycompletion;

import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Base64;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import io.temporal.activity.Activity;
import io.temporal.failure.ApplicationFailure;
import java.net.HttpURLConnection;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionClient;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Base64;
import asyncactivitycompletion.model.TranslationActivityInput;
import asyncactivitycompletion.model.TranslationActivityOutput;

public class TranslationActivitiesImpl implements TranslationActivities {

private static final Logger logger = LoggerFactory.getLogger(TranslationActivitiesImpl.class);

@Override
public TranslationActivityOutput translateTerm(TranslationActivityInput input) {

logger.info("translateTerm Activity received input: {}", input);

// TODO PART A: Add the call to `getExecutionContext()`

// TODO: PART A: Get the task token using the context object defined in the previous step.
// The taskToken should be a byte[]

// TODO: PART A: Uncomment me
//String encoded = new String(Base64.getEncoder().encode(taskToken));

// TODO: PART A: Log the encoded task token

String term = input.getTerm();
String lang = input.getLanguageCode();

logger.info("[ACTIVITY INVOKED] translateTerm invoked with input term: {} language code: {}",
term, lang);

// construct the URL, with supplied input parameters, for accessing the
// microservice
URL url = null;
try {
String baseUrl = "http://localhost:9999/translate?term=%s&lang=%s";
url = URI.create(
String.format(baseUrl,
URLEncoder.encode(term, "UTF-8"),
URLEncoder.encode(lang, "UTF-8")))
.toURL();
} catch (IOException e) {
logger.error(e.getMessage());
throw Activity.wrap(e);
}

TranslationActivityOutput result = new TranslationActivityOutput();

try {
// Open a connection to the URL
HttpURLConnection connection = (HttpURLConnection) url.openConnection();

// Set the HTTP request method (GET, POST, etc.)
connection.setRequestMethod("GET");

// Get the response code
int responseCode = connection.getResponseCode();

if (responseCode == HttpURLConnection.HTTP_OK) {
// If the response code is 200 (HTTP OK), the request was successful
BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getInputStream()));
String line;
StringBuilder response = new StringBuilder();

while ((line = reader.readLine()) != null) {
response.append(line);
}

reader.close();

connection.disconnect();
result.setTranslation(response.toString());

} else {
// If the response code is not 200, there was an error
BufferedReader errorReader = new BufferedReader(
new InputStreamReader(connection.getErrorStream()));
String line;
StringBuilder errorResponse = new StringBuilder();

while ((line = errorReader.readLine()) != null) {
errorResponse.append(line);
}

errorReader.close();

connection.disconnect();
// Print the error response
throw ApplicationFailure.newFailure(errorResponse.toString(), IOException.class.getName());
}

} catch (IOException e) {
logger.error(e.getMessage());
throw Activity.wrap(e);
}

logger.info("Translation Service returned: {}", result.getTranslation());

// TODO: PART B: Use the `context` object from above to call `doNotCompleteOnReturn()`;
// This notifies Temporal that the Activity should not be completed on return and will be completed asynchronously.


// Since we have set doNotCompleteOnReturn(), the return value is ignored.
return new TranslationActivityOutput("this will be ignored");
}

}
Loading

0 comments on commit 1535e81

Please sign in to comment.