public final class VirtualProcessor<T>
extends java.util.concurrent.atomic.AtomicReference<java.lang.Object>
implements org.reactivestreams.Processor<T,T>
This is a transparent processor that shall consume as little resources as possible. Due to the possibility of receiving uncoordinated inputs from both downstream and upstream, this needs an atomic state machine which looks a little like this:
+--------------+ (2) +------------+ | null | ----------> | Subscriber | +--------------+ +------------+ | | (1) | | (1) \|/ \|/ +--------------+ (2) +------------+ --\ | Subscription | ----------> | Both | | (4) +--------------+ +------------+ <-/ | | (3) | | (3) \|/ \|/ +--------------+ (2) +------------+ --\ | Publisher | ----------> | Inert | | (4, *) +--------------+ +------------+ <-/
The idea is to keep the major state in only one atomic reference. The actions that can happen are:
(1) onSubscribe (2) subscribe (3) onError / onComplete (4) onNext (*) Inert can be reached also by cancellation after which onNext is still fine so we just silently ignore possible spec violations here
Any event that occurs in a state where no matching outgoing arrow can be found is a spec violation, leading to the shutdown of this processor (meaning that the state is updated such that all following actions match that of a failed Publisher or a cancelling Subscriber, and the non-guilty party is informed if already connected).
request() can only be called after the Subscriber has received the Subscription and that also means that onNext() will only happen after having transitioned into the Both state as well. The Publisher state means that if the real Publisher terminates before we get the Subscriber, we can just forget about the real one and keep an already finished one around for the Subscriber.
The Subscription that is offered to the Subscriber must cancel the original
Publisher if things go wrong (like request(0)
coming in from downstream) and
it must ensure that we drop the Subscriber reference when cancel
is invoked.
Modifier and Type | Class and Description |
---|---|
static class |
VirtualProcessor.Both |
static class |
VirtualProcessor.Both$ |
static class |
VirtualProcessor.Inert$ |
class |
VirtualProcessor.WrappedSubscription$ |
Constructor and Description |
---|
VirtualProcessor() |
Modifier and Type | Method and Description |
---|---|
static V |
accumulateAndGet(V x$1,
java.util.function.BinaryOperator<V> x$2) |
static boolean |
compareAndSet(V x$1,
V x$2) |
static V |
get() |
static V |
getAndAccumulate(V x$1,
java.util.function.BinaryOperator<V> x$2) |
static V |
getAndSet(V x$1) |
static V |
getAndUpdate(java.util.function.UnaryOperator<V> x$1) |
static void |
lazySet(V x$1) |
void |
onComplete() |
void |
onError(java.lang.Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(org.reactivestreams.Subscription s) |
static void |
set(V x$1) |
void |
subscribe(org.reactivestreams.Subscriber<? super T> s) |
static java.lang.String |
toString() |
static V |
updateAndGet(java.util.function.UnaryOperator<V> x$1) |
static boolean |
weakCompareAndSet(V x$1,
V x$2) |
VirtualProcessor.WrappedSubscription$ |
WrappedSubscription()
Accessor for nested Scala object
|
public static final V get()
public static final void set(V x$1)
public static final void lazySet(V x$1)
public static final boolean compareAndSet(V x$1, V x$2)
public static final boolean weakCompareAndSet(V x$1, V x$2)
public static final V getAndSet(V x$1)
public static final V getAndUpdate(java.util.function.UnaryOperator<V> x$1)
public static final V updateAndGet(java.util.function.UnaryOperator<V> x$1)
public static final V getAndAccumulate(V x$1, java.util.function.BinaryOperator<V> x$2)
public static final V accumulateAndGet(V x$1, java.util.function.BinaryOperator<V> x$2)
public static java.lang.String toString()
public VirtualProcessor.WrappedSubscription$ WrappedSubscription()
public void subscribe(org.reactivestreams.Subscriber<? super T> s)
subscribe
in interface org.reactivestreams.Publisher<T>
public final void onSubscribe(org.reactivestreams.Subscription s)
onSubscribe
in interface org.reactivestreams.Subscriber<T>
public void onError(java.lang.Throwable t)
onError
in interface org.reactivestreams.Subscriber<T>
public final void onComplete()
onComplete
in interface org.reactivestreams.Subscriber<T>