Kafka 安装与启动

1. 下载代码

下载 2.3.0 版本并解压缩:

tar -zxvf kafka_2.12-2.3.0.tgz -C .

创建软连接便于升级:

ln -s kafka_2.12-2.3.0/ kafka

配置环境变量:

export KAFKA_HOME=/Users/smartsi/opt/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

2. 安装ZooKeeper

Kafka 依赖 ZooKeeper,如果你还没有 ZooKeeper 服务器,你需要先启动一个 ZooKeeper 服务器。可以先参考ZooKeeper 安装与启动来安装 ZooKeeper。ZooKeeper 配置如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/smartsi/opt/zookeeper/data
clientPort=2181
server.1=localhost:2888:3888

你也可以通过与 kafka 打包在一起的便捷脚本来快速简单地创建一个单节点 ZooKeeper 实例:

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

3. 配置Kafka

第一个 broker 配置 server-9092.properties 如下:

broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

运行起来至少要配置四项。上面的前四项。

第二个 broker 配置 server-9093.properties 如下:

broker.id=1
listeners=PLAINTEXT://127.0.0.1:9093
log.dirs=/Users/smartsi/opt/kafka/logs/log-9093
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

第三个 broker 配置 server-9094.properties 如下:

broker.id=2
listeners=PLAINTEXT://127.0.0.1:9094
log.dirs=/Users/smartsi/opt/kafka/logs/log-9094
zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有都在同一个端口注册,或者覆盖彼此的数据。所以用端口号9092、9093、9094分别代表三个 broker。

下面具体解释一下我们的配置项:

(1) Broker相关:

broker.id=0

broker 的 Id。每一个 broker 在集群中的唯一标示,要求是正数。每个 broker 都不相同。

(2) Socket服务设置:

listeners=PLAINTEXT://127.0.0.1:9092

Socket服务器监听的地址,如果没有设置,则监听 java.net.InetAddress.getCanonicalHostName() 返回的地址。

(3) ZooKeeper相关:

zookeeper.connect=localhost:2181/kafka-2.3.0
zookeeper.connection.timeout.ms=6000

zookeeper.connect 是一个逗号分隔的 host:port 键值对,每个对应一个 zk 服务器。例如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。你还可以将可选的客户端命名空间 Chroot 字符串追加到 URL 上以指定所有 kafka 的 Znode 的根目录。另外这个 kafka-2.3.0 这个节点需要你提前建立。让 Kafka 把他需要的数据结构都建立在这个节点下,否则会建立在根节点 / 节点下。

(3) 日志相关:

log.dirs=/Users/smartsi/opt/kafka/logs/log-9092

Kafka存储Log的目录。

4. 启动Kafka服务器

有两种方式可以启动 Kafka 服务器:

# 第一种方式(推荐)
bin/kafka-server-start.sh -daemon config/server.properties
# 第二种方式
nohup bin/kafka-server-start.sh config/server.properties &

我们以第一种方式启动 Kafka 服务器:

bin/kafka-server-start.sh -daemon config/server-9092.properties
bin/kafka-server-start.sh -daemon config/server-9093.properties
bin/kafka-server-start.sh -daemon config/server-9094.properties

查看进程和端口:

smartsi:kafka smartsi$ jps
8914 DataNode
42802 Jps
9252 NodeManager
41253 Kafka
41541 Kafka
42790 Kafka
41670 ZooKeeperMain
16731
9164 ResourceManager
1997

我们现在看一下 Kafka 在 ZooKeeper 上创建的节点:

[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

看一下我们在ZooKeeper上注册的两个 broker:

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids
[0, 1, 2]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4}
cZxid = 0x92
ctime = Mon Sep 02 10:08:41 CST 2019
mZxid = 0x92
mtime = Mon Sep 02 10:08:41 CST 2019
pZxid = 0x92
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560012
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4}
cZxid = 0xa7
ctime = Mon Sep 02 10:08:48 CST 2019
mZxid = 0xa7
mtime = Mon Sep 02 10:08:48 CST 2019
pZxid = 0xa7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560014
dataLength = 188
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4}
cZxid = 0xbd
ctime = Mon Sep 02 10:19:09 CST 2019
mZxid = 0xbd
mtime = Mon Sep 02 10:19:09 CST 2019
pZxid = 0xbd
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100009088560018
dataLength = 188
numChildren = 0

5. 测试Kafka

5.1 创建Topic

让我们创建一个名为 test 的 Topic,它有一个分区和一个副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test

现在我们可以运行 list 命令来查看这个 Topic:

smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0
test

或者,你也可将代理配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。

5.2 启动生产者

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行 Producer (生产者),然后在控制台输入一些消息以发送到服务器:

smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is my first message
>this is my second message

5.3 启动消费者

Kafka 还有一个命令行 Consumer(消费者),将消息转储到标准输出:

smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is my first message
this is my second message

如果你将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。

欢迎关注我的公众号和博客:

原文:Quickstart

赏几毛白!