Producing temperature events

Once registered, the temperature entity can receive observations. These observations are then produced to a remote consumer, and this page describes how we produce events over gRPC. We can also produce events over HTTP and do so later in the guide.

An interesting aspect of connectivity from the edge is that it typically connects to the cloud, and not the other way round. This is largely due to network traffic restrictions where the cloud cannot “see in” to the edge, but the edge can “reach out”. Akka provides a “projection producer” for the purposes of establishing a connection and then producing events.

The offset store

We wish to avoid producing events that we have produced before. As with consuming registration events, we establish an offset store to keep track of events that we produced.

Rust
sourcelet stream_id = StreamId::from("temperature-events");

let offset_store_id = stream_id.clone();
let offset_store = offset_store::task(
    commit_log.clone(),
    temperature::MAX_ENTITIES,
    offset_store_id,
);

We also declare the commit log to use and the expected number of distinct entities for sizing memory usage. As with consuming registrations, the number of temperature productions is used to determine how many entity id offsets are held in memory at any one time. Each offset store must also have a distinct identity, hence the offset-store-id.

The source provider

We establish our source of events as a commit log using a source provider with the same marshaller declared for running the entity manager earlier in the guide:

Rust
sourcelet source_provider = CommitLogSourceProvider::new(
    commit_log,
    temperature::marshaller(events_key_secret_path, secret_store),
    "iot-service-projection",
    Topic::from(temperature::EVENTS_TOPIC),
);

The producer

A “sink” of envelopes is established that forward on to a remote consumer via a producer flow. The flow will be used to bridge between source of events and this sink.

A great deal of flexibility is provided in terms of expressing how the remote consumer’s endpoint is established via the consumer_connector. Quite often, this will be the establishment of an HTTP TLS endpoint, but it can also be plaintext (as it is in the example), or even Unix Domain Sockets.

Rust
sourcelet consumer_connector = move || {
    let consumer_endpoint = Channel::builder(event_consumer_addr.clone());
    async move { consumer_endpoint.connect().await }
};
let (producer_task, producer_flow, producer_kill_switch) = producer::task(
    consumer_connector,
    OriginId::from("edge-iot-service"),
    stream_id,
    entity_type,
    MAX_IN_FLIGHT,
);
tokio::spawn(producer_task);

MAX_IN_FLIGHT represents the maximum number of publications of an event waiting for an acknowledgement that will buffer before back-pressuring the producer.

A producer is independent from the source of events in that it produces at a different rate to what is sourced. Consequently, the producer is spawned into its own task, and it can buffer the source of events across the flow.

The transformer and handler

We must declare how our events are to be transformed into their gRPC form, and then obtain a handler from any required filtering.

Rust
sourcelet transformer = |envelope: &EventEnvelope<temperature::Event>| {
    let temperature::Event::TemperatureRead { temperature } = envelope.event else {
        return None;
    };

    let event = proto::TemperatureRead {
        temperature: temperature as i32,
    };

    Some(event)
};
let producer_filter = |_: &EventEnvelope<temperature::Event>| true;
let handler = producer_flow.handler(producer_filter, transformer);

The consumer

At this stage, we have a means to produce events over a gRPC producer. The final step is to start up a projection that uses the flow to source events from the commit log. The projection is able to resume from where we left off given the offset store.

Rust
sourcelet (consumer_task, consumer_kill_switch) =
    consumer::task(offset_store, source_provider, handler);
tokio::spawn(consumer_task);

Putting it all together

The following code puts all of the above together as a task that can be spawned:

Rust
sourceconst MAX_IN_FLIGHT: usize = 10;

// Apply sensor observations to a remote consumer.
pub fn spawn(
    commit_log: FileLog,
    event_consumer_addr: Uri,
    secret_store: FileSecretStore,
    events_key_secret_path: String,
) -> (oneshot::Sender<()>, oneshot::Sender<()>) {
    let entity_type = EntityType::from(temperature::ENTITY_TYPE);

    let stream_id = StreamId::from("temperature-events");

    let offset_store_id = stream_id.clone();
    let offset_store = offset_store::task(
        commit_log.clone(),
        temperature::MAX_ENTITIES,
        offset_store_id,
    );

    let source_provider = CommitLogSourceProvider::new(
        commit_log,
        temperature::marshaller(events_key_secret_path, secret_store),
        "iot-service-projection",
        Topic::from(temperature::EVENTS_TOPIC),
    );

    let consumer_connector = move || {
        let consumer_endpoint = Channel::builder(event_consumer_addr.clone());
        async move { consumer_endpoint.connect().await }
    };
    let (producer_task, producer_flow, producer_kill_switch) = producer::task(
        consumer_connector,
        OriginId::from("edge-iot-service"),
        stream_id,
        entity_type,
        MAX_IN_FLIGHT,
    );
    tokio::spawn(producer_task);

    let transformer = |envelope: &EventEnvelope<temperature::Event>| {
        let temperature::Event::TemperatureRead { temperature } = envelope.event else {
            return None;
        };

        info!(
            "Producing {} with temperature of {}",
            envelope.persistence_id, temperature
        );

        let event = proto::TemperatureRead {
            temperature: temperature as i32,
        };

        Some(event)
    };
    let producer_filter = |_: &EventEnvelope<temperature::Event>| true;
    let handler = producer_flow.handler(producer_filter, transformer);

    let (consumer_task, consumer_kill_switch) =
        consumer::task(offset_store, source_provider, handler);
    tokio::spawn(consumer_task);

    (producer_kill_switch, consumer_kill_switch)
}

What’s next?

  • HTTP temperature events
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.