kafka java api
发布日期:2021-06-28 18:42:14
浏览次数:3
分类:技术文章
本文共 4895 字,大约阅读时间需要 16 分钟。
1、消费者
package com.asiainfo.group.kafka.consumer;import java.io.FileReader;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;public class ConsumerDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/"; private static KafkaConsumerconsumer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"consumer.properties")); consumer = new KafkaConsumer (p); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { List topics = new ArrayList (); topics.add("okk"); consumer.subscribe(topics); try { while(true){ ConsumerRecords records = consumer.poll(3000); System.err.println("收到了"+records.count()+"条消息"); for (ConsumerRecord record : records) { System.err.println("topic:"+record.topic()); System.err.println("partition:"+record.partition()); System.err.println("offset:"+record.offset()); System.err.println("key:"+record.key()); System.err.println("value:"+record.value()); } //同步提交:会一直尝试直至提交成功,会一直阻塞 //consumer.commitSync(); //异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map arg0, Exception arg1) { System.err.println("异步提交偏移量成功!"); } }); //同步提交结合异步提交 //如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的 //如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码 /*try{ consumer.commitSync(); } finally{ consumer.close(); }*/ } } catch (Exception e) { e.printStackTrace(); } finally{ consumer.close(); } } }).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer consumer.wakeup(); }}
2、生产者
package com.asiainfo.group.kafka.producer;import java.io.FileReader;import java.util.Properties;import java.util.concurrent.ExecutionException;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class ProductDemo { private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/"; private static KafkaProducerproducer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"producer.properties")); producer = new KafkaProducer (p); } catch (Exception e) { e.printStackTrace(); } } /** * 发送并忘记(不关心是否到达) * @throws ExecutionException * @throws InterruptedException */ public void sendAndForget() throws Exception{ for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i); producer.send(record); } producer.close(); } public void syncSend() throws Exception{ long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { ProducerRecord record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i); RecordMetadata recordMetadata = producer.send(record).get(); System.err.println("同步发送成功!"); } System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } public void asyncSend(){ long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.err.println("异步发送成功!"); } }); } System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ new ProductDemo().sendAndForget(); //new ProductDemo().syncSend(); //new ProductDemo().asyncSend(); }}
3、生产者配置文件
bootstrap.servers=192.168.0.108:9092key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer
4、消费者配置文件
bootstrap.servers=192.168.0.108:9092key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializergroup.id=groupByJava1enable.auto.commit=false
转载地址:https://blog.csdn.net/xl_1803/article/details/104134011 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
表示我来过!
[***.240.166.169]2024年04月15日 21时53分12秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
android课程表!大厂offer手到擒来,满满干货指导
2019-04-29
android网!2021中级Android开发面试解答,进阶学习资料!
2019-04-29
android自动化测试工具!为什么有人说Android开发不再吃香?建议收藏
2019-04-29
android系统架构五层!最详细的解释小白也能听懂,2年以上经验必看
2019-04-29
android线刷包!跟我一起手写EventBus吧,大厂内部资料
2019-04-29
android实战!百度、阿里、滴滴、新浪的面试心经总结,满满干货指导
2019-04-29
Android小技巧:一线互联网移动架构师NDK模块开发!含BATJM大厂
2019-04-29
docker搭建postgresql9.4主从同步复制集群
2019-04-29
docker下postgis12+postgis3.0搭建
2019-04-29
什么是函数式编程
2019-04-29
Java开发必用的工具包
2019-04-29
世界500强公司要求员工必须熟练掌握的七种工作方法
2019-04-29
九个做事的顺序,你会更加优秀
2019-04-29
史上最详细的Hadoop环境搭建
2019-04-29
最近经历的一些大数据(Spark/Hadoop)面试题
2019-04-29
Hadoop MapReduce原理及实例
2019-04-29
Java 集合系列目录(Category)
2019-04-29
redis永久设置或取消密码
2019-04-29
Git .gitignore配置学习
2019-04-29