kafka在Java中常用API的使用

2021/5/22 20:26:10

本文主要是介绍kafka在Java中常用API的使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

  1. 新建maven工程
    导入maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.mashibing</groupId>
    <artifactId>kafka_mq_study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>12</maven.compiler.source>
        <maven.compiler.target>12</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.11</version>
        </dependency>
    </dependencies>

</project>

2.新建log4j.properties配置文件,对日志文件进行配置

log4j.rootLogger = info,console

log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern = %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n

3.KafKa常用API的使用 DML管理
(1)Topic管理API

package com.mashibing.dml;

import org.apache.kafka.clients.admin.*;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/21 - 21 -19:54
 * @Description:
 * @version: 1.0
 */
public class KafkaTopicDML {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建KafkaAdminClient
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "VM-4-9-centos:9092");
        KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(properties);

        //创建Topic信息
        //CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(new NewTopic("Topic02", 3, (short) 1)));
        //createTopicsResult.all().get();//同步创建

        //查看Topic列表
        ListTopicsResult topicsResult = adminClient.listTopics();
        Set<String> names = topicsResult.names().get();
        for (String name : names) {
            System.out.println(name);
        }

        //删除Topic
        //DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList("topic02"));
        //deleteTopicsResult.all().get();//把异步的删除变成同步的删除

        //查看Topic详情信息
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("topic01"));
        Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get();
        for (Map.Entry<String, TopicDescription> entry : topicDescriptionMap.entrySet()) {
            System.out.println(entry.getKey() + "\t" + entry.getValue());
        }

        //关闭AdminClient
        adminClient.close();
    }
}

(2)消费者&生产者
生产者类~

package com.mashibing.quickstart;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/21 - 21 -23:51
 * @Description:
 * @version: 1.0
 */
public class KafkaProducerQuickStart {
    public static void main(String[] args) {
        //1.创建KafkaProducer
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "VM-4-9-centos:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            final ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
            //发送消息给服务器
            producer.send(record);
        }

        //关闭生产者
        producer.close();
    }
}

消费者类~

package com.mashibing.quickstart;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;

import static java.util.regex.Pattern.compile;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/22 - 22 -0:25
 * @Description:
 * @version: 1.0
 */
public class KafkaConsumerQuickStart_1 {
    public static void main(String[] args) {
        //创建KafkaConsumer
        final Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "VM-4-9-centos:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");

        final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅相关的Topics
        kafkaConsumer.subscribe(compile("^topic.*"));

        //遍历消息队列
        while(true){
            final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //从队列中取到了数据
            if (!consumerRecords.isEmpty()) {
                Iterator<ConsumerRecord<String, String>> records = consumerRecords.iterator();
                while (records.hasNext()) {
                    ConsumerRecord<String, String> record = records.next();
                    final String topic = record.topic();
                    final int partition = record.partition();
                    final long offset = record.offset();
                    final String key = record.key();
                    final String value = record.value();
                    final long timestamp = record.timestamp();
                    System.out.println(topic + "," + partition + "," + offset + "," + key + "," + value + "," + timestamp);
                }
            }
        }

    }
}

(3)自定义分区
自定义分区生产者类~

package com.mashibing.partitioner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/21 - 21 -23:51
 * @Description:
 * @version: 1.0
 */
public class KafkaProducerPartitioner {
    public static void main(String[] args) {
        //1.创建KafkaProducer
         final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "VM-4-9-centos:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UserDefinePartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 6; i++) {
            final ProducerRecord<String, String> record =
                    new ProducerRecord<>("topic01", "key" + i, "value" + i);
                    //new ProducerRecord<>("topic01", "value" + i);
            //发送消息给服务器
            producer.send(record);
        }

        //关闭生产者
        producer.close();
    }
}

自定义分区消费者类

package com.mashibing.quickstart;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

import static java.util.regex.Pattern.compile;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/22 - 22 -0:25
 * @Description:
 * @version: 1.0
 */
public class KafkaConsumerQuickStart_2 {
    public static void main(String[] args) {
        //创建KafkaConsumer
        final Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "VM-4-9-centos:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅相关的Topics,手动指定消费分区,失去管理特性
        final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic01", 0));
        kafkaConsumer.assign(partitions);

        //指定消费分区的位置
        //kafkaConsumer.seekToBeginning(partitions);
        kafkaConsumer.seek(new TopicPartition("topic01", 0), 1);

        //遍历消息队列
        while(true){
            final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //从队列中取到了数据
            if (!consumerRecords.isEmpty()) {
                Iterator<ConsumerRecord<String, String>> records = consumerRecords.iterator();
                while (records.hasNext()) {
                    ConsumerRecord<String, String> record = records.next();
                    final String topic = record.topic();
                    final int partition = record.partition();
                    final long offset = record.offset();
                    final String key = record.key();
                    final String value = record.value();
                    final long timestamp = record.timestamp();
                    System.out.println(topic + "," + partition + "," + offset + "," + key + "," + value + "," + timestamp);
                }
            }
        }

    }
}

用户自定义分区类代码实现

package com.mashibing.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: ZhouYun
 * @Date: 2021/5/22 - 22 -14:06
 * @Description:
 * @version: 1.0
 */
public class UserDefinePartitioner implements Partitioner {
    private AtomicInteger counter = new AtomicInteger(0);

    /**
     * 返回分区号
     *
     * @param s
     * @param o
     * @param bytes
     * @param o1
     * @param bytes1
     * @param cluster
     * @return
     */
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //获取所有分区
        final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);
        final int numPartitions = partitionInfos.size();
        if (bytes == null) {
            final int andIncrement = counter.getAndIncrement();
            return (andIncrement & Integer.MAX_VALUE) % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(bytes)) % numPartitions;
        }
    }

    @Override
    public void close() {
        System.out.println("close");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("configure");
    }
}



这篇关于kafka在Java中常用API的使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程