kafka java api
发布日期:2021-06-28 18:42:14 浏览次数:3 分类:技术文章

本文共 4895 字,大约阅读时间需要 16 分钟。

1、消费者

package com.asiainfo.group.kafka.consumer;import java.io.FileReader;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;public class ConsumerDemo {	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/";	private static KafkaConsumer
consumer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"consumer.properties")); consumer = new KafkaConsumer
(p); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { List
topics = new ArrayList
(); topics.add("okk"); consumer.subscribe(topics); try { while(true){ ConsumerRecords
records = consumer.poll(3000); System.err.println("收到了"+records.count()+"条消息"); for (ConsumerRecord
record : records) { System.err.println("topic:"+record.topic()); System.err.println("partition:"+record.partition()); System.err.println("offset:"+record.offset()); System.err.println("key:"+record.key()); System.err.println("value:"+record.value()); } //同步提交:会一直尝试直至提交成功,会一直阻塞 //consumer.commitSync(); //异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map
arg0, Exception arg1) { System.err.println("异步提交偏移量成功!"); } }); //同步提交结合异步提交 //如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的 //如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码 /*try{ consumer.commitSync(); } finally{ consumer.close(); }*/ } } catch (Exception e) { e.printStackTrace(); } finally{ consumer.close(); } } }).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer consumer.wakeup(); }}

 

2、生产者

package com.asiainfo.group.kafka.producer;import java.io.FileReader;import java.util.Properties;import java.util.concurrent.ExecutionException;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class ProductDemo {	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/";	private static KafkaProducer
producer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"producer.properties")); producer = new KafkaProducer
(p); } catch (Exception e) { e.printStackTrace(); } } /** * 发送并忘记(不关心是否到达) * @throws ExecutionException * @throws InterruptedException */ public void sendAndForget() throws Exception{ for (int i = 0; i < 10; i++) { ProducerRecord
record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i); producer.send(record); } producer.close(); } public void syncSend() throws Exception{ long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { ProducerRecord
record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i); RecordMetadata recordMetadata = producer.send(record).get(); System.err.println("同步发送成功!"); } System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } public void asyncSend(){ long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { ProducerRecord
record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.err.println("异步发送成功!"); } }); } System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ new ProductDemo().sendAndForget(); //new ProductDemo().syncSend(); //new ProductDemo().asyncSend(); }}

3、生产者配置文件

bootstrap.servers=192.168.0.108:9092key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer

4、消费者配置文件

bootstrap.servers=192.168.0.108:9092key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializergroup.id=groupByJava1enable.auto.commit=false

 

转载地址:https://blog.csdn.net/xl_1803/article/details/104134011 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:pom文件出现unkown error解决办法
下一篇:maven-assembly-plugin插件实现自定义打包

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年04月15日 21时53分12秒