use akka_persistence_rs::{
effect::{persist_event, reply, Effect, EffectExt},
entity::{Context, EventSourcedBehavior},
entity_manager, EntityId, EntityType, Message, Offset, PersistenceId,
};
use akka_persistence_rs_commitlog::{CommitLogMarshaller, CommitLogTopicAdapter, EventEnvelope};
use akka_projection_rs::offset_store::{self, LastOffset};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use std::{cmp::Ordering, sync::Arc};
use std::{future::Future, io};
use streambed::commit_log::{ConsumerRecord, Header, HeaderKey, Key, ProducerRecord, Topic};
use streambed_logged::{compaction::KeyBasedRetention, FileLog};
use tokio::sync::{mpsc, oneshot, watch, Notify};
mod internal {
use std::sync::Arc;
use akka_persistence_rs_commitlog::{CannotConsume, CannotProduce};
use tokio::sync::Notify;
use super::*;
#[derive(Clone, Default)]
pub struct State {
pub offsets: Vec<(PersistenceId, Offset)>,
}
pub enum Command {
Get {
persistence_id: PersistenceId,
reply_to: oneshot::Sender<Option<Offset>>,
},
Save {
persistence_id: PersistenceId,
offset: Offset,
},
}
#[derive(Clone, Deserialize, Serialize)]
pub enum Event {
Saved {
persistence_id: PersistenceId,
offset: Offset,
},
}
pub struct Behavior {
pub last_offset: watch::Sender<Option<LastOffset>>,
pub ready: Arc<Notify>,
}
const HASH_COLLISION_WARNING_THRESHOLD: usize = 10;
impl Behavior {
fn update_last_offset(
&self,
state: Option<&<Behavior as EventSourcedBehavior>::State>,
persistence_id: PersistenceId,
) {
self.last_offset.send_if_modified(|last_offset| {
if let Some(state) = state {
let state_offset = state.offsets.iter().find_map(|(pid, offset)| {
if pid == &persistence_id {
Some(offset.clone())
} else {
None
}
});
match (state_offset, last_offset) {
(Some(state_offset), Some((pids, last_offset))) => {
match state_offset.partial_cmp(last_offset) {
Some(Ordering::Equal) => {
if !pids.contains(&persistence_id) {
let pids_len = pids.len();
if pids_len < HASH_COLLISION_WARNING_THRESHOLD {
pids.push(persistence_id);
true
} else {
log::warn!("Exceeded hash collision threshold with {pids_len} entities sharing the same timestamp for the latest offset. Discarding {persistence_id} from here.");
false
}
} else {
false
}
}
Some(Ordering::Greater) => {
*pids = vec![persistence_id];
*last_offset = state_offset;
true
}
Some(Ordering::Less) => false,
None => false,
}
}
(None, None) => false,
(None, Some(_)) => false,
(Some(state_offset), last_offset @ None) => {
*last_offset = Some((vec![persistence_id], state_offset));
true
}
}
} else {
false
}
});
}
}
#[async_trait]
impl EventSourcedBehavior for Behavior {
type State = State;
type Command = Command;
type Event = Event;
fn for_command(
_context: &Context,
state: &Self::State,
command: Self::Command,
) -> Box<dyn Effect<Self>> {
match command {
Command::Get {
persistence_id,
reply_to,
} => {
let offset = state.offsets.iter().find_map(|(pid, offset)| {
if pid == &persistence_id {
Some(offset.clone())
} else {
None
}
});
reply(reply_to, offset).boxed()
}
Command::Save {
persistence_id,
offset,
} => persist_event(Event::Saved {
persistence_id: persistence_id.clone(),
offset,
})
.and_then(|behavior: &Self, state, result| {
if result.is_ok() {
behavior.update_last_offset(state, persistence_id);
}
async { result }
})
.boxed(),
}
}
fn on_event(_context: &Context, state: &mut Self::State, event: Self::Event) {
let Event::Saved {
persistence_id,
offset,
} = event;
match state
.offsets
.iter_mut()
.find(|(pid, _)| pid == &persistence_id)
{
Some((_, o)) => *o = offset,
None => {
let state_offsets_len = state.offsets.len();
if state_offsets_len < HASH_COLLISION_WARNING_THRESHOLD {
state.offsets.push((persistence_id, offset))
} else {
log::error!("Too many entities are hashing to the same value. Discarding offset history for {persistence_id}.");
}
}
}
}
async fn on_recovery_completed(&self, _context: &Context, state: &Self::State) {
for (persistence_id, _) in state.offsets.iter() {
self.update_last_offset(Some(state), persistence_id.clone());
}
}
async fn on_initial_recovery_completed(&self) {
self.ready.notify_one();
}
}
pub struct OffsetStoreEventMarshaller<F> {
pub entity_type: EntityType,
pub to_compaction_key: F,
}
#[async_trait]
impl<F> CommitLogMarshaller<Event> for OffsetStoreEventMarshaller<F>
where
F: Fn(&EntityId, &Event) -> Key + Send + Sync,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}
fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Key {
(self.to_compaction_key)(entity_id, event)
}
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
.find(|header| header.key == "entity-id")?;
std::str::from_utf8(value).ok().map(EntityId::from)
}
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Result<EventEnvelope<Event>, CannotConsume> {
let event = ciborium::de::from_reader(record.value.as_slice()).map_err(|e| {
CannotConsume::new(
entity_id.clone(),
format!("CBOR deserialization issue: {e}"),
)
})?;
let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr: 0, timestamp,
event,
offset: record.offset,
tags: vec![],
});
envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}
async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
_seq_nr: u64,
timestamp: DateTime<Utc>,
event: &Event,
) -> Result<ProducerRecord, CannotProduce> {
let mut value = Vec::new();
ciborium::ser::into_writer(event, &mut value).map_err(|e| {
CannotProduce::new(entity_id.clone(), format!("CBOR serialization issue: {e}"))
})?;
let headers = vec![Header {
key: HeaderKey::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
let key = self.to_compaction_key(&entity_id, event);
Ok(ProducerRecord {
topic,
headers,
timestamp: Some(timestamp),
key,
value,
partition: 0,
})
}
}
}
pub type OffsetStoreId = SmolStr;
use internal::*;
pub fn task(
mut commit_log: FileLog,
keys_expected: usize,
offset_store_id: OffsetStoreId,
) -> (
impl Future<Output = io::Result<()>>,
mpsc::Sender<offset_store::Command>,
) {
let (offset_store, mut offset_store_receiver) = mpsc::channel(1);
let task = async move {
let events_entity_type = EntityType::from(offset_store_id.clone());
let events_topic = Topic::from(offset_store_id.clone());
commit_log
.register_compaction(events_topic.clone(), KeyBasedRetention::new(keys_expected))
.await
.unwrap();
let to_compaction_key = |_: &EntityId, event: &Event| -> Key {
let Event::Saved { persistence_id, .. } = event;
persistence_id.jdk_string_hash() as u64
};
let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
OffsetStoreEventMarshaller {
entity_type: events_entity_type,
to_compaction_key,
},
&offset_store_id,
events_topic,
);
let (last_offset, last_offset_receiver) = watch::channel(None);
let ready = Arc::new(Notify::new());
let (entity_manager_runner, offset_store_entities) = entity_manager::task(
Behavior {
last_offset,
ready: ready.clone(),
},
file_log_topic_adapter,
keys_expected,
keys_expected,
);
let offset_command_handler = async {
ready.notified().await;
ready.notify_one();
while let Some(command) = offset_store_receiver.recv().await {
match command {
offset_store::Command::GetLastOffset { reply_to } => {
let _ = reply_to.send(last_offset_receiver.borrow().clone());
}
offset_store::Command::GetOffset {
persistence_id,
reply_to,
} => {
let _ = offset_store_entities
.send(Message::new(
EntityId::from(persistence_id.jdk_string_hash().to_string()),
Command::Get {
persistence_id,
reply_to,
},
))
.await;
}
offset_store::Command::SaveOffset {
persistence_id,
offset,
} => {
let _ = offset_store_entities
.send(Message::new(
EntityId::from(persistence_id.jdk_string_hash().to_string()),
Command::Save {
persistence_id,
offset,
},
))
.await;
}
}
}
};
let (_, r2) = tokio::join!(offset_command_handler, entity_manager_runner);
r2
};
(task, offset_store)
}
#[cfg(test)]
mod tests {
use super::*;
use akka_persistence_rs::TimestampOffset;
use std::{env, fs};
use test_log::test;
#[test(tokio::test)]
async fn test_get_last_offsets() {
let logged_dir = env::temp_dir().join("test_get_last_offsets");
let _ = fs::remove_dir_all(&logged_dir);
let _ = fs::create_dir_all(&logged_dir);
println!("Writing to {}", logged_dir.to_string_lossy());
let commit_log = FileLog::new(logged_dir);
let (offset_store_task, offset_store) =
task(commit_log, 10, OffsetStoreId::from("some-offset-id"));
tokio::spawn(offset_store_task);
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.unwrap();
assert_eq!(reply_to_receiver.await, Ok(None));
let persistence_id_0 = PersistenceId::new(
EntityType::from("entity-type"),
EntityId::from("entity-id0"),
);
let timestamp_0 = Utc::now();
let seq_nr_0 = 2;
offset_store
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id_0.clone(),
offset: Offset::Timestamp(TimestampOffset {
timestamp: timestamp_0,
seq_nr: seq_nr_0,
}),
})
.await
.unwrap();
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetOffset {
persistence_id: persistence_id_0.clone(),
reply_to,
})
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some(Offset::Timestamp(TimestampOffset {
timestamp: timestamp_0,
seq_nr: seq_nr_0
})))
);
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some((
vec![persistence_id_0.clone()],
Offset::Timestamp(TimestampOffset {
timestamp: timestamp_0,
seq_nr: seq_nr_0
})
)))
);
let timestamp_1 = timestamp_0 - chrono::Duration::minutes(1);
let seq_nr_1 = 1;
offset_store
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id_0.clone(),
offset: Offset::Timestamp(TimestampOffset {
timestamp: timestamp_1,
seq_nr: seq_nr_1,
}),
})
.await
.unwrap();
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some((
vec![persistence_id_0.clone()],
Offset::Timestamp(TimestampOffset {
timestamp: timestamp_0,
seq_nr: seq_nr_0
})
)))
);
let timestamp_2 = timestamp_0 + chrono::Duration::minutes(1);
let seq_nr_2 = 3;
offset_store
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id_0.clone(),
offset: Offset::Timestamp(TimestampOffset {
timestamp: timestamp_2,
seq_nr: seq_nr_2,
}),
})
.await
.unwrap();
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetOffset {
persistence_id: persistence_id_0.clone(),
reply_to,
})
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some(Offset::Timestamp(TimestampOffset {
timestamp: timestamp_2,
seq_nr: seq_nr_2
})))
);
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some((
vec![persistence_id_0.clone()],
Offset::Timestamp(TimestampOffset {
timestamp: timestamp_2,
seq_nr: seq_nr_2
})
)))
);
let persistence_id_1 = PersistenceId::new(
EntityType::from("entity-type"),
EntityId::from("entity-id1"),
);
let timestamp_3 = timestamp_0 - chrono::Duration::minutes(1);
let seq_nr_3 = 1;
offset_store
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id_1,
offset: Offset::Timestamp(TimestampOffset {
timestamp: timestamp_3,
seq_nr: seq_nr_3,
}),
})
.await
.unwrap();
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.unwrap();
assert_eq!(
reply_to_receiver.await,
Ok(Some((
vec![persistence_id_0],
Offset::Timestamp(TimestampOffset {
timestamp: timestamp_2,
seq_nr: seq_nr_2
})
)))
);
}
}