use std::fmt::Display;
use akka_persistence_rs::{EntityId, Tag, WithPersistenceId, WithTags};
use mqtt::{TopicFilter, TopicNameRef};
use regex::Regex;
#[derive(Clone)]
pub struct EntityIdOffset {
pub entity_id: EntityId,
pub seq_nr: u64,
}
#[derive(Clone)]
pub struct ComparableRegex(pub Regex);
impl PartialEq for ComparableRegex {
fn eq(&self, other: &Self) -> bool {
self.0.as_str() == other.0.as_str()
}
}
impl Eq for ComparableRegex {}
impl PartialOrd for ComparableRegex {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ComparableRegex {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.as_str().cmp(other.0.as_str())
}
}
#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)]
pub struct TopicMatcher(TopicFilter);
#[derive(Debug)]
pub struct BadTopicMatcher;
impl TopicMatcher {
pub fn new<S: Into<String>>(matcher: S) -> Result<Self, BadTopicMatcher> {
Ok(Self(
TopicFilter::new(matcher).map_err(|_| BadTopicMatcher)?,
))
}
}
impl Display for TopicMatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Clone)]
pub enum FilterCriteria {
ExcludeTags { tags: Vec<Tag> },
RemoveExcludeTags { tags: Vec<Tag> },
IncludeTags { tags: Vec<Tag> },
RemoveIncludeTags { tags: Vec<Tag> },
ExcludeRegexEntityIds { matching: Vec<ComparableRegex> },
RemoveExcludeRegexEntityIds { matching: Vec<ComparableRegex> },
IncludeRegexEntityIds { matching: Vec<ComparableRegex> },
RemoveIncludeRegexEntityIds { matching: Vec<ComparableRegex> },
ExcludeEntityIds { entity_ids: Vec<EntityId> },
RemoveExcludeEntityIds { entity_ids: Vec<EntityId> },
IncludeEntityIds {
entity_id_offsets: Vec<EntityIdOffset>,
},
RemoveIncludeEntityIds { entity_ids: Vec<EntityId> },
IncludeTopics { expressions: Vec<TopicMatcher> },
RemoveIncludeTopics { expressions: Vec<TopicMatcher> },
}
pub fn exclude_all() -> FilterCriteria {
FilterCriteria::ExcludeRegexEntityIds {
matching: vec![ComparableRegex(Regex::new(".*").unwrap())],
}
}
pub struct Filter {
topic_tag_prefix: Tag,
max_tags: usize,
exclude_tags: Vec<Tag>,
include_tags: Vec<Tag>,
max_regex_entity_ids: usize,
exclude_regex_entity_ids: Vec<ComparableRegex>,
include_regex_entity_ids: Vec<ComparableRegex>,
max_entity_ids: usize,
exclude_entity_ids: Vec<EntityId>,
include_entity_ids: Vec<EntityId>,
max_topics: usize,
include_topics: Vec<TopicMatcher>,
}
impl Default for Filter {
fn default() -> Self {
Self {
topic_tag_prefix: Tag::from("t:"),
max_tags: 10,
exclude_tags: vec![],
include_tags: vec![],
max_regex_entity_ids: 10,
exclude_regex_entity_ids: vec![],
include_regex_entity_ids: vec![],
max_entity_ids: 10,
exclude_entity_ids: vec![],
include_entity_ids: vec![],
max_topics: 10,
include_topics: vec![],
}
}
}
impl Filter {
pub fn new(
topic_tag_prefix: Tag,
max_tags: usize,
max_regex_entity_ids: usize,
max_entity_ids: usize,
max_topics: usize,
) -> Self {
Self {
topic_tag_prefix,
max_tags,
exclude_tags: vec![],
include_tags: vec![],
max_regex_entity_ids,
exclude_regex_entity_ids: vec![],
include_regex_entity_ids: vec![],
max_entity_ids,
exclude_entity_ids: vec![],
include_entity_ids: vec![],
max_topics,
include_topics: vec![],
}
}
pub fn matches<Envelope>(&self, envelope: &Envelope) -> bool
where
Envelope: WithPersistenceId + WithTags,
{
let tags = envelope.tags();
let persistence_id = envelope.persistence_id();
let entity_id = &persistence_id.entity_id;
if self.matches_exclude_tags(tags)
|| self.matches_exclude_entity_ids(entity_id)
|| self.matches_exclude_regex_entity_ids(entity_id)
{
self.matches_include_tags(tags)
|| self.matches_include_topics(tags)
|| self.matches_include_entity_ids(entity_id)
|| self.matches_include_regex_entity_ids(entity_id)
} else {
true
}
}
fn matches_exclude_regex_entity_ids(&self, entity_id: &EntityId) -> bool {
Self::matches_regex_entity_ids(&self.exclude_regex_entity_ids, entity_id)
}
fn matches_include_regex_entity_ids(&self, entity_id: &EntityId) -> bool {
Self::matches_regex_entity_ids(&self.include_regex_entity_ids, entity_id)
}
fn matches_exclude_entity_ids(&self, entity_id: &EntityId) -> bool {
Self::matches_entity_ids(&self.exclude_entity_ids, entity_id)
}
fn matches_include_entity_ids(&self, entity_id: &EntityId) -> bool {
Self::matches_entity_ids(&self.include_entity_ids, entity_id)
}
fn matches_exclude_tags(&self, tags: &[Tag]) -> bool {
Self::matches_tags(&self.exclude_tags, tags)
}
fn matches_include_tags(&self, tags: &[Tag]) -> bool {
Self::matches_tags(&self.include_tags, tags)
}
fn matches_include_topics(&self, tags: &[Tag]) -> bool {
Self::matches_topics(&self.include_topics, &self.topic_tag_prefix, tags)
}
fn matches_regex_entity_ids(matching: &[ComparableRegex], entity_id: &EntityId) -> bool {
matching.iter().any(|r| r.0.is_match(entity_id))
}
fn matches_entity_ids(entity_ids: &[EntityId], entity_id: &EntityId) -> bool {
entity_ids.iter().any(|pi| pi == entity_id)
}
fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool {
match_tags.iter().any(|mt| tags.iter().any(|t| t == mt))
}
fn matches_topics(expressions: &[TopicMatcher], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool {
let topic_tag_prefix_len = topic_tag_prefix.len();
expressions.iter().any(|r| {
let matcher = r.0.get_matcher();
tags.iter()
.filter(|t| t.starts_with(topic_tag_prefix.as_str()))
.any(|t| {
let topic_name = TopicNameRef::new(&t[topic_tag_prefix_len..]);
if let Ok(topic_name) = topic_name {
matcher.is_match(topic_name)
} else {
false
}
})
})
}
pub fn update(&mut self, criteria: Vec<FilterCriteria>) {
for criterion in criteria {
match criterion {
FilterCriteria::ExcludeTags { mut tags } => {
merge(&mut self.exclude_tags, &mut tags, self.max_tags)
}
FilterCriteria::RemoveExcludeTags { tags } => remove(&mut self.exclude_tags, &tags),
FilterCriteria::IncludeTags { mut tags } => {
merge(&mut self.include_tags, &mut tags, self.max_tags)
}
FilterCriteria::RemoveIncludeTags { tags } => remove(&mut self.include_tags, &tags),
FilterCriteria::ExcludeRegexEntityIds { mut matching } => merge(
&mut self.exclude_regex_entity_ids,
&mut matching,
self.max_regex_entity_ids,
),
FilterCriteria::RemoveExcludeRegexEntityIds { matching } => {
remove(&mut self.exclude_regex_entity_ids, &matching)
}
FilterCriteria::IncludeRegexEntityIds { mut matching } => merge(
&mut self.include_regex_entity_ids,
&mut matching,
self.max_regex_entity_ids,
),
FilterCriteria::RemoveIncludeRegexEntityIds { matching } => {
remove(&mut self.include_regex_entity_ids, &matching)
}
FilterCriteria::ExcludeEntityIds { mut entity_ids } => merge(
&mut self.exclude_entity_ids,
&mut entity_ids,
self.max_entity_ids,
),
FilterCriteria::RemoveExcludeEntityIds { entity_ids } => {
remove(&mut self.exclude_entity_ids, &entity_ids)
}
FilterCriteria::IncludeEntityIds { entity_id_offsets } => merge(
&mut self.include_entity_ids,
&mut entity_id_offsets
.into_iter()
.map(|EntityIdOffset { entity_id, .. }| entity_id)
.collect(),
self.max_entity_ids,
),
FilterCriteria::RemoveIncludeEntityIds { entity_ids } => {
remove(&mut self.include_entity_ids, &entity_ids)
}
FilterCriteria::IncludeTopics { mut expressions } => {
merge(&mut self.include_topics, &mut expressions, self.max_topics)
}
FilterCriteria::RemoveIncludeTopics { expressions } => {
remove(&mut self.include_topics, &expressions)
}
};
}
}
}
fn merge<T>(l: &mut Vec<T>, r: &mut Vec<T>, max_len: usize)
where
T: Ord,
{
if l.len() < max_len && r.len() < max_len {
l.append(r);
l.sort();
l.dedup();
}
}
fn remove<T>(l: &mut Vec<T>, r: &[T])
where
T: PartialEq,
{
l.retain(|existing| !r.contains(existing));
}
#[cfg(test)]
mod tests {
use akka_persistence_rs::PersistenceId;
use super::*;
struct TestEnvelope {
persistence_id: PersistenceId,
tags: Vec<Tag>,
}
impl WithPersistenceId for TestEnvelope {
fn persistence_id(&self) -> &PersistenceId {
&self.persistence_id
}
}
impl WithTags for TestEnvelope {
fn tags(&self) -> &[Tag] {
&self.tags
}
}
#[test]
fn exclude_include_and_remove_include_tag_and_remove_exclude_tag() {
let persistence_id = "a|1".parse::<PersistenceId>().unwrap();
let tag = Tag::from("a");
let envelope = TestEnvelope {
persistence_id: persistence_id.clone(),
tags: vec![tag.clone()],
};
let mut filter = Filter::default();
let criteria = vec![
FilterCriteria::ExcludeTags {
tags: vec![tag.clone()],
},
FilterCriteria::IncludeTags {
tags: vec![tag.clone()],
},
];
filter.update(criteria);
assert!(filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveIncludeTags {
tags: vec![tag.clone()],
}];
filter.update(criteria);
assert!(!filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveExcludeTags { tags: vec![tag] }];
filter.update(criteria);
assert!(filter.matches(&envelope));
}
#[test]
fn exclude_include_and_remove_include_entity_id_and_remove_exclude_entity_id() {
let persistence_id = "a|1".parse::<PersistenceId>().unwrap();
let entity_id = persistence_id.entity_id.clone();
let envelope = TestEnvelope {
persistence_id: persistence_id.clone(),
tags: vec![],
};
let mut filter = Filter::default();
let criteria = vec![
FilterCriteria::ExcludeEntityIds {
entity_ids: vec![entity_id.clone()],
},
FilterCriteria::IncludeEntityIds {
entity_id_offsets: vec![EntityIdOffset {
entity_id: entity_id.clone(),
seq_nr: 0,
}],
},
];
filter.update(criteria);
assert!(filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveIncludeEntityIds {
entity_ids: vec![entity_id.clone()],
}];
filter.update(criteria);
assert!(!filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveExcludeEntityIds {
entity_ids: vec![entity_id.clone()],
}];
filter.update(criteria);
assert!(filter.matches(&envelope));
}
#[test]
fn exclude_include_and_remove_include_regex_entity_id_and_remove_exclude_regex_entity_id() {
let persistence_id = "a|1".parse::<PersistenceId>().unwrap();
let matching = ComparableRegex(Regex::new("1").unwrap());
let envelope = TestEnvelope {
persistence_id: persistence_id.clone(),
tags: vec![],
};
let mut filter = Filter::default();
let criteria = vec![
FilterCriteria::ExcludeRegexEntityIds {
matching: vec![matching.clone()],
},
FilterCriteria::IncludeRegexEntityIds {
matching: vec![matching.clone()],
},
];
filter.update(criteria);
assert!(filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveIncludeRegexEntityIds {
matching: vec![matching.clone()],
}];
filter.update(criteria);
assert!(!filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveExcludeRegexEntityIds {
matching: vec![matching.clone()],
}];
filter.update(criteria);
assert!(filter.matches(&envelope));
}
#[test]
fn include_and_remove_include_topic() {
let persistence_id = "a|1".parse::<PersistenceId>().unwrap();
let tag = Tag::from("t:sport/abc/player1");
let expression = TopicMatcher::new("sport/+/player1").unwrap();
let envelope = TestEnvelope {
persistence_id: persistence_id.clone(),
tags: vec![tag.clone()],
};
let mut filter = Filter::default();
let criteria = vec![
exclude_all(),
FilterCriteria::IncludeTopics {
expressions: vec![expression.clone()],
},
];
filter.update(criteria);
assert!(filter.matches(&envelope));
let criteria = vec![FilterCriteria::RemoveIncludeTopics {
expressions: vec![expression.clone()],
}];
filter.update(criteria);
assert!(!filter.matches(&envelope));
}
}