ActorSource.actorRef
Materialize an ActorRef<T>
ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
Actor interop operators
Dependency
This operator is included in:
sbt val AkkaVersion = "2.6.21"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
Maven <properties>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-bom_${scala.binary.version} </artifactId>
<version> 2.6.21 </version>
<type> pom </type>
<scope> import </scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream-typed_${scala.binary.version} </artifactId>
</dependency>
</dependencies>
Gradle def versions = [
ScalaBinary : "2.13"
]
dependencies {
implementation platform ( "com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.21" )
implementation "com.typesafe.akka:akka-stream-typed_${versions.ScalaBinary}"
}
Signature
ActorSource.actorRef
ActorSource.actorRef
Description
Materialize an ActorRef<T>
ActorRef[T]
which only accepts messages that are of the same type as the stream.
See also:
Examples
Scala
copy source import akka . actor . typed . ActorRef
import akka . stream . OverflowStrategy
import akka . stream . scaladsl .{ Sink , Source }
import akka . stream . typed . scaladsl . ActorSource
trait Protocol
case class Message ( msg : String ) extends Protocol
case object Complete extends Protocol
case class Fail ( ex : Exception ) extends Protocol
val source : Source [ Protocol , ActorRef [ Protocol ]] = ActorSource . actorRef [ Protocol ]( completionMatcher = {
case Complete =>
}, failureMatcher = {
case Fail ( ex ) => ex
}, bufferSize = 8 , overflowStrategy = OverflowStrategy . fail )
val ref = source
. collect {
case Message ( msg ) => msg
}
. to ( Sink . foreach ( println ))
. run ()
ref ! Message ( "msg1" )
// ref ! "msg2" Does not compile
Java
copy source import akka . actor . typed . ActorRef ;
import akka . actor . typed . ActorSystem ;
import akka . japi . JavaPartialFunction ;
import akka . stream . OverflowStrategy ;
import akka . stream . javadsl . Sink ;
import akka . stream . javadsl . Source ;
import akka . stream . typed . javadsl . ActorSource ;
import java . util . Optional ;
interface Protocol {}
class Message implements Protocol {
private final String msg ;
public Message ( String msg ) {
this . msg = msg ;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Exception ex ;
public Fail ( Exception ex ) {
this . ex = ex ;
}
}
final Source < Protocol , ActorRef < Protocol >> source =
ActorSource . actorRef (
( m ) -> m instanceof Complete ,
( m ) -> ( m instanceof Fail ) ? Optional . of ((( Fail ) m ). ex ) : Optional . empty (),
8 ,
OverflowStrategy . fail ());
final ActorRef < Protocol > ref =
source
. collect (
new JavaPartialFunction < Protocol , String >() {
public String apply ( Protocol p , boolean isCheck ) {
if ( p instanceof Message ) {
return (( Message ) p ). msg ;
} else {
throw noMatch ();
}
}
})
. to ( Sink . foreach ( System . out :: println ))
. run ( system );
ref . tell ( new Message ( "msg1" ));
// ref.tell("msg2"); Does not compile
Found an error in this documentation? The source code for this page can be found
here .
Please feel free to edit and contribute a pull request.