Kafka-RS implements a subset of the Apache Kafka wire protocol to maintain compatibility with existing Kafka clients like KafkaJS. This document describes the supported APIs and their behavior.
API Key | Name | Description | Status |
---|---|---|---|
0 | Produce | Send messages to topics | ✅ Implemented |
1 | Fetch | Retrieve messages from topics | ✅ Implemented |
3 | Metadata | Get topic and broker information | ✅ Implemented |
8 | OffsetCommit | Commit consumer offsets | 🔄 Basic implementation |
9 | OffsetFetch | Retrieve consumer offsets | 🔄 Basic implementation |
Purpose: Send messages to a topic
Request Format:
ProduceRequest =>
TransactionalId => NULLABLE_STRING
Acks => INT16
TimeoutMs => INT32
TopicData => ARRAY
Topic => STRING
PartitionData => ARRAY
Partition => INT32
RecordSet => RECORDS
Response Format:
ProduceResponse =>
ThrottleTimeMs => INT32
Responses => ARRAY
Topic => STRING
PartitionResponses => ARRAY
Partition => INT32
ErrorCode => INT16
BaseOffset => INT64
LogAppendTimeMs => INT64
LogStartOffset => INT64
Behavior:
Error Codes:
Purpose: Retrieve messages from a topic
Request Format:
FetchRequest =>
ReplicaId => INT32
MaxWaitMs => INT32
MinBytes => INT32
MaxBytes => INT32
IsolationLevel => INT8
SessionId => INT32
SessionEpoch => INT32
Topics => ARRAY
Topic => STRING
Partitions => ARRAY
Partition => INT32
CurrentLeaderEpoch => INT32
FetchOffset => INT64
LogStartOffset => INT64
PartitionMaxBytes => INT32
Response Format:
FetchResponse =>
ThrottleTimeMs => INT32
ErrorCode => INT16
SessionId => INT32
Responses => ARRAY
Topic => STRING
Partitions => ARRAY
Partition => INT32
ErrorCode => INT16
HighWatermark => INT64
LastStableOffset => INT64
LogStartOffset => INT64
AbortedTransactions => ARRAY
PreferredReadReplica => INT32
Records => RECORDS
Behavior:
Purpose: Get cluster and topic metadata
Request Format:
MetadataRequest =>
Topics => NULLABLE_ARRAY
Topic => STRING
AllowAutoTopicCreation => BOOLEAN
IncludeClusterAuthorizedOperations => BOOLEAN
IncludeTopicAuthorizedOperations => BOOLEAN
Response Format:
MetadataResponse =>
ThrottleTimeMs => INT32
Brokers => ARRAY
NodeId => INT32
Host => STRING
Port => INT32
Rack => NULLABLE_STRING
ClusterId => NULLABLE_STRING
ControllerId => INT32
Topics => ARRAY
ErrorCode => INT16
Name => STRING
IsInternal => BOOLEAN
Partitions => ARRAY
ErrorCode => INT16
PartitionIndex => INT32
LeaderId => INT32
LeaderEpoch => INT32
ReplicaNodes => ARRAY
IsrNodes => ARRAY
OfflineReplicas => ARRAY
Behavior:
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
// Producer
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'my-topic',
messages: [
{ value: 'Hello World!' },
{ key: 'user-123', value: 'User data' }
],
});
await producer.disconnect();
// Consumer
const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
Connection Lifecycle:
Error Handling:
pub struct Message {
pub id: MessageId, // UUID
pub key: Option<String>, // Optional message key
pub value: Vec<u8>, // Message payload
pub timestamp: DateTime<Utc>, // Creation timestamp
pub offset: Option<Offset>, // Set when stored
pub headers: HashMap<String, String>, // Message headers
}
Messages on the wire follow Kafka’s record format:
Each API request is logged with:
[INFO] Starting Kafka-RS server on 127.0.0.1:9092
[INFO] New connection from: 127.0.0.1:52341
[DEBUG] Processing PRODUCE request (correlation_id: 1)
[DEBUG] Produce request for topic: test-topic
[INFO] Creating new topic: test-topic
[DEBUG] Message stored at offset 0 in topic test-topic
[DEBUG] Sent response (45 bytes)
[DEBUG] Processing FETCH request (correlation_id: 2)
[DEBUG] Fetch request for topic: test-topic, offset: 0, max_bytes: 1048576
[DEBUG] Retrieved 1 messages from topic test-topic
[DEBUG] Sent response (89 bytes)
Code | Name | Description |
---|---|---|
0 | NONE | No error |
-1 | UNKNOWN | Generic error |
3 | UNKNOWN_TOPIC_OR_PARTITION | Topic does not exist |
6 | NOT_LEADER_FOR_PARTITION | This server is not the leader |
Error responses maintain the same correlation ID as the request and include appropriate error codes in the response structure.
This implementation is intentionally simplified for educational purposes:
To extend this implementation: