actorRef

Materialize an ActorRef; sending messages to it will emit them on the stream.

Source operators

Signature

def actorRef[T](completionMatcher: PartialFunction[Any, CompletionStrategy],failureMatcher: PartialFunction[Any, Throwable],bufferSize: Int,overflowStrategy: OverflowStrategy): Source[T, ActorRef]
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef]

Description

Materialize an ActorRef, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream; the strategy is chosen by the user.

The stream can be completed successfully by sending the actor reference a akka.actor.Status.Success. If the content is akka.stream.CompletionStrategy.immediately the completion will be signaled immediately. Otherwise, if the content is akka.stream.CompletionStrategy.draining (or anything else) already buffered elements will be sent out before signaling completion. Sending akka.actor.PoisonPill will signal completion immediately but this behavior is deprecated and scheduled to be removed. Using akka.actor.ActorSystem.stop to stop the actor and complete the stream is not supported.

Reactive Streams semantics

emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef

completes when the actor is stopped by sending it a particular message as described above

Examples

Scala

import akka.actor.Status.Success import akka.actor.ActorRef import akka.stream.OverflowStrategy import akka.stream.CompletionStrategy import akka.stream.scaladsl._ val bufferSize = 100 val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() actorRef ! "hello" actorRef ! "hello" // The stream completes successfully with the following message actorRef ! Success(CompletionStrategy.immediately)
Java
import akka.actor.ActorRef;
import akka.actor.Status.Success;
import akka.stream.OverflowStrategy;
import akka.stream.CompletionStrategy;
import akka.stream.javadsl.Sink;
import akka.testkit.TestProbe;

int bufferSize = 100;
Source<Object, ActorRef> source = Source.actorRef(bufferSize, OverflowStrategy.dropHead());

ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);
actorRef.tell("hello", ActorRef.noSender());
actorRef.tell("hello", ActorRef.noSender());

// The stream completes successfully with the following message
actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender());
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.