当前位置:网站首页 > 更多 > 编程开发 > 正文

[知识总结] 【Kafka】Springboot + Kafka(docker)简单demo

作者:CC下载站 日期:2021-10-29 08:50:00 浏览:83 分类:编程开发

长篇大论的原理概念放后面补充,先把使用放上

部署与使用

这里使用docker部署, Docker Hub上有两个kafka镜像比较流行,一个是wurstmeister/kafka,另一个是bitnami/kafka,这里以wurstmeister/kafka为例:

docker部署

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 先启动zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 假设你的服务器公网ip为192.168.1.1
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

(Topic的创建我们用代码实现)

如果你想进入kafka下:

docker exec -it kafka bash
cd opt/kafka

SpringBoot整合

配置

依赖:(springBoot版本我使用的2.3.1)

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>

配置:

spring:
  kafka:
    # kafka服务器ip和端口号,多个用逗号隔开
    bootstrap-servers: 192.168.1.1:9092
    # 生产者
    producer:
      # 如果该值大于零时,表示启用重试失败的发送次数
      retries: 0
      # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      acks: 1
      # 每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
      batch-size: 16384
      # 生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
      buffer-memory: 33554432
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 提交延时
        linger:
          # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
          ms: 0
          # 自定义分区器
#        partitioner:
#          class: com.xxx.xxx.xx
    # 消费者
    consumer:
      # 用于标识此使用者所属的使用者组的唯一字符串
      group-id: log
      # 消费者的偏移量将在后台定期提交,默认值为true
      enable-auto-commit: true
      # 如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 1000
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: earliest
      # Kafka提供的序列化和反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: {session.timeout.ms: 12000, request.timeout.ms: 18000}
      # 批量消费每次最多消费多少条消息
    #      max-poll-records: 50
    # 消费端监听的topic不存在时,项目启动会报错(关掉)
    listener:
      missing-topics-fatal: false
#      concurrency: 10
#      ack-mode: MANUAL_IMMEDIATE
#      poll-timeout: 1500
      # 设置批量消费
#      type: batch

创建Topic

@Configuration
public class CreateTopic {
    // 创建一个名为log的Topic并设置分区数为4,分区副本数为2
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("log",4, (short) 2 );
    }
}

生产者

@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
@Slf4j
public class KafkaController {
    //类似于RabbitTemplate
    private final KafkaTemplate<String, Object> kafkaTemplate;
    // 发送消息
    @GetMapping("/{message}")
    public void sendMessage(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("log", normalMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }
}

消费者

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"log"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

自定义工具类

@Component
@RequiredArgsConstructor
public class KafkaUtil {

    @Value("${spring.kafka.bootstrap-servers}")
    private String springKafkaBootstrapServers;

    private AdminClient adminClient;

    private final KafkaTemplate kafkaTemplate;


    /**
     * 初始化AdminClient
     * '@PostConstruct该注解被用来修饰一个非静态的void()方法。
     * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
     * PostConstruct在构造函数之后执行,init()方法之前执行。
     */
    @PostConstruct
    private void initAdminClient() {
        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
        adminClient = KafkaAdminClient.create(props);
    }

    /**
     * 新增topic,支持批量
     */
    public void createTopic(Collection<NewTopic> newTopics) {
        adminClient.createTopics(newTopics);
    }

    /**
     * 删除topic,支持批量
     */
    public void deleteTopic(Collection<String> topics) {
        adminClient.deleteTopics(topics);
    }

    /**
     * 获取指定topic的信息
     */
    public String getTopicInfo(Collection<String> topics) {
        AtomicReference<String> info = new AtomicReference<>("");
        try {
            adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
                for (TopicPartitionInfo partition : description.partitions()) {
                    info.set(info + partition.toString() + "\n");
                }
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return info.get();
    }

    /**
     * 获取全部topic
     */
    public List<String> getAllTopic() {
        try {
            return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return Lists.newArrayList();
    }

    /**
     * 往topic中发送消息
     */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

概念

为什么吞吐量大、速度快

顺序读写

不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写

基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写

Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。

Page Cache

为了优化读写性能,Kafka利用了操作系统本身的Page Cache页高速缓冲存储器,简称页高缓),就是利用操作系统自身的内存而不是JVM空间内存

  • 避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
  • 避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题

通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。

零拷贝

linux操作系统 “零拷贝” 机制使用了sendfile方法, 允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。

通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。

Kafka使用了零拷贝技术,也就是直接将数据从内核空间读缓冲区直接拷贝到内核空间的socket缓冲区,然后再写入到NIC缓冲区,避免了在内核空间和用户空间之间穿梭。

这里的零拷贝并非指一次拷贝都没有,而是避免了在内核空间和用户空间之间的拷贝。

分区分段+索引

Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。

通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。

为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

批量读写批量压缩

Kafka数据读写也是批量的而不是单条的。

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。

  • 如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
  • Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
  • Kafka支持多种压缩协议,包括GzipSnappy压缩协议

Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

您需要 登录账户 后才能发表评论

取消回复欢迎 发表评论:

关灯