Orchestrate the agents

New to Akka? Start here:

Use the Build your first agent guide to get a simple agentic service running locally and interact with it.

Overview

We have two agents, the ActivityAgent and the WeatherAgent. We could make a request to the WeatherAgent from the endpoint before calling the ActivityAgent but a better approach is to introduce a workflow that orchestrates the calls between the agents.

In this part of the guide you will:

  • Create a workflow that calls the agents

  • Adjust the endpoint to use the workflow

Prerequisites

Orchestrate the agents

Agents make external calls to the AI model and possibly other services, and therefore it is important to have solid error handling and durable execution steps when calling agents. In many cases it is a good recommendation to call agents from a workflow. The workflow will automatically execute the steps in a reliable and durable way. This means that if a call in a step fails, it will be retried until it succeeds or the retry limit of the recovery strategy is reached and separate error handling can be performed. The state machine of the workflow is durable, which means that if the workflow is restarted for some reason it will continue from where it left off, i.e. execute the current non-completed step again.

Our workflow should first retrieve the weather forecast and then find suitable activities. Add a new file AgentTeamWorkflow.java to src/main/java/com/example/application/

AgentTeamWorkflow.java
import akka.Done;
import akka.javasdk.annotations.ComponentId;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.workflow.Workflow;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ComponentId("agent-team")
public class AgentTeamWorkflow extends Workflow<AgentTeamWorkflow.State> {

  private static final Logger logger = LoggerFactory.getLogger(AgentTeamWorkflow.class);

  public record Request(String userId, String message) {}

  public record State(String userId, String userQuery, String finalAnswer) {
    State withAnswer(String a) {
      return new State(userId, userQuery, a);
    }
  }

  private final ComponentClient componentClient;

  public AgentTeamWorkflow(ComponentClient componentClient) {
    this.componentClient = componentClient;
  }

  public Effect<Done> start(Request request) {
    return effects()
      .updateState(new State(request.userId(), request.message(), "")) (1)
      .transitionTo("weather") (2)
      .thenReply(Done.getInstance());
  }

  public Effect<String> getAnswer() {
    if (currentState() == null || currentState().finalAnswer.isEmpty()) {
      return effects()
        .error(
          "Workflow '" + commandContext().workflowId() + "' not started, or not completed"
        );
    } else {
      return effects().reply(currentState().finalAnswer);
    }
  }

  @Override
  public WorkflowDef<State> definition() {
    return workflow()
      .addStep(askWeather())
      .addStep(suggestActivities())
      .addStep(error())
      .defaultStepRecoverStrategy(maxRetries(2).failoverTo("error"));
  }

  private Step askWeather() { (3)
    return step("weather")
      .call(
        () ->
          componentClient
            .forAgent()
            .inSession(sessionId())
            .method(WeatherAgent::query)
            .invoke(currentState().userQuery)
      )
      .andThen(String.class, forecast -> {
        logger.info("Weather forecast: {}", forecast);

        return effects().transitionTo("activities"); (4)
      })
      .timeout(Duration.ofSeconds(60));
  }

  private Step suggestActivities() {
    return step("activities")
      .call(() ->
        componentClient
          .forAgent()
          .inSession(sessionId())
          .method(ActivityAgent::query) (5)
          .invoke(
            new ActivityAgent.Request(currentState().userId(), currentState().userQuery())
          ))
      .andThen(String.class, suggestion -> {
        logger.info("Activities: {}", suggestion);

        return effects()
          .updateState(currentState().withAnswer(suggestion)) (6)
          .end();
      })
      .timeout(Duration.ofSeconds(60));
  }

  private Step error() {
    return step("error").call(() -> null).andThen(() -> effects().end());
  }

  private String sessionId() {
    // the workflow corresponds to the session
    return commandContext().workflowId();
  }
}
1 The workflow starts, and keeps track of the user id and original query in the state of the workflow.
2 First step is for the weather forecast.
3 Weather forecast is retrieved by the WeatherAgent, which must extract the location from the user query.
4 Next step is to find activities.
5 Request to the ActivityAgent.
6 The final result is stored in the workflow state.

You might have noticed that we don’t pass the forecast as a parameter to the ActivityAgent. How would it then know about the weather? The WeatherAgent and Activity agents share the same session memory and thereby the ActivityAgent will have the weather forecast in the context that is sent to the AI model.

Adjust the endpoint

Let’s modify the endpoint to use the AgentTeamWorkflow workflow instead of calling the agent directly.

ActivityEndpoint.java
@Post("/activities/{userId}")
public HttpResponse suggestActivities(String userId, Request request) {
  var sessionId = UUID.randomUUID().toString();

  var res = componentClient
    .forWorkflow(sessionId)
    .method(AgentTeamWorkflow::start) (1)
    .invoke(new AgentTeamWorkflow.Request(userId, request.message()));

  return HttpResponses.created(res, "/activities/" + userId + "/" + sessionId); (2)
}
1 Spawn the workflow by calling the start method.
2 Since the workflow is running in the background we can’t wait for the final answer, but instead reply with a HTTP CREATED status code.

This is returning akka.http.javadsl.model.HttpResponse, which is created with akka.javasdk.http.HttpResponses.

We need another method to retrieve the actual answer:

ActivityEndpoint.java
@Get("/activities/{userId}/{sessionId}")
public HttpResponse getAnswer(String userId, String sessionId) {
  var res = componentClient
    .forWorkflow(sessionId)
    .method(AgentTeamWorkflow::getAnswer)
    .invoke();

  if (res.isEmpty()) return HttpResponses.notFound(
    "Answer for '" + sessionId + "' not available (yet)"
  );
  else return HttpResponses.ok(res);
}

Running the service

Start your service locally:

mvn compile exec:java

Ask for activities.

curl -i -XPOST --location "http://localhost:9000/activities/alice" \
  --header "Content-Type: application/json" \
  --data '{"message": "I am in Madrid. What should I do? Beware of the weather."}'

Retrieve the suggested activities with the sessionId from the previous response:

curl -i -XGET --location "http://localhost:9000/activities/alice/{sessionId}"

Does it take the current weather forecast into account? You should see the Weather forecast in the logs of the service.

Next steps