Producing temperature events
Once registered, the temperature entity can receive observations. These observations are then produced to a remote consumer, and this page describes how we produce events over gRPC. We can also produce events over HTTP and do so later in the guide.
An interesting aspect of connectivity from the edge is that it typically connects to the cloud, and not the other way round. This is largely due to network traffic restrictions where the cloud cannot “see in” to the edge, but the edge can “reach out”. Akka provides a “projection producer” for the purposes of establishing a connection and then producing events.
The offset store
We wish to avoid producing events that we have produced before. As with consuming registration events, we establish an offset store to keep track of events that we produced.
- Rust
-
source
let stream_id = StreamId::from("temperature-events"); let offset_store_id = stream_id.clone(); let offset_store = offset_store::task( commit_log.clone(), temperature::MAX_ENTITIES, offset_store_id, );
We also declare the commit log to use and the expected number of distinct entities for sizing memory usage. As with consuming registrations, the number of temperature productions is used to determine how many entity id offsets are held in memory at any one time. Each offset store must also have a distinct identity, hence the offset-store-id
.
The source provider
We establish our source of events as a commit log using a source provider with the same marshaller declared for running the entity manager earlier in the guide:
- Rust
-
source
let source_provider = CommitLogSourceProvider::new( commit_log, temperature::marshaller(events_key_secret_path, secret_store), "iot-service-projection", Topic::from(temperature::EVENTS_TOPIC), );
The producer
A “sink” of envelopes is established that forward on to a remote consumer via a producer flow. The flow will be used to bridge between source of events and this sink.
A great deal of flexibility is provided in terms of expressing how the remote consumer’s endpoint is established via the consumer_connector
. Quite often, this will be the establishment of an HTTP TLS endpoint, but it can also be plaintext (as it is in the example), or even Unix Domain Sockets.
- Rust
-
source
let consumer_connector = move || { let consumer_endpoint = Channel::builder(event_consumer_addr.clone()); async move { consumer_endpoint.connect().await } }; let (producer_task, producer_flow, producer_kill_switch) = producer::task( consumer_connector, OriginId::from("edge-iot-service"), stream_id, entity_type, MAX_IN_FLIGHT, ); tokio::spawn(producer_task);
MAX_IN_FLIGHT
represents the maximum number of publications of an event waiting for an acknowledgement that will buffer before back-pressuring the producer.
A producer is independent from the source of events in that it produces at a different rate to what is sourced. Consequently, the producer is spawned into its own task, and it can buffer the source of events across the flow.
The transformer and handler
We must declare how our events are to be transformed into their gRPC form, and then obtain a handler from any required filtering.
- Rust
-
source
let transformer = |envelope: &EventEnvelope<temperature::Event>| { let temperature::Event::TemperatureRead { temperature } = envelope.event else { return None; }; let event = proto::TemperatureRead { temperature: temperature as i32, }; Some(event) }; let producer_filter = |_: &EventEnvelope<temperature::Event>| true; let handler = producer_flow.handler(producer_filter, transformer);
The consumer
At this stage, we have a means to produce events over a gRPC producer. The final step is to start up a projection that uses the flow to source events from the commit log. The projection is able to resume from where we left off given the offset store.
- Rust
-
source
let (consumer_task, consumer_kill_switch) = consumer::task(offset_store, source_provider, handler); tokio::spawn(consumer_task);
Putting it all together
The following code puts all of the above together as a task that can be spawned:
- Rust
-
source
const MAX_IN_FLIGHT: usize = 10; // Apply sensor observations to a remote consumer. pub fn spawn( commit_log: FileLog, event_consumer_addr: Uri, secret_store: FileSecretStore, events_key_secret_path: String, ) -> (oneshot::Sender<()>, oneshot::Sender<()>) { let entity_type = EntityType::from(temperature::ENTITY_TYPE); let stream_id = StreamId::from("temperature-events"); let offset_store_id = stream_id.clone(); let offset_store = offset_store::task( commit_log.clone(), temperature::MAX_ENTITIES, offset_store_id, ); let source_provider = CommitLogSourceProvider::new( commit_log, temperature::marshaller(events_key_secret_path, secret_store), "iot-service-projection", Topic::from(temperature::EVENTS_TOPIC), ); let consumer_connector = move || { let consumer_endpoint = Channel::builder(event_consumer_addr.clone()); async move { consumer_endpoint.connect().await } }; let (producer_task, producer_flow, producer_kill_switch) = producer::task( consumer_connector, OriginId::from("edge-iot-service"), stream_id, entity_type, MAX_IN_FLIGHT, ); tokio::spawn(producer_task); let transformer = |envelope: &EventEnvelope<temperature::Event>| { let temperature::Event::TemperatureRead { temperature } = envelope.event else { return None; }; info!( "Producing {} with temperature of {}", envelope.persistence_id, temperature ); let event = proto::TemperatureRead { temperature: temperature as i32, }; Some(event) }; let producer_filter = |_: &EventEnvelope<temperature::Event>| true; let handler = producer_flow.handler(producer_filter, transformer); let (consumer_task, consumer_kill_switch) = consumer::task(offset_store, source_provider, handler); tokio::spawn(consumer_task); (producer_kill_switch, consumer_kill_switch) }
What’s next?
- HTTP temperature events