Source.actorRef
Materialize an ActorRef
of the classic actors API; sending messages to it will emit them on the stream.
Signature
Source.actorRef
Source.actorRef
Description
Materialize an ActorRef
, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream; the strategy is chosen by the user.
The stream can be completed successfully by sending the actor reference a akka.actor.Status.Success
. If the content is akka.stream.CompletionStrategy.immediately
the completion will be signaled immediately. Otherwise, if the content is akka.stream.CompletionStrategy.draining
(or anything else) already buffered elements will be sent out before signaling completion. Sending akka.actor.PoisonPill
will signal completion immediately but this behavior is deprecated and scheduled to be removed. Using akka.actor.ActorSystem.stop
to stop the actor and complete the stream is not supported.
See also:
- Source.actorRefWithBackpressure This operator, but with backpressure control
- ActorSource.actorRef The corresponding operator for the new actors API
- ActorSource.actorRefWithBackpressure The operator for the new actors API with backpressure control
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Examples
- Scala
-
source
import akka.Done import akka.actor.ActorRef import akka.stream.OverflowStrategy import akka.stream.CompletionStrategy import akka.stream.scaladsl._ val source: Source[Any, ActorRef] = Source.actorRef( completionMatcher = { case Done => // complete stream immediately if we send it Done CompletionStrategy.immediately }, // never fail the stream because of a message failureMatcher = PartialFunction.empty, bufferSize = 100, overflowStrategy = OverflowStrategy.dropHead) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() actorRef ! "hello" actorRef ! "hello" // The stream completes successfully with the following message actorRef ! Done
- Java
-
source
import akka.actor.ActorRef; import akka.stream.OverflowStrategy; import akka.stream.CompletionStrategy; import akka.stream.javadsl.Sink; import akka.testkit.TestProbe; int bufferSize = 100; Source<Object, ActorRef> source = Source.actorRef( elem -> { // complete stream immediately if we send it Done if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately()); else return Optional.empty(); }, // never fail the stream because of a message elem -> Optional.empty(), bufferSize, OverflowStrategy.dropHead()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system); actorRef.tell("hello", ActorRef.noSender()); actorRef.tell("hello", ActorRef.noSender()); // The stream completes successfully with the following message actorRef.tell(Done.done(), ActorRef.noSender());
Reactive Streams semantics
emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef
completes when the actor is stopped by sending it a particular message as described above