ActorSource.actorRef
Materialize an ActorRef<T>
ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream only if they are of the same type as the stream.
Actor interop operators
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
sbt resolvers += "Akka library repository" . at ( "https://repo.akka.io/maven" )
Maven <project>
...
<repositories>
<repository>
<id> akka-repository </id>
<name> Akka library repository </name>
<url> https://repo.akka.io/maven </url>
</repository>
</repositories>
</project>
Gradle repositories {
mavenCentral ()
maven {
url "https://repo.akka.io/maven"
}
}
This operator is included in:
sbt val AkkaVersion = "2.10.2"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
Maven <properties>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-bom_${scala.binary.version} </artifactId>
<version> 2.10.2 </version>
<type> pom </type>
<scope> import </scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream-typed_${scala.binary.version} </artifactId>
</dependency>
</dependencies>
Gradle def versions = [
ScalaBinary : "2.13"
]
dependencies {
implementation platform ( "com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.10.2" )
implementation "com.typesafe.akka:akka-stream-typed_${versions.ScalaBinary}"
}
Signature
ActorSource.actorRef
ActorSource.actorRef
Description
Materialize an ActorRef<T>
ActorRef[T]
which only accepts messages that are of the same type as the stream.
See also:
Examples
Scala
copy source import akka . actor . typed . ActorRef
import akka . stream . OverflowStrategy
import akka . stream . scaladsl .{ Sink , Source }
import akka . stream . typed . scaladsl . ActorSource
trait Protocol
case class Message ( msg : String ) extends Protocol
case object Complete extends Protocol
case class Fail ( ex : Exception ) extends Protocol
val source : Source [ Protocol , ActorRef [ Protocol ]] = ActorSource . actorRef [ Protocol ]( completionMatcher = {
case Complete =>
}, failureMatcher = {
case Fail ( ex ) => ex
}, bufferSize = 8 , overflowStrategy = OverflowStrategy . fail )
val ref = source
. collect {
case Message ( msg ) => msg
}
. to ( Sink . foreach ( println ))
. run ()
ref ! Message ( "msg1" )
// ref ! "msg2" Does not compile
Java
copy source import akka . actor . typed . ActorRef ;
import akka . actor . typed . ActorSystem ;
import akka . japi . JavaPartialFunction ;
import akka . stream . OverflowStrategy ;
import akka . stream . javadsl . Sink ;
import akka . stream . javadsl . Source ;
import akka . stream . typed . javadsl . ActorSource ;
import java . util . Optional ;
interface Protocol {}
class Message implements Protocol {
private final String msg ;
public Message ( String msg ) {
this . msg = msg ;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Exception ex ;
public Fail ( Exception ex ) {
this . ex = ex ;
}
}
final Source < Protocol , ActorRef < Protocol >> source =
ActorSource . actorRef (
( m ) -> m instanceof Complete ,
( m ) -> ( m instanceof Fail ) ? Optional . of ((( Fail ) m ). ex ) : Optional . empty (),
8 ,
OverflowStrategy . fail ());
final ActorRef < Protocol > ref =
source
. collect (
new JavaPartialFunction < Protocol , String >() {
public String apply ( Protocol p , boolean isCheck ) {
if ( p instanceof Message ) {
return (( Message ) p ). msg ;
} else {
throw noMatch ();
}
}
})
. to ( Sink . foreach ( System . out :: println ))
. run ( system );
ref . tell ( new Message ( "msg1" ));
// ref.tell("msg2"); Does not compile