git clone https://github.com/elsonwu/kafka-rs.git
cd kafka-rs
cargo build
cargo test
cargo run
This starts the Kafka-RS server on localhost:9092
(default Kafka port).
RUST_LOG=debug cargo run
cargo run -- --host 0.0.0.0 --port 9093
npm install kafkajs
// producer.js
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
const producer = kafka.producer();
async function run() {
await producer.connect();
console.log('Sending message...');
await producer.send({
topic: 'test-topic',
messages: [
{
key: 'user-1',
value: JSON.stringify({
userId: 1,
action: 'login',
timestamp: Date.now()
}),
},
],
});
console.log('Message sent!');
await producer.disconnect();
}
run().catch(console.error);
// consumer.js
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'test-group' });
async function run() {
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
});
},
});
}
run().catch(console.error);
cargo run
node consumer.js
node producer.js
You should see the message flow from producer to consumer!
[INFO] Starting Kafka-RS server on 127.0.0.1:9092
[INFO] Received connection from 127.0.0.1:52341
[DEBUG] Processing produce request for topic: test-topic
[DEBUG] Created new topic: test-topic with 1 partition
[DEBUG] Stored message with offset 0 in partition 0
[INFO] Producer sent 1 message(s) to test-topic
[DEBUG] Processing fetch request from consumer: test-group
[DEBUG] Fetching messages from offset 0 for topic: test-topic
[INFO] Consumer test-group fetched 1 message(s) from test-topic
{
topic: 'test-topic',
partition: 0,
offset: '0',
key: 'user-1',
value: '{"userId":1,"action":"login","timestamp":1703123456789}'
}
For comprehensive testing, Kafka-RS includes automated integration tests using real Kafka JavaScript clients.
The integration test verifies protocol compatibility with actual Kafka clients:
# Install Node.js dependencies
cd integration/kafka-client-test
npm install
# Start the server (in another terminal)
cargo run --release -- --port 9092
# Run the automated test
npm test
The automated test performs a complete producer-consumer cycle:
integration-test-topic
integration-test-group
When successful, you’ll see:
🎯 Starting Kafka Client Integration Test
📡 Connecting to Kafka broker: localhost:9092
🚀 Testing Kafka Producer...
✅ Producer connected successfully
✅ Sent 3 messages
✅ Producer disconnected successfully
📥 Testing Kafka Consumer...
✅ Consumer connected successfully
✅ Subscribed to topic: integration-test-topic
📩 Received message: {"key":"key1","value":"Hello from KafkaJS client!"}
📩 Received message: {"key":"key2","value":"Testing Kafka-RS server compatibility"}
📩 Received message: {"key":"key3","value":"{\"test\":true,\"timestamp\":...}"}
✅ Received 3 messages (expected 3)
✅ Consumer disconnected successfully
🎉 Integration Test Results:
✅ Producer: Successfully sent 3 messages
✅ Consumer: Successfully received 3 messages
✅ Server compatibility: Verified with real Kafka JavaScript client
🎯 All integration tests passed! Kafka-RS server is compatible with KafkaJS client.
This integration test runs automatically in GitHub Actions as part of the kafka-client-integration
job, ensuring ongoing compatibility with real Kafka clients.
Error: Connection refused (os error 61)
Solution: Make sure Kafka-RS server is running on the correct port.
Error: Topic 'test-topic' not found
Solution: Topics are auto-created on first produce. Send a message first.
Error: Address already in use (os error 48)
Solution:
pkill -f kafka
cargo run -- --port 9093
cargo test
src/domain/
src/application/
src/infrastructure/