The main function
The main function brings everything together and is the entry point into the service.
Arguments
Command line arguments are declared using clap, which is popular within the Rust community.
- Rust
-
source
#[derive(Parser, Debug)] #[clap(author, about, long_about = None)] struct Args { // Logged commit log args #[clap(flatten)] cl_args: CommitLogArgs, // A socket address for connecting to a GRPC event consuming // service for temperature observations. #[clap(env, long, default_value = "http://127.0.0.1:8101")] event_consumer_addr: Uri, // A socket address for connecting to a GRPC event producing // service for registrations. #[clap(env, long, default_value = "http://127.0.0.1:8101")] event_producer_addr: Uri, // A socket address for serving our HTTP web service requests. #[clap(env, long, default_value = "127.0.0.1:8080")] http_addr: SocketAddr, // Logged commit log args #[clap(flatten)] ss_args: SsArgs, // A socket address for receiving telemetry from our fictitious // sensor. #[clap(env, long, default_value = "127.0.0.1:8081")] udp_addr: SocketAddr, }
Some of the arguments used here are declared by Streambed i.e. the cl_args
and the ss_args
for the commit log and secret store respectively. A reasonable set of defaults are supplied above such that the commit log and secret store are the only arguments we need to supply to run the sample.
You might also use git-version to supply the version in the args.
Establishing the secret store
We setup and authenticate our service with the secret store and do this by consuming a “root secret” and another secret to be used as a passcode for authenticating our service. These secrets are provided on the command line as standard input to avoid being written to disk as a security consideration. The expectation is that a supervisory service, perhaps systemd
, sources and provides these secrets from a hardware-based secure enclave.
- Rust
-
source
let ss = { let line = streambed::read_line(std::io::stdin()).unwrap(); assert!(!line.is_empty(), "Failed to source a line from stdin"); let (root_secret, ss_secret_id) = line.split_at(32); let root_secret = hex::decode(root_secret).unwrap(); let ss = FileSecretStore::new( args.ss_args.ss_root_path, &root_secret.try_into().unwrap(), args.ss_args.ss_unauthorized_timeout.into(), args.ss_args.ss_max_secrets, args.ss_args.ss_ttl_field.as_deref(), ); ss.approle_auth(&args.ss_args.ss_role_id, ss_secret_id) .await .unwrap(); ss };
The service’s root secret is used to encrypt and decrypt data with its secret store. The secret store being used here is Streambed Confidant, which provides a file-system based store.
Generating keys
The first time a service is run it must generate some keys for the purposes of encrypting the commit log when appending to it, and decrypting it when consuming.
- Rust
-
source
let temperature_events_key_secret_path = format!("{}/secrets.temperature-events.key", args.ss_args.ss_ns); if let Ok(None) = ss.get_secret(&temperature_events_key_secret_path).await { // If we can't write this initial secret then all bets are off let mut key = vec![0; 16]; rand::thread_rng().fill_bytes(&mut key); let data = HashMap::from([("value".to_string(), hex::encode(key))]); ss.create_secret(&temperature_events_key_secret_path, SecretData { data }) .await .unwrap(); }
Establishing the commit log
The first instance of the commit log receives the location of where its storage should reside. This instance will be cloned and passed to the other tasks that need it:
- Rust
-
source
let cl = FileLog::new(args.cl_args.cl_root_path.clone());
Putting it all together
The remaining code spawns the other tasks covered by this guide. With exception to the argument declaration, here is the main function in its entirety:
- Rust
-
source
#[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let args = Args::parse(); env_logger::builder().format_timestamp_millis().init(); let ss = { let line = streambed::read_line(std::io::stdin()).unwrap(); assert!(!line.is_empty(), "Failed to source a line from stdin"); let (root_secret, ss_secret_id) = line.split_at(32); let root_secret = hex::decode(root_secret).unwrap(); let ss = FileSecretStore::new( args.ss_args.ss_root_path, &root_secret.try_into().unwrap(), args.ss_args.ss_unauthorized_timeout.into(), args.ss_args.ss_max_secrets, args.ss_args.ss_ttl_field.as_deref(), ); ss.approle_auth(&args.ss_args.ss_role_id, ss_secret_id) .await .unwrap(); ss }; let temperature_events_key_secret_path = format!("{}/secrets.temperature-events.key", args.ss_args.ss_ns); if let Ok(None) = ss.get_secret(&temperature_events_key_secret_path).await { // If we can't write this initial secret then all bets are off let mut key = vec![0; 16]; rand::thread_rng().fill_bytes(&mut key); let data = HashMap::from([("value".to_string(), hex::encode(key))]); ss.create_secret(&temperature_events_key_secret_path, SecretData { data }) .await .unwrap(); } let cl = FileLog::new(args.cl_args.cl_root_path.clone()); // Establish the task and command sender for the temperature entity let (temperature_entity_manager, temperature_commands) = temperature::spawn( cl.clone(), ss.clone(), temperature_events_key_secret_path.clone(), ) .await; // Start up a task to manage registration projections let _registration_projection_kill_switch = registration_projection::spawn( cl.clone(), args.event_producer_addr, temperature_commands.clone(), ); // Start up a task to manage temperature productions let _temperature_projection_kill_switch = temperature_production::spawn( cl.clone(), args.event_consumer_addr, ss.clone(), temperature_events_key_secret_path.clone(), ); // Start up the http service let routes = http_server::routes(cl, ss, temperature_events_key_secret_path); tokio::spawn(warp::serve(routes).run(args.http_addr)); info!("HTTP listening on {}", args.http_addr); // Start up the UDP service let socket = UdpSocket::bind(args.udp_addr).await?; tokio::spawn(udp_server::task(socket, temperature_commands)); info!("UDP listening on {}", args.udp_addr); // All things started. Wait for the entity manager to complete. info!("IoT service ready"); let _ = temperature_entity_manager.await?; // If we get here then we are shutting down. Any other task, // such as the projection one, will stop automatically given // that its sender will be dropped. Ok(()) }
Single core machines
Edge based services tend to operate on single core machines. If this is the case with yours then you should consider establishing the executor as a single-threaded one, also reserving a number of blocking IO threads and providing a reasonable stack size:
fn main() -> Result<(), Box<dyn Error>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.max_blocking_threads(x)
.thread_stack_size(2 * 1024 * 1024)
.build()?;
rt.block_on(main_task())
}
async fn main_task() -> Result<(), Box<dyn Error>> {
...
}