public interface ActorPublisher<T> extends Actor
Actor
to make it a
stream publisher that keeps track of the subscription life cycle and
requested elements.
Create a Publisher
backed by this actor with Scala API ActorPublisher#apply
,
or Java API UntypedActorPublisher.create(akka.actor.ActorRef)
or Java API compatible with lambda expressions
AbstractActorPublisher.create(akka.actor.ActorRef)
.
It can be attached to a Subscriber
or be used as an input source for a
Flow
. You can only attach one subscriber to this publisher.
The life cycle state of the subscription is tracked with the following boolean members:
isActive()
, isCompleted()
, isErrorEmitted()
, and isCanceled()
.
You send elements to the stream by calling onNext(T)
. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
totalDemand()
. It is only allowed to use onNext
when isActive
and totalDemand > 0
,
otherwise onNext
will throw IllegalStateException
.
When the stream subscriber requests more elements the ActorPublisher#Request
message
is delivered to this actor, and you can act on that event. The totalDemand()
is updated automatically.
When the stream subscriber cancels the subscription the ActorPublisher#Cancel
message
is delivered to this actor. After that subsequent calls to onNext
will be ignored.
You can complete the stream by calling onComplete()
. After that you are not allowed to
call onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.
You can terminate the stream with failure by calling onError(java.lang.Throwable)
. After that you are not allowed to
call onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.
If you suspect that this ActorPublisher
may never get subscribed to, you can override the subscriptionTimeout()
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ActorPublisherMessage.SubscriptionTimeoutExceeded
message and MUST then perform cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with failure, completed or canceled.
Modifier and Type | Interface and Description |
---|---|
static class |
ActorPublisher.Internal$
INTERNAL API
|
Actor.emptyBehavior$, Actor.ignoringBehavior$
Modifier and Type | Method and Description |
---|---|
void |
aroundPostRestart(java.lang.Throwable reason)
INTERNAL API
|
void |
aroundPostStop()
INTERNAL API
|
void |
aroundPreRestart(java.lang.Throwable reason,
scala.Option<java.lang.Object> message)
INTERNAL API
|
void |
aroundPreStart()
INTERNAL API
|
void |
aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive,
java.lang.Object msg)
INTERNAL API
|
void |
cancelSelf() |
boolean |
isActive()
The state when the publisher is active, i.e.
|
boolean |
isCanceled()
The state after the stream subscriber has canceled the subscription.
|
boolean |
isCompleted()
The terminal state after calling
onComplete() . |
boolean |
isErrorEmitted()
The terminal state after calling
onError(java.lang.Throwable) . |
void |
onComplete()
Complete the stream.
|
void |
onCompleteThenStop()
Complete the stream.
|
void |
onError(java.lang.Throwable cause)
Terminate the stream with failure.
|
void |
onErrorThenStop(java.lang.Throwable cause)
Terminate the stream with failure.
|
void |
onNext(T element)
Send an element to the stream subscriber.
|
scala.concurrent.duration.Duration |
subscriptionTimeout()
Subscription timeout after which this actor will become Canceled and reject any incoming "late" subscriber.
|
long |
totalDemand()
Total number of requested elements from the stream subscriber.
|
context, postRestart, postStop, preRestart, preStart, receive, self, sender, supervisorStrategy, unhandled
scala.concurrent.duration.Duration subscriptionTimeout()
The actor will receive an SubscriptionTimeoutExceeded
message upon which it
MUST react by performing all necessary cleanup and stopping itself.
Use this feature in order to avoid leaking actors when you suspect that this Publisher may never get subscribed to by some Subscriber.
boolean isActive()
onComplete()
and onError(java.lang.Throwable)
in this state. It is
allowed to call onNext(T)
in this state when totalDemand()
is greater than zero.long totalDemand()
onNext
.boolean isCompleted()
onComplete()
. It is not allowed to
call onNext(T)
, onError(java.lang.Throwable)
, and onComplete()
in this state.boolean isErrorEmitted()
onError(java.lang.Throwable)
. It is not allowed to
call onNext(T)
, onError(java.lang.Throwable)
, and onComplete()
in this state.boolean isCanceled()
onNext(T)
, onError(java.lang.Throwable)
, and onComplete()
in
this state, but the calls will not perform anything.void onNext(T element)
totalDemand()
. It is only allowed to use onNext
when isActive
and totalDemand > 0
,
otherwise onNext
will throw IllegalStateException
.element
- (undocumented)void onComplete()
onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.void onCompleteThenStop()
onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.
After signaling completion the Actor will then stop itself as it has completed the protocol.
When onComplete()
is called before any Subscriber
has had the chance to subscribe
to this ActorPublisher
the completion signal (and therefore stopping of the Actor as well)
will be delayed until such Subscriber
arrives.
void onError(java.lang.Throwable cause)
onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.cause
- (undocumented)void onErrorThenStop(java.lang.Throwable cause)
onNext(T)
, onError(java.lang.Throwable)
and onComplete()
.
After signaling the Error the Actor will then stop itself as it has completed the protocol.
When onError(java.lang.Throwable)
is called before any Subscriber
has had the chance to subscribe
to this ActorPublisher
the error signal (and therefore stopping of the Actor as well)
will be delayed until such Subscriber
arrives.
cause
- (undocumented)void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive, java.lang.Object msg)
aroundReceive
in interface Actor
receive
- (undocumented)msg
- (undocumented)void cancelSelf()
void aroundPreStart()
aroundPreStart
in interface Actor
void aroundPreRestart(java.lang.Throwable reason, scala.Option<java.lang.Object> message)
aroundPreRestart
in interface Actor
reason
- (undocumented)message
- (undocumented)void aroundPostRestart(java.lang.Throwable reason)
aroundPostRestart
in interface Actor
reason
- (undocumented)void aroundPostStop()
aroundPostStop
in interface Actor