[知识总结] 【Kafka】Springboot + Kafka(docker)简单demo
作者:CC下载站 日期:2021-10-29 08:50:00 浏览:83 分类:编程开发
长篇大论的原理概念放后面补充,先把使用放上
部署与使用
这里使用docker部署, Docker Hub上有两个kafka镜像比较流行,一个是wurstmeister/kafka
,另一个是bitnami/kafka
,这里以wurstmeister/kafka
为例:
docker部署
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 先启动zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 假设你的服务器公网ip为192.168.1.1
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
(Topic的创建我们用代码实现)
如果你想进入kafka下:
docker exec -it kafka bash
cd opt/kafka
SpringBoot整合
配置
依赖:(springBoot版本我使用的2.3.1)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置:
spring:
kafka:
# kafka服务器ip和端口号,多个用逗号隔开
bootstrap-servers: 192.168.1.1:9092
# 生产者
producer:
# 如果该值大于零时,表示启用重试失败的发送次数
retries: 0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
# 每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
batch-size: 16384
# 生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 提交延时
linger:
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
ms: 0
# 自定义分区器
# partitioner:
# class: com.xxx.xxx.xx
# 消费者
consumer:
# 用于标识此使用者所属的使用者组的唯一字符串
group-id: log
# 消费者的偏移量将在后台定期提交,默认值为true
enable-auto-commit: true
# 如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: earliest
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties: {session.timeout.ms: 12000, request.timeout.ms: 18000}
# 批量消费每次最多消费多少条消息
# max-poll-records: 50
# 消费端监听的topic不存在时,项目启动会报错(关掉)
listener:
missing-topics-fatal: false
# concurrency: 10
# ack-mode: MANUAL_IMMEDIATE
# poll-timeout: 1500
# 设置批量消费
# type: batch
创建Topic
@Configuration
public class CreateTopic {
// 创建一个名为log的Topic并设置分区数为4,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("log",4, (short) 2 );
}
}
生产者
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
@Slf4j
public class KafkaController {
//类似于RabbitTemplate
private final KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/{message}")
public void sendMessage(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("log", normalMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
}
消费者
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"log"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
自定义工具类
@Component
@RequiredArgsConstructor
public class KafkaUtil {
@Value("${spring.kafka.bootstrap-servers}")
private String springKafkaBootstrapServers;
private AdminClient adminClient;
private final KafkaTemplate kafkaTemplate;
/**
* 初始化AdminClient
* '@PostConstruct该注解被用来修饰一个非静态的void()方法。
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
* PostConstruct在构造函数之后执行,init()方法之前执行。
*/
@PostConstruct
private void initAdminClient() {
Map<String, Object> props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
adminClient = KafkaAdminClient.create(props);
}
/**
* 新增topic,支持批量
*/
public void createTopic(Collection<NewTopic> newTopics) {
adminClient.createTopics(newTopics);
}
/**
* 删除topic,支持批量
*/
public void deleteTopic(Collection<String> topics) {
adminClient.deleteTopics(topics);
}
/**
* 获取指定topic的信息
*/
public String getTopicInfo(Collection<String> topics) {
AtomicReference<String> info = new AtomicReference<>("");
try {
adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
for (TopicPartitionInfo partition : description.partitions()) {
info.set(info + partition.toString() + "\n");
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return info.get();
}
/**
* 获取全部topic
*/
public List<String> getAllTopic() {
try {
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return Lists.newArrayList();
}
/**
* 往topic中发送消息
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
概念
为什么吞吐量大、速度快
顺序读写
不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写
基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写
Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。
Page Cache
为了优化读写性能,Kafka利用了操作系统本身的Page Cache
(页高速缓冲存储器,简称页高缓),就是利用操作系统自身的内存而不是JVM空间内存
- 避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
- 避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题
通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
零拷贝
linux操作系统 “零拷贝” 机制使用了sendfile
方法, 允许操作系统将数据从Page Cache
直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。
通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。
Kafka使用了零拷贝技术,也就是直接将数据从内核空间的读缓冲区直接拷贝到内核空间的socket缓冲区,然后再写入到NIC缓冲区,避免了在内核空间和用户空间之间穿梭。
这里的零拷贝并非指一次拷贝都没有,而是避免了在内核空间和用户空间之间的拷贝。
分区分段+索引
Kafka的message
是按topic
分类存储的,topic
中的数据又是按照一个一个的partition
即分区存储到不同broker
节点。每个partition
对应了操作系统上的一个文件夹,partition
实际上又是按照segment
分段存储的。这也非常符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。
为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
批量读写批量压缩
Kafka数据读写也是批量的而不是单条的。
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
- 如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
- Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
- Kafka支持多种压缩协议,包括
Gzip
和Snappy
压缩协议
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
猜你还喜欢
- 03-29 [编程相关] Winform窗体圆角以及描边完美解决方案
- 03-29 [前端问题] has been blocked by CORS policy跨域问题解决
- 03-29 [编程相关] GitHub Actions 入门教程
- 03-29 [编程探讨] CSS Grid 网格布局教程
- 10-12 [编程相关] python实现文件夹所有文件编码从GBK转为UTF8
- 10-11 [编程算法] opencv之霍夫变换:圆
- 10-11 [编程算法] OpenCV Camshift算法+目标跟踪源码
- 10-11 [Python] python 创建 Telnet 客户端
- 10-11 [编程相关] Python 基于 Yolov8 + CPU 实现物体检测
- 03-15 [脚本工具] 使用go语言开发自动化脚本 - 一键定场、抢购、预约、捡漏
- 01-08 [编程技术] 秒杀面试官系列 - Redis zset底层是怎么实现的
- 01-05 [编程技术] 《Redis设计与实现》pdf
取消回复欢迎 你 发表评论:
- 精品推荐!
-
- 最新文章
- 热门文章
- 热评文章
[书籍] 【帛书版】合集
[老照片] 一万张珍贵历史老照片【jpg 40.4GB】
[素材] 2024新年春节烟花素材合集【PSD格式+PNG格式】
[美剧] 《生活大爆炸》S01-S12季合集 【1080P 蓝光原盘REMUX】 DTS-HD.MA.5.1 【外挂简英双语字幕】 742.8G
[电影] 茶馆(1982)蓝光原盘REMUX 内封简繁英.简中简繁四字幕【33.9G】本片根据老舍同名原著改编
[电视剧] 永夜星河(2024)【4K 2160P 杜比音效】国语中字【全32集完结】爱情,古装 又名 :黑莲花攻略手册
[影视合集] 《霍比特人》三部曲加长版合集 【4K 蓝光 HDR】 TrueHD.7.1 国语次世代+导评 【国配简繁英特效+导评中字五字幕】134G
[课程] 2024邓诚高三数学视频课【MP4 12.2GB】
[电视剧] 宿敌(2024)【完结】【4K / 臻彩视听 / 杜比音效】【廖凡/朱珠】【17.8G】
[影视合集] 【鹿鼎记 7个版本合集】【1984-2020】【4K、1080P、720P】【中文字幕】【278.5G】
[书籍] 彭子益医书合集 [PDF/DOC]
[动画] 2002《火影忍者》720集全【4K典藏版】+11部剧场版+OVA+漫画 内嵌简日字幕
[剧集] 《斯巴达克斯》1-4季合集 无删减版 1080P 内嵌简英特效字幕
[电影] 《变形金刚系列》七部合集 [4K HDR 蓝光] 国英双语音轨 [内封精品特效字幕]【典藏版】235G
[CG剧情] 《黑神话:悟空》158分钟CG完整剧情合集 4K120帧最高画质
[动画] 收藏版:1996-2024年名侦探柯南全系列1080P,含国配、日配双语版+26部剧场作品
[游戏] 黑神话悟空离线完整版+修改器
[电影] 《神奇动物在哪里三部合集》 4K REMUX原盘 [杜比视界] [国英双语音轨] 特效字幕 [171.1G]
[动画] 西游记 (1999) 动画版 4K 全52集 高清修复版 童年回忆
[电影] 我的阿勒泰 (2024) 4K内封简繁 全8集 9.57G
[电影] 《黄飞鸿》全系列合集
[Android] 开罗游戏 ▎像素风格的模拟经营的游戏厂商安卓游戏大合集
[游戏合集] 要战便战 v0.9.107 免安装绿色中文版
[电影] 【珍藏版】20世纪电影合集从1922年到1990年代,看看爷爷辈的电影是什么样合集约212G
[书籍] 彭子益医书合集 [PDF/DOC]
[系统]【黑果小兵】macOS Big Sur 11.0.1 20B50 正式版 with Clover 5126 黑苹果系统镜像下载
[美图] 【经典收藏美图集合】1500多张韩国美女高清图片让你的收藏夹更加丰富多彩
[瓜] 青岛【路虎女】插队、逆行、追尾、打人未删减【完整版视频】
[电视剧] 灵魂摆渡(1-3季合集)【未删减】【4K.无水印】【剧情/恐怖/惊悚】【豆瓣8.7】
[书籍资料] 《玉房秘诀》《玉房秘典》《古代房中术》
- 最新评论
-
电影很不错谢谢分享贪睡的猫 评论于:11-18 一部不错的经典科幻kelvin 评论于:11-13 找了好久的资源,终于在这里找到了。感谢本站的资源和分享。谢谢285552528 评论于:11-09 找了好久的资源bjzchzch12 评论于:11-07 谢谢分享感谢ppy2016 评论于:11-05 谢谢分享感谢ppy2016 评论于:11-05 有靳东!嘻嘻奥古斯都.凯撒 评论于:10-28 流星花园是F4处女作也是4人集体搭配的唯一一部!奥古斯都.凯撒 评论于:10-28 找了好久的资源,终于在这里找到了。感谢本站的资源和分享。谢谢AAAAA 评论于:10-26 找了好久的资源,终于在这里找到了。感谢本站的资源和分享。谢谢password63 评论于:10-26
- 热门tag