The temperature entity

As the other features of Akka Edge are build on top of Event Sourcing, let us start by implementing a digital twin for temperature sensors using the same approach as the Akka Event Sourced Behavior API, but written using Akka Edge Rust.

We will represent a temperature sensor as an Event Sourced entity. If you are unfamiliar with Event Sourcing, refer to the Event Sourcing section in the Akka guide for an explanation. The Event Sourcing with Akka video is also a good starting point for learning about Event Sourcing.

Commands and events

Commands are the public API of an entity that other parts of the system use to interact with it. Entity state can only be changed by an entity’s events as a result of commands. A command can request state changes, and different events might be generated depending on the current state of the entity. A command can also be rejected if it has invalid input or cannot be handled due to the current state of the entity.

The Temperature sensor accepts three commands: Post, and Register, for retrieving the current state, posting a new temperature observation, and registering a sensor respectively. When posting a new observation, a TemperatureRead is emitted and then persisted in a local event journal. Any Post command where a sensor has not been registered will be ignored.

The definition of the commands looks like this:

Rust
sourcepub enum Command {
    Post { temperature: u32 },
    Register { secret: SecretDataValue },
}

Temperature values in our sample are represented as integers, which is often the convention when receiving small packets of data from sensors. Quite often, sensors communicate over a very low bandwidth connection. The less bits to transmit, the more chance a sensor’s data will reach its destination, particularly over Low-Powered-Wide-Area-Networks (LPWANs).

The registration command conveys a SecretDataValue which, in our case, is a hexadecimal string that will be conveyed out-of-band, via Akka Edge JVM. These types of secrets are often used by sensors to produce a “session key” that both a sensor can encrypt with, and an edge service can decrypt with. For example, AES-CCM 128 bit encryption is often used to provide authentication and validation of the message. To keep things simple, our sample does nothing more with the secret than accepting temperature observations given the presence of it.

A shared model

The definition of the events is in a separate module so that it they can be shared with a browser-based application, a “frontend”. Sharing the data structures improves code maintenance and the assurance that the frontend and backend will continue to be compatible with each other. The event declarations look like this:

Rust
source#[derive(Clone, Deserialize, Serialize)]
pub enum Event {
    Registered { secret: SecretDataValue },
    TemperatureRead { temperature: u32 },
}

As events are persisted in a local event journal, we use Serde declarations to generate code for serializing and deserializing the events to and from bytes. Events are also required to be cloneable so that they may be processed by various handlers e.g. when sending an event via gRPC.

State

Up to 10 temperature observations are kept in memory. The amount of history to be retained in memory is an application-specific concern, and will depend on the amount of memory to be reserved also considering the number of entities held in memory at any one time.

Here is the declaration of the state structure, again in a separate module so that it can be shared between the frontend and the backend of the sample:

Rust
sourceconst MAX_HISTORY_EVENTS: usize = 10;

#[derive(Default)]
pub struct State {
    pub history: VecDeque<u32>,
    pub secret: SecretDataValue,
}

We use a VecDeque for history as we can add and remove observations from the queue in constant time. The implementation of State shows how this is put to effect with a method that updates it given an event:

Rust
sourceimpl State {
    pub fn on_event(&mut self, _context: &Context, event: Event) {
        match event {
            Event::Registered { secret } => {
                self.secret = secret;
            }
            Event::TemperatureRead { temperature } => {
                if self.history.len() == MAX_HISTORY_EVENTS {
                    self.history.pop_front();
                }
                self.history.push_back(temperature);
            }
        }
    }
}

This method will be called by our backend’s entity behavior declaration described in the next section. It is also used by the frontend to construct the state it requires to display to our user.

Behavior

The temperature entity will receive commands and produce “effects” that an “entity manager” will apply. These effects can cause events to be emitted and then persisted to an event journal.

The structure for our temperature entity is as follows:

Rust
sourcepub struct Behavior;

An EventSourcedBehavior declaration associates the types used for the state, commands, and events of an entity instances. The declaration also includes how the entity’s commands are processed, and how events are applied to state. Note that it is impossible to update state from a command handler as it is immutable by design. Updating state only via the event handler enables entities to be sourced from their stored events without effects.

Here is the behavior declaration of our entity, noting that the event handler is calling upon a shared model declaration described earlier:

Rust
sourceimpl EventSourcedBehavior for Behavior {
    type State = State;

    type Command = Command;

    type Event = Event;

    fn for_command(
        _context: &Context,
        state: &Self::State,
        command: Self::Command,
    ) -> Box<dyn Effect<Self>> {
        match command {
            Command::Post { temperature } if !state.secret.is_empty() => {
                persist_event(Event::TemperatureRead { temperature }).boxed()
            }

            Command::Register { secret } => persist_event(Event::Registered { secret }).boxed(),

            _ => unhandled(),
        }
    }

    fn on_event(context: &Context, state: &mut Self::State, event: Self::Event) {
        state.on_event(context, event);
    }
}

Each command handler declares an effect to be performed. When posting an observation in our sample, the TemperatureRead event is persisted.

Serialization and storage

The events of the entity must be serializable because they are written to an event journal. We are using the Streambed Logged commit log as an event journal. Streambed Logged has a particular emphasis on managing limited amounts of storage. The notion of “compaction” is important to Streambed Logged, and ensures that only so many records of a given event type for a given entity id are retained over time. Akka Edge Rust provides a crate to support Streambed Logged, but we must still describe how to marshal our events to and from it. The following code illustrates this:

Rust
source// A namespace for our entity's events when constructing persistence ids.
pub const ENTITY_TYPE: &str = "Sensor";

// Produces a marshaller for our file based commit log and uses the
// upper 12 bits of the log's 64 bit key for holding up to 4K event
// types. The remainder of the key is for the numeric entity id (52 bits).
pub fn marshaller(
    events_key_secret_path: String,
    secret_store: FileSecretStore,
) -> Marshaller<Event, impl Fn(&Event) -> u64, FileSecretStore, 12> {
    let to_record_type = |event: &Event| match event {
        Event::TemperatureRead { .. } => 0,
        Event::Registered { .. } => 1,
    };

    cbor::marshaller(
        EntityType::from(ENTITY_TYPE),
        events_key_secret_path,
        secret_store,
        to_record_type,
    )
}

We provide the marshaller as a function so that we can re-use it when projecting events (later).

With the above, Akka Edge Rust provides out-of-the-box functionality to encrypt and decrypt the records of Streamed Logged, and use CBOR for serialization.

Compaction

Streambed Logged compaction manages the amount of storage consumed by a commit log. Compaction is used to manage the space occupied by storage, which is important, particularly at the edge where resources are limited.

We size compaction to the typical number of devices we expect to have in the system. Note though that it will impact memory, so there is a trade-off. Let’s suppose this was some LoRaWAN system and that our gateway cannot handle more than 1,000 devices being connected. We can work out that 1,000 is therefore a reasonable limit. The overhead is small, but should be calculated and measured for a production scenario.

Rust
sourcepub const EVENTS_TOPIC: &str = "temperature";

const MAX_HISTORY_EVENTS: usize = 10;

// This is the number of keys that "compaction" will produce in one pass. If there are more
// keys than that, then there will be another pass. You should size it to what you think
// is a reasonable number of entities for your application.
const MAX_TOPIC_COMPACTION_KEYS: usize = 1_000;

    let events_topic = Topic::from(EVENTS_TOPIC);

    let mut task_commit_log = commit_log.clone();
    let task_events_topic = events_topic.clone();
    tokio::spawn(async move {
        task_commit_log
            .register_compaction(
                task_events_topic,
                NthKeyBasedRetention::new(MAX_TOPIC_COMPACTION_KEYS, MAX_HISTORY_EVENTS),
            )
            .await
            .unwrap();
    });

We register a compaction strategy for our topic such that when we use up 64KB of disk space (the default), we will run compaction in the background so that unwanted events are removed. In our scenario, unwanted events can be removed when the exceed MAX_HISTORY_EVENTS as we do not have a requirement to ever return more than that.

The file log adapter

This adapter permits us to source events from the commit log and append them to it as they are emitted by an entity.

Rust
sourcelet file_log_topic_adapter = CommitLogTopicAdapter::new(
    commit_log,
    marshaller(events_key_secret_path, secret_store),
    "iot-service",
    events_topic,
);

Running the entity manager

We are now ready to run the entity manager. An entity manager task handles the lifecycle and routing of messages per type of entity.

Rust
sourcelet (entity_manager_task, commands) =
    entity_manager::task(Behavior, file_log_topic_adapter, MAX_COMMANDS, MAX_ENTITIES);
let handle = tokio::spawn(entity_manager_task);

The above runs the entity manager given a behavior, the commit log adapter, and a means to receive commands via a channel. We inform it of the MAX_COMMANDS that will be buffered before back-pressuring any senders. Similarly, we dimension the “working set” of entities that are cached in memory at any one time via MAX_ENTITIES. A channel is returned so that we can send commands to entities.

Putting it all together

The following code represents putting all of the above together as a task that can be spawned:

Rust
sourcepub const EVENTS_TOPIC: &str = "temperature";

const MAX_HISTORY_EVENTS: usize = 10;

// This is the number of keys that "compaction" will produce in one pass. If there are more
// keys than that, then there will be another pass. You should size it to what you think
// is a reasonable number of entities for your application.
const MAX_TOPIC_COMPACTION_KEYS: usize = 1_000;

// The maximum number of temperature commands to retain in memory at any one time.
// Exceeding this number will back-pressure any senders of commands.
pub const MAX_COMMANDS: usize = 10;

// The maximum number of temperature entities to retain in memory at any one time.
// Exceeding this number will evict the last entity used and source any new one
// required.
pub const MAX_ENTITIES: usize = 10;

// A task that will be run to manage the temperature sensor.
pub async fn spawn(
    commit_log: FileLog,
    secret_store: FileSecretStore,
    events_key_secret_path: String,
) -> (JoinHandle<io::Result<()>>, mpsc::Sender<Message<Command>>) {

    let events_topic = Topic::from(EVENTS_TOPIC);

    let mut task_commit_log = commit_log.clone();
    let task_events_topic = events_topic.clone();
    tokio::spawn(async move {
        task_commit_log
            .register_compaction(
                task_events_topic,
                NthKeyBasedRetention::new(MAX_TOPIC_COMPACTION_KEYS, MAX_HISTORY_EVENTS),
            )
            .await
            .unwrap();
    });

    let file_log_topic_adapter = CommitLogTopicAdapter::new(
        commit_log,
        marshaller(events_key_secret_path, secret_store),
        "iot-service",
        events_topic,
    );

    let (entity_manager_task, commands) =
        entity_manager::task(Behavior, file_log_topic_adapter, MAX_COMMANDS, MAX_ENTITIES);
    let handle = tokio::spawn(entity_manager_task);

    (handle, commands)
}

What’s next?

  • Consuming registration events
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.