use std::{collections::HashMap, io};
use futures::Future;
use tokio::sync::mpsc;
use crate::offset_store;
pub fn task(
keys_expected: usize,
) -> (
impl Future<Output = io::Result<()>>,
mpsc::Sender<offset_store::Command>,
) {
let (sender, mut receiver) = mpsc::channel(1);
let task = async move {
let mut offsets = HashMap::with_capacity(keys_expected);
while let Some(command) = receiver.recv().await {
match command {
offset_store::Command::GetLastOffset { reply_to } => {
let _ = reply_to.send(None);
offsets.clear();
}
offset_store::Command::GetOffset {
persistence_id,
reply_to,
} => {
let _ = reply_to.send(offsets.get(&persistence_id).cloned());
}
offset_store::Command::SaveOffset {
persistence_id,
offset,
} => {
offsets
.entry(persistence_id)
.and_modify(|v| *v = offset.clone())
.or_insert(offset);
}
}
}
Ok(())
};
(task, sender)
}
#[cfg(test)]
mod tests {
use super::*;
use akka_persistence_rs::{EntityId, EntityType, Offset, PersistenceId};
use test_log::test;
use tokio::sync::oneshot;
#[test(tokio::test)]
async fn test_basic_ops() {
let (task, commands) = task(1);
tokio::spawn(task);
let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));
let persistence_id =
PersistenceId::new(EntityType::from("entity-type"), EntityId::from("entity-id"));
let offset = Offset::Sequence(10);
assert!(commands
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id.clone(),
offset: offset.clone()
})
.await
.is_ok());
let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetOffset {
persistence_id: persistence_id.clone(),
reply_to
})
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(Some(offset)));
let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));
let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetOffset {
persistence_id,
reply_to
})
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));
}
}