use async_trait::async_trait;
use chrono::Utc;
use std::future::{self, Ready};
use std::result::Result as StdResult;
use std::{future::Future, io, marker::PhantomData};
use tokio::sync::oneshot;
use crate::{
entity::EventSourcedBehavior,
entity_manager::{EntityOps, EventEnvelope, Handler},
EntityId,
};
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
pub type Result = StdResult<(), Error>;
#[async_trait]
pub trait Effect<B>: Send
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result;
}
pub struct And<E, L, R> {
_l: L,
_r: R,
phantom: PhantomData<E>,
}
#[async_trait]
impl<B, L, R> Effect<B> for And<B, L, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
L: Effect<B>,
R: Effect<B>,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let r = self
._l
.process(
behavior,
handler,
entities,
entity_id,
last_seq_nr,
prev_result,
)
.await;
self._r
.process(behavior, handler, entities, entity_id, last_seq_nr, r)
.await
}
}
impl<B, L, R> EffectExt<B> for And<B, L, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
L: Effect<B>,
R: Effect<B>,
{
}
pub trait EffectExt<B>: Effect<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
fn and<R>(self, r: R) -> And<B, Self, R>
where
Self: Sized,
R: Effect<B>,
{
And {
_l: self,
_r: r,
phantom: PhantomData,
}
}
fn and_then<F, R>(self, f: F) -> And<B, Self, Then<B, F, R>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result>,
{
And {
_l: self,
_r: Then {
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
#[allow(clippy::type_complexity)]
fn then_persist_event<F>(
self,
f: F,
) -> And<
B,
Self,
ThenPersistEvent<
B,
Box<
dyn FnOnce(
&B,
Option<&B::State>,
Result,
) -> Ready<StdResult<Option<B::Event>, Error>>
+ Send,
>,
Ready<StdResult<Option<B::Event>, Error>>,
>,
>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(Option<&B::State>) -> Option<B::Event> + Send + 'static,
{
let f = Box::new(|_b: &B, s: Option<&B::State>, r: Result| {
let r = if let Err(e) = r { Err(e) } else { Ok(f(s)) };
future::ready(r)
});
And {
_l: self,
_r: ThenPersistEvent {
deletion_event: false,
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
#[allow(clippy::type_complexity)]
fn then_reply<F, T>(
self,
f: F,
) -> And<
B,
Self,
ThenReply<
B,
Box<dyn FnOnce(&B, Option<&B::State>, Result) -> Ready<ReplyResult<T>> + Send>,
Ready<ReplyResult<T>>,
T,
>,
>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(Option<&B::State>) -> Option<ReplyTo<T>> + Send + 'static,
T: Send,
{
let f = Box::new(|_b: &B, s: Option<&B::State>, r: Result| {
let r = if let Err(e) = r { Err(e) } else { Ok(f(s)) };
future::ready(r)
});
And {
_l: self,
_r: ThenReply {
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
fn boxed(self) -> Box<dyn Effect<B>>
where
Self: Sized + 'static,
{
Box::new(self)
}
}
pub struct PersistEvent<B>
where
B: EventSourcedBehavior,
{
deletion_event: bool,
event: Option<B::Event>,
}
#[async_trait]
impl<B> Effect<B> for PersistEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
B::Event: Send,
{
async fn process(
&mut self,
_behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
if prev_result.is_ok() {
if let Some(event) = self.event.take() {
let seq_nr = last_seq_nr.wrapping_add(1);
let envelope = EventEnvelope {
entity_id: entity_id.clone(),
seq_nr,
timestamp: Utc::now(),
event,
deletion_event: self.deletion_event,
};
let result = handler.process(envelope).await.map_err(Error::IoError);
if let Ok(envelope) = result {
*last_seq_nr = entities.update(envelope);
Ok(())
} else {
result.map(|_| ())
}
} else {
prev_result
}
} else {
prev_result
}
}
}
impl<B> EffectExt<B> for PersistEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
B::Event: Send,
{
}
pub fn persist_event<B>(event: B::Event) -> PersistEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
PersistEvent {
deletion_event: false,
event: Some(event),
}
}
pub fn persist_deletion_event<B>(event: B::Event) -> PersistEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
PersistEvent {
deletion_event: true,
event: Some(event),
}
}
pub type ReplyTo<T> = (oneshot::Sender<T>, T);
pub type ReplyResult<T> = StdResult<Option<ReplyTo<T>>, Error>;
pub struct Reply<B, T> {
replier: Option<ReplyTo<T>>,
phantom: PhantomData<B>,
}
#[async_trait]
impl<B, T> Effect<B> for Reply<B, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
T: Send,
{
async fn process(
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
if prev_result.is_ok() {
if let Some((reply_to, reply)) = self.replier.take() {
let _ = reply_to.send(reply);
}
}
prev_result
}
}
impl<B, T> EffectExt<B> for Reply<B, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
T: Send,
{
}
pub fn reply<B, T>(reply_to: oneshot::Sender<T>, reply: T) -> Reply<B, T> {
Reply {
replier: Some((reply_to, reply)),
phantom: PhantomData,
}
}
pub struct Then<B, F, R> {
f: Option<F>,
phantom: PhantomData<(B, R)>,
}
#[async_trait]
impl<B, F, R> Effect<B> for Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result> + Send,
{
async fn process(
&mut self,
behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
f(behavior, entities.get(entity_id), prev_result).await
} else {
Ok(())
}
}
}
impl<B, F, R> EffectExt<B> for Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result> + Send,
{
}
pub fn then<B, F, R>(f: F) -> Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result>,
{
Then {
f: Some(f),
phantom: PhantomData,
}
}
pub struct ThenPersistEvent<B, F, R>
where
B: EventSourcedBehavior,
{
deletion_event: bool,
f: Option<F>,
phantom: PhantomData<(B, R)>,
}
#[async_trait]
impl<B, F, R> Effect<B> for ThenPersistEvent<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
B::Event: Send,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
let result = f(behavior, entities.get(entity_id), prev_result).await;
if let Ok(event) = result {
let mut effect = PersistEvent::<B> {
deletion_event: self.deletion_event,
event,
};
effect
.process(behavior, handler, entities, entity_id, last_seq_nr, Ok(()))
.await
} else {
result.map(|_| ())
}
} else {
Ok(())
}
}
}
impl<B, F, R> EffectExt<B> for ThenPersistEvent<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
B::Event: Send,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
}
pub struct ThenReply<B, F, R, T> {
f: Option<F>,
phantom: PhantomData<(B, R, T)>,
}
#[async_trait]
impl<B, F, R, T> Effect<B> for ThenReply<B, F, R, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = ReplyResult<T>> + Send,
T: Send,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
let result = f(behavior, entities.get(entity_id), prev_result).await;
if let Ok(replier) = result {
let mut effect = Reply {
replier,
phantom: PhantomData,
};
effect
.process(behavior, handler, entities, entity_id, last_seq_nr, Ok(()))
.await
} else {
Ok(())
}
} else {
Ok(())
}
}
}
impl<B, F, R, T> EffectExt<B> for ThenReply<B, F, R, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = ReplyResult<T>> + Send,
T: Send,
{
}
pub struct Unhandled<E> {
phantom: PhantomData<E>,
}
#[async_trait]
impl<B> Effect<B> for Unhandled<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
async fn process(
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
_prev_result: Result,
) -> Result {
Ok(())
}
}
pub fn unhandled<B>() -> Box<Unhandled<B>> {
Box::new(Unhandled {
phantom: PhantomData,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::entity::Context;
use test_log::test;
#[derive(Default)]
struct TestState;
struct TestCommand;
#[derive(Copy, Clone, Debug, PartialEq)]
struct TestEvent;
struct TestBehavior;
impl EventSourcedBehavior for TestBehavior {
type State = TestState;
type Command = TestCommand;
type Event = TestEvent;
fn for_command(
_context: &Context,
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
todo!()
}
fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {
todo!()
}
}
struct TestHandler {
expected: EventEnvelope<TestEvent>,
}
#[async_trait]
impl Handler<TestEvent> for TestHandler {
async fn process(
&mut self,
envelope: EventEnvelope<TestEvent>,
) -> io::Result<EventEnvelope<TestEvent>> {
assert_eq!(envelope.deletion_event, self.expected.deletion_event);
assert_eq!(envelope.entity_id, self.expected.entity_id);
assert_eq!(envelope.seq_nr, self.expected.seq_nr);
assert_eq!(envelope.event, self.expected.event);
Ok(envelope)
}
}
struct TestEntityOps {
expected_get_entity_id: EntityId,
get_result: TestState,
expected_update: EventEnvelope<TestEvent>,
}
impl EntityOps<TestBehavior> for TestEntityOps {
fn get(&mut self, entity_id: &EntityId) -> Option<&TestState> {
assert_eq!(entity_id, &self.expected_get_entity_id);
Some(&self.get_result)
}
fn update(&mut self, envelope: EventEnvelope<TestEvent>) -> u64 {
assert_eq!(envelope.deletion_event, self.expected_update.deletion_event);
assert_eq!(envelope.entity_id, self.expected_update.entity_id);
assert_eq!(envelope.seq_nr, self.expected_update.seq_nr);
assert_eq!(envelope.event, self.expected_update.event);
envelope.seq_nr
}
}
#[test(tokio::test)]
async fn test_persist_then_reply() {
let entity_id = EntityId::from("entity-id");
let expected = EventEnvelope {
deletion_event: false,
entity_id: entity_id.clone(),
seq_nr: 1,
event: TestEvent,
timestamp: Utc::now(),
};
let mut handler = TestHandler {
expected: expected.clone(),
};
let mut entity_ops = TestEntityOps {
expected_get_entity_id: entity_id.clone(),
get_result: TestState,
expected_update: expected,
};
let (reply_to, reply_to_receiver) = oneshot::channel();
let reply_value = 1;
assert!(persist_event(TestEvent)
.then_reply(move |_s| Some((reply_to, reply_value)))
.process(
&TestBehavior,
&mut handler,
&mut entity_ops,
&entity_id,
&mut 0,
Ok(()),
)
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(reply_value));
}
#[test(tokio::test)]
async fn test_reply_then_persist() {
let entity_id = EntityId::from("entity-id");
let expected = EventEnvelope {
deletion_event: false,
entity_id: entity_id.clone(),
seq_nr: 1,
event: TestEvent,
timestamp: Utc::now(),
};
let mut handler = TestHandler {
expected: expected.clone(),
};
let mut entity_ops = TestEntityOps {
expected_get_entity_id: entity_id.clone(),
get_result: TestState,
expected_update: expected,
};
let (reply_to, reply_to_receiver) = oneshot::channel();
let reply_value = 1;
assert!(reply(reply_to, reply_value)
.then_persist_event(|_s| Some(TestEvent))
.process(
&TestBehavior,
&mut handler,
&mut entity_ops,
&entity_id,
&mut 0,
Ok(()),
)
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(reply_value));
}
}