Camel

Dependency

To use Camel, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-camel" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-camel_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-camel_2.12:2.5.32"
}

Camel depends on jaxb-api and javax.activation that were removed from the JDK. If running on a version of the JDK 9 or above also add the following dependencies:

sbt
libraryDependencies += "javax.xml.bind" % "jaxb-api" % "2.3.0"
Maven
<dependencies>
  <dependency>
    <groupId>javax.xml.bind</groupId>
    <artifactId>jaxb-api</artifactId>
    <version>2.3.0</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "javax.xml.bind:jaxb-api:2.3.0"
}
sbt
libraryDependencies += "com.sun.activation" % "javax.activation" % "1.2.0"
Maven
<dependencies>
  <dependency>
    <groupId>com.sun.activation</groupId>
    <artifactId>javax.activation</artifactId>
    <version>1.2.0</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.sun.activation:javax.activation:1.2.0"
}

Introduction

Warning

Akka Camel is deprecated in favour of Alpakka , the Akka Streams based collection of integrations to various endpoints (including Camel).

Introduction

The akka-camel module allows Untyped Actors to receive and send messages over a great variety of protocols and APIs. In addition to the native Scala and Java actor API, actors can now exchange messages with other systems over large number of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported.

Apache Camel

The akka-camel module is based on Apache Camel, a powerful and light-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this Apache Camel article. Camel comes with a large number of components that provide bindings to different protocols and APIs. The camel-extra project provides further components.

Consumer

Here’s an example of using Camel’s integration components in Akka.

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }

class MyEndpoint extends Consumer {
  def endpointUri = "mina2:tcp://localhost:6200?textline=true"

  def receive = {
    case msg: CamelMessage => { /* ... */ }
    case _                 => { /* ... */ }
  }
}

// start and expose actor via tcp
import akka.actor.{ ActorSystem, Props }

val system = ActorSystem("some-system")
val mina = system.actorOf(Props[MyEndpoint])
Java
sourceimport akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;

public class MyEndpoint extends UntypedConsumerActor {
  private String uri;

  public String getEndpointUri() {
    return uri;
  }

  public void onReceive(Object message) throws Exception {
    if (message instanceof CamelMessage) {
      /* ... */
    } else unhandled(message);
  }

  // Extra constructor to change the default uri,
  // for instance to "jetty:http://localhost:8877/example"
  public MyEndpoint(String uri) {
    this.uri = uri;
  }

  public MyEndpoint() {
    this.uri = "mina2:tcp://localhost:6200?textline=true";
  }
}

The above example exposes an actor over a TCP endpoint via Apache Camel’s Mina component. The actor implements the endpointUrigetEndpointUri method to define an endpoint from which it can receive messages. After starting the actor, TCP clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel’s Jetty component), the actor’s endpointUrigetEndpointUri method should return a different URI, for instance jetty:http://localhost:8877/example.

sourceimport akka.camel.{ CamelMessage, Consumer }

class MyEndpoint extends Consumer {
  def endpointUri = "jetty:http://localhost:8877/example"

  def receive = {
    case msg: CamelMessage => { /* ... */ }
    case _                 => { /* ... */ }
  }
}

In the above case an extra constructor is added that can set the endpoint URI, which would result in the getEndpointUri returning the URI that was set using this constructor.

Producer

Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.

Scala
sourceimport akka.actor.{ ActorSystem, Props }
import akka.camel.CamelExtension

import language.postfixOps
import akka.util.Timeout
import akka.actor.Actor
import akka.camel.{ Oneway, Producer }
import akka.actor.{ ActorSystem, Props }

class Orders extends Actor with Producer with Oneway {
  def endpointUri = "jms:queue:Orders"
}

val sys = ActorSystem("some-system")
val orders = sys.actorOf(Props[Orders])

orders ! <order amount="100" currency="PLN" itemId="12345"/>
Java
sourceimport akka.camel.javaapi.UntypedProducerActor;

public class Orders extends UntypedProducerActor {
  public String getEndpointUri() {
    return "jms:queue:Orders";
  }
}

In the above example, any message sent to this actor will be sent to the JMS queue ordersOrders. Producer actors may choose from the same set of Camel components as Consumer actors do.

Below an example of how to send a message to the Orders producer.

sourceActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(Orders.class);
ActorRef producer = system.actorOf(props, "jmsproducer");
producer.tell("<order amount=\"100\" currency=\"PLN\" itemId=\"12345\"/>", ActorRef.noSender());

CamelMessage

The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application’s classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage. In the above example of the Orders Producer, the XML message is put in the body of a newly created Camel Message with an empty set of headers. You can also create a CamelMessage yourself with the appropriate body and headers as you see fit.

CamelExtension

The akka-camel module is implemented as an Akka Extension, the CamelExtension object. Extensions will only be loaded once per ActorSystem, which will be managed by Akka. The CamelExtension object provides access to the Camel traitinterface. The Camel traitinterface in turn provides access to two important Apache Camel objects, the CamelContext and the ProducerTemplate. Below you can see how you can get access to these Apache Camel objects.

Scala
sourceval system = ActorSystem("some-system")
val camel = CamelExtension(system)
val camelContext = camel.context
val producerTemplate = camel.template
Java
sourceActorSystem system = ActorSystem.create("some-system");
Camel camel = CamelExtension.get(system);
CamelContext camelContext = camel.context();
ProducerTemplate producerTemplate = camel.template();

One CamelExtension is only loaded once for every one ActorSystem, which makes it safe to call the CamelExtension at any point in your code to get to the Apache Camel objects associated with it. There is one CamelContext and one ProducerTemplate for every one ActorSystem that uses a CamelExtension. By Default, a new CamelContext is created when the CamelExtension starts. If you want to inject your own context instead, you can extendimplement the ContextProvider traitinterface and add the FQCN of your implementation in the config, as the value of the “akka.camel.context-provider”. This interface define a single method getContext() used to load the CamelContext.

Below an example on how to add the ActiveMQ component to the CamelContext, which is required when you would like to use the ActiveMQ component.

Scala
source// import org.apache.activemq.camel.component.ActiveMQComponent
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val camelContext = camel.context
// camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(
//   "vm://localhost?broker.persistent=false"))
Java
sourceActorSystem system = ActorSystem.create("some-system");
Camel camel = CamelExtension.get(system);
CamelContext camelContext = camel.context();
// camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(
//   "vm://localhost?broker.persistent=false"));

The CamelContext joins the lifecycle of the ActorSystem and CamelExtension it is associated with; the CamelContext is started when the CamelExtension is created, and it is shut down when the associated ActorSystem is shut down. The same is true for the ProducerTemplate.

The CamelExtension is used by both Producer and Consumer actors to interact with Apache Camel internally. You can access the CamelExtension inside a Producer or a Consumer using the camel definitionmethod, or get straight at the CamelContext using the camelContext definitiongetCamelContext method or to the ProducerTemplate using the getProducerTemplate method. Actors are created and started asynchronously. When a Consumer actor is created, the Consumer is published at its Camel endpoint (more precisely, the route is added to the CamelContext from the Endpoint to the actor). When a Producer actor is created, a SendProcessor and Endpoint are created so that the Producer can send messages to it. Publication is done asynchronously; setting up an endpoint may still be in progress after you have requested the actor to be created. Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used. The Camel traitinterface allows you to find out when the endpoint is activated or deactivated.

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }
import scala.concurrent.duration._

class MyEndpoint extends Consumer {
  def endpointUri = "mina2:tcp://localhost:6200?textline=true"

  def receive = {
    case msg: CamelMessage => { /* ... */ }
    case _                 => { /* ... */ }
  }
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val actorRef = system.actorOf(Props[MyEndpoint])
// get a future reference to the activation of the endpoint of the Consumer Actor
val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
Java
sourceimport akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import jdocs.AbstractJavaTest;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import static java.util.concurrent.TimeUnit.SECONDS;

// ..
ActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(MyConsumer.class);
ActorRef producer = system.actorOf(props, "myproducer");
Camel camel = CamelExtension.get(system);
// get a future reference to the activation of the endpoint of the Consumer Actor
Timeout timeout = new Timeout(Duration.create(10, SECONDS));
Future<ActorRef> activationFuture =
    camel.activationFutureFor(producer, timeout, system.dispatcher());

The above code shows that you can get a Future to the activation of the route from the endpoint to the actor, or you can wait in a blocking fashion on the activation of the route. An ActivationTimeoutException is thrown if the endpoint could not be activated within the specified timeout. Deactivation works in a similar fashion:

Scala
sourcesystem.stop(actorRef)
// get a future reference to the deactivation of the endpoint of the Consumer Actor
val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
Java
source// ..
system.stop(producer);
// get a future reference to the deactivation of the endpoint of the Consumer Actor
Future<ActorRef> deactivationFuture =
    camel.deactivationFutureFor(producer, timeout, system.dispatcher());

Deactivation of a Consumer or a Producer actor happens when the actor is terminated. For a Consumer, the route to the actor is stopped. For a Producer, the SendProcessor is stopped. A DeActivationTimeoutException is thrown if the associated camel objects could not be deactivated within the specified timeout.

Consumer Actors

For objects to receive messages, they must mixin the Consumer traitinherit from the UntypedConsumerActor class. For example, the following actor class (Consumer1) implements the endpointUrigetEndpointUri method, which is declared in the Consumer traitUntypedConsumerActor class, in order to receive messages from the file:data/input/actor Camel endpoint.

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }

class Consumer1 extends Consumer {
  def endpointUri = "file:data/input/actor"

  def receive = {
    case msg: CamelMessage => println("received %s".format(msg.bodyAs[String]))
  }
}
Java
sourceimport akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Consumer1 extends UntypedConsumerActor {
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  public String getEndpointUri() {
    return "file:data/input/actor";
  }

  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      String body = camelMessage.getBodyAs(String.class, getCamelContext());
      log.info("Received message: {}", body);
    } else unhandled(message);
  }
}

Whenever a file is put into the data/input/actor directory, its content is picked up by the Camel file component and sent as message to the actor. Messages consumed by actors from Camel endpoints are of type CamelMessage. These are immutable representations of Camel messages.

Here’s another example that sets the endpointUri to jetty:http://localhost:8877/camel/default. It causes Camel’s Jetty component to start an embedded Jetty server, accepting HTTP connections from localhost on port 8877.

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }

class Consumer2 extends Consumer {
  def endpointUri = "jetty:http://localhost:8877/camel/default"

  def receive = {
    case msg: CamelMessage => sender() ! ("Hello %s".format(msg.bodyAs[String]))
  }
}
Java
sourceimport akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;

public class Consumer2 extends UntypedConsumerActor {
  public String getEndpointUri() {
    return "jetty:http://localhost:8877/camel/default";
  }

  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      String body = camelMessage.getBodyAs(String.class, getCamelContext());
      getSender().tell(String.format("Received message: %s", body), getSelf());
    } else unhandled(message);
  }
}

After starting the actor, clients can send messages to that actor by POSTing to http://localhost:8877/camel/default. The actor sends a response by using the sender !getSender().tell method. For returning a message body and headers to the HTTP client the response type should be CamelMessage. For any other response type, a new CamelMessage object is created by akka-camel with the actor response as message body.

Delivery acknowledgements

With in-out message exchanges, clients usually know that a message exchange is done when they receive a reply from a consumer actor. The reply message can be a CamelMessage (or any object which is then internally converted to a CamelMessage) on success, and a Failure message on failure.

With in-only message exchanges, by default, an exchange is done when a message is added to the consumer actor’s mailbox. Any failure or exception that occurs during processing of that message by the consumer actor cannot be reported back to the endpoint in this case. To allow consumer actors to positively or negatively acknowledge the receipt of a message from an in-only message exchange, they need to override the autoAck method to return false. In this case, consumer actors must reply either with a special akka.camel.Ack message (positive acknowledgement) or a akka.actor.Status.Failure (negative acknowledgement).

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure

class Consumer3 extends Consumer {
  override def autoAck = false

  def endpointUri = "jms:queue:test"

  def receive = {
    case msg: CamelMessage =>
      sender() ! Ack
      // on success
      // ..
      val someException = new Exception("e1")
      // on failure
      sender() ! Failure(someException)
  }
}
Java
sourceimport akka.actor.Status;
import akka.camel.Ack;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;

public class Consumer3 extends UntypedConsumerActor {

  @Override
  public boolean autoAck() {
    return false;
  }

  public String getEndpointUri() {
    return "jms:queue:test";
  }

  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      getSender().tell(Ack.getInstance(), getSelf());
      // on success
      // ..
      Exception someException = new Exception("e1");
      // on failure
      getSender().tell(new Status.Failure(someException), getSelf());
    } else unhandled(message);
  }
}

Consumer timeout

Camel Exchanges (and their corresponding endpoints) that support two-way communications need to wait for a response from an actor before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual Camel components. Another option is to configure timeouts on the level of consumer actors.

Two-way communications between a Camel endpoint and an actor are initiated by sending the request message to the actor with the askask pattern and the actor replies to the endpoint when the response is ready. The ask request to the actor can timeout, which will result in the Exchange failing with a TimeoutException set on the failure of the Exchange. The timeout on the consumer actor can be overridden with the replyTimeout, as shown below.

Scala
sourceimport akka.camel.{ CamelMessage, Consumer }
import scala.concurrent.duration._

class Consumer4 extends Consumer {
  def endpointUri = "jetty:http://localhost:8877/camel/default"
  override def replyTimeout = 500 millis
  def receive = {
    case msg: CamelMessage => sender() ! ("Hello %s".format(msg.bodyAs[String]))
  }
}
Java
sourceimport akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.TimeUnit;

public class Consumer4 extends UntypedConsumerActor {
  private static final FiniteDuration timeout = Duration.create(500, TimeUnit.MILLISECONDS);

  @Override
  public FiniteDuration replyTimeout() {
    return timeout;
  }

  public String getEndpointUri() {
    return "jetty:http://localhost:8877/camel/default";
  }

  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      String body = camelMessage.getBodyAs(String.class, getCamelContext());
      getSender().tell(String.format("Hello %s", body), getSelf());
    } else unhandled(message);
  }
}

Producer Actors

For sending messages to Camel endpoints, actors need to mixin the Producer trait inherit from the UntypedProducerActor class and implement the getEndpointUri method.

Scala
sourceimport akka.actor.Actor
import akka.actor.{ ActorSystem, Props }
import akka.camel.{ CamelMessage, Producer }
import akka.util.Timeout

class Producer1 extends Actor with Producer {
  def endpointUri = "http://localhost:8080/news"
}
Java
sourceimport akka.camel.javaapi.UntypedProducerActor;

public class Producer1 extends UntypedProducerActor {
  public String getEndpointUri() {
    return "http://localhost:8080/news";
  }
}

Producer1 inherits a default implementation of the receiveonReceive method from the Producer traitUntypedProducerActor class. To customize a producer actor’s default behavior you must override the Producer.transformResponseUntypedProducerActor.onTransformResponse and Producer.transformOutgoingMessage methodsUntypedProducerActor.onTransformOutgoingMessage methods. This is explained later in more detail. Producer Actors cannot override the default Producer.receiveUntypedProducerActor.onReceive method.

Any message sent to a ProducerProducer actor will be sent to the associated Camel endpoint, in the above example to http://localhost:8080/news. The ProducerUntypedProducerActor always sends messages asynchronously. Response messages (if supported by the configured endpoint) will, by default, be returned to the original sender. The following example uses the ask pattern to send a message to a Producer actor and waits for a response.

Scala
sourceimport akka.pattern.ask
import scala.concurrent.duration._
implicit val timeout = Timeout(10 seconds)

val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer1])
val future = producer.ask("some request").mapTo[CamelMessage]
Java
sourceActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(FirstProducer.class);
ActorRef producer = system.actorOf(props, "myproducer");
CompletionStage<Object> future =
    Patterns.ask(producer, "some request", Duration.ofMillis(1000L));

The future contains the response CamelMessage, or an AkkaCamelException when an error occurred, which contains the headers of the response.

Custom Processing

Instead of replying to the initial sender, producer actors can implement custom response processing by overriding the routeResponseonRouteResponse method. In the following example, the response message is forwarded to a target actor instead of being replied to the original sender.

Scala
sourceimport akka.actor.{ Actor, ActorRef }
import akka.camel.{ CamelMessage, Producer }
import akka.actor.{ ActorSystem, Props }

class ResponseReceiver extends Actor {
  def receive = {
    case msg: CamelMessage =>
    // do something with the forwarded response
  }
}

class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
  def endpointUri = uri

  override def routeResponse(msg: Any): Unit = { target.forward(msg) }
}
val system = ActorSystem("some-system")
val receiver = system.actorOf(Props[ResponseReceiver])
val forwardResponse = system.actorOf(Props(classOf[Forwarder], this, "http://localhost:8080/news/akka", receiver))
// the Forwarder sends out a request to the web page and forwards the response to
// the ResponseReceiver
forwardResponse ! "some request"
Java
sourceimport akka.actor.UntypedAbstractActor;
import akka.camel.CamelMessage;

public class ResponseReceiver extends UntypedAbstractActor {
  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      // do something with the forwarded response
    }
  }
}
sourceimport akka.actor.ActorRef;
import akka.camel.javaapi.UntypedProducerActor;

public class Forwarder extends UntypedProducerActor {
  private String uri;
  private ActorRef target;

  public Forwarder(String uri, ActorRef target) {
    this.uri = uri;
    this.target = target;
  }

  public String getEndpointUri() {
    return uri;
  }

  @Override
  public void onRouteResponse(Object message) {
    target.forward(message, getContext());
  }
}
sourceActorSystem system = ActorSystem.create("some-system");
Props receiverProps = Props.create(ResponseReceiver.class);
final ActorRef receiver = system.actorOf(receiverProps, "responseReceiver");
ActorRef forwardResponse =
    system.actorOf(Props.create(Forwarder.class, "http://localhost:8080/news/akka", receiver));
// the Forwarder sends out a request to the web page and forwards the response to
// the ResponseReceiver
forwardResponse.tell("some request", ActorRef.noSender());

Before producing messages to endpoints, producer actors can pre-process them by overriding the Producer.transformOutgoingMessage UntypedProducerActor.onTransformOutgoingMessag method.

Scala
sourceimport akka.actor.Actor
import akka.camel.{ CamelMessage, Producer }

class Transformer(uri: String) extends Actor with Producer {
  def endpointUri = uri

  def upperCase(msg: CamelMessage) = msg.mapBody { body: String =>
    body.toUpperCase
  }

  override def transformOutgoingMessage(msg: Any) = msg match {
    case msg: CamelMessage => upperCase(msg)
  }
}
Java
sourceimport akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedProducerActor;
import akka.dispatch.Mapper;

public class Transformer extends UntypedProducerActor {
  private String uri;

  public Transformer(String uri) {
    this.uri = uri;
  }

  public String getEndpointUri() {
    return uri;
  }

  private CamelMessage upperCase(CamelMessage msg) {
    return msg.mapBody(
        new Mapper<String, String>() {
          @Override
          public String apply(String body) {
            return body.toUpperCase();
          }
        });
  }

  @Override
  public Object onTransformOutgoingMessage(Object message) {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      return upperCase(camelMessage);
    } else {
      return message;
    }
  }
}

Producer configuration options

The interaction of producer actors with Camel endpoints can be configured to be one-way or two-way (by initiating in-only or in-out message exchanges, respectively). By default, the producer initiates an in-out message exchange with the endpoint. For initiating an in-only exchange, producer actors have to override the onewayisOneway method to return true.

Scala
sourceimport akka.actor.{ Actor, ActorSystem, Props }
import akka.camel.Producer

class OnewaySender(uri: String) extends Actor with Producer {
  def endpointUri = uri
  override def oneway: Boolean = true
}

val system = ActorSystem("some-system")
val producer = system.actorOf(Props(classOf[OnewaySender], this, "activemq:FOO.BAR"))
producer ! "Some message"
Java
sourceimport akka.camel.javaapi.UntypedProducerActor;

public class OnewaySender extends UntypedProducerActor {
  private String uri;

  public OnewaySender(String uri) {
    this.uri = uri;
  }

  public String getEndpointUri() {
    return uri;
  }

  @Override
  public boolean isOneway() {
    return true;
  }
}

Message correlation

To correlate request with response messages, applications can set the Message.MessageExchangeId message header.

Scala
sourceimport akka.camel.{ CamelMessage, Producer }
import akka.actor.Actor
import akka.actor.{ ActorSystem, Props }

class Producer2 extends Actor with Producer {
  def endpointUri = "activemq:FOO.BAR"
}
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[Producer2])

producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))
Java
sourceActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(Orders.class);
ActorRef producer = system.actorOf(props, "jmsproducer");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(CamelMessage.MessageExchangeId(), "123");
producer.tell(
    new CamelMessage("<order amount=\"100\" currency=\"PLN\" " + "itemId=\"12345\"/>", headers),
    ActorRef.noSender());

ProducerTemplate

The Producer traitUntypedProducerActor class is a very convenient way for actors to produce messages to Camel endpoints. Actors may also use a Camel ProducerTemplate for producing messages to endpoints.

Scala
sourceimport akka.actor.Actor
class MyActor extends Actor {
  def receive = {
    case msg =>
      val template = CamelExtension(context.system).template
      template.sendBody("direct:news", msg)
  }
}
Java
sourceimport akka.actor.UntypedAbstractActor;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import org.apache.camel.ProducerTemplate;

public class MyActor extends UntypedAbstractActor {
  public void onReceive(Object message) {
    Camel camel = CamelExtension.get(getContext().getSystem());
    ProducerTemplate template = camel.template();
    template.sendBody("direct:news", message);
  }
}

For initiating a two-way message exchange, one of the ProducerTemplate.request* methods must be used.

Scala
sourceimport akka.actor.Actor
class MyActor extends Actor {
  def receive = {
    case msg =>
      val template = CamelExtension(context.system).template
      sender() ! template.requestBody("direct:news", msg)
  }
}
Java
sourceimport akka.actor.AbstractActor;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import org.apache.camel.ProducerTemplate;

public class RequestBodyActor extends AbstractActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchAny(
            message -> {
              Camel camel = CamelExtension.get(getContext().getSystem());
              ProducerTemplate template = camel.template();
              getSender().tell(template.requestBody("direct:news", message), getSelf());
            })
        .build();
  }
}

Asynchronous routing

In-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.

  • A consumer endpoint sends request messages to its consumer actor using the ! (tell) operator tell method and the actor returns responses with sender !getSender().tell once they are ready.
  • A producer actor sends request messages to its endpoint using Camel’s asynchronous routing engine. Asynchronous responses are wrapped and added to the producer actor’s mailbox for later processing. By default, response messages are returned to the initial sender but this can be overridden by Producer implementations (see also description of the routeResponseonRouteResponse method in Custom Processing).

However, asynchronous two-way message exchanges, without allocating a thread for the full duration of exchange, cannot be generically supported by Camel’s asynchronous routing engine alone. This must be supported by the individual Camel components (from which endpoints are created) as well. They must be able to suspend any work started for request processing (thereby freeing threads to do other work) and resume processing when the response is ready. This is currently the case for a subset of components such as the Jetty component. All other Camel components can still be used, but they will cause allocation of a thread for the duration of an in-out message exchange.

If the used Camel component is blocking it might be necessary to use a separate dispatcher for the producer. The Camel processor is invoked by a child actor of the producer and the dispatcher can be defined in the deployment section of the configuration. For example, if your producer actor has path /user/integration/output the dispatcher of the child actor can be defined with:

akka.actor.deployment {
  /integration/output/* {
    dispatcher = my-dispatcher
  }
}

Custom Camel routes

In all the examples so far, routes to consumer actors have been automatically constructed by akka-camel, when the actor was started. Although the default route construction templates, used by akka-camel internally, are sufficient for most use cases, some applications may require more specialized routes to actors. The akka-camel module provides two mechanisms for customizing routes to actors, which will be explained in this section. These are:

  • Usage of Akka Camel components to access actors. Any Camel route can use these components to access Akka actors.
  • Intercepting route construction to actors. This option gives you the ability to change routes that have already been added to Camel. Consumer actors have a hook into the route definition process which can be used to change the route.

Akka Camel components

Akka actors can be accessed from Camel routes using the actor Camel component. This component can be used to access any Akka actor (not only consumer actors) from Camel routes, as described in the following sections.

Access to actors

To access actors from custom Camel routes, the actor Camel component should be used. It fully supports Camel’s asynchronous routing engine.

This component accepts the following endpoint URI format:

  • [<actor-path>]?<options>

where <actor-path> is the ActorPath to the actor. The <options> are name-value pairs separated by & (i.e. name1=value1&name2=value2&...).

URI options

The following URI options are supported:

Name Type Default Description
replyTimeout Duration false The reply timeout, specified in the same way that you use the duration in akka, for instance 10 seconds except that in the url it is handy to use a + between the amount and the unit, like for example 200+millis See also Consumer timeout.
autoAck Boolean true If set to true, in-only message exchanges are auto-acknowledged when the message is added to the actor’s mailbox. If set to false, actors must acknowledge the receipt of the message. See also Delivery acknowledgements.

Here’s an actor endpoint URI example containing an actor path:

akka://some-system/user/myconsumer?autoAck=false&replyTimeout=100+millis

In the following example, a custom route to an actor is created, using the actor’s path.

The Akka camel package contains an implicit toActorRouteDefinition that allows for a route to reference an ActorRef directly as shown in the below example, The route starts from a Jetty endpoint and ends at the target actor.

Scala
sourceimport akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.camel.{ CamelExtension, CamelMessage }
import org.apache.camel.builder.RouteBuilder
import akka.camel._
class Responder extends Actor {
  def receive = {
    case msg: CamelMessage =>
      sender() ! (msg.mapBody { body: String =>
        "received %s".format(body)
      })
  }
}

class CustomRouteBuilder(system: ActorSystem, responder: ActorRef) extends RouteBuilder {
  def configure: Unit = {
    from("jetty:http://localhost:8877/camel/custom").to(responder)
  }
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val responder = system.actorOf(Props[Responder], name = "TestResponder")
camel.context.addRoutes(new CustomRouteBuilder(system, responder))
Java
sourceimport akka.actor.UntypedAbstractActor;
import akka.camel.CamelMessage;
import akka.dispatch.Mapper;

public class Responder extends UntypedAbstractActor {

  public void onReceive(Object message) {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      getSender().tell(createResponse(camelMessage), getSelf());
    } else unhandled(message);
  }

  private CamelMessage createResponse(CamelMessage msg) {
    return msg.mapBody(
        new Mapper<String, String>() {
          @Override
          public String apply(String body) {
            return String.format("received %s", body);
          }
        });
  }
}
sourceimport akka.actor.ActorRef;
import akka.camel.internal.component.CamelPath;
import org.apache.camel.builder.RouteBuilder;

public class CustomRouteBuilder extends RouteBuilder {
  private String uri;

  public CustomRouteBuilder(ActorRef responder) {
    uri = CamelPath.toUri(responder);
  }

  public void configure() throws Exception {
    from("jetty:http://localhost:8877/camel/custom").to(uri);
  }
}
sourceActorSystem system = ActorSystem.create("some-system");
try {
  Camel camel = CamelExtension.get(system);
  ActorRef responder = system.actorOf(Props.create(Responder.class), "TestResponder");
  camel.context().addRoutes(new CustomRouteBuilder(responder));

The CamelPath.toCamelUri converts the ActorRef to the Camel actor component URI format which points to the actor endpoint as described above. When a message is received on the jetty endpoint, it is routed to the Responder actor, which in return replies back to the client of the HTTP request.

Intercepting route construction

The previous section, camel components, explained how to setup a route to an actor manually. It was the application’s responsibility to define the route and add it to the current CamelContext. This section explains a more convenient way to define custom routes: akka-camel is still setting up the routes to consumer actors (and adds these routes to the current CamelContext) but applications can define extensions to these routes. Extensions can be defined with Camel’s Java DSL or Scala DSL. For example, an extension could be a custom error handler that redelivers messages from an endpoint to an actor’s bounded mailbox when the mailbox was full.

The following examples demonstrate how to extend a route to a consumer actor for handling exceptions thrown by that actor.

Scala
sourceimport akka.camel.Consumer

import org.apache.camel.builder.Builder
import org.apache.camel.model.RouteDefinition

class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
  def receive = {
    case msg: CamelMessage => throw new Exception("error: %s".format(msg.body))
  }
  override def onRouteDefinition =
    (rd) => rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end

  final override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    sender() ! Failure(reason)
  }
}
Java
sourceimport akka.actor.Status;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.dispatch.Mapper;
import org.apache.camel.builder.Builder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import scala.Option;

public class ErrorThrowingConsumer extends UntypedConsumerActor {
  private String uri;

  private static Mapper<RouteDefinition, ProcessorDefinition<?>> mapper =
      new Mapper<RouteDefinition, ProcessorDefinition<?>>() {
        public ProcessorDefinition<?> apply(RouteDefinition rd) {
          // Catch any exception and handle it by returning the exception message
          // as response
          return rd.onException(Exception.class)
              .handled(true)
              .transform(Builder.exceptionMessage())
              .end();
        }
      };

  public ErrorThrowingConsumer(String uri) {
    this.uri = uri;
  }

  public String getEndpointUri() {
    return uri;
  }

  public void onReceive(Object message) throws Exception {
    if (message instanceof CamelMessage) {
      CamelMessage camelMessage = (CamelMessage) message;
      String body = camelMessage.getBodyAs(String.class, getCamelContext());
      throw new Exception(String.format("error: %s", body));
    } else unhandled(message);
  }

  @Override
  public Mapper<RouteDefinition, ProcessorDefinition<?>> getRouteDefinitionHandler() {
    return mapper;
  }

  @Override
  public void preRestart(Throwable reason, Option<Object> message) {
    getSender().tell(new Status.Failure(reason), getSelf());
  }
}

The above ErrorThrowingConsumer sends the Failure back to the sender in preRestart because the Exception that is thrown in the actor would otherwise just crash the actor, by default the actor would be restarted, and the response would never reach the client of the Consumer.

The akka-camel module creates a RouteDefinition instance by calling from(endpointUri) on a Camel RouteBuilder (where endpointUri is the endpoint URI of the consumer actor) and passes that instance as argument to the route definition handler *). The route definition handler then extends the route and returns a ProcessorDefinition (in the above example, the ProcessorDefinition returned by the end method. See the org.apache.camel.model package for details). After executing the route definition handler, akka-camel finally calls a to(targetActorUri) on the returned ProcessorDefinition to complete the route to the consumer actor (where targetActorUri is the actor component URI as described in Access to actors). If the actor cannot be found, a ActorNotRegisteredException is thrown.

*) Before passing the RouteDefinition instance to the route definition handler, akka-camel may make some further modifications to it.

Configuration

There are several configuration properties for the Camel module, please refer to the reference configuration.

Additional Resources

For an introduction to akka-camel 2, see also the Peter Gabryanczyk’s talk Migrating akka-camel module to Akka 2.x.

For an introduction to akka-camel 1, see also the Appendix E - Akka and Camel (pdf) of the book Camel in Action.

Other, more advanced external articles (for version 1) are:

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.