HTTP temperature events

For the purposes of serving requests made by a user interface, we stream events to one in a similar fashion to how we produce events over gRPC; at least in terms of how we source the events. There are variety of technologies that can be used to push a stream of events to a user interface and we illustrate Server Sent Events (SSE). SSE is attractive for browser-based user interfaces given its ease-of use, and because the browser takes care of maintaining a connection with a server.

For our example, we are using Warp as the toolkit for serving HTTP. There are a number of HTTP toolkits available so you may decide on using another for other reasons. Our choice is here based simply on familiarity and the fact that Warp is authored by the same team that provides Hyper, a very popular lower-level HTTP toolkit in the Rust community.

The offset store

We have an offset store (again!). This time, things are slightly different. In our example, we assume that our clients are stateless and will not request events given a starting offset. We can get away with this given the use of Streambed Logged as our storage medium, and its compaction function which keeps the number of events to something manageable. However, we still need an offset store though so we can correctly track offset sequence numbers. For this particular use-case, we use a volatile_offset_store:

Rust
sourcelet offset_store = volatile_offset_store::task(temperature::MAX_ENTITIES);

We size the offset store with the number of entities held in memory at any one time. This ultimately results in the amount of memory reserved for the offset store, and it will grow dynamically beyond this size if required. However, it is good practice to size it with the maximum anticipated so that memory consumption is reasonably deterministic.

Note

In all applications, constraining the number of external requests at any one time is important so that a DDoS attack will not bring a process down. This can be best done with an application level proxy.

The source provider

A source provider is established in the same way as when producing over gRPC, again using the same marshaller declared for running the entity manager earlier in the guide:

Rust
sourcelet source_provider = CommitLogSourceProvider::new(
    commit_log.clone(),
    temperature::marshaller(events_key_secret_path.clone(), secret_store.clone()),
    "http-projection",
    Topic::from(temperature::EVENTS_TOPIC),
);

The handler

A handler function is provided to send envelopes from the commit log over a channel that we will subsequently stream via SSE.

Rust
sourcelet (temperature_events, temperature_events_receiver) = mpsc::channel(1);
let handler = move |envelope: EventEnvelope<temperature::Event>| {
    let entity_id = entity_id.clone();
    let temperature_events = temperature_events.clone();
    async move {
        if envelope.persistence_id.entity_id == entity_id {
            temperature_events
                .send((entity_id, envelope.event))
                .await
                .map(|_| ())
                .map_err(|_| HandlerError)
        } else {
            Ok(())
        }
    }
};

The consumer

Set up the consumer task that will source events from using our in-memory offset store, source provider and handler.

Rust
sourcelet (consumer_task, consumer_kill_switch) =
    consumer::task(offset_store, source_provider, handler);
tokio::spawn(consumer_task);

Server Sent Events (SSE)

The final step is to consume the events produced by the consumer’s handler. We do this by wrapping the receiver end of the channel created earlier. Events are then serialized into JSON.

Rust
sourcelet event_stream = ReceiverStream::new(temperature_events_receiver)
    .map(|(entity_id, event)| {
        let sse_event = sse::Event::default();
        let sse_event = if let temperature::Event::Registered { .. } = event {
            sse_event.id(entity_id)
        } else {
            sse_event
        };
        sse_event.json_data(event)
    })
    .take_until(async {
        let result =
            time::timeout(MAX_SSE_CONNECTION_TIME, future::pending::<()>()).await;
        drop(consumer_kill_switch);
        result
    });

let event_stream = sse::keep_alive().stream(event_stream);
sse::reply(event_stream)

We use MAX_SSE_CONNECTION_TIME to limit the maximum amount of time that an SSE connection can be held. Any consumer of the SSE events will automatically re-connect if they are still able to. This circumvents keeping a TCP socket open for many minutes in the absence of there being a connection. While producing our SSE events should relatively efficient, we do not do it unless we really need to, and TCP is not always fast in detecting the loss of a network connection.

Putting it all together

The following code puts all of the above together as a route that can be served:

Rust
source// We use this to limit the maximum amount of time that an SSE connection can be held.
// Any consumer of the SSE events will automatically re-connect if they are still able
// to. This circumvents keeping a TCP socket open for many minutes in the absence of
// there being a connection. While producing our SSE events should relatively efficient,
// we don't do it unless we really need to, and TCP isn't great about detecting the
// loss of a network
const MAX_SSE_CONNECTION_TIME: Duration = Duration::from_secs(60);

// Declares routes to serve our HTTP interface.
pub fn routes(
    commit_log: FileLog,
    secret_store: FileSecretStore,
    events_key_secret_path: String,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
    let get_temperature_route = {
        warp::get()
            .and(warp::path("events"))
            .and(warp::path::param())
            .and(warp::path::end())
            .map(move |entity_id: String| {
                let entity_id = EntityId::from(entity_id);

                let offset_store = volatile_offset_store::task(temperature::MAX_ENTITIES);

                let source_provider = CommitLogSourceProvider::new(
                    commit_log.clone(),
                    temperature::marshaller(events_key_secret_path.clone(), secret_store.clone()),
                    "http-projection",
                    Topic::from(temperature::EVENTS_TOPIC),
                );

                let (temperature_events, temperature_events_receiver) = mpsc::channel(1);
                let handler = move |envelope: EventEnvelope<temperature::Event>| {
                    let entity_id = entity_id.clone();
                    let temperature_events = temperature_events.clone();
                    async move {
                        if envelope.persistence_id.entity_id == entity_id {
                            temperature_events
                                .send((entity_id, envelope.event))
                                .await
                                .map(|_| ())
                                .map_err(|_| HandlerError)
                        } else {
                            Ok(())
                        }
                    }
                };

                let (consumer_task, consumer_kill_switch) =
                    consumer::task(offset_store, source_provider, handler);
                tokio::spawn(consumer_task);

                let event_stream = ReceiverStream::new(temperature_events_receiver)
                    .map(|(entity_id, event)| {
                        let sse_event = sse::Event::default();
                        let sse_event = if let temperature::Event::Registered { .. } = event {
                            sse_event.id(entity_id)
                        } else {
                            sse_event
                        };
                        sse_event.json_data(event)
                    })
                    .take_until(async {
                        let result =
                            time::timeout(MAX_SSE_CONNECTION_TIME, future::pending::<()>()).await;
                        drop(consumer_kill_switch);
                        result
                    });

                let event_stream = sse::keep_alive().stream(event_stream);
                sse::reply(event_stream)
            })
    };

    let routes = get_temperature_route;

    warp::path("api").and(warp::path("temperature").and(routes))
}

What’s next?

  • UDP observations
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.