use async_trait::async_trait;
use chrono::Utc;
use std::result::Result as StdResult;
use std::{future::Future, io, marker::PhantomData};
use tokio::sync::oneshot;
use crate::{
entity::EventSourcedBehavior,
entity_manager::{EntityOps, EventEnvelope, Handler},
EntityId,
};
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
pub type Result = StdResult<(), Error>;
#[async_trait]
pub trait Effect<B>: Send
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result;
}
pub struct And<E, L, R> {
_l: L,
_r: R,
phantom: PhantomData<E>,
}
#[async_trait]
impl<B, L, R> Effect<B> for And<B, L, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
L: Effect<B>,
R: Effect<B>,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let r = self
._l
.process(
behavior,
handler,
entities,
entity_id,
last_seq_nr,
prev_result,
)
.await;
if r.is_ok() {
self._r
.process(behavior, handler, entities, entity_id, last_seq_nr, r)
.await
} else {
r
}
}
}
impl<B, L, R> EffectExt<B> for And<B, L, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
L: Effect<B>,
R: Effect<B>,
{
}
pub trait EffectExt<B>: Effect<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
fn and<R>(self, r: R) -> And<B, Self, R>
where
Self: Sized,
R: Effect<B>,
{
And {
_l: self,
_r: r,
phantom: PhantomData,
}
}
fn and_then<F, R>(self, f: F) -> And<B, Self, Then<B, F, R>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result>,
{
And {
_l: self,
_r: Then {
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
fn and_then_emit_event<F, R>(self, f: F) -> And<B, Self, ThenEmitEvent<B, F, R>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
And {
_l: self,
_r: ThenEmitEvent {
deletion_event: false,
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
fn and_then_emit_deletion_event<F, R>(self, f: F) -> And<B, Self, ThenEmitEvent<B, F, R>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
And {
_l: self,
_r: ThenEmitEvent {
deletion_event: true,
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
fn and_then_reply<F, R, T>(self, f: F) -> And<B, Self, ThenReply<B, F, R, T>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
T: Send,
{
And {
_l: self,
_r: ThenReply {
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}
fn boxed(self) -> Box<dyn Effect<B>>
where
Self: Sized + 'static,
{
Box::new(self)
}
}
pub struct EmitEvent<B>
where
B: EventSourcedBehavior,
{
deletion_event: bool,
event: Option<B::Event>,
}
#[async_trait]
impl<B> Effect<B> for EmitEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
B::Event: Send,
{
async fn process(
&mut self,
_behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
if prev_result.is_ok() {
if let Some(event) = self.event.take() {
let seq_nr = last_seq_nr.wrapping_add(1);
let envelope = EventEnvelope {
entity_id: entity_id.clone(),
seq_nr,
timestamp: Utc::now(),
event,
deletion_event: self.deletion_event,
};
let result = handler.process(envelope).await.map_err(Error::IoError);
if let Ok(envelope) = result {
*last_seq_nr = entities.update(envelope);
Ok(())
} else {
result.map(|_| ())
}
} else {
prev_result
}
} else {
prev_result
}
}
}
impl<B> EffectExt<B> for EmitEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send,
B::Event: Send,
{
}
pub fn emit_event<B>(event: B::Event) -> EmitEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
EmitEvent {
deletion_event: false,
event: Some(event),
}
}
pub fn emit_deletion_event<B>(event: B::Event) -> EmitEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
EmitEvent {
deletion_event: true,
event: Some(event),
}
}
pub struct Reply<B, T> {
replier: Option<(oneshot::Sender<T>, T)>,
phantom: PhantomData<B>,
}
#[async_trait]
impl<B, T> Effect<B> for Reply<B, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
T: Send,
{
async fn process(
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
if prev_result.is_ok() {
if let Some((reply_to, reply)) = self.replier.take() {
let _ = reply_to.send(reply);
}
}
prev_result
}
}
impl<B, T> EffectExt<B> for Reply<B, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
T: Send,
{
}
pub fn reply<B, T>(reply_to: oneshot::Sender<T>, reply: T) -> Reply<B, T> {
Reply {
replier: Some((reply_to, reply)),
phantom: PhantomData,
}
}
pub struct Then<B, F, R> {
f: Option<F>,
phantom: PhantomData<(B, R)>,
}
#[async_trait]
impl<B, F, R> Effect<B> for Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result> + Send,
{
async fn process(
&mut self,
behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
f(behavior, entities.get(entity_id), prev_result).await
} else {
Ok(())
}
}
}
impl<B, F, R> EffectExt<B> for Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result> + Send,
{
}
pub fn then<B, F, R>(f: F) -> Then<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = Result>,
{
Then {
f: Some(f),
phantom: PhantomData,
}
}
pub struct ThenEmitEvent<B, F, R>
where
B: EventSourcedBehavior,
{
deletion_event: bool,
f: Option<F>,
phantom: PhantomData<(B, R)>,
}
#[async_trait]
impl<B, F, R> Effect<B> for ThenEmitEvent<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
B::Event: Send,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
let result = f(behavior, entities.get(entity_id), prev_result).await;
if let Ok(event) = result {
let mut effect = EmitEvent::<B> {
deletion_event: self.deletion_event,
event,
};
effect
.process(behavior, handler, entities, entity_id, last_seq_nr, Ok(()))
.await
} else {
result.map(|_| ())
}
} else {
Ok(())
}
}
}
impl<B, F, R> EffectExt<B> for ThenEmitEvent<B, F, R>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
B::Event: Send,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
}
pub struct ThenReply<B, F, R, T> {
f: Option<F>,
phantom: PhantomData<(B, R, T)>,
}
#[async_trait]
impl<B, F, R, T> Effect<B> for ThenReply<B, F, R, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
T: Send,
{
async fn process(
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
) -> Result {
let f = self.f.take();
if let Some(f) = f {
let result = f(behavior, entities.get(entity_id), prev_result).await;
if let Ok(replier) = result {
let mut effect = Reply {
replier,
phantom: PhantomData,
};
effect
.process(behavior, handler, entities, entity_id, last_seq_nr, Ok(()))
.await
} else {
Ok(())
}
} else {
Ok(())
}
}
}
impl<B, F, R, T> EffectExt<B> for ThenReply<B, F, R, T>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
T: Send,
{
}
pub struct Unhandled<E> {
phantom: PhantomData<E>,
}
#[async_trait]
impl<B> Effect<B> for Unhandled<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
{
async fn process(
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
_prev_result: Result,
) -> Result {
Ok(())
}
}
pub fn unhandled<B>() -> Box<Unhandled<B>> {
Box::new(Unhandled {
phantom: PhantomData,
})
}