Java代码测试Kafka集群收发消息
2021/4/15 20:25:30
本文主要是介绍Java代码测试Kafka集群收发消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
consumer:
package cn.miaoying.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class TestConsumer { private final ConsumerConnector consumer; private TestConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "20.21.1.xxx:2182,20.21.1.xxx:2183,20.21.1.xxx:2184"); // group 代表一个消费组 props.put("group.id", "jd-group"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map topicCountMap = new HashMap(); //topicCountMap.put(KafkaProducerDemo.TOPIC, new Integer(1)); topicCountMap.put("test_miaoying", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream stream = consumerMap.get("test_miaoying").get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { new TestConsumer().consume(); } }
provider:
package cn.miaoying.consumer; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProvider { public static void main(String[] args) { long events = Long.parseLong("1"); Properties properties = new Properties(); properties.put("metadata.broker.list", "20.21.1.xxx:9093,20.21.1.xxx:9091,20.21.1.xxx:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(properties); Producer producer = new Producer(config); for (int i = 0; i < 5; i++) { long runtime = new Date().getTime(); String ip = "127.0.0.1"; String msg = "test~test~test"; KeyedMessage keyedMessage = new KeyedMessage("test_miaoying", ip, msg); System.out.println(events + "---" + runtime); producer.send(keyedMessage); } producer.close(); } }
这篇关于Java代码测试Kafka集群收发消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?
- 2024-05-09企业src漏洞挖掘-有意思的命令执行