Kafka-java代码向kafka中输入和消费数据
2022/7/26 1:22:48
本文主要是介绍Kafka-java代码向kafka中输入和消费数据,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Kafka-java
1. 在写代码前需要导入依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency>
2. 使用java代码从kafka中拿数据
package com.wt.flink.scurce import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala._ object Demo5KafkaSource { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 构建kafka source */ val source: KafkaSource[String] = KafkaSource .builder[String] .setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表 .setTopics("test_topic2") //指定topic .setGroupId("my_group") //指定消费组,一条数据指能在一个组内只能被消费一次 .setStartingOffsets(OffsetsInitializer.earliest()) //读取数据的位置,earliest:读取所有的数据,latest:读取最新的数据 .setValueOnlyDeserializer(new SimpleStringSchema()) //反序列的类 .build() //使用kafka source val kafkaDS: DataStream[String] = env.fromSource(source,WatermarkStrategy.noWatermarks(),"kafka Source") kafkaDS.print() env.execute() } }
3. 用java代码向kafka中打入数据
package com.wt.flink.kafka import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import java.util.Properties object Demo1KafkaProducer { def main(args: Array[String]): Unit = { /** * 1. 创建生产者 * */ val properties = new Properties() //指定kafka broker的地址 properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092") //设置key 和 value的序列化的类 properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](properties) val record = new ProducerRecord[String, String]("test_topic2", "woaini,zhongguo") //发送数据到kafka中 producer.send(record) producer.flush() //关闭连接 producer.close() } }
4. 向kafka中批量打入学生数据
package com.wt.flink.kafka import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import java.util.Properties import scala.io.Source object Demo2StudentToKafka { def main(args: Array[String]): Unit = { /** * 创建生产者 * */ val properties = new Properties() //指定kafka broker 的地址 //指定kafka broker地址 properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092") //设置key 和value的序列化类 properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](properties) /** * 将学生表数据批量写到kafka中 * */ val studentList: List[String] = Source.fromFile("data/students.txt").getLines().toList //发送数据到kafka中 for (student <- studentList) { val record = new ProducerRecord[String, String]("student", student) producer.send(record) producer.flush() } producer.close() } }
5. 在kafka中批量拿数据
package com.wt.flink.kafka import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} import java.time.Duration import java.util.Properties import java.{lang, util} object Demo3KafkaConsumer { def main(args: Array[String]): Unit = { /** * 1. 创建消费者 * */ val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092") //key 和value 反序列化的类 properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") /** * earliest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest 默认 * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据 * none * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 * */ properties.setProperty("auto.offset.reset","earliest") //消费者组 properties.setProperty("group.id","suibian_mingzi") val consumer = new KafkaConsumer[String, String](properties) /** * 2. 订阅一个 topic, 可以一次定义多个topic * */ val topics = new util.ArrayList[String]() topics.add("student") consumer.subscribe(topics) while (true) { println("正在消费") /** * 消费数据,这需要设置一个超时时间 * */ val consumerRecords: ConsumerRecords[String, String] = consumer .poll(Duration.ofSeconds(2)) //解析数据 val records: lang.Iterable[ConsumerRecord[String, String]] = consumerRecords.records("student") val iterRecord: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iterRecord.hasNext) { //获取一行数据 val record: ConsumerRecord[String, String] = iterRecord.next() val topic: String = record.topic() //topic val offset: Long = record.offset() //数据偏移量 val key: String = record.key() //数据的key,默认情况下没有指定的的话为null val value: String = record.value() //保存数据 val ts: Long = record.timestamp() //时间戳,默认存入的时间 println(s"$topic\t$offset\t$key\t$value\t$ts") } } //关闭连接 consumer.close() } }
这篇关于Kafka-java代码向kafka中输入和消费数据的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?
- 2024-05-09企业src漏洞挖掘-有意思的命令执行