The domain model represents the core business concepts of our Kafka-compatible system. Following Domain-Driven Design principles, these entities capture the essential behavior and invariants of message streaming.
Purpose: A named channel that organizes and categorizes messages.
Attributes:
name: TopicName
- Unique identifier for the topicpartitions: Vec<Partition>
- List of partitions (simplified to 1)created_at: DateTime
- When the topic was createdmessage_count: u64
- Total messages ever sent to this topicInvariants:
Behavior:
impl Topic {
pub fn new(name: TopicName) -> Self;
pub fn add_message(&mut self, message: Message) -> Result<Offset>;
pub fn get_messages(&self, from_offset: Offset, limit: usize) -> Vec<Message>;
pub fn get_partition(&self, partition_id: u32) -> Option<&Partition>;
}
Purpose: The fundamental unit of data that flows through the system.
Attributes:
id: MessageId
- Unique identifier (UUID)key: Option<String>
- Optional message key for partitioningvalue: Vec<u8>
- The actual message payloadtimestamp: DateTime
- When the message was createdoffset: Option<Offset>
- Position in partition (set when stored)headers: HashMap<String, String>
- Optional message metadataInvariants:
Behavior:
impl Message {
pub fn new(key: Option<String>, value: Vec<u8>) -> Self;
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self;
pub fn assign_offset(&mut self, offset: Offset);
pub fn size(&self) -> usize;
}
Purpose: An ordered, immutable sequence of messages within a topic.
Attributes:
id: PartitionId
- Identifier within the topicmessages: Vec<Message>
- Ordered list of messageshigh_watermark: Offset
- Highest committed offsetcreated_at: DateTime
- When partition was createdInvariants:
Behavior:
impl Partition {
pub fn new(id: PartitionId) -> Self;
pub fn append_message(&mut self, message: Message) -> Offset;
pub fn get_messages(&self, from: Offset, limit: usize) -> &[Message];
pub fn get_high_watermark(&self) -> Offset;
pub fn message_count(&self) -> usize;
}
Purpose: Represents a client that reads messages from topics.
Attributes:
id: ConsumerId
- Unique consumer identifiergroup_id: Option<String>
- Consumer group (simplified)subscriptions: HashSet<TopicName>
- Subscribed topicsoffsets: HashMap<TopicPartition, Offset>
- Current positionslast_heartbeat: DateTime
- For connection managementInvariants:
Behavior:
impl Consumer {
pub fn new(id: ConsumerId, group_id: Option<String>) -> Self;
pub fn subscribe(&mut self, topics: Vec<TopicName>);
pub fn fetch_messages(&self, topic: &TopicName, limit: usize) -> Vec<Message>;
pub fn commit_offset(&mut self, topic_partition: TopicPartition, offset: Offset);
pub fn get_current_offset(&self, topic_partition: &TopicPartition) -> Offset;
}
Purpose: Represents a client that sends messages to topics.
Attributes:
id: ProducerId
- Unique producer identifierclient_id: String
- Human-readable client nametransaction_id: Option<String>
- For transactional producerscreated_at: DateTime
- When producer was createdmessage_count: u64
- Total messages sentInvariants:
Behavior:
impl Producer {
pub fn new(id: ProducerId, client_id: String) -> Self;
pub fn send_message(&mut self, topic: TopicName, message: Message) -> Result<Offset>;
pub fn send_batch(&mut self, topic: TopicName, messages: Vec<Message>) -> Result<Vec<Offset>>;
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicName(String);
impl TopicName {
pub fn new(name: String) -> Result<Self, ValidationError> {
if name.is_empty() || name.len() > 255 {
return Err(ValidationError::InvalidTopicName);
}
// Additional validation rules...
Ok(TopicName(name))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Offset(u64);
impl Offset {
pub fn new(value: u64) -> Self { Offset(value) }
pub fn next(&self) -> Self { Offset(self.0 + 1) }
pub fn value(&self) -> u64 { self.0 }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageId(Uuid);
impl MessageId {
pub fn new() -> Self { MessageId(Uuid::new_v4()) }
pub fn from_string(s: &str) -> Result<Self, uuid::Error> {
Ok(MessageId(Uuid::parse_str(s)?))
}
}
Purpose: Determines which partition a message should go to.
pub struct MessageRoutingService;
impl MessageRoutingService {
pub fn route_message(
&self,
message: &Message,
topic: &Topic
) -> PartitionId {
// Simplified: always use partition 0
PartitionId(0)
}
}
Purpose: Manages consumer offset tracking and commits.
pub struct OffsetManagementService {
offsets: Arc<DashMap<(ConsumerId, TopicPartition), Offset>>,
}
impl OffsetManagementService {
pub fn get_offset(&self, consumer: ConsumerId, tp: TopicPartition) -> Offset;
pub fn commit_offset(&self, consumer: ConsumerId, tp: TopicPartition, offset: Offset);
pub fn reset_offset(&self, consumer: ConsumerId, tp: TopicPartition, offset: Offset);
}
Purpose: Handles topic lifecycle and metadata.
pub struct TopicManagementService {
topics: Arc<DashMap<TopicName, Topic>>,
}
impl TopicManagementService {
pub fn create_topic(&self, name: TopicName) -> Result<(), DomainError>;
pub fn get_topic(&self, name: &TopicName) -> Option<Topic>;
pub fn list_topics(&self) -> Vec<TopicName>;
pub fn topic_exists(&self, name: &TopicName) -> bool;
}
#[async_trait]
pub trait TopicRepository: Send + Sync {
async fn save(&self, topic: &Topic) -> Result<(), RepositoryError>;
async fn find_by_name(&self, name: &TopicName) -> Result<Option<Topic>, RepositoryError>;
async fn list_all(&self) -> Result<Vec<Topic>, RepositoryError>;
async fn delete(&self, name: &TopicName) -> Result<(), RepositoryError>;
}
#[async_trait]
pub trait MessageRepository: Send + Sync {
async fn store_message(
&self,
topic: &TopicName,
partition: PartitionId,
message: Message
) -> Result<Offset, RepositoryError>;
async fn fetch_messages(
&self,
topic: &TopicName,
partition: PartitionId,
from_offset: Offset,
limit: usize,
) -> Result<Vec<Message>, RepositoryError>;
}
#[async_trait]
pub trait OffsetRepository: Send + Sync {
async fn save_offset(
&self,
consumer: ConsumerId,
topic_partition: TopicPartition,
offset: Offset,
) -> Result<(), RepositoryError>;
async fn load_offset(
&self,
consumer: ConsumerId,
topic_partition: &TopicPartition,
) -> Result<Option<Offset>, RepositoryError>;
}
#[derive(Debug, Clone)]
pub struct MessageProduced {
pub topic: TopicName,
pub partition: PartitionId,
pub offset: Offset,
pub message_id: MessageId,
pub producer_id: ProducerId,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct MessageConsumed {
pub topic: TopicName,
pub partition: PartitionId,
pub offset: Offset,
pub message_id: MessageId,
pub consumer_id: ConsumerId,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct TopicCreated {
pub topic: TopicName,
pub partition_count: u32,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, thiserror::Error)]
pub enum DomainError {
#[error("Topic '{0}' not found")]
TopicNotFound(TopicName),
#[error("Invalid topic name: {0}")]
InvalidTopicName(String),
#[error("Message offset {0} not found")]
OffsetNotFound(Offset),
#[error("Consumer '{0}' not subscribed to topic '{1}'")]
ConsumerNotSubscribed(ConsumerId, TopicName),
#[error("Partition {0} does not exist")]
PartitionNotFound(u32),
}
This domain model provides a solid foundation for implementing Kafka-like behavior while remaining educational and understandable. The simplified design focuses on core concepts without overwhelming complexity.