Class VirtualProcessor<T>
- java.lang.Object
-
- java.util.concurrent.atomic.AtomicReference<java.lang.Object>
-
- akka.stream.impl.VirtualProcessor<T>
-
- All Implemented Interfaces:
java.io.Serializable
,org.reactivestreams.Processor<T,T>
,org.reactivestreams.Publisher<T>
,org.reactivestreams.Subscriber<T>
public final class VirtualProcessor<T> extends java.util.concurrent.atomic.AtomicReference<java.lang.Object> implements org.reactivestreams.Processor<T,T>
INTERNAL APIThis is a transparent processor that shall consume as little resources as possible. Due to the possibility of receiving uncoordinated inputs from both downstream and upstream, this needs an atomic state machine which looks a little like this:
+--------+ (2) +---------------+ | null +------------>+ Subscriber | +---+----+ +-----+---------+ | | (1)| | (1) v v +---+----------+ (2) +-----+---------+ | Subscription +------>+ Establishing | +---+----------+ +-----+---------+ | | | | (4) | v | +-----+---------+ --- | (3) | Both | | (5) | +-----+---------+ <-- | | | | v v +---+----------+ (2) +-----+---------+ --- | Publisher +-----> | Inert | | (5, *) +--------------+ +---------------+ <--
The idea is to keep the major state in only one atomic reference. The actions that can happen are:
(1) onSubscribe (2) subscribe (3) onError / onComplete (4) establishing subscription completes (5) onNext (*) Inert can be reached also by cancellation after which onNext is still fine so we just silently ignore possible spec violations here
Any event that occurs in a state where no matching outgoing arrow can be found is a spec violation, leading to the shutdown of this processor (meaning that the state is updated such that all following actions match that of a failed Publisher or a cancelling Subscriber, and the non-guilty party is informed if already connected).
request() can only be called after the Subscriber has received the Subscription and that also means that onNext() will only happen after having transitioned into the Both state as well. The Publisher state means that if the real Publisher terminates before we get the Subscriber, we can just forget about the real one and keep an already finished one around for the Subscriber.
The Subscription that is offered to the Subscriber must cancel the original Publisher if things go wrong (like
request(0)
coming in from downstream) and it must ensure that we drop the Subscriber reference whencancel
is invoked.- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
VirtualProcessor.Both
static class
VirtualProcessor.Both$
static class
VirtualProcessor.Establishing
static class
VirtualProcessor.Establishing$
static interface
VirtualProcessor.HasActualSubscriber
static class
VirtualProcessor.Inert$
class
VirtualProcessor.WrappedSubscription$
-
Constructor Summary
Constructors Constructor Description VirtualProcessor()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static boolean
Debug()
void
onComplete()
void
onError(java.lang.Throwable t)
void
onNext(T t)
void
onSubscribe(org.reactivestreams.Subscription s)
void
subscribe(org.reactivestreams.Subscriber<? super T> s)
java.lang.String
toString()
VirtualProcessor.WrappedSubscription$
WrappedSubscription()
Accessor for nested Scala object-
Methods inherited from class java.util.concurrent.atomic.AtomicReference
accumulateAndGet, compareAndExchange, compareAndExchangeAcquire, compareAndExchangeRelease, compareAndSet, get, getAcquire, getAndAccumulate, getAndSet, getAndUpdate, getOpaque, getPlain, lazySet, set, setOpaque, setPlain, setRelease, updateAndGet, weakCompareAndSet, weakCompareAndSetAcquire, weakCompareAndSetPlain, weakCompareAndSetRelease, weakCompareAndSetVolatile
-
-
-
-
Method Detail
-
Debug
public static final boolean Debug()
-
WrappedSubscription
public VirtualProcessor.WrappedSubscription$ WrappedSubscription()
Accessor for nested Scala object- Returns:
- (undocumented)
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.util.concurrent.atomic.AtomicReference<java.lang.Object>
-
subscribe
public void subscribe(org.reactivestreams.Subscriber<? super T> s)
- Specified by:
subscribe
in interfaceorg.reactivestreams.Publisher<T>
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s)
- Specified by:
onSubscribe
in interfaceorg.reactivestreams.Subscriber<T>
-
onError
public void onError(java.lang.Throwable t)
- Specified by:
onError
in interfaceorg.reactivestreams.Subscriber<T>
-
onComplete
public void onComplete()
- Specified by:
onComplete
in interfaceorg.reactivestreams.Subscriber<T>
-
-