查看: 2082|回复: 0

[Java代码] 消息队列专题之 Kafka

发表于 2018-2-1 08:00:03
Kafka 特点

Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:

同时为发布和订阅提供高吞吐量
Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。

消息持久化
将消息持久化到磁盘,因此可用于批量消费,例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。

分布式
支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。

消费消息采用 pull 模式
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。

支持 online 和 offline 的场景。
同时支持离线数据处理和实时数据处理。 Kafka 中的基本概念

Broker
Kafka 集群中的一台或多台服务器统称为 Broker

Topic
每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同
Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)

Partition
Topic 物理上的分组,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)

Producer
消息和数据的生产者,可以理解为往 Kafka 发消息的客户端

Consumer
消息和数据的消费者,可以理解为从 Kafka 取消息的客户端

Consumer Group
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。
这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。 Kafka 安装

Mac 用户用 HomeBrew 来安装,安装前要先更新 brew

  1. brew update
复制代码

接着安装 kafka

  1. brew install kafka
复制代码

安装完成之后可以查看 kafka 的配置文件

  1. cd /usr/local/etc/kafka
复制代码

kafka
我的电脑通过 HomeBrew 安装的 kafka 位置在 /usr/local/Cellar/kafka/0.11.0.1/bin ,可以看到 HomeBrew 安装下来的 kafka 的版本已经是 0.11.0.1 的了。

kafka 需要用到 zookeeper,HomeBrew 安装kafka 的时候会同时安装 zookeeper。下面先启动 zookeeper:

  1. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
复制代码

接着启动 kafka

  1. cd /usr/local/Cellar/kafka/0.11.0.1
  2. ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
复制代码

创建 topic,设置 partition 数量为2,topic 的名字叫 test-topic,下面的例子都用这个 topic

  1. cd /usr/local/Cellar/kafka/0.11.0.1
  2. ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
复制代码

查看创建的 topic

  1. cd /usr/local/Cellar/kafka/0.11.0.1
  2. ./bin/kafka-topics --list --zookeeper localhost:2181
复制代码
Kafka 命令行测试

发送消息

  1. cd /usr/local/Cellar/kafka/0.11.0.1/bin
  2. kafka-console-producer --broker-list localhost:9092 --topic test-topic
复制代码

消费消息

  1. cd /usr/local/Cellar/kafka/0.11.0.1/bin
  2. kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
复制代码

删除 topic

  1. cd /usr/local/Cellar/kafka/0.11.0.1/bin
  2. ./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic
复制代码

如果 kafka 启动时加载的配置文件中 server.properties 没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把 topic 标记为:marked for deletion

可以通过命令来查看所有 topic

  1. cd /usr/local/Cellar/kafka/0.11.0.1/bin
  2. ./bin/kafka-topics --zookeeper localhost:2181 --list
复制代码

如果想真正删除它,可以如下操作

  1. 登录zookeeper客户端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli
  2. 找到topic所在的目录:ls /brokers/topics
  3. 找到要删除的topic,执行命令:rmr /brokers/topics/test-topic 即可,此时topic被彻底删除
复制代码
Java 客户端访问 maven工程的pom文件中添加依赖
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.11.0.1</version>
  5. </dependency>
复制代码
消息生产者
  1. package org.study.kafka;
复制代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class ProducerSample {

  1. public static void main(String[] args) {
  2. Map<String, Object> props = new HashMap<String, Object>();
  3. props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址
  4. props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
  5. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. String topic = "test-topic";
  10. Producer<String, String> producer = new KafkaProducer<String, String>(props);
  11. producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));
  12. producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));
  13. producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));
  14. producer.close();
  15. }
复制代码

}

  1. 3. 消息消费者
复制代码

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {

  1. public static void main(String[] args) {
  2. String topic = "test-topic";// topic name
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
  5. props.put("group.id", "testGroup1");// Consumer Group Name
  6. props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
  7. props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒
  8. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  10. Consumer<String, String> consumer = new KafkaConsumer(props);
  11. consumer.subscribe(Arrays.asList(topic));
  12. while (true) {
  13. ConsumerRecords<String, String> records = consumer.poll(100);
  14. for (ConsumerRecord<String, String> record : records)
  15. System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
  16. }
  17. }
复制代码

}

  1. 4. 启动 zookeeper
复制代码

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

  1. 5. 启动 kafka 服务器
复制代码

kafka-server-start /usr/local/etc/kafka/server.properties

  1. 6. 运行 Consumer
  2. 先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
  3. 7. 运行 Producer
  4. 运行 Producer,发布几条消息,在 Consumer 的控制台能看到接收的消息
  5. ![Consumer 控制台](http://upload-images.jianshu.io/upload_images/5015984-a942182a87cb3929.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  6. # Kafka 集群配置
  7. kafka 的集群配置一般有三种,即: single node - single broker ,single node - multiple broker ,multiple node - multiple broker
  8. 前两种实际上[官网有介绍](http://kafka.apache.org/0110/documentation.html#quickstart)。
  9. - single node - single broker
  10. ![单节点单 broker ](http://upload-images.jianshu.io/upload_images/5015984-a1c5f682e7532ffe.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  11. 1. 启动 zookeeper
复制代码

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

  1. 2. 启动 kafka broker
复制代码

kafka-server-start /usr/local/etc/kafka/server.properties

  1. 3. 创建一个 kafka topic
复制代码

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker

  1. 4. 启动 producer 发送信息
复制代码

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker

  1. broker-list 和 topic 这两个参数是必须的,broker-list 指定要连接的 broker 的地址,格式为 node_address:port 。topic 是必须的,因为需要发送消息给订阅了该
  2. topic 的 consumer group 。
  3. 现在可以在命令行里输入一些信息,每一行会被作为一个消息。
  4. ![发送消息](http://upload-images.jianshu.io/upload_images/5015984-9767c36f6b425e68.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  5. 5. 启动 consumer 消费消息
复制代码

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker

  1. 在不同的终端窗口里分别启动 zookeeper、broker、producer、consumer 后,在
  2. producer 终端里输入消息,消息就会在 consumer 终端中显示了。
  3. ![消息显示](http://upload-images.jianshu.io/upload_images/5015984-197223d8d1f23b22.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  4. - single node - multiple broker
  5. ![单节点多 broker ](http://upload-images.jianshu.io/upload_images/5015984-a56e71d74199ab10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  6. 1. 启动 zookeeper
复制代码

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

  1. 2. 启动broker
  2. 如果需要在单个节点(即一台机子)上面启动多个 broker(这里作为例子启动三个 broker),需要准备多个server.properties文件即可,所以需要复制 /usr/local/etc/kafka/server.properties 文件。因为需要为每个 broker 指定单独的属性配置文件,其中 broker.id 、 port 、 log.dir 这三个属性必须是不同的。
  3. 新建一个 kafka-example 目录和三个存放日志的目录
复制代码

mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3

  1. 复制 /usr/local/etc/kafka/server.properties 文件三份
复制代码

cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties

  1. 在 broker1 的配置文件 server-1.properties 中,相关要修改的参数为:
复制代码

broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1

  1. broker2 的配置文件 server-2.properties 中,相关要修改的参数为:
复制代码

broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2

  1. broker3 的配置文件 server-3.properties 中,相关要修改的参数为:
复制代码

broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3

  1. 启动每个 broker
复制代码

cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties

  1. 3. 创建 topic
  2. 创建一个名为 topic-singlenode-multiplebroker 的topic
复制代码

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker

  1. 4. 启动 producer 发送信息
  2. 如果一个 producer 需要连接多个 broker 则需要传递参数 broker-list
复制代码

kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker

  1. 5. 启动 consumer 消费消息
复制代码

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker

  1. ![单节点多 broker 消费消息](http://upload-images.jianshu.io/upload_images/5015984-fa8dcd2c131a5692.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  2. - multiple node - multiple broker
  3. ![多节点多 broker](http://upload-images.jianshu.io/upload_images/5015984-e7c461fb33427a73.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  4. 在多节点多 broker 集群中,每个节点都需要安装 Kafka,且所有的 broker 都连接到同一个 zookeeper 。这里 zookeeper 当然也是可以配置成集群方式的,具体步骤参见我之前写的[搭建 zookeeper 集群](https://www.jianshu.com/p/286079c634fb)。
  5. 1. Kafka 的集群配置
复制代码

broker.id=1 #当前机器在集群中的唯一标识
port=9093 #当前 kafka 对外提供服务的端口,默认是 9092
host.name=192.168.121.101 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #消息存放的目录,这个目录可以配置为逗号分割的表达式
zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #设置 zookeeper 集群的连接端口

num.network.threads=3 #这个是 borker 进行网络处理的线程数
num.io.threads=5 #这个是 borker 进行 IO 处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区的大小,数据先回存储到缓冲区了到达一定的大小后在发送能提高性能
socket.receive.buffer.bytes=102400 #接收缓冲区的大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向 kafka 请求消息或者向 kafka 发送消息的请求的最大数,这个值不能超过 jvm 的堆栈大小
num.partitions=1 #默认的分区数,一个 topic 默认1个分区数
log.retention.hours=24 #默认消息的最大持久化时间,24小时
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka 保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是因为 kafka 的消息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新建一个文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=24 ),到目录查看是否有过期的消息如果有则删除
log.cleaner.enable=false #是否启用 log 压缩,一般不用启用,启用的话可以提高性能

  1. 由于是多节点多 broker 的,所以每个 broker 的配置文件 server.properties 都要按以上说明修改
  2. 2. producer 的配置修改
复制代码

kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker

  1. 3. consumer 的配置修改
复制代码

kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker

  1. # Kafka 高可靠性配置
  2. Kafka 提供了很高的数据冗余弹性,对于需要数据高可靠性的场景可以增加数据冗余备份数(replication.factor),调高最小写入副本数的个数(min.insync.replicas)等等,但是这样会影响性能。反之,性能提高而可靠性则降低,用户需要自身业务特性在彼此之间做一些权衡性选择。
  3. 要保证数据写入到 Kafka 是安全的、高可靠的,需要如下的配置:
  4. 1. topic 的配置
  5. replication.factor>=3,即副本数至少是3个2<=min.insync.replicas<=replication.factor
  6. 2. broker 的配置
  7. leader 的选举条件 unclean.leader.election.enable=false
  8. 3. producer 的配置
  9. request.required.acks=-1,producer.type=sync
复制代码


回复

使用道具 举报