Skip to content

Commit

Permalink
Merge pull request #3 from temporalio/sample-custom-search-attribute
Browse files Browse the repository at this point in the history
Sample - Custom Search Attributes
  • Loading branch information
MasonEgger authored May 3, 2024
2 parents 64c122a + 48d6d0d commit 4e406d7
Show file tree
Hide file tree
Showing 19 changed files with 863 additions and 0 deletions.
39 changes: 39 additions & 0 deletions samples/custom-search-attributes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Custom Search Attributes

This example shows how custom Search Attributes can be used in your Workflow. This sample uses the Pizza Workflow, and creates a custom Search Attribute called `isOrderFailed` which is a boolean. The user can then use this Search Attribute to query for Workflows where the pizza order has failed.

## Part A: Create a Custom Search Attribute

1. First, you will create your custom Search Attribute, `isOrderFailed`, a boolean type. You can do this in one of your terminals with the following command: `temporal operator search-attribute create --namespace default --name isOrderFailed --type bool`.
2. Make sure you can see all the Search Attributes you have with the command: `temporal operator search-attribute list`. You should now see `isOrderFailed` in this list. It may take up to ten seconds before the attribute appears.

## Part B: Setting a Custom Search Attribute Value While Starting a Workflow

This is not a necessary step. In the `Starter.java` file, you can set Custom Search Attribute by adding them to the options when starting a Workflow execution using [`WorkflowOptions.newBuilder().setTypedSearchAttributes()`](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/client/WorkflowOptions.Builder.html#setTypedSearchAttributes(io.temporal.common.SearchAttributes)).

Keep in mind that setting attributes is optional in some attributes' case, and this is more for setting Search Attributes that are known at the start of the Workflow or may not change through the Workflow logic.

## Part C: Upserting Attributes

Within the Pizza Workflow code, we will now dynamically update Search Attributes using [`upsertTypedSearchAttributes`](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/Workflow.html#upsertTypedSearchAttributes(io.temporal.common.SearchAttributeUpdate...)).

1. In `PizzaWorkflowImpl.java`, locate `Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(true));` which is used to indicate that the order has not failed. It is marked not failed, because it is in the part of the logic when the Workflow has received the Signal that the order has been fulfilled.

2. In `PizzaWorkflowImpl.java`, locate `Workflow.upsertTypedSearchAttributes(IS_ORDER_FAILED.valueSet(false));` which is used to indicate that the order has failed. It is marked failed in the part of the Workflow code when the Workflow has received the Signal that the order has not been fulfilled successfully.

## Part D: Running Your Workflow

1. In one terminal, run `mvn exec:java -Dexec.mainClass='customsearchattributes.PizzaWorker` to start the Worker.
2. In another terminal and run `mvn exec:java -Dexec.mainClass='customsearchattributes.Starter'` to initiate execution of the Workflow. Now, try your query to retrieve the results. Recall that visibility is eventually consistent, so not all updates are going to be immediately reflected. If you don't see your update, wait a bit an try again.

## Part E: Querying Workflows by Search Attributes

Once you have Workflows tagged with Custom Search Attributes, you can query them based on these attributes. For example, using the CLI:

```shell
temporal workflow list -q 'isOrderFailed=false'
```

This lists all the Workflows that fulfill this query.

### This is the end of the sample.
115 changes: 115 additions & 0 deletions samples/custom-search-attributes/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.temporal.learn</groupId>
<artifactId>custom-search-attributes-sample</artifactId>
<version>1.0-SNAPSHOT</version>

<name>custom search attributes (sample)</name>
<url>https://learn.temporal.io</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.20.1</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>

<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-testing</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.1.1</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package customsearchattributes;

import io.temporal.common.SearchAttributeKey;

public class Constants {

public static final String TASK_QUEUE_NAME = "pizza-tasks";
public static final SearchAttributeKey<Boolean> IS_ORDER_FAILED = SearchAttributeKey.forBoolean("isOrderFailed");

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package customsearchattributes;

import io.temporal.activity.ActivityInterface;
import customsearchattributes.model.Distance;
import customsearchattributes.exceptions.InvalidChargeAmountException;
import customsearchattributes.model.Address;
import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Bill;

@ActivityInterface
public interface PizzaActivities {

Distance getDistance(Address address);

OrderConfirmation sendBill(Bill bill) throws InvalidChargeAmountException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package customsearchattributes;

import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Address;
import customsearchattributes.model.Distance;
import customsearchattributes.model.Bill;

import customsearchattributes.exceptions.InvalidChargeAmountException;

import java.time.Instant;

import io.temporal.activity.Activity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PizzaActivitiesImpl implements PizzaActivities {

private static final Logger logger = LoggerFactory.getLogger(PizzaActivitiesImpl.class);

@Override
public Distance getDistance(Address address) {

logger.info("getDistance invoked; determining distance to customer address");

// this is a simulation, which calculates a fake (but consistent)
// distance for a customer address based on its length. The value
// will therefore be different when called with different addresses,
// but will be the same across all invocations with the same address.

int kilometers = address.getLine1().length() + address.getLine2().length() - 10;
if (kilometers < 1) {
kilometers = 5;
}

Distance distance = new Distance(kilometers);

logger.info("getDistance complete: {}", distance.getKilometers());
return distance;
}

@Override
public OrderConfirmation sendBill(Bill bill) {
int amount = bill.getAmount();

logger.info("sendBill invoked: customer: {} amount: {}", bill.getCustomerID(), amount);

int chargeAmount = amount;

// This month's special offer: Get $5 off all orders over $30
if (amount > 3000) {
logger.info("Applying discount");

chargeAmount -= 500; // reduce amount charged by 500 cents
}

// reject invalid amounts before calling the payment processor
if (chargeAmount < 0) {
logger.error("invalid charge amount: {%d} (must be above zero)", chargeAmount);
String errorMessage = "invalid charge amount: " + chargeAmount;
throw Activity.wrap(new InvalidChargeAmountException(errorMessage));
}

// pretend we called a payment processing service here
OrderConfirmation confirmation = new OrderConfirmation(bill.getOrderNumber(), "SUCCESS",
"P24601", Instant.now().getEpochSecond(), chargeAmount);

logger.debug("Sendbill complete: Confirmation Number: {}",
confirmation.getConfirmationNumber());

return confirmation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package customsearchattributes;

import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;

public class PizzaWorker {
public static void main(String[] args) {

WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);

Worker worker = factory.newWorker(Constants.TASK_QUEUE_NAME);

worker.registerWorkflowImplementationTypes(PizzaWorkflowImpl.class);

worker.registerActivitiesImplementations(new PizzaActivitiesImpl());

factory.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package customsearchattributes;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import customsearchattributes.model.PizzaOrder;
import customsearchattributes.model.OrderConfirmation;

@WorkflowInterface
public interface PizzaWorkflow {

@WorkflowMethod
OrderConfirmation orderPizza(PizzaOrder order);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package customsearchattributes;

import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import io.temporal.failure.ApplicationFailure;
import io.temporal.common.SearchAttributeKey;

import customsearchattributes.model.Address;
import customsearchattributes.model.Bill;
import customsearchattributes.model.Customer;
import customsearchattributes.model.Distance;
import customsearchattributes.model.OrderConfirmation;
import customsearchattributes.model.Pizza;
import customsearchattributes.model.PizzaOrder;
import customsearchattributes.exceptions.InvalidChargeAmountException;
import customsearchattributes.exceptions.OutOfServiceAreaException;

import java.time.Duration;
import java.util.List;

import org.slf4j.Logger;

public class PizzaWorkflowImpl implements PizzaWorkflow {

public static final Logger logger = Workflow.getLogger(PizzaWorkflowImpl.class);

ActivityOptions options =
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build();

private final PizzaActivities activities =
Workflow.newActivityStub(PizzaActivities.class, options);

@Override
public OrderConfirmation orderPizza(PizzaOrder order) {

String orderNumber = order.getOrderNumber();
Customer customer = order.getCustomer();
List<Pizza> items = order.getItems();
boolean isDelivery = order.isDelivery();
Address address = order.getAddress();



logger.info("orderPizza Workflow Invoked");

int totalPrice = 0;
for (Pizza pizza : items) {
totalPrice += pizza.getPrice();
}

Distance distance;
try {
distance = activities.getDistance(address);
} catch (NullPointerException e) {
logger.error("Unable to get distance");

Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(true));

throw new NullPointerException("Unable to get distance");
}

if (isDelivery && (distance.getKilometers() > 25)) {
logger.error("Customer lives outside the service area");
Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(true));
throw ApplicationFailure.newFailure("Customer lives outside the service area",
OutOfServiceAreaException.class.getName());
}

logger.info("distance is {}", distance.getKilometers());

// Use a short Timer duration here to simulate the passage of time
// while avoiding delaying the exercise.
Workflow.sleep(Duration.ofSeconds(3));

Bill bill = new Bill(customer.getCustomerID(), orderNumber, "Pizza", totalPrice);

OrderConfirmation confirmation;
try {
confirmation = activities.sendBill(bill);
} catch (InvalidChargeAmountException e) {
logger.error("Unable to bill customer");
Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(true));
throw Workflow.wrap(new InvalidChargeAmountException("Unable to bill customer"));
}

Workflow.upsertTypedSearchAttributes(Constants.IS_ORDER_FAILED.valueSet(false));
return confirmation;
}
}
Loading

0 comments on commit 4e406d7

Please sign in to comment.