actorRef
Materialize an ActorRef
; sending messages to it will emit them on the stream.
Description
Materialize an ActorRef
, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream; the strategy is chosen by the user.
emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef
completes when the ActorRef
is sent akka.actor.Status.Success
Examples
- Scala
-
source
import akka.actor.Status.Success import akka.actor.ActorRef import akka.stream.OverflowStrategy import akka.stream.CompletionStrategy import akka.stream.scaladsl._ implicit val system: ActorSystem = ActorSystem() implicit val materializer: ActorMaterializer = ActorMaterializer() val bufferSize = 100 val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() actorRef ! "hello" actorRef ! "hello" // The stream completes successfully with the following message actorRef ! Success(CompletionStrategy.immediately) - Java
-
source
import akka.actor.ActorRef; import akka.actor.Status.Success; import akka.stream.OverflowStrategy; import akka.stream.CompletionStrategy; import akka.stream.javadsl.Sink; import akka.testkit.TestProbe; final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); int bufferSize = 100; Source<Object, ActorRef> source = Source.actorRef(bufferSize, OverflowStrategy.dropHead()); ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(materializer); actorRef.tell("hello", ActorRef.noSender()); actorRef.tell("hello", ActorRef.noSender()); // The stream completes successfully with the following message actorRef.tell(new Success(CompletionStrategy.draining()), ActorRef.noSender());