3.kafka生产者消费者测试
发布日期:2021-07-30 03:26:39 浏览次数:3 分类:技术文章

本文共 10860 字,大约阅读时间需要 36 分钟。

1.eclipse新建maven项目

eclipse新建maven项目,pom.xml文件添加kafka-clients依赖

在这里插入图片描述

2.修改配置

idea 之前已经启动了kafka的core模块,修改server.properties

添加localhostlisteners=PLAINTEXT://localhost:9092配置文件尽可能使用主机名,不使用ip

3.生产者代码

import java.util.Properties;import java.util.concurrent.Future;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class); @SuppressWarnings("resource") public static void main(String[] args) throws Exception {
Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer
producer = new KafkaProducer
(properties); String data = "hi"; ProducerRecord
record = new ProducerRecord
("hi", data); Future
future = producer.send(record); RecordMetadata recordMetadata = future.get(); logger.info("send data: {},offset :{}", data, recordMetadata.offset()); }}

注意

  1. server.properties的listeners=PLAINTEXT://localhost:9092注释要放开,添加主机名
  2. 生产者配置信息bootstrap.servers,主机名和端口要配置正确

否则报错

MINAServer Logger--> INFO{
AppInfoParser.java:83}-Kafka version : 0.11.0.3 MINAServer Logger--> INFO{
AppInfoParser.java:84}-Kafka commitId : 26ddb9e3197be39a MINAServer Logger--> WARN{
NetworkClient.java:589}-Connection to node -1 could not be established. Broker may not be available. MINAServer Logger--> WARN{
NetworkClient.java:589}-Connection to node -1 could not be established. Broker may not be available. MINAServer Logger--> WARN{
NetworkClient.java:589}-Connection to node -1 could not be established. Broker may not be available.

4.运行生产者

执行两次

运行部分日志如下

MINAServer Logger--> INFO{
AbstractConfig.java:223}-ProducerConfig values: 略 org.apache.kafka.clients.producer.internals.DefaultPartitioner 略 MINAServer Logger--> INFO{
AppInfoParser.java:83}-Kafka version : 0.11.0.3 MINAServer Logger--> INFO{
AppInfoParser.java:84}-Kafka commitId : 26ddb9e3197be39a MINAServer Logger--> INFO{
Producer.java:27}-send data: hi,offset :1

5. 消费者代码

import java.util.Collections;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class); @SuppressWarnings("resource") public static void main(String[] args) {
Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "hi"); properties.put("client.id", "consumer-client"); properties.put("auto.offset.reset", "earliest"); KafkaConsumer
consumer = new KafkaConsumer
(properties); consumer.subscribe(Collections.singletonList("hi")); while(true) {
ConsumerRecords
records = consumer.poll(1000L); records.forEach(record -> {
logger.info("offset is :{}, value is :{}", record.offset(), record.value()); }); } }}

6. 运行消费者

部分日志如下

MINAServer Logger--> INFO{
AbstractConfig.java:223}-ConsumerConfig values: 略 MINAServer Logger--> INFO{
AppInfoParser.java:83}-Kafka version : 0.11.0.3 MINAServer Logger--> INFO{
AppInfoParser.java:84}-Kafka commitId : 26ddb9e3197be39a MINAServer Logger--> INFO{
AbstractCoordinator.java:607}-Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group hi. MINAServer Logger--> INFO{
ConsumerCoordinator.java:419}-Revoking previously assigned partitions [] for group hi MINAServer Logger--> INFO{
AbstractCoordinator.java:442}-(Re-)joining group hi MINAServer Logger--> INFO{
AbstractCoordinator.java:652}-Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group hi MINAServer Logger--> INFO{
AbstractCoordinator.java:607}-Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group hi. MINAServer Logger--> INFO{
AbstractCoordinator.java:442}-(Re-)joining group hi MINAServer Logger--> INFO{
AbstractCoordinator.java:409}-Successfully joined group hi with generation 1 MINAServer Logger--> INFO{
ConsumerCoordinator.java:262}-Setting newly assigned partitions [hi-0] for group hi MINAServer Logger--> INFO{
Consumer.java:30}-offset is :0, value is :hi MINAServer Logger--> INFO{
Consumer.java:30}-offset is :1, value is :hi

7. 观察idea中启动的kafka core日志

略 INFO Loading producer state from offset 0 for partition __consumer_offsets-3 with message format version 2 (kafka.log.Log) INFO Completed load of log __consumer_offsets-3 with 1 log segments, log start offset 0 and log end offset 0 in 7 ms (kafka.log.Log) INFO Created log for partition [__consumer_offsets,3] in F:\resources\kafka\logs with properties {
compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) INFO Partition [__consumer_offsets,3] on broker 0: No checkpointed highwatermark is found for partition __consumer_offsets-3 (kafka.cluster.Partition) [ INFO Replica loaded for partition __consumer_offsets-3 with initial high watermark 0 (kafka.cluster.Replica) INFO Partition [__consumer_offsets,3] on broker 0: __consumer_offsets-3 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition) INFO Loading producer state from offset 0 for partition __consumer_offsets-13 with message format version 2 (kafka.log.Log) [ INFO Completed load of log __consumer_offsets-13 with 1 log segments, log start offset 0 and log end offset 0 in 8 ms (kafka.log.Log) INFO Created log for partition [__consumer_offsets,13] in F:\resources\kafka\logs with properties {
compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) INFO Partition [__consumer_offsets,13] on broker 0: No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition) INFO Replica loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Replica) INFO Partition [__consumer_offsets,13] on broker 0: __consumer_offsets-13 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition) 略 INFO [Group Metadata Manager on Broker 0]: Finished loading offsets and group metadata from __consumer_offsets-36 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) INFO [Group Metadata Manager on Broker 0]: Finished loading offsets and group metadata from __consumer_offsets-39 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) INFO [Group Metadata Manager on Broker 0]: Finished loading offsets and group metadata from __consumer_offsets-42 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) INFO [Group Metadata Manager on Broker 0]: Finished loading offsets and group metadata from __consumer_offsets-45 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) INFO [Group Metadata Manager on Broker 0]: Finished loading offsets and group metadata from __consumer_offsets-48 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) INFO [GroupCoordinator 0]: Preparing to rebalance group hi with old generation 0 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Stabilized group hi generation 1 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Assignment received from leader for group hi for generation 1 (kafka.coordinator.group.GroupCoordinator) INFO Updated PartitionLeaderEpoch. New: {
epoch:0, offset:0}, Current: {
epoch:-1, offset-1} for Partition: __consumer_offsets-29. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache) INFO [GroupCoordinator 0]: Preparing to rebalance group hi with old generation 1 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Stabilized group hi generation 2 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Assignment received from leader for group hi for generation 2 (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Preparing to rebalance group hi with old generation 2 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Member consumer-client-0b3a48bb-5f12-4ec9-ab89-bd019ca1d339 in group hi has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Member consumer-client-8b6972ab-56a3-4584-b382-f7e8d88c52b4 in group hi has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Stabilized group hi generation 3 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) INFO [GroupCoordinator 0]: Assignment received from leader for group hi for generation 3 (kafka.coordinator.group.GroupCoordinator) INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 7 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

转载地址:https://blog.csdn.net/u010895512/article/details/118369280 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:erlang 环境配置
下一篇:2.调试kafka源码

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月23日 02时42分41秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

java多线程初学者指南_Java多线程初学者指南(4):线程的生命周期 2019-04-21
java进程user是jenkins_java 学习:在java中启动其他应用,由jenkins想到的 2019-04-21
java添加资源文件_如何在eclipse中将资源文件夹添加到我的Java项目中 2021-06-24
java的三种修饰符_3分钟弄明白JAVA三大修饰符 2021-06-24
mysql source skip_redis mysql 中的跳表(skip list) 查找树(btree) 2021-06-24
java sun.org.mozilla_maven编译找不到符号 sun.org.mozilla.javascript.internal 2021-06-24
php curl 输出到文件,PHP 利用CURL(HTTP)实现服务器上传文件至另一服务器 2021-06-24
PHP字符串运算结果,PHP运算符(二)"字符串运算符"实例详解 2021-06-24
PHP实现 bcrypt,如何使php中的bcrypt和Java中的jbcrypt兼容 2021-06-24
php8安全,PHP八大安全函数解析 2021-06-24
php基础语法了解和熟悉的表现,PHP第二课 了解PHP的基本语法以及目录结构 2021-06-24
matlab中lag函数用法,MATLAB movavg函数用法 2021-06-24
matlab变形监测,基于matlab的变形监测数据处理与分析_毕业设计论文 2021-06-24
opencv matlab编程,在Matlab中调用OpenCV函数 | 学步园 2021-06-24
c语言文件wt,c语言,wt和rt中的t是什么意思 2021-06-24
c语言运行几进制,【C语言】求已知等式在几进制条件下成立 2021-06-24
电梯运行仿真c语言代码,电梯调度算法模拟(示例代码) 2021-06-24
android组件动态接收数据库,Android开发——fragment中数据传递与刷新UI(更改控件)... 2021-06-24
云麦小米华为体脂秤怎么样_云康宝和华为智能体脂秤对比评测,实际体验告诉你哪款更好... 2021-06-24
linux 条件判断 取非_Linux awk 系列文章之 awk 多重条件判断 2021-06-24