Class VirtualProcessor<T>
- java.lang.Object
-
- java.util.concurrent.atomic.AtomicReference<java.lang.Object>
-
- akka.stream.impl.VirtualProcessor<T>
-
- All Implemented Interfaces:
java.io.Serializable,org.reactivestreams.Processor<T,T>,org.reactivestreams.Publisher<T>,org.reactivestreams.Subscriber<T>
public final class VirtualProcessor<T> extends java.util.concurrent.atomic.AtomicReference<java.lang.Object> implements org.reactivestreams.Processor<T,T>INTERNAL APIThis is a transparent processor that shall consume as little resources as possible. Due to the possibility of receiving uncoordinated inputs from both downstream and upstream, this needs an atomic state machine which looks a little like this:
+--------+ (2) +---------------+ | null +------------>+ Subscriber | +---+----+ +-----+---------+ | | (1)| | (1) v v +---+----------+ (2) +-----+---------+ | Subscription +------>+ Establishing | +---+----------+ +-----+---------+ | | | | (4) | v | +-----+---------+ --- | (3) | Both | | (5) | +-----+---------+ <-- | | | | v v +---+----------+ (2) +-----+---------+ --- | Publisher +-----> | Inert | | (5, *) +--------------+ +---------------+ <--
The idea is to keep the major state in only one atomic reference. The actions that can happen are:
(1) onSubscribe (2) subscribe (3) onError / onComplete (4) establishing subscription completes (5) onNext (*) Inert can be reached also by cancellation after which onNext is still fine so we just silently ignore possible spec violations here
Any event that occurs in a state where no matching outgoing arrow can be found is a spec violation, leading to the shutdown of this processor (meaning that the state is updated such that all following actions match that of a failed Publisher or a cancelling Subscriber, and the non-guilty party is informed if already connected).
request() can only be called after the Subscriber has received the Subscription and that also means that onNext() will only happen after having transitioned into the Both state as well. The Publisher state means that if the real Publisher terminates before we get the Subscriber, we can just forget about the real one and keep an already finished one around for the Subscriber.
The Subscription that is offered to the Subscriber must cancel the original Publisher if things go wrong (like
request(0)coming in from downstream) and it must ensure that we drop the Subscriber reference whencancelis invoked.- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classVirtualProcessor.Bothstatic classVirtualProcessor.Both$static classVirtualProcessor.Establishingstatic classVirtualProcessor.Establishing$static interfaceVirtualProcessor.HasActualSubscriberstatic classVirtualProcessor.Inert$classVirtualProcessor.WrappedSubscription$
-
Constructor Summary
Constructors Constructor Description VirtualProcessor()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static booleanDebug()voidonComplete()voidonError(java.lang.Throwable t)voidonNext(T t)voidonSubscribe(org.reactivestreams.Subscription s)voidsubscribe(org.reactivestreams.Subscriber<? super T> s)java.lang.StringtoString()VirtualProcessor.WrappedSubscription$WrappedSubscription()Accessor for nested Scala object-
Methods inherited from class java.util.concurrent.atomic.AtomicReference
accumulateAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, get, getAcquire, getAndAccumulate, getAndSet, getAndUpdate, getOpaque, getPlain, lazySet, set, setOpaque, setPlain, setRelease, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
-
-
-
-
Method Detail
-
Debug
public static final boolean Debug()
-
WrappedSubscription
public VirtualProcessor.WrappedSubscription$ WrappedSubscription()
Accessor for nested Scala object- Returns:
- (undocumented)
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.util.concurrent.atomic.AtomicReference<java.lang.Object>
-
subscribe
public void subscribe(org.reactivestreams.Subscriber<? super T> s)
- Specified by:
subscribein interfaceorg.reactivestreams.Publisher<T>
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s)
- Specified by:
onSubscribein interfaceorg.reactivestreams.Subscriber<T>
-
onError
public void onError(java.lang.Throwable t)
- Specified by:
onErrorin interfaceorg.reactivestreams.Subscriber<T>
-
onComplete
public void onComplete()
- Specified by:
onCompletein interfaceorg.reactivestreams.Subscriber<T>
-
-