🎉 Sealos 首充折扣,限时返场!最高立返 10000,活动日期 4月22日-4月28日
Sealos Logo

Java

在 Sealos DevBox 中使用 Java 连接 Kafka 的完整指南

本文介绍如何在 Sealos DevBox 环境中使用 Java 连接和操作 Kafka。

准备工作

项目配置

创建 Maven 项目

在 Cursor 终端中,执行以下命令初始化 Maven 项目:

mvn archetype:generate -DgroupId=com.example -DartifactId=kafka-java-example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
mv kafka-java-example/* .
rm -rf kafka-java-example
rm -rf test

配置 pom.xml

将项目的 pom.xml 文件替换为以下内容:

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>kafka-java-example</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.5</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.App</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

这个 pom.xml 文件包含必需的依赖项 (Kafka 客户端和用于日志记录的 SLF4J),并配置 Maven Shade 插件以创建可执行的 JAR。

添加配置文件

src/main/resources 目录下创建 kafka.properties 配置文件:

kafka.properties
bootstrap.servers=your_kafka_bootstrap_servers:9092
topic=your_topic_name
group.id=your_consumer_group_id

将占位符替换为您从 Sealos 桌面中数据库应用获取的实际 Kafka 凭据。

编写 Java 代码

src/main/java/com/example 目录下创建以下 Java 类:

  1. KafkaProducerExample.java
package com.example;
 
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
 
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
 
public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = loadConfig();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        String topic = props.getProperty("topic");
        String message = "Hello from Sealos DevBox!";
 
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
 
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent successfully. Topic: " + metadata.topic() +
                        ", Partition: " + metadata.partition() +
                        ", Offset: " + metadata.offset());
            } else {
                System.err.println("Error sending message: " + exception.getMessage());
            }
        });
 
        producer.flush();
        producer.close();
    }
 
    private static Properties loadConfig() {
        Properties props = new Properties();
        try (FileInputStream fis = new FileInputStream("src/main/resources/kafka.properties")) {
            props.load(fis);
        } catch (IOException e) {
            throw new RuntimeException("Error loading Kafka configuration", e);
        }
        return props;
    }
}

该类演示如何创建 Kafka 生产者、发送消息以及异步处理结果。

  1. KafkaConsumerExample.java
KafkaConsumerExample.java
package com.example;
 
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
 
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = loadConfig();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
 
        String topic = props.getProperty("topic");
        consumer.subscribe(Collections.singletonList(topic));
 
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value() +
                            " from topic: " + record.topic() +
                            ", partition: " + record.partition() +
                            ", offset: " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
 
    private static Properties loadConfig() {
        Properties props = new Properties();
        try (FileInputStream fis = new FileInputStream("src/main/resources/kafka.properties")) {
            props.load(fis);
        } catch (IOException e) {
            throw new RuntimeException("Error loading Kafka configuration", e);
        }
        return props;
    }
}

该类展示如何创建 Kafka 消费者、订阅主题并持续轮询新消息。

以上两个类都通过 loadConfig() 方法从 kafka.properties 文件中读取 Kafka 配置信息,这样可以在不修改代码的情况下灵活更改配置。

构建与运行

在运行环境终端中执行以下命令来构建和运行项目:

mvn clean package
java -cp target/kafka-java-example-1.0-SNAPSHOT.jar com.example.KafkaProducerExample
java -cp target/kafka-java-example-1.0-SNAPSHOT.jar com.example.KafkaConsumerExample

建议在不同的终端窗口中分别运行生产者和消费者程序,以便观察消息的发送和接收过程。

最佳实践

  1. 将 Kafka 配置信息统一存储在配置文件中
  2. 实现合适的错误处理和日志记录机制
  3. 使用 try-with-resources 语句确保 Kafka 生产者和消费者正确关闭
  4. 考虑使用 Kafka AdminClient 来管理主题和其他 Kafka 资源
  5. 为消息的键值对实现合适的序列化和反序列化方法

常见问题排查

如果遇到连接问题,请检查以下几点:

  1. 确认 kafka.properties 文件中的 Kafka 连接信息是否正确
  2. 验证 Kafka 集群是否正常运行且可以从运行环境访问
  3. 检查运行环境的网络配置是否有限制
  4. 确保 pom.xml 文件中的所有依赖项配置正确

更多关于 Java 操作 Kafka 的详细信息,请参考 Apache Kafka 官方文档

在 GitHub 上编辑

最后更新于

本页导航