#![doc = include_str!("../README.md")]
use akka_persistence_rs::{
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId, EntityType, Offset, PersistenceId, Source, Tag, WithOffset, WithPersistenceId,
WithSeqNr, WithSource, WithTags, WithTimestamp,
};
use async_stream::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{
CommitLog, ConsumerRecord, Key, Offset as CommitLogOffset, ProducerRecord, Subscription,
Topic,
},
secret_store::SecretStore,
};
use tokio_stream::{Stream, StreamExt};
#[derive(Clone, Debug, PartialEq)]
pub struct EventEnvelope<E> {
pub persistence_id: PersistenceId,
pub seq_nr: u64,
pub timestamp: DateTime<Utc>,
pub event: E,
pub offset: CommitLogOffset,
pub tags: Vec<Tag>,
}
impl<E> WithPersistenceId for EventEnvelope<E> {
fn persistence_id(&self) -> &PersistenceId {
&self.persistence_id
}
}
impl<E> WithOffset for EventEnvelope<E> {
fn offset(&self) -> Offset {
Offset::Sequence(self.offset)
}
}
impl<E> WithSeqNr for EventEnvelope<E> {
fn seq_nr(&self) -> u64 {
self.seq_nr
}
}
impl<E> WithSource for EventEnvelope<E> {
fn source(&self) -> akka_persistence_rs::Source {
Source::Regular
}
}
impl<E> WithTags for EventEnvelope<E> {
fn tags(&self) -> &[Tag] {
&self.tags
}
}
impl<E> WithTimestamp for EventEnvelope<E> {
fn timestamp(&self) -> &DateTime<Utc> {
&self.timestamp
}
}
#[derive(Debug)]
pub struct CannotConsume {
_entity_id: EntityId,
_cause: String,
}
impl CannotConsume {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}
#[derive(Debug)]
pub struct CannotProduce {
_entity_id: EntityId,
_cause: String,
}
impl CannotProduce {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}
#[async_trait]
pub trait CommitLogMarshaller<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
fn entity_type(&self) -> EntityType;
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key;
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId>;
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Result<EventEnvelope<E>, CannotConsume>;
async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Result<ProducerRecord, CannotProduce>;
}
#[async_trait]
pub trait EncryptedCommitLogMarshaller<E>: CommitLogMarshaller<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
type SecretStore: SecretStore;
fn secret_store(&self) -> &Self::SecretStore;
fn secret_path(&self, entity_id: &EntityId) -> Arc<str>;
#[cfg(feature = "cbor")]
async fn decrypted_envelope(
&self,
entity_id: EntityId,
mut record: ConsumerRecord,
) -> Result<EventEnvelope<E>, CannotConsume> {
use streambed::commit_log::{Header, HeaderKey};
let secret_path = self.secret_path(&entity_id);
let event = streambed::decrypt_buf(
self.secret_store(),
&secret_path,
&mut record.value,
|value| ciborium::de::from_reader(value),
)
.await
.ok_or(CannotConsume::new(
entity_id.clone(),
format!("Cannot decrypt with key: {secret_path}"),
))?;
let seq_nr = record
.headers
.iter()
.find_map(|Header { key, value }| {
if key == &HeaderKey::from("seq_nr") {
if value.len() >= 8 {
if let Ok(value) = value[0..8].try_into() {
Some(u64::from_be_bytes(value))
} else {
None
}
} else {
None
}
} else {
None
}
})
.ok_or(CannotConsume::new(
entity_id.clone(),
"Cannot find seq_nr header",
))?;
let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
});
envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}
#[cfg(not(feature = "cbor"))]
async fn decrypted_envelope(
&self,
entity_id: EntityId,
mut record: ConsumerRecord,
) -> Option<EventEnvelope<E>>;
#[cfg(feature = "cbor")]
async fn encrypted_producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Result<ProducerRecord, CannotProduce> {
use streambed::commit_log::{Header, HeaderKey};
let key = self.to_compaction_key(&entity_id, event);
let secret_path = self.secret_path(&entity_id);
let buf = streambed::encrypt_struct(
self.secret_store(),
&secret_path,
|event| {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf).map(|_| buf)
},
rand::thread_rng,
&event,
)
.await
.ok_or(CannotProduce::new(
entity_id,
format!("Problem encrypting and serializing with secret path: {secret_path}"),
))?;
Ok(ProducerRecord {
topic,
headers: vec![Header {
key: HeaderKey::from("seq_nr"),
value: u64::to_be_bytes(seq_nr).to_vec(),
}],
timestamp: Some(timestamp),
key,
value: buf,
partition: 0,
})
}
#[cfg(not(feature = "cbor"))]
async fn encrypted_producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord>;
}
pub struct CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogMarshaller<E>,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
commit_log: CL,
consumer_group_name: String,
marshaller: M,
topic: Topic,
phantom: PhantomData<E>,
}
impl<CL, E, M> CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogMarshaller<E>,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
pub fn new(commit_log: CL, marshaller: M, consumer_group_name: &str, topic: Topic) -> Self {
Self {
commit_log,
consumer_group_name: consumer_group_name.into(),
marshaller,
topic,
phantom: PhantomData,
}
}
}
#[async_trait]
impl<CL, E, M> SourceProvider<E> for CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogMarshaller<E> + Send + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
async fn source_initial(
&mut self,
) -> io::Result<Pin<Box<dyn Stream<Item = EntityManagerEventEnvelope<E>> + Send + 'async_trait>>>
{
let consumer_records = produce_to_last_offset(
&self.commit_log,
&self.consumer_group_name,
self.topic.clone(),
)
.await;
let marshaller = &self.marshaller;
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
panic!("When initially consuming: {e:?}. Having to abort as this is unrecoverable.");
}
}
}
}
})))
} else {
Ok(Box::pin(tokio_stream::empty()))
}
}
async fn source(
&mut self,
entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = EntityManagerEventEnvelope<E>> + Send + 'async_trait>>>
{
let consumer_records = produce_to_last_offset(
&self.commit_log,
&self.consumer_group_name,
self.topic.clone(),
)
.await;
let marshaller = &self.marshaller;
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
if &record_entity_id == entity_id {
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
panic!("When consuming: {e:?}. Having to abort as this is unrecoverable.");
}
}
}
}
}
})))
} else {
Ok(Box::pin(tokio_stream::empty()))
}
}
}
async fn produce_to_last_offset<'async_trait>(
commit_log: &'async_trait impl CommitLog,
consumer_group_name: &str,
topic: Topic,
) -> io::Result<Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'async_trait>>> {
let last_offset = commit_log
.offsets(topic.clone(), 0)
.await
.map(|lo| lo.end_offset);
if let Some(last_offset) = last_offset {
let subscriptions = vec![Subscription { topic }];
let mut records =
commit_log.scoped_subscribe(consumer_group_name, vec![], subscriptions, None);
Ok(Box::pin(stream!({
while let Some(record) = records.next().await {
if record.offset <= last_offset {
let is_last_offset = record.offset == last_offset;
yield record;
if !is_last_offset {
continue;
}
}
break;
}
})))
} else {
Ok(Box::pin(tokio_stream::empty()))
}
}
#[async_trait]
impl<CL, E, M> Handler<E> for CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogMarshaller<E> + Send + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
async fn process(
&mut self,
envelope: EntityManagerEventEnvelope<E>,
) -> io::Result<EntityManagerEventEnvelope<E>> {
let producer_record = self
.marshaller
.producer_record(
self.topic.clone(),
envelope.entity_id.clone(),
envelope.seq_nr,
envelope.timestamp,
&envelope.event,
)
.await
.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"A problem occurred converting a envelope when producing",
)
})?;
self.commit_log
.produce(producer_record)
.await
.map(|_| envelope)
.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"A problem occurred producing a envelope",
)
})
}
}
#[cfg(feature = "cbor")]
pub mod cbor {
use super::*;
pub struct Marshaller<E, F, SS, const RTB: u64> {
pub entity_type: EntityType,
pub events_key_secret_path: Arc<str>,
pub to_record_type: F,
pub secret_store: SS,
phantom: PhantomData<E>,
}
impl<E, F, SS, const RTB: u64> Marshaller<E, F, SS, RTB> {
const EVENT_TYPE_BIT_SHIFT: u64 = 64 - RTB;
const ENTITY_ID_BIT_MASK: u64 = !(0xFFFFFFFF_FFFFFFFF << Self::EVENT_TYPE_BIT_SHIFT);
}
#[async_trait]
impl<E, F, SS, const RTB: u64> CommitLogMarshaller<E> for Marshaller<E, F, SS, RTB>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key {
let record_type = (self.to_record_type)(event);
let entity_id = entity_id.parse::<u64>().unwrap();
assert!(
entity_id & !Self::EVENT_TYPE_BIT_SHIFT != 0,
"Entity id occupies too many bits."
);
record_type << Self::EVENT_TYPE_BIT_SHIFT | entity_id
}
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let entity_id = record.key & Self::ENTITY_ID_BIT_MASK;
let mut buffer = itoa::Buffer::new();
Some(EntityId::from(buffer.format(entity_id)))
}
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Result<EventEnvelope<E>, CannotConsume> {
self.decrypted_envelope(entity_id, record).await
}
async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Result<ProducerRecord, CannotProduce> {
self.encrypted_producer_record(topic, entity_id, seq_nr, timestamp, event)
.await
}
}
#[async_trait]
impl<E, F, SS, const RTB: u64> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS, RTB>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
{
type SecretStore = SS;
fn secret_store(&self) -> &Self::SecretStore {
&self.secret_store
}
fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
self.events_key_secret_path.clone()
}
}
pub fn marshaller<E, F, S, SS, const RTB: u64>(
entity_type: EntityType,
events_key_secret_path: S,
secret_store: SS,
to_record_type: F,
) -> Marshaller<E, F, SS, RTB>
where
for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
S: ToString,
{
Marshaller {
entity_type,
events_key_secret_path: Arc::from(events_key_secret_path.to_string()),
to_record_type,
secret_store,
phantom: PhantomData,
}
}
}
#[cfg(test)]
mod tests {
use std::{env, fs, time::Duration};
use super::*;
use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager};
use serde::Deserialize;
use streambed::commit_log::{Header, HeaderKey};
use streambed_logged::FileLog;
use test_log::test;
use tokio::time;
#[derive(Clone, Deserialize, Serialize)]
struct MyEvent {
value: String,
}
struct MyBehavior;
impl EventSourcedBehavior for MyBehavior {
type State = ();
type Command = ();
type Event = MyEvent;
fn for_command(
_context: &akka_persistence_rs::entity::Context,
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn akka_persistence_rs::effect::Effect<Self>> {
todo!()
}
fn on_event(
_context: &akka_persistence_rs::entity::Context,
_state: &mut Self::State,
_event: Self::Event,
) {
todo!()
}
}
struct MyEventMarshaller;
#[async_trait]
impl CommitLogMarshaller<MyEvent> for MyEventMarshaller {
fn entity_type(&self) -> EntityType {
EntityType::from("some-entity-type")
}
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Key {
panic!("should not be called")
}
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
.find(|header| header.key == "entity-id")?;
std::str::from_utf8(value).ok().map(EntityId::from)
}
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Result<EventEnvelope<MyEvent>, CannotConsume> {
let value = String::from_utf8(record.value)
.ok()
.ok_or(CannotConsume::new(entity_id.clone(), "bad entity id"))?;
let event = MyEvent { value };
let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
});
envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}
async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
_seq_nr: u64,
timestamp: DateTime<Utc>,
event: &MyEvent,
) -> Result<ProducerRecord, CannotProduce> {
let headers = vec![Header {
key: HeaderKey::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
Ok(ProducerRecord {
topic,
headers,
timestamp: Some(timestamp),
key: 0,
value: event.value.clone().into_bytes(),
partition: 0,
})
}
}
#[test(tokio::test)]
async fn can_source_and_flow() {
let logged_dir = env::temp_dir().join("can_source_and_flow");
let _ = fs::remove_dir_all(&logged_dir);
let _ = fs::create_dir_all(&logged_dir);
println!("Writing to {}", logged_dir.to_string_lossy());
let commit_log = FileLog::new(logged_dir);
let marshaller = MyEventMarshaller;
let mut adapter = CommitLogTopicAdapter::new(
commit_log.clone(),
marshaller,
"some-consumer",
Topic::from("some-topic"),
);
let entity_id = EntityId::from("some-entity");
let timestamp = Utc::now();
{
let mut envelopes = adapter.source_initial().await.unwrap();
assert!(envelopes.next().await.is_none());
}
let envelope = adapter
.process(EntityManagerEventEnvelope::new(
entity_id.clone(),
1,
timestamp,
MyEvent {
value: "first-event".to_string(),
},
))
.await
.unwrap();
assert_eq!(envelope.entity_id, entity_id);
let envelope = adapter
.process(EntityManagerEventEnvelope::new(
entity_id.clone(),
2,
timestamp,
MyEvent {
value: "second-event".to_string(),
},
))
.await
.unwrap();
assert_eq!(envelope.entity_id, entity_id);
adapter
.process(EntityManagerEventEnvelope::new(
"some-other-entity-id",
1,
timestamp,
MyEvent {
value: "third-event".to_string(),
},
))
.await
.unwrap();
for _ in 0..10 {
let last_offset = commit_log
.offsets(Topic::from("some-topic"), 0)
.await
.map(|lo| lo.end_offset);
if last_offset == Some(3) {
break;
}
time::sleep(Duration::from_millis(100)).await;
}
{
let mut envelopes = adapter.source(&entity_id).await.unwrap();
let envelope = envelopes.next().await.unwrap();
assert_eq!(envelope.entity_id, entity_id);
assert_eq!(envelope.seq_nr, 1);
assert_eq!(envelope.event.value, "first-event");
let envelope = envelopes.next().await.unwrap();
assert_eq!(envelope.entity_id, entity_id);
assert_eq!(envelope.event.value, "second-event");
assert!(envelopes.next().await.is_none());
}
}
#[test(tokio::test)]
async fn can_establish_an_entity_manager() {
let commit_log = FileLog::new("/dev/null");
let marshaller = MyEventMarshaller;
let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
marshaller,
"some-consumer",
Topic::from("some-topic"),
);
let (entity_manager, _) = entity_manager::task(MyBehavior, file_log_topic_adapter, 10, 1);
assert!(entity_manager.await.is_ok());
}
}