Kafka-RS implements a subset of the Apache Kafka wire protocol to maintain compatibility with existing Kafka clients. This document explains how the protocol is implemented and the educational simplifications made.
All Kafka protocol messages follow this structure:
Message => Size MessageContent
Size => INT32 // Big-endian message size
MessageContent => RequestMessage | ResponseMessage
RequestMessage => RequestHeader RequestBody
RequestHeader => ApiKey ApiVersion CorrelationId ClientId
ApiKey => INT16 // Identifies the request type
ApiVersion => INT16 // Protocol version
CorrelationId => INT32 // Matches request with response
ClientId => NULLABLE_STRING // Client identifier
RequestBody => (varies by API)
ResponseMessage => ResponseHeader ResponseBody
ResponseHeader => CorrelationId
CorrelationId => INT32 // Matches the request
ResponseBody => (varies by API)
Type | Size | Encoding | Description |
---|---|---|---|
INT8 | 1 byte | Big-endian | Signed 8-bit integer |
INT16 | 2 bytes | Big-endian | Signed 16-bit integer |
INT32 | 4 bytes | Big-endian | Signed 32-bit integer |
INT64 | 8 bytes | Big-endian | Signed 64-bit integer |
// Nullable string encoding
pub fn encode_string(buf: &mut BytesMut, value: Option<&str>) -> io::Result<()> {
match value {
Some(s) => {
let bytes = s.as_bytes();
encode_i16(buf, bytes.len() as i16); // Length prefix
buf.put_slice(bytes); // UTF-8 bytes
}
None => {
encode_i16(buf, -1); // Null marker
}
}
Ok(())
}
// Array with length prefix
encode_i32(buf, array.len() as i32); // Element count
for item in array {
// Encode each item
item.encode(buf)?;
}
// Nullable bytes encoding
pub fn encode_bytes(buf: &mut BytesMut, value: Option<&[u8]>) -> io::Result<()> {
match value {
Some(bytes) => {
encode_i32(buf, bytes.len() as i32); // Length prefix
buf.put_slice(bytes); // Raw bytes
}
None => {
encode_i32(buf, -1); // Null marker
}
}
Ok(())
}
Purpose: Send messages to topics
Request Processing:
async fn handle_produce_request(&mut self, header: RequestHeader, buf: &mut BytesMut) -> anyhow::Result<()> {
// 1. Decode the produce request
let request = ProduceRequest::decode(buf)?;
// 2. Process each message
for msg in request.messages {
let key = msg.key.map(|k| String::from_utf8_lossy(&k).to_string());
let value = msg.value.unwrap_or_default();
// 3. Store the message (creates topic if needed)
let offset = self.send_message_use_case.execute(
request.topic.clone(),
key,
value
).await?;
}
// 4. Send acknowledgment response
self.send_produce_response(header.correlation_id, &request.topic, offsets).await?;
}
Simplified Request Format:
ProduceRequest =>
TransactionalId => NULLABLE_STRING // Ignored (no transactions)
Acks => INT16 // Always treated as 1
TimeoutMs => INT32 // Ignored (synchronous)
TopicData => ARRAY
Topic => STRING
PartitionData => ARRAY
Partition => INT32 // Always 0 (single partition)
RecordSet => RECORDS
Response Format:
ProduceResponse =>
ThrottleTimeMs => INT32 // Always 0
Responses => ARRAY
Topic => STRING
PartitionResponses => ARRAY
Partition => INT32 // Always 0
ErrorCode => INT16 // 0 for success
BaseOffset => INT64 // First message offset
LogAppendTimeMs => INT64 // -1 (not used)
LogStartOffset => INT64 // Always 0
Purpose: Retrieve messages from topics
Request Processing:
async fn handle_fetch_request(&mut self, header: RequestHeader, buf: &mut BytesMut) -> anyhow::Result<()> {
// 1. Decode fetch request
let request = FetchRequest::decode(buf)?;
// 2. Generate consumer ID from correlation ID
let consumer_id = format!("consumer-{}", header.correlation_id);
// 3. Fetch messages from the topic
let messages = self.consume_messages_use_case.execute(
consumer_id,
request.topic.clone(),
100 // Max 100 messages
).await?;
// 4. Send fetch response
self.send_fetch_response(
header.correlation_id,
&request.topic,
messages,
request.offset as u64
).await?;
}
Message Record Format:
Record =>
Length => INT32
Attributes => INT8 // Always 0 (no compression)
TimestampDelta => VARINT // Simplified to full timestamp
OffsetDelta => VARINT // Always 0 (absolute offsets)
KeyLength => VARINT
Key => BYTES
ValueLength => VARINT
Value => BYTES
Headers => ARRAY // Empty array
Purpose: Get cluster and topic information
Response Structure:
async fn send_metadata_response(&mut self, correlation_id: i32, topics: Vec<String>) -> anyhow::Result<()> {
// Single broker metadata
encode_i32(&mut response, 1); // Broker count
encode_i32(&mut response, 0); // Broker ID
encode_string(&mut response, Some("localhost"))?;
encode_i32(&mut response, 9092); // Port
// Topic metadata
for topic in topics {
encode_i16(&mut response, 0); // Error code
encode_string(&mut response, Some(&topic))?;
encode_i8(&mut response, 0); // Not internal
// Single partition per topic
encode_i32(&mut response, 1); // Partition count
encode_i16(&mut response, 0); // Error code
encode_i32(&mut response, 0); // Partition ID
encode_i32(&mut response, 0); // Leader (broker 0)
// Replica and ISR (both contain just broker 0)
encode_i32(&mut response, 1); // Replica count
encode_i32(&mut response, 0); // Replica broker
encode_i32(&mut response, 1); // ISR count
encode_i32(&mut response, 0); // ISR broker
}
}
OffsetCommit (Key: 8) and OffsetFetch (Key: 9) are implemented with basic responses:
// Simple acknowledgment for offset commits
async fn send_offset_commit_response(&mut self, correlation_id: i32) -> anyhow::Result<()> {
let mut response = BytesMut::new();
let header = ResponseHeader { correlation_id };
header.encode(&mut response)?;
encode_i32(&mut response, 0); // Throttle time
encode_i32(&mut response, 0); // Empty topics array
self.send_response(response).await
}
Real Kafka: Topics can have multiple partitions for scalability Kafka-RS: Each topic has exactly one partition (ID: 0)
// Simplified partition assignment
pub fn route_message(&self, _message: &Message, _topic: &Topic) -> PartitionId {
PartitionId(0) // Always partition 0
}
Benefits:
Real Kafka: Asynchronous batching with configurable acknowledgments Kafka-RS: Synchronous processing with immediate acknowledgment
// Immediate acknowledgment after storage
let offset = topic.add_message(message)?;
self.topic_repo.save(&topic).await?;
// Response sent immediately
Benefits:
Real Kafka: Persistent log files with configurable retention Kafka-RS: In-memory storage with HashMap/DashMap
pub struct InMemoryTopicRepository {
topics: Arc<DashMap<TopicName, Topic>>, // Lost on restart
}
Benefits:
Real Kafka: Complex coordinator protocol with rebalancing Kafka-RS: Simple offset tracking per consumer ID
// Basic consumer identification
let consumer_id = format!("consumer-{}", header.correlation_id);
Benefits:
Every protocol operation is logged for educational visibility:
debug!("Processing {} request (correlation_id: {})",
match header.api_key {
ApiKey::Produce => "PRODUCE",
ApiKey::Fetch => "FETCH",
ApiKey::Metadata => "METADATA",
_ => "UNKNOWN",
},
header.correlation_id);
Each request-response pair is tracked:
// Request received
[DEBUG] Processing PRODUCE request (correlation_id: 123)
// Message processing
[INFO] Creating new topic: test-topic
[DEBUG] Stored message at offset 0 in topic test-topic
// Response sent
[DEBUG] Sent response (87 bytes)
Errors are handled gracefully with educational logging:
match ProduceRequest::decode(buf) {
Ok(request) => { /* process */ }
Err(e) => {
error!("Failed to decode produce request: {}", e);
self.send_error_response(header.correlation_id, -1).await?;
}
}
For optimal compatibility with KafkaJS:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
// Recommended settings for Kafka-RS
connectionTimeout: 3000,
requestTimeout: 30000,
enforceRequestTimeout: false,
// Disable features not implemented
ssl: false,
sasl: false,
});
const consumer = kafka.consumer({
groupId: 'my-group',
// Work well with simplified implementation
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
// Single partition optimization
maxWaitTimeInMs: 5000,
minBytes: 1,
maxBytes: 1024 * 1024,
});
// Add new API handler
ApiKey::NewApi => {
self.handle_new_api_request(header, buf).await?;
}
// Implement request decoder
impl KafkaDecodable for NewApiRequest {
fn decode(buf: &mut BytesMut) -> io::Result<Self> {
// Protocol-specific decoding
}
}
// Add response encoder
async fn send_new_api_response(&mut self, correlation_id: i32) -> anyhow::Result<()> {
// Build and send response
}
This protocol implementation demonstrates how a complex distributed system protocol can be simplified for educational purposes while maintaining compatibility with real clients.