Class VirtualProcessor<T>

  • All Implemented Interfaces:
    java.io.Serializable, org.reactivestreams.Processor<T,​T>, org.reactivestreams.Publisher<T>, org.reactivestreams.Subscriber<T>

    public final class VirtualProcessor<T>
    extends java.util.concurrent.atomic.AtomicReference<java.lang.Object>
    implements org.reactivestreams.Processor<T,​T>
    INTERNAL API

    This is a transparent processor that shall consume as little resources as possible. Due to the possibility of receiving uncoordinated inputs from both downstream and upstream, this needs an atomic state machine which looks a little like this:

    +--------+ (2) +---------------+ | null +------------>+ Subscriber | +---+----+ +-----+---------+ | | (1)| | (1) v v +---+----------+ (2) +-----+---------+ | Subscription +------>+ Establishing | +---+----------+ +-----+---------+ | | | | (4) | v | +-----+---------+ --- | (3) | Both | | (5) | +-----+---------+ <-- | | | | v v +---+----------+ (2) +-----+---------+ --- | Publisher +-----> | Inert | | (5, *) +--------------+ +---------------+ <--

    The idea is to keep the major state in only one atomic reference. The actions that can happen are:

    (1) onSubscribe (2) subscribe (3) onError / onComplete (4) establishing subscription completes (5) onNext (*) Inert can be reached also by cancellation after which onNext is still fine so we just silently ignore possible spec violations here

    Any event that occurs in a state where no matching outgoing arrow can be found is a spec violation, leading to the shutdown of this processor (meaning that the state is updated such that all following actions match that of a failed Publisher or a cancelling Subscriber, and the non-guilty party is informed if already connected).

    request() can only be called after the Subscriber has received the Subscription and that also means that onNext() will only happen after having transitioned into the Both state as well. The Publisher state means that if the real Publisher terminates before we get the Subscriber, we can just forget about the real one and keep an already finished one around for the Subscriber.

    The Subscription that is offered to the Subscriber must cancel the original Publisher if things go wrong (like request(0) coming in from downstream) and it must ensure that we drop the Subscriber reference when cancel is invoked.

    See Also:
    Serialized Form
    • Constructor Detail

      • VirtualProcessor

        public VirtualProcessor()
    • Method Detail

      • Debug

        public static final boolean Debug()
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.util.concurrent.atomic.AtomicReference<java.lang.Object>
      • subscribe

        public void subscribe​(org.reactivestreams.Subscriber<? super T> s)
        Specified by:
        subscribe in interface org.reactivestreams.Publisher<T>
      • onSubscribe

        public void onSubscribe​(org.reactivestreams.Subscription s)
        Specified by:
        onSubscribe in interface org.reactivestreams.Subscriber<T>
      • onError

        public void onError​(java.lang.Throwable t)
        Specified by:
        onError in interface org.reactivestreams.Subscriber<T>
      • onComplete

        public void onComplete()
        Specified by:
        onComplete in interface org.reactivestreams.Subscriber<T>
      • onNext

        public void onNext​(T t)
        Specified by:
        onNext in interface org.reactivestreams.Subscriber<T>