Encoding and decoding binary data
Loading

Encoding and decoding binary data

Warning

The IO implementation is marked as “experimental” as of its introduction in Akka 2.2.0. We will continue to improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the akka.io package.

Akka adopted and adapted the implementation of data processing pipelines found in the spray-io module. The idea is that encoding and decoding often go hand in hand and keeping the code pertaining to one protocol layer together is deemed more important than writing down the complete read side—say—in the iteratee style in one go; pipelines encourage packaging the stages in a form which lends itself better to reuse in a protocol stack. Another reason for choosing this abstraction is that it is at times necessary to change the behavior of encoding and decoding within a stage based on a message stream’s state, and pipeline stages allow communication between the read and write halves quite naturally.

The actual byte-fiddling can be done within pipeline stages, for example using the rich API of ByteIterator and ByteStringBuilder as shown below. All these activities are synchronous transformations which benefit greatly from CPU affinity to make good use of those data caches. Therefore the design of the pipeline infrastructure is completely synchronous, every stage’s handler code can only directly return the events and/or commands resulting from an input, there are no callbacks. Exceptions thrown within a pipeline stage will abort processing of the whole pipeline under the assumption that recoverable error conditions will be signaled in-band to the next stage instead of raising an exception.

An overall “logical” pipeline can span multiple execution contexts, for example starting with the low-level protocol layers directly within an actor handling the reads and writes to a TCP connection and then being passed to a number of higher-level actors which do the costly application level processing. This is supported by feeding the generated events into a sink which sends them to another actor, and that other actor will then upon reception feed them into its own pipeline.

Introducing the Sample Protocol

In the following the process of implementing a protocol stack using pipelines is demonstrated on the following simple example:

frameLen: Int
persons: Int
persons times {
  first: String
  last: String
}
points: Int
points times Double

mapping to the following data type:

public class Message {
  
  static public class Person {
    private final String first;
    private final String last;

    public Person(String first, String last) {
      this.first = first;
      this.last = last;
    }

    public String getFirst() {
      return first;
    }

    public String getLast() {
      return last;
    }

  }

  private final Person[] persons;
  private final double[] happinessCurve;

  public Message(Person[] persons, double[] happinessCurve) {
    this.persons = persons;
    this.happinessCurve = happinessCurve;
  }

  public Person[] getPersons() {
    return persons;
  }

  public double[] getHappinessCurve() {
    return happinessCurve;
  }
}

We will split the handling of this protocol into two parts: the frame-length encoding handles the buffering necessary on the read side and the actual encoding of the frame contents is done in a separate stage.

Building a Pipeline Stage

As a common example, which is also included in the akka-actor package, let us look at a framing protocol which works by prepending a length field to each message (the following is a simplified version for demonstration purposes, the real implementation is more configurable and implemented in Scala).

import java.nio.ByteOrder;
import java.util.ArrayList;

import scala.util.Either;
import akka.io.AbstractSymmetricPipePair;
import akka.io.PipePairFactory;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;
import akka.io.SymmetricPipelineStage;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;

public class LengthFieldFrame extends
    SymmetricPipelineStage<PipelineContext, ByteString, ByteString> {

  final int maxSize;

  public LengthFieldFrame(int maxSize) {
    this.maxSize = maxSize;
  }

  @Override
  public SymmetricPipePair<ByteString, ByteString> apply(final PipelineContext ctx) {
    return PipePairFactory
        .create(ctx, new AbstractSymmetricPipePair<ByteString, ByteString>() {

          final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
          ByteString buffer = null;

          @Override
          public Iterable<Either<ByteString, ByteString>> onCommand(
              ByteString cmd) {
            final int length = cmd.length() + 4;
            if (length > maxSize) {
              return new ArrayList<Either<ByteString, ByteString>>(0);
            }
            final ByteStringBuilder bb = new ByteStringBuilder();
            bb.putInt(length, byteOrder);
            bb.append(cmd);
            return singleCommand(bb.result());
          }

          @Override
          public Iterable<Either<ByteString, ByteString>> onEvent(
              ByteString event) {
            final ArrayList<Either<ByteString, ByteString>> res =
                new ArrayList<Either<ByteString, ByteString>>();
            ByteString current = buffer == null ? event : buffer.concat(event);
            while (true) {
              if (current.length() == 0) {
                buffer = null;
                return res;
              } else if (current.length() < 4) {
                buffer = current;
                return res;
              } else {
                final int length = current.iterator().getInt(byteOrder);
                if (length > maxSize)
                  throw new IllegalArgumentException(
                      "received too large frame of size " + length + " (max = "
                          + maxSize + ")");
                if (current.length() < length) {
                  buffer = current;
                  return res;
                } else {
                  res.add(makeEvent(current.slice(4, length)));
                  current = current.drop(length);
                }
              }
            }
          }

        });
  }

}

In the end a pipeline stage is nothing more than a set of three methods: one transforming commands arriving from above, one transforming events arriving from below and the third transforming incoming management commands (not shown here, see below for more information). The result of the transformation can in either case be a sequence of commands flowing downwards or events flowing upwards (or a combination thereof).

In the case above the data type for commands and events are equal as both functions operate only on ByteString, and the transformation does not change that type because it only adds or removes four octets at the front.

The pair of command and event transformation functions is represented by an object of type AbstractPipePair, or in this case a AbstractSymmetricPipePair. This object could benefit from knowledge about the context it is running in, for example an Actor, and this context is introduced by making a PipelineStage be a factory for producing a PipePair. The factory method is called apply (a Scala tradition) and receives the context object as its argument. The implementation of this factory method could now make use of the context in whatever way it sees fit, you will see an example further down.

Manipulating ByteStrings

The second stage of our sample protocol stack illustrates in more depth what showed only a little in the pipeline stage built above: constructing and deconstructing byte strings. Let us first take a look at the encoder:

public class MessageStage extends
    SymmetricPipelineStage<HasByteOrder, Message, ByteString> {

  @Override
  public SymmetricPipePair<Message, ByteString> apply(final HasByteOrder context) {

    return PipePairFactory
        .create(context, new AbstractSymmetricPipePair<Message, ByteString>() {

          final ByteOrder byteOrder = context.byteOrder();

          private void putString(ByteStringBuilder builder, String str) {
            final byte[] bytes = ByteString.fromString(str, "UTF-8").toArray();
            builder.putInt(bytes.length, byteOrder);
            builder.putBytes(bytes);
          }

          @Override
          public Iterable<Either<Message, ByteString>> onCommand(Message cmd) {
            final ByteStringBuilder builder = new ByteStringBuilder();

            builder.putInt(cmd.getPersons().length, byteOrder);
            for (Message.Person p : cmd.getPersons()) {
              putString(builder, p.getFirst());
              putString(builder, p.getLast());
            }

            builder.putInt(cmd.getHappinessCurve().length, byteOrder);
            builder.putDoubles(cmd.getHappinessCurve(), byteOrder);

            return singleCommand(builder.result());
          }

          // decoding omitted ...

        });

  }

}

Note how the byte order to be used by this stage is fixed in exactly one place, making it impossible get wrong between commands and events; the way how the byte order is passed into the stage demonstrates one possible use for the stage’s context parameter.

The basic tool for constucting a ByteString is a ByteStringBuilder. This builder is specialized for concatenating byte representations of the primitive data types like Int and Double or arrays thereof. Encoding a String requires a bit more work because not only the sequence of bytes needs to be encoded but also the length, otherwise the decoding stage would not know where the String terminates. When all values making up the Message have been appended to the builder, we simply pass the resulting ByteString on to the next stage as a command using the optimized singleCommand facility.

Warning

The singleCommand and singleEvent methods provide a way to generate responses which transfer exactly one result from one pipeline stage to the next without suffering the overhead of object allocations. This means that the returned collection object will not work for anything else (you will get ClassCastExceptions!) and this facility can only be used EXACTLY ONCE during the processing of one input (command or event).

Now let us look at the decoder side:

private String getString(ByteIterator iter) {
  final int length = iter.getInt(byteOrder);
  final byte[] bytes = new byte[length];
  iter.getBytes(bytes);
  return ByteString.fromArray(bytes).utf8String();
}

@Override
public Iterable<Either<Message, ByteString>> onEvent(ByteString evt) {
  final ByteIterator iter = evt.iterator();

  final int personLength = iter.getInt(byteOrder);
  final Message.Person[] persons = new Message.Person[personLength];
  for (int i = 0; i < personLength; ++i) {
    persons[i] = new Message.Person(getString(iter), getString(iter));
  }

  final int curveLength = iter.getInt(byteOrder);
  final double[] curve = new double[curveLength];
  iter.getDoubles(curve, byteOrder);

  // verify that this was all; could be left out to allow future
  // extensions
  assert iter.isEmpty();

  return singleEvent(new Message(persons, curve));
}

The decoding side does the same things that the encoder does in the same order, it just uses a ByteIterator to retrieve primitive data types or arrays of those from the underlying ByteString. And in the end it hands the assembled Message as an event to the next stage using the optimized singleEvent facility (see warning above).

Building a Pipeline

Given the two pipeline stages introduced in the sections above we can now put them to some use. First we define some message to be encoded:

final Message msg = new Message(
    new Message.Person[] { 
        new Message.Person("Alice", "Gibbons"),
        new Message.Person("Bob", "Sparseley")
    },
    new double[] { 1.0, 3.0, 5.0 });

Then we need to create a pipeline context which satisfies our declared needs:

class Context extends AbstractPipelineContext implements HasByteOrder {

  @Override
  public ByteOrder byteOrder() {
    return java.nio.ByteOrder.BIG_ENDIAN;
  }
  
}
final Context ctx = new Context();

Building the pipeline and encoding this message then is quite simple:

final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
    PipelineStage.sequence(
        new MessageStage(),
        new LengthFieldFrame(10000)
    );

final PipelineSink<ByteString, Message> sink =
    new PipelineSink<ByteString, Message>() {

      @Override
      public void onCommand(ByteString cmd) throws Throwable {
        commandHandler.tell(cmd, ActorRef.noSender());
      }

      @Override
      public void onEvent(Message evt) throws Throwable {
        eventHandler.tell(evt, ActorRef.noSender());
      }
    };
    
final PipelineInjector<Message, ByteString> injector =
    PipelineFactory.buildWithSink(ctx, stages, sink);

injector.injectCommand(msg);

First we sequence the two stages, i.e. attach them such that the output of one becomes the input of the other. Then we create a PipelineSink which is essentially a callback interface for what shall happen with the encoded commands or decoded events, respectively. Then we build the pipeline using the PipelineFactory, which returns an interface for feeding commands and events into this pipeline instance. As a demonstration of how to use this, we simply encode the message shown above and the resulting ByteString will then be sent to the commandHandler actor. Decoding works in the same way, only using injectEvent.

Injecting into a pipeline using a PipelineInjector will catch exceptions resulting from processing the input, in which case the exception (there can only be one per injection) is passed into the respective sink. The default implementation of onCommandFailure and onEventFailure will re-throw the exception (whence originates the throws declaration of the inject* method).

Using the Pipeline’s Context

Up to this point there was always a parameter ctx which was used when constructing a pipeline, but it was not explained in full. The context is a piece of information which is made available to all stages of a pipeline. The context may also carry behavior, provide infrastructure or helper methods etc. It should be noted that the context is bound to the pipeline and as such must not be accessed concurrently from different threads unless care is taken to properly synchronize such access. Since the context will in many cases be provided by an actor it is not recommended to share this context with code executing outside of the actor’s message handling.

Warning

A PipelineContext instance MUST NOT be used by two different pipelines since it contains mutable fields which are used during message processing.

Using Management Commands

Since pipeline stages do not have any reference to the pipeline or even to their neighbors they cannot directly effect the injection of commands or events outside of their normal processing. But sometimes things need to happen driven by a timer, for example. In this case the timer would need to cause sending tick messages to the whole pipeline, and those stages which wanted to receive them would act upon those. In order to keep the type signatures for events and commands useful, such external triggers are sent out-of-band, via a different channel—the management port. One example which makes use of this facility is the TickGenerator which comes included with akka-actor (this is a transcription of the Scala version which is actually included in the akka-actor JAR):

public interface HasActorContext extends PipelineContext {
  
  public ActorContext getContext();

}
public class TickGenerator<Cmd, Evt> extends
    PipelineStage<HasActorContext, Cmd, Cmd, Evt, Evt> {

  public static interface Trigger {};
  
  public static class Tick implements Trigger {
    final FiniteDuration timestamp;

    public Tick(FiniteDuration timestamp) {
      super();
      this.timestamp = timestamp;
    }

    public FiniteDuration getTimestamp() {
      return timestamp;
    }
  }

  private final FiniteDuration interval;

  public TickGenerator(FiniteDuration interval) {
    this.interval = interval;
  }

  @Override
  public PipePair<Cmd, Cmd, Evt, Evt> apply(final HasActorContext ctx) {
    return PipePairFactory.create(ctx,
        new AbstractPipePair<Cmd, Cmd, Evt, Evt>() {
      
          private final Trigger trigger = new Trigger() {
            public String toString() {
              return "Tick[" + ctx.getContext().self().path() + "]";
            }
          };
          
          private void schedule() {
            final ActorSystem system = ctx.getContext().system();
            system.scheduler().scheduleOnce(interval,
                ctx.getContext().self(), trigger, system.dispatcher(), null);
          }

          {
            schedule();
          }

          @Override
          public Iterable<Either<Evt, Cmd>> onCommand(Cmd cmd) {
            return singleCommand(cmd);
          }

          @Override
          public Iterable<Either<Evt, Cmd>> onEvent(Evt evt) {
            return singleEvent(evt);
          }

          @Override
          public Iterable<Either<Evt, Cmd>> onManagementCommand(Object cmd) {
            if (cmd == trigger) {
              ctx.getContext().self().tell(new Tick(Deadline.now().time()),
                  ActorRef.noSender());
              schedule();
            }
            return Collections.emptyList();
          }

        });
  }
}

This pipeline stage is to be used within an actor, and it will make use of this context in order to schedule the delivery of Tick messages; the actor is then supposed to feed these messages into the management port of the pipeline. An example could look like this:

public class Processor extends UntypedActor {

  private class Context extends AbstractPipelineContext 
      implements HasByteOrder, HasActorContext {

    @Override
    public ActorContext getContext() {
      return Processor.this.getContext();
    }

    @Override
    public ByteOrder byteOrder() {
      return java.nio.ByteOrder.BIG_ENDIAN;
    }

  }

  final Context ctx = new Context();

  final FiniteDuration interval = Duration.apply(1, TimeUnit.SECONDS);

  final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
    PipelineStage.sequence(
        // Java 7 can infer these types, Java 6 cannot
    PipelineStage.<Context, Message, Message, ByteString, Message, Message, 
        ByteString> sequence( //
      new TickGenerator<Message, Message>(interval), //
      new MessageStage()), //
      new LengthFieldFrame(10000));

  private final ActorRef evts;
  private final ActorRef cmds;

  final PipelineInjector<Message, ByteString> injector = PipelineFactory
      .buildWithSink(ctx, stages, new PipelineSink<ByteString, Message>() {

        @Override
        public void onCommand(ByteString cmd) {
          cmds.tell(cmd, getSelf());
        }

        @Override
        public void onEvent(Message evt) {
          evts.tell(evt, getSelf());
        }
      });

  public Processor(ActorRef cmds, ActorRef evts) throws Exception {
    this.cmds = cmds;
    this.evts = evts;
  }
  
  // omitted ...
  
  @Override
  public void onReceive(Object obj) throws Exception {
    if (obj instanceof Message) {
      injector.injectCommand((Message) obj);
    } else if (obj instanceof ByteString) {
      injector.injectEvent((ByteString) obj);
    } else if (obj instanceof TickGenerator.Trigger) {
      injector.managementCommand(obj);
    }
  }

}

This actor extends our well-known pipeline with the tick generator and attaches the outputs to functions which send commands and events to actors for further processing. The pipeline stages will then all receive on Tick per second which can be used like so:

private FiniteDuration lastTick = Duration.Zero();

@Override
public Iterable<Either<Message, ByteString>> onManagementCommand(Object cmd) {
  // omitted ...
  if (cmd instanceof TickGenerator.Tick) {
    final FiniteDuration timestamp = ((TickGenerator.Tick) cmd)
        .getTimestamp();
    System.out.println("time since last tick: "
        + timestamp.minus(lastTick));
    lastTick = timestamp;
  }
  return Collections.emptyList();
}

Note

Management commands are delivered to all stages of a pipeline “effectively parallel”, like on a broadcast medium. No code will actually run concurrently since a pipeline is strictly single-threaded, but the order in which these commands are processed is not specified.

The intended purpose of management commands is for each stage to define its special command types and then listen only to those (where the aforementioned Tick message is a useful counter-example), exactly like sending packets on a wifi network where every station receives all traffic but reacts only to those messages which are destined for it.

If you need all stages to react upon something in their defined order, then this must be modeled either as a command or event, i.e. it will be part of the “business” type of the pipeline.

Contents