Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample - Custom Search Attributes #3

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions samples/custom-search-attributes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Custom Search Attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta comment, this course seems to use the deprecated old search attribute API, we now recommend users use the typed search attribute API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what other language courses did, but I think it is worth a broader discussion on what API we should be teaching.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this. Ready for review again


This example shows how custom Search Attributes can be used in your Workflow. This sample uses the Pizza Workflow, and creates a custom Search Attribute called `isOrderFailed` which is a boolean. The user can then use this Search Attribute to query for Workflows where the pizza order has failed.
MasonEgger marked this conversation as resolved.
Show resolved Hide resolved

## Part A: Create a Custom Search Attribute

1. First, you will create your custom Search Attribute, `isOrderFailed`, a boolean type. You can do this in one of your terminals with the following command: `temporal operator search-attribute create --namespace default --name isOrderFailed --type bool`.
2. Make sure you can see all the Search Attributes you have with the command: `temporal operator search-attribute list`. You should now see `isOrderFailed` in this list. It may take up to ten seconds before the attribute appears.

## Part B: Setting a Custom Search Attribute Value While Starting a Workflow

This is not a necessary step. In the `Starter.java` file, you can set Custom Search Attribute by adding them to the options when starting a Workflow execution using [`WorkflowOptions.newBuilder().setTypedSearchAttributes()`](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/client/WorkflowOptions.Builder.html#setTypedSearchAttributes(io.temporal.common.SearchAttributes)).

Keep in mind that setting attributes is optional in some attributes' case, and this is more for setting Search Attributes that are known at the start of the Workflow or may not change through the Workflow logic.

## Part C: Upserting Attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to show removing a search attribute?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this sample we won't, but I'll include it in the course content.


Within the Pizza Workflow code, we will now dynamically update Search Attributes using [`upsertTypedSearchAttributes`](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/Workflow.html#upsertTypedSearchAttributes(io.temporal.common.SearchAttributeUpdate...)).

1. In `PizzaWorkflowImpl.java`, locate `Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(true));` which is used to indicate that the order has not failed. It is marked not failed, because it is in the part of the logic when the Workflow has received the Signal that the order has been fulfilled.

2. In `PizzaWorkflowImpl.java`, locate `Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(false));` which is used to indicate that the order has failed. It is marked failed in the part of the Workflow code when the Workflow has received the Signal that the order has not been fulfilled successfully.

## Part D: Running Your Workflow

1. In one terminal, run `mvn exec:java -Dexec.mainClass='customsearchattributes.PizzaWorker` to start the Worker.
1. In another terminal and run `mvn exec:java -Dexec.mainClass='customsearchattributes.Starter'` to initiate execution of the Workflow. Now, try your query to retrieve the results. If you don't immediately see the correct result, wait a few seconds
and try again. Recall that it is possible to send a query before the updated Search Attribute has been recorded.

## Part E: Querying Workflows by Search Attributes
MasonEgger marked this conversation as resolved.
Show resolved Hide resolved

Once you have Workflows tagged with Custom Search Attributes, you can query them based on these attributes. For example, using the CLI:

```shell
temporal workflow list -q 'isOrderFailed=false'
```

This lists all the Workflows that fulfill this query.

### This is the end of the sample.
115 changes: 115 additions & 0 deletions samples/custom-search-attributes/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.temporal.learn</groupId>
<artifactId>custom-search-attributes-sample</artifactId>
<version>1.0-SNAPSHOT</version>

<name>custom search attributes (sample)</name>
<url>https://learn.temporal.io</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.20.1</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-testing</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.1.1</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package customsearchattributes;

public class Constants {

public static final String TASK_QUEUE_NAME = "pizza-tasks";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package customsearchattributes;

import io.temporal.activity.ActivityInterface;
import customsearchattributes.model.Distance;
import customsearchattributes.exceptions.InvalidChargeAmountException;
import customsearchattributes.model.Address;
import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Bill;

@ActivityInterface
public interface PizzaActivities {

Distance getDistance(Address address);

OrderConfirmation sendBill(Bill bill) throws InvalidChargeAmountException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package customsearchattributes;

import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Address;
import customsearchattributes.model.Distance;
import customsearchattributes.model.Bill;

import customsearchattributes.exceptions.InvalidChargeAmountException;

import java.time.Instant;

import io.temporal.activity.Activity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PizzaActivitiesImpl implements PizzaActivities {

private static final Logger logger = LoggerFactory.getLogger(PizzaActivitiesImpl.class);

@Override
public Distance getDistance(Address address) {

logger.info("getDistance invoked; determining distance to customer address");

// this is a simulation, which calculates a fake (but consistent)
// distance for a customer address based on its length. The value
// will therefore be different when called with different addresses,
// but will be the same across all invocations with the same address.

int kilometers = address.getLine1().length() + address.getLine2().length() - 10;
if (kilometers < 1) {
kilometers = 5;
}

Distance distance = new Distance(kilometers);

logger.info("getDistance complete: {}", distance.getKilometers());
return distance;
}

@Override
public OrderConfirmation sendBill(Bill bill) {
int amount = bill.getAmount();

logger.info("sendBill invoked: customer: {} amount: {}", bill.getCustomerID(), amount);

int chargeAmount = amount;

// This month's special offer: Get $5 off all orders over $30
if (amount > 3000) {
logger.info("Applying discount");

chargeAmount -= 500; // reduce amount charged by 500 cents
}

// reject invalid amounts before calling the payment processor
if (chargeAmount < 0) {
logger.error("invalid charge amount: {%d} (must be above zero)", chargeAmount);
String errorMessage = "invalid charge amount: " + chargeAmount;
throw Activity.wrap(new InvalidChargeAmountException(errorMessage));
}

// pretend we called a payment processing service here
OrderConfirmation confirmation = new OrderConfirmation(bill.getOrderNumber(), "SUCCESS",
"P24601", Instant.now().getEpochSecond(), chargeAmount);

logger.debug("Sendbill complete: Confirmation Number: {}",
confirmation.getConfirmationNumber());

return confirmation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package customsearchattributes;

import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;

public class PizzaWorker {
public static void main(String[] args) {

WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);

Worker worker = factory.newWorker(Constants.TASK_QUEUE_NAME);

worker.registerWorkflowImplementationTypes(PizzaWorkflowImpl.class);

worker.registerActivitiesImplementations(new PizzaActivitiesImpl());

factory.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package customsearchattributes;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import customsearchattributes.model.PizzaOrder;
import customsearchattributes.model.OrderConfirmation;

@WorkflowInterface
public interface PizzaWorkflow {

@WorkflowMethod
OrderConfirmation orderPizza(PizzaOrder order);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package customsearchattributes;

import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import io.temporal.failure.ApplicationFailure;
import io.temporal.common.SearchAttributeKey;

import customsearchattributes.model.Address;
import customsearchattributes.model.Bill;
import customsearchattributes.model.Customer;
import customsearchattributes.model.Distance;
import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Pizza;
import customsearchattributes.model.PizzaOrder;
import customsearchattributes.exceptions.InvalidChargeAmountException;
import customsearchattributes.exceptions.OutOfServiceAreaException;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;

public class PizzaWorkflowImpl implements PizzaWorkflow {

public static final Logger logger = Workflow.getLogger(PizzaWorkflowImpl.class);

ActivityOptions options =
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build();

private final PizzaActivities activities =
Workflow.newActivityStub(PizzaActivities.class, options);

@Override
public OrderConfirmation orderPizza(PizzaOrder order) {

String orderNumber = order.getOrderNumber();
Customer customer = order.getCustomer();
List<Pizza> items = order.getItems();
boolean isDelivery = order.isDelivery();
Address address = order.getAddress();

final SearchAttributeKey<Boolean> IS_ORDER_FAILED = SearchAttributeKey.forBoolean("isOrderFailed");
MasonEgger marked this conversation as resolved.
Show resolved Hide resolved

logger.info("orderPizza Workflow Invoked");

int totalPrice = 0;
for (Pizza pizza : items) {
totalPrice += pizza.getPrice();
}

Distance distance;
try {
distance = activities.getDistance(address);
} catch (NullPointerException e) {
logger.error("Unable to get distance");

Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(true));

throw new NullPointerException("Unable to get distance");
}

if (isDelivery && (distance.getKilometers() > 25)) {
logger.error("Customer lives outside the service area");
Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(true));
throw ApplicationFailure.newFailure("Customer lives outside the service area",
OutOfServiceAreaException.class.getName());
}

logger.info("distance is {}", distance.getKilometers());

// Use a short Timer duration here to simulate the passage of time
// while avoiding delaying the exercise.
Workflow.sleep(Duration.ofSeconds(3));

Bill bill = new Bill(customer.getCustomerID(), orderNumber, "Pizza", totalPrice);

OrderConfirmation confirmation;
try {
confirmation = activities.sendBill(bill);
} catch (InvalidChargeAmountException e) {
logger.error("Unable to bill customer");
Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(true));
throw Workflow.wrap(new InvalidChargeAmountException("Unable to bill customer"));
}

Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(false));
return confirmation;
}
}
Loading