🎉 Sealos 首充折扣,限时返场!最高立返 10000,活动日期 4月22日-4月28日
Sealos Logo

Node.js

在 Sealos DevBox 中使用 Node.js 连接 Kafka 的完整指南

本文将指导您如何在 Sealos DevBox 中使用 Node.js 连接和操作 Kafka。

准备工作

安装依赖包

在运行环境终端中执行以下命令安装所需依赖:

npm install kafkajs dotenv

该命令将安装以下包:

  • kafkajs:一个现代化的 Node.js Kafka 客户端
  • dotenv:用于从 .env 文件加载环境变量的模块

配置与实现

配置环境变量

首先,在项目根目录创建 .env 文件,添加 Kafka 连接所需的环境变量:

.env
KAFKA_BROKERS=your_kafka_host:9092
KAFKA_CLIENT_ID=my-app
KAFKA_TOPIC=my-topic

请将上述配置中的占位符替换为您从 Sealos 数据库应用中获取的实际 Kafka 连接信息。

创建 Kafka 客户端

新建 kafkaClient.js 文件,添加以下代码:

const { Kafka } = require('kafkajs');
require('dotenv').config();
 
const kafka = new Kafka({
  clientId: process.env.KAFKA_CLIENT_ID,
  brokers: process.env.KAFKA_BROKERS.split(','),
});
 
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });
 
module.exports = { kafka, producer, consumer };

该文件创建并导出 Kafka 客户端实例及其生产者和消费者。

实现消息生产者

创建 producer.js 文件,实现消息发送功能:

const { kafka, producer } = require('./kafkaClient');
require('dotenv').config();
 
async function produceMessage() {
  try {
    await producer.connect();
    
    // Check if the topic exists
    const admin = kafka.admin();
    await admin.connect();
    const topics = await admin.listTopics();
    
    if (!topics.includes(process.env.KAFKA_TOPIC)) {
      console.log(`Topic ${process.env.KAFKA_TOPIC} does not exist. Creating it...`);
      await admin.createTopics({
        topics: [{ topic: process.env.KAFKA_TOPIC, numPartitions: 1, replicationFactor: 1 }]
      });
      console.log(`Topic ${process.env.KAFKA_TOPIC} created successfully.`);
    }
    
    await admin.disconnect();
 
    // Send the message
    const result = await producer.send({
      topic: process.env.KAFKA_TOPIC,
      messages: [
        { value: 'Hello from Sealos DevBox!' },
      ],
    });
    console.log('Message sent successfully', result);
  } catch (error) {
    console.error('Error producing message:', error);
    if (error.name === 'KafkaJSNumberOfRetriesExceeded') {
      console.error('Connection details:', {
        clientId: process.env.KAFKA_CLIENT_ID,
        brokers: process.env.KAFKA_BROKERS,
      });
    }
  } finally {
    await producer.disconnect();
  }
}
 
produceMessage();

生产者脚本的主要功能:

  1. 连接 Kafka 服务
  2. 检查并创建主题 (如不存在)
  3. 发送消息
  4. 错误处理和日志记录
  5. 完成后断开连接

实现消息消费者

创建 consumer.js 文件,实现消息接收功能:

const { consumer } = require('./kafkaClient');
require('dotenv').config();
 
async function consumeMessages() {
  try {
    await consumer.connect();
    await consumer.subscribe({ topic: process.env.KAFKA_TOPIC, fromBeginning: true });
 
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          topic,
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });
      },
    });
  } catch (error) {
    console.error('Error consuming messages:', error);
  }
}
 
consumeMessages();

消费者脚本的主要功能: 1。连接 Kafka 服务 2。订阅指定主题 3。持续监听并处理新消息 4。记录接收到的消息详情 5。错误处理

运行程序

在运行环境终端中,执行以下命令启动生产者:

node producer.js

打开新的终端窗口,运行消费者:

node consumer.js

此时消费者将开始监听消息。当生产者发送消息后,您可以在消费者终端中看到接收到的消息内容。

最佳实践

  1. 使用环境变量管理 Kafka 配置信息
  2. 实现完善的错误处理和日志记录机制
  3. 充分利用 kafkajs 提供的重试机制
  4. 为消费者实现优雅的关闭处理
  5. 在处理大量消息时考虑使用压缩以提升性能

常见问题排查

如果遇到连接问题,请检查:

  1. .env 文件中的 Kafka 连接信息是否正确
  2. Kafka 集群是否正常运行且可访问
  3. 运行环境的网络配置是否有限制
  4. 相关依赖包是否正确安装

要了解更多关于在 Node.js 中使用 Kafka 的信息,请参考 KafkaJS 官方文档

在 GitHub 上编辑

最后更新于

本页导航