Consuming registration events

Before a temperature entity is able to be queried for its state, or observations can be posted to it, the fictitious temperature sensor must be registered as described previously in “Running the sample”.

The following section describes connecting to a remote producer of registration events, and forwarding them as registration commands to our temperature entity. The offset of this consumer is saved so that our service may start near where it left off in the case of being restarted.

Note

We use the term “near” here deliberately as the guarantees are “at least once” and the last offset processed may not have been saved - although it mostly will have been.

The offset store

When consuming events from a remote producer, we must keep track of the offset we have processed. This is so that we can resume near where we left off in the case of a restart. An offset store is used for this purpose, and we will also use one when consuming from the GrpcSourceProvider, discussed in the next section.

Rust
sourcelet stream_id = StreamId::from("registration-events");

let offset_store_id = stream_id.clone();
let offset_store =
    offset_store::task(commit_log, EXPECTED_DISTINCT_REGISTRATIONS, offset_store_id);

We also declare the commit log to use and the expected number of distinct device registrations. The number of device registrations is used to determine how many entity id offsets are held in memory at any one time. Each offset store must also have a distinct identity also, hence the offset-store-id.

The gRPC source provider

We use a GrpcSourceProvider to source events from a remote producer. The remote endpoint is declared as a function that establishes a gRPC connection. This function can be enhanced to establish the connection flexibly e.g. we can establish a TLS-based connection or even a Unix Domain Socket based connection if desired.

Rust
sourcelet source_provider = GrpcSourceProvider::new(
    move || {
        let event_producer = Channel::builder(event_producer_addr.clone());
        async move { event_producer.connect().await }
    },
    stream_id,
);

The gRPC source provider will manage the remote connection, including exponential backoff and retries in the case of failure.

The handler

Receiving events from a remote producer means little if we don’t do anything with them. To this end, we declare a handler to forward these events on to the temperature entity via its entity manager’s message channel for commands.

Rust
sourcelet handler = move |envelope: EventEnvelope<registration::Registered>| {
    let temperature = temperature.clone();
    async move {
        let (entity_id, secret) = {
            let secret = {
                let registration::Registered {
                    secret: Some(secret),
                    ..
                } = envelope.event
                else {
                    return Ok(());
                };
                secret.value.into()
            };
            (envelope.persistence_id.entity_id, secret)
        };

        temperature
            .send(Message::new(
                entity_id,
                temperature::Command::Register { secret },
            ))
            .await
            .map(|_| ())
            .map_err(|_| HandlerError)
    }
};

Note that we are only interested in Registered events. If our remote producer was capable of producing other types of events then these would be filtered out.

Note

We can also declare a “consumer filter” on the GrpcSourceProvider so that the producer avoids transmitting events that we do not wish to consume. This is a great way to save on network bandwidth; an important concern when dealing with the edge, particularly with wireless communications.

Run the projection

We are now in a position to start up a projection that will use the offset store to remember the offset consumed, and invoke the handler for each remote event received:

Rust
sourcelet (consumer_task, consumer_kill_switch) =
    consumer::task(offset_store, source_provider, handler);
tokio::spawn(consumer_task);

A kill switch is also provided so that its task may be terminated from elsewhere.

Putting it all together

The following code represents putting all of the above together as a task that can be spawned:

Rust
source
const EXPECTED_DISTINCT_REGISTRATIONS: usize = 1000; // Apply sensor registrations to the temperature sensor entity. pub fn spawn( commit_log: FileLog, event_producer_addr: Uri, temperature: mpsc::Sender<Message<temperature::Command>>, ) -> oneshot::Sender<()> { let stream_id = StreamId::from("registration-events"); let offset_store_id = stream_id.clone(); let offset_store = offset_store::task(commit_log, EXPECTED_DISTINCT_REGISTRATIONS, offset_store_id); let source_provider = GrpcSourceProvider::new( move || { let event_producer = Channel::builder(event_producer_addr.clone()); async move { event_producer.connect().await } }, stream_id, ); let handler = move |envelope: EventEnvelope<registration::Registered>| { let temperature = temperature.clone(); async move { let (entity_id, secret) = { let secret = { let registration::Registered { secret: Some(secret), .. } = envelope.event else { return Ok(()); }; secret.value.into() }; (envelope.persistence_id.entity_id, secret) }; info!("Consuming {entity_id}"); temperature .send(Message::new( entity_id, temperature::Command::Register { secret }, )) .await .map(|_| ()) .map_err(|_| HandlerError) } }; let (consumer_task, consumer_kill_switch) = consumer::task(offset_store, source_provider, handler); tokio::spawn(consumer_task); consumer_kill_switch }

What’s next?

  • Producing temperature events
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.