跳转至

第五节 Kafka API

1、使用 Kafka 原生的 API

消费者自动提交.

1-1 定义自己的生产者:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * @ClassName MyKafkaProducer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:37 PM
 * @Version 1.0
 **/
public class MyKafkaProducer {
    private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;

    public MyKafkaProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置批量发送
        properties.put("batch.size", 16384);
        // 批量发送的等待时间 50ms, 超过 50ms, 不足批量大小也发送
        properties.put("linger.ms", 50);
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);
    }

    public boolean sendMsg() {
        boolean result = true;
        try {
            // 正常发送, test2 是 topic, 0 代表的是分区, 1 代表的是 key, hello world 是发送的消息内容
            final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");
            producer.send(record);
            // 有回调函数的调用
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println(recordMetadata.topic());
                    System.out.println(recordMetadata.partition());
                    System.out.println(recordMetadata.offset());
                }
            });
          // 自己定义一个类
            producer.send(record, new MyCallback(record));
        } catch (Exception e) {
            result = false;
        }
        return result;
    }
}

定义生产者发送成功的回调函数:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @ClassName MyCallback
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:51 PM
 * @Version 1.0
 **/
public class MyCallback implements Callback {
    private Object msg;

    public MyCallback(Object msg) {
        this.msg = msg;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        System.out.println("topic = " + metadata.topic());
        System.out.println("partiton = " + metadata.partition());
        System.out.println("offset = " + metadata.offset());
        System.out.println(msg);
    }
}

生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了。

因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下。当然,你在生产环境中可能不会遇到这样问题,呵呵!

代码如下:

import static java.lang.Thread.sleep;

/**
 * @ClassName MyKafkaProducerTest
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:46 PM
 * @Version 1.0
 **/
public class MyKafkaProducerTest {
    public static void main(String[] args) throws InterruptedException {
        MyKafkaProducer producer = new MyKafkaProducer();
        boolean result = producer.sendMsg();
        System.out.println("send msg " + result);
        sleep(1000);
    }
}

消费者类:

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
    }
}

1-2 消费者的测试类

/**
 * @ClassName MyConsumerTest
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:23 PM
 * @Version 1.0
 **/
public class MyConsumerTest {
    public static void main(String[] args) {
        MyKafkaConsumer consumer = new MyKafkaConsumer();
        consumer.start();
        System.out.println("==================");
    }
}

1-3 消费者同步手动提交

前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况。

所以在生产环境下,很多时候需要对 Offset 进行手动提交, 以解决重复消费的问题。

手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的。

所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式。

同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。

而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
      // 这里要修改成手动提交
        properties.put("enable.auto.commit", "false");
        // properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }
    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());

          //手动同步提交
          consumer.commitSync();
        }

    }
}

1-4 消费者异步手工提交

手动同步提交方式需要等待 Broker 的成功响应,效率太低,影响消费者的吞吐量。

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
      // 这里要修改成手动提交
        properties.put("enable.auto.commit", "false");
        // properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());

          //手动同步提交
          // consumer.commitSync();
          //手动异步提交
          // consumer.commitAsync();
          // 带回调公共的手动异步提交
          consumer.commitAsync((offsets, e) -> {
            if(e != null) {
              System.out.println("提交次数, offsets = " + offsets);
              System.out.println("exception = " + e);
            }
          });
        }
    }
}

2、Spring Boot 使用 Kafka

现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?

Maven 配置:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.1</version>
    </dependency>

添加配置文件,在 application.properties 中加入如下配置信息:

Kafka 连接地址:

spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

生产者:

spring.kafka.producer.acks = 0
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries = 3
spring.kafka.producer.batch-size = 4096
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.compression-type = gzip

消费者:

spring.kafka.consumer.group-id = mygroup
spring.kafka.consumer.auto-commit-interval = 5000
spring.kafka.consumer.heartbeat-interval = 3000
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.enable-auto-commit = true
# listenner, 标识消费者监听的个数
spring.kafka.listener.concurrency = 8
# topic的名字
kafka.topic1 = topic1

生产者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

@Service
@Slf4j
public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {
        @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
        // 读取配置文件
    @Value("${kafka.topic1}")
    private String topic;

    @Override
    public void sendKafka() {
      kafkaTemplate.send(topic, "hell world");
    }
}

消费者:

@Component
@Slf4j
public class MyKafkaConsumer {
  @KafkaListener(topics = "${kafka.topic1}")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + kafkaMessage.get());
}