Kafka
kafka
当前kafka版本为2.4.0
慎用apachecn.org
建议不要图方便使用国内中文网站,这个网站内容停留在1.0版本, 会导致客户端、服务端、文档等多头不一致问题,出现莫名其妙的问题;直接使用官方网站即可;
--bootstrap-server
指定域名出错却打印127.0.0.1的问题
关于使用源码配置config/server.properties
启动起来的服务,
在远端使用脚本客户端./bin/kafka-topics.sh --bootstrap-server kafka.qidizi.com:9092 --list
连接,
出错时打印127.0.0.1:9092
这种莫名问题,经过半天研究发现,它是受某配置影响:
如果服务端的config/server.properties
没有指定#advertised.listeners=PLAINTEXT://kafka.qidizi.com:9092
,在通过
--bootstrap-server kafka.qidizi.com:9092
连接时,返回的node地址将是listeners=PLAINTEXT://10.10.10.10:9092
设置
解析出来的值,因为它的值是局域网的ip,肯定连接不上;但是使用弃用的--zookeeper <String: hosts>
指定却能正常使用;
出错时的提示如下:
[2020-02-28 17:12:32,429] WARN [AdminClient clientId=adminclient-1] Connection to node 0 (/10.10.10.10:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
kafka的php扩展
它叫rdkafka,通过pecl search kafka得到全名后,换成install指令安装即可,若出错,查看提示,可能是需要先安装librdkafka,若是mac,使用brew安装这个依赖库; php扩展文档的在这,但是它的文档写得比较差,比如set的可用列表都没有提及,需要参考它的依赖库的文档,所以建议结合kafka文档、库文档、php扩展文档才能搞明白用法,建议先使用kafka的bin目录下带的sh脚本来测试搞明白之后再使用;
// 2天研究理解写的php示例代码
/*
// https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-kafkaconsumer.offsetsfortimes.html 用例
$a = new \RdKafka\TopicPartition(self::TOPIC_NAME,0,0) ;
$a->setOffset(1583258770007); 使用该消息的时间取它的offset,这个值=== public $timestamp =>
int(1583259283584),差一秒都不行,它查找方式是通过 === 来查找,然后返回这个offset的,所以你若理解为随便填写个时间,它会返回最
* 接近那个消息,就错了,正确的应该是先找到这个offset所对应的消息的timestamp,然后赋值
$ar = $kf->offsetsForTimes([
1=>$a
],1000);
var_dump($a,$ar);exit;
*/
<?php
class kafka_test
{
const BROKER = 'qidizi.com:9092';
const TOPIC_NAME = 'topic1';
static public function topic_list()
{
echo "\n打开队列所有主题:\n\n";
$conf = new \RdKafka\Conf();
// set 的参数列表在此 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$conf->set('log_level', (string)LOG_NOTICE);
//$conf->set('debug', 'all');
// 指定服务集地址
$conf->set('bootstrap.servers', self::BROKER);
//var_dump($conf->dump());exit;
// metadata 不能直接实例化,必须使用该类的2个子类来实例化
$kf = new RdKafka\Producer($conf);
$md = $kf->getMetadata(true, null, 10e3);
$result = $md->getTopics();
var_dump($result);
}
static public function add_msg()
{
echo "\n添加一些消息\n\n";
echo "\n可以使用如下命令来监听消息队列是否有它:\n
./bin/kafka-console-consumer.sh --bootstrap-server qidizi.com:9092 --topic topic1
\n\n";
$conf = new \RdKafka\Conf();
// set 的参数列表在此 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$conf->set('log_level', (string)LOG_NOTICE);
// 若结果未预期,可以打开这个查看交互详情来解决问题
// $conf->set('debug', 'all');
// 注意这个用法是错误的,它是用在消费端
// $conf->set('bootstrap.servers', self::BROKER);
// 增加集群,也可以在创建实例后使用addBrokers在运行时创建
$conf->set('metadata.broker.list', self::BROKER);
$kf = new RdKafka\Producer($conf);
// 这是另外一个增加集群的方式
//$kf->addBrokers(self::BROKER);
// 选定一个主题,如果不存在会自动创建;当然服务上的设置允许才行;
$topic = $kf->newTopic(self::TOPIC_NAME);
// 如果队列满了,行为未明;RD_KAFKA_MSG_F_BLOCK 是阻塞到有位或是超时
$on_full_undefine = 0;
$msg = 'msg_' . date_format(date_create(), 'Y-m-d H:i:s u ') . __FILE__;
$msg_id = uniqid('', true);
// 内容存放到自动分区中
$topic->produce(RD_KAFKA_PARTITION_UA, $on_full_undefine, $msg, $msg_id);
// 发送事件消息,比如使用之类的钩子挂载了处理fun Delivery report callbacks RdKafka\Conf::setDrMsgCb() [producer]
$kf->poll(0);
// 等待消息入队成功才退出进程,否则可能会面临消息丢失风险
$result = $kf->flush(10e3);
echo '消息入队执行:' . (RD_KAFKA_RESP_ERR_NO_ERROR === $result ? '成功' : '失败');
}
static public function get_msg()
{
echo "\n获取消息\n\n";
echo "\n可以使用如下命令来生产消息:\n
./bin/kafka-console-producer.sh --broker-list qidizi.com:9092 --topic topic1
\n\n";
$conf = new \RdKafka\Conf();
// set 的参数列表在此 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$conf->set('log_level', (string)LOG_NOTICE);
// 若结果未预期,可以打开这个查看交互详情来解决问题
// $conf->set('debug', 'all');
// 注意这个用法是错误的,它是用在消费端
// $conf->set('bootstrap.servers', self::BROKER);
// 增加集群,也可以在创建实例后使用addBrokers在运行时创建
$conf->set('metadata.broker.list', self::BROKER);
$kf = new RdKafka\Consumer($conf);
// 这是另外一个增加集群的方式
//$kf->addBrokers(self::BROKER);
// 选定一个主题,如果不存在会自动创建;当然服务上的设置允许才行;
// 它返回 RdKafka\ConsumerTopic
$topic = $kf->newTopic(self::TOPIC_NAME);
$partition = 0;
// 从首个分区首个读起
$topic->consumeStart($partition, RD_KAFKA_OFFSET_BEGINNING);
// 读取次数
$max_read = 10;
// 读取间歇
$sleep_sec = 2;
$partition = 0;
$read_timeout = 1e3;
while ($max_read > 0) {
$msg = $topic->consume($partition, $read_timeout);
// todo 需要考虑读取超时
if (null === $msg or $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
echo "可能是没有消息了\n";
break;
}
if ($msg->err) {
echo "读取出错了,稍候再试:", $msg->errstr(), "\n";
sleep($sleep_sec);
continue;
}
echo "消息如下,稍候会再读取\n", $msg->payload, "\n";
sleep($sleep_sec);
}
}
}
kafka_test::get_msg();
很简单的一个控制脚本
#!bash
# kafka的控制脚本
if [[ "$UID" -ne "$(id -ur kafka)" ]];then
# 必须用用户 kafka 来运行
echo ""
echo "切换成用户 kafka 来运行本脚本..."
echo ""
sudo -u kafka bash "$0" $1
exit 0;
fi
kafka_dir="/var/web/kafka/kafka_2.13-2.4.0/"
cd $kafka_dir
echo "kafka相关控制脚本 by qidizi"
echo ""
input="$1"
echo ""
cmd=("")
function add_cmd() {
len="${#cmd[*]}"
cmd[$len]=$2
printf "%-4s %s\n" "${len}" "${1}"
}
function server_status() {
echo ""
echo "kafka运行状态:"
echo ""
netstat -anp -t |grep 49092
echo ""
ps aux |grep "config\/server\.properties"
echo ""
echo ""
echo ""
echo "zookeeper运行状态:"
netstat -anp -t |grep 42181
echo ""
ps aux|grep "config\/zookeeper\.properties"
echo ""
}
function server_start(){
echo ""
echo "启动服务..."
ps aux|grep "config\/server\.properties"
if [[ "$?" -eq "0" ]];then
echo "服务已在运行"
echo ""
exit 0
fi
ps aux|grep "config\/zookeeper\.properties"
if [[ "$?" -eq "0" ]];then
echo "zookeeper还在运行,请先停止"
echo ""
exit 0
fi
echo "启动zookeeper服务..."
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
if [[ "$?" -ne "0" ]];then
echo ""
echo ""
echo "zookeeper启动失败"
exit 0
fi
echo ""
echo "zookeeper启动成功"
echo ""
echo "启动kafka服务..."
./bin/kafka-server-start.sh -daemon ./config/server.properties
if [[ "$?" -ne "0" ]];then
echo ""
echo ""
echo "kafka启动失败"
exit 0
fi
echo ""
echo "kafka启动成功"
}
function server_stop(){
echo ""
echo "停止服务..."
ps aux|grep "config\/server\.properties"
if [[ "$?" -eq "0" ]];then
echo ""
echo "停止kafka服务..."
./bin/kafka-server-stop.sh ./config/server.properties
if [[ "$?" -ne "0" ]];then
echo ""
echo ""
echo "kafka停止失败"
exit 0
fi
echo ""
echo "kafka停止成功"
echo "如果服务被使用中,它需要保存数据才会退出,所以需要一定时间才消失"
echo ""
else
echo ""
echo "kafka未运行"
echo ""
fi
ps aux|grep "config\/zookeeper\.properties"
if [[ "$?" -eq "0" ]];then
echo "停止zookeeper服务..."
./bin/zookeeper-server-stop.sh ./config/zookeeper.properties
if [[ "$?" -ne "0" ]];then
echo ""
echo ""
echo "zookeeper停止失败"
exit 0
fi
echo ""
echo "zookeeper停止成功"
echo ""
else
echo ""
echo "zookeeper未运行"
echo ""
fi
}
add_cmd '查看服务运行状态' 'server_status';
add_cmd '启动服务' 'server_start';
add_cmd '停止服务' 'server_stop';
if [[ -z "${input}" ]];then
read -r -t 10 -p "请输入数字:" input
fi
if [[ "${cmd[${input}]}" == "" ]]; then
echo "选择有误"
exit 0
fi
printf "执行\n%s" "${cmd[${input}]}"
echo ""
echo ""
${cmd[$input]}
单机,无安全授权的kafka服务配置
# cat config/server.properties
# 本配置没有考虑优化
# 服务与客户连接
listeners=broker://127.0.0.1:9093,client://127.0.0.1:9092
advertised.listeners=broker://127.0.0.1:9093,client://127.0.0.1:9092
inter.broker.listener.name=broker
# 协议别称
listener.security.protocol.map=client:PLAINTEXT,broker:PLAINTEXT
# 证书配置未启用,关于ssl的配置不会生效
ssl.keystore.location=/data/kafka_home/kafka_2.13-2.4.0/cert/kafka.server.keystore
ssl.keystore.password=qidizi_kafka
ssl.key.password=qidizi_kafka
ssl.truststore.location=/data/kafka_home/kafka_2.13-2.4.0/cert/kafka.trust.ca.keystore
ssl.truststore.password=qidizi_kafka
ssl.endpoint.identification.algorithm=PLAINTEXT
ssl.client.auth=none
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
# 相对于多台集群,当前服务在集群中唯一的id,必须人工防止冲突
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka_home/logs_kafka
# 每个topic存放到多少个分区中,按kafka,建议是与硬盘个数相同
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:42181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
简单php守护消费实现
当前代码环境前提:
kafka_2.13-2.4.0
PHP 7.2.24 (cli) (built: Nov 4 2019 10:23:08) ( NTS )
librdkafka version (build) => 1.3.0.255
php rdkafka version => 4.0.3
<?php
namespace project\core\cli\run;
use Exception;
use project\core\cli\start;
use project\core\includes\config;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\TopicPartition;
/**
* 无序队列 {@see config::KAFKA_TOPIC_ASYNC_TASK} 消费者(守护式),功能:
* 从队列中拉取消息,判断该消息由哪个php负责,通过linux的后台进程启动式启动该php后,
* 标志该消息在本组中已消费,然后就不管该php执行结果(该php自行维护消费情况),继续提取后续消息重复以上步骤;
* 如果本php挂掉,会继续从挂前保持的offset继续消息(挂掉/重启后立刻启动由supervisord实现,正常退出不会被启动)
*
* 执行代码方式
* /usr/local/bin/php core/cli/start.php core/cli/run/fork_php_worker.php
*
* 检查需要后台执行的任务(如从kafka中),fork php进程来处理
*
* 删除 主题 ../../kafka_2.13-2.4.0/bin/kafka-topics.sh
* --bootstrap-server t.t.1bom.net:49092 --topic async_task --delete
*
* 监视主题
*
* ~/web/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh
* --bootstrap-server kafka.qidizi.com:49092 --topic async_task
*
* 给本文件发送 sigterm 信号
*
* kill -s SIGTERM "$(ps aux|grep 'fork_php_worker\.php'|awk '{print $2}')"
*
* TODO 注意
* 如果主题不存在,启动本进程,然后再启动生产者并生产了数据,本进程依然无法获得数据,需要重启,
* 意思就是必须保证该主题存在,本问题未细化研究做处理
*
*
*
*/
class fork_php_worker extends start
{
// 收到退出信号
private static string $exit_signal = '';
// 是否在sleep中
private bool $sleeping = false;
private KafkaConsumer $consumer;
static int $start_time = 0;
// 最大php并发数
const MAX_RUNNING = 100;
// fork php时加入特殊字符,方便ps aux|grep 时进行统计
const PS_GREP_SIGNAL = "'fork_php_child=1'";
/**
* 测试fork 是否成功
*/
protected function run()
{
self::$start_time = microtime(true);
// 有信号,立即回调信号处理func(这个执行是异步的)
pcntl_async_signals(true);
// 关闭信号
pcntl_signal(SIGTERM, ['self', 'signal_handler']);
// 用户自定义信号
pcntl_signal(SIGUSR1, ['self', 'signal_handler']);
pcntl_signal(SIGUSR2, ['self', 'signal_handler']);
// 重启信号
pcntl_signal(SIGHUP, ['self', 'signal_handler']);
// 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出,用于通知前台进程组终止进程。
pcntl_signal(SIGINT, ['self', 'signal_handler']);
// 虽然上方注册了信息处理方法,但是关于退出本php的信号捕捉,大概有几种方式
// 首先考虑几个问题:php是顺序执行的,那么正在执行其它代码时,收到信号怎么办?答案是收到信号,是先存放到信号池中
// 如果使用了sleep方法,收到信号会怎么样?进程立刻会恢复进入后续可继续执行的特殊方法,比如echo 之类在收到
// SIGTERM 信号并不会再解释了,但是 pcntl_signal_dispatch 却会执行;所以具体哪些会执行需要了解;
// 那问题来了,收到了信号,应该什么时候执行?
// 用 declare(ticks=1){/* 其它代码 */},这个方法是每解析{}中的“一行"代码(比如$a=1;算一行,$a=1 and $b=2;算一行),
// 就会去执行一次{}的“其它代码“,那你就可以把exit之类放到这里了,当然要配合收信号处理,但是它有一个性能问题:每一行都去跑一通;
// 二,使用 pcntl_signal_dispatch();,就是把它放到sleep后面,它就会看信号池中是否有信号,
// 然后call相应的pcntl_signal绑定方法来处理;
// 三,使用 pcntl_async_signals(true);,这个是php7.1才支持,它意思就是,只要有信号发生,立刻执行处理func,注意它最多也只是能把信号放入php可以接触到的信号池中,并不能干扰php串行解析器当前正在执行的代码:比如你想判断当前php代码查库中是否应该停止,这种事情应该是业务代码需要考虑的,当前最多只能设置类中的“是否有停止信号“标志成真,至于业务怎么使用,是业务的事,
// 不管解析器当前处于执行哪个代码
// 比如系统关机,会在先发出所有“请立刻终止执行。。。否则我就强制关闭你“信号后等待n久,立刻直接使用sigKill,就是-9,
// 来强制退出,这种信号是不可以捕捉和延后处理的(比如等我操作完再干掉我)。
// 任务为空时,暂停时间
$empty_to_sleep_sec = 5;
// 并发数不足暂停秒数
$running_full_sleep_sec = 5;
self::echo_block('start ' . date('Y-m-d H:i:s ') . __FILE__);
while (true) {
if (self::$exit_signal) {
// Close down the KafkaConsumer.
// This call will block until the consumer has revoked its assignment,
// calling the rebalance_cb if it is configured, committed offsets to broker,
// and left the consumer group.
// The maximum blocking time is roughly limited to session.timeout.ms.
if (!empty($this->consumer)) {
// TODO 待解决的异常,已经上报bug,关注及时修复
// https://github.com/arnaud-lb/php-rdkafka/issues/357
// https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination
// 提到在销毁前,不用退订
//$this->consumer->unsubscribe();
// 目前了解没有明确的关闭机制, https://github.com/arnaud-lb/php-rdkafka/issues/75
$this->consumer->close();
// 通过发信号方式,加快kafka终止;目前这个机制不是很清楚,只是见issues中有提到放进来,暂未细研究
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// 根据讨论,只有这样,才能触发destroy;暂时也没做实际跟踪测试
unset($this->consumer);
}
// 捕获退出信号,正常退出,注意 supervisord 的autorestart设置必须正确,否则本php又被立即启动
// 见 http://supervisord.org/configuration.html#program-x-section-settings
// php正常结束时的exit code === 0,而 supervisord 守护检测到 code === 0,它就不会重新启动本php。
// 当然也可以修改它的 autorestart 为true 达到无条件重启动
// 所以,可以模拟异常异常退出
self::echo_block('exit_0_at_while_by_signal_' . self::$exit_signal);
// 退出并不会影响(终止) fork 的php;因为启动php处理类方式类似于ssh 中后台运行进程
return;
}
if (self::MAX_RUNNING - $this->running_count() <= 0) {
// 剩余并发数不足了,要暂停再试,这种暂停可以短点
self::echo_block('running_full_' . self::MAX_RUNNING);
$this->sleep($running_full_sleep_sec);
continue;
}
// 队列取得一个任务
$result = $this->get_task();
if (is_null($result) or false === $result) {
$this->sleep($empty_to_sleep_sec);
}
self::echo_block('next task');
}
self::echo_block('php_finish');
}
/**
* 注意收到如sigterm信号sleep状态将终止并恢复,剩余的未休眠秒数可以自己用时间来扣减得到
* 对于目前只是休眠担心浪费cpu,这种行为不影响
* @param int $sec
*/
private function sleep($sec = 0)
{
$this->sleeping = true;
sleep($sec);
$this->sleeping = false;
}
public function signal_handler($signal_no, $signal_info)
{
// 注意,当前信号本处理func异步产生信号时立即触发,无需等其它代码阻塞后再执行
// 其它信号可以不理会,只对需要停止的信号打上要停止标志,让业务代码适时停止,这样,信号捕捉处理就这么多
switch ($signal_no) {
case SIGTERM:
self::$exit_signal = "SIGTERM";
break;
case SIGINT:
self::$exit_signal = "SIGINT";
break;
default:
break;
}
// self::echo_block('signal_handler_' . $signal_no . ' ' . var_export($signal_info, true));
if (self::$exit_signal) {
// 你可能会看到这个输出很久了,php才退出
self::echo_block('get ' . self::$exit_signal . ',wait for quit ...');
}
}
/**
* 返回正在进行中的work个数
*
* 经过测试, php运行 1000次 ps aux|grep "php\-fpm" 平均耗时 0.058秒一次;
*
* @return bool|int
*/
private function running_count()
{
$cmd = 'ps aux|grep '
. str_replace('=', '\=', self::PS_GREP_SIGNAL)
. '|wc -l';
exec($cmd, $output, $exit_code);
return 0 === $exit_code ? intval($output[0]) : false;
}
private function fork_process($php_worker = '', $offset = 0)
{
// 类命名空间,大概是 \project\core\cli\run\worker_test2,php_worker 就是test2部分
if (!preg_match("/^\w{1,100}$/", $php_worker)) {
self::echo_line('php_worker后缀名不允许');
return false;
}
// 取当前系统名,因为window不支持本启动方式
$os = php_uname('s');
if (false === strpos(',Linux,Darwin,', ",{$os},")) {
self::echo_block('只支持linux与mac系统');
return false;
}
// 防止 路径包含特殊字符,传入到exec
chdir(dirname(__DIR__));
$params = [
'offset' => $offset,
];
$params = http_build_query(
$params
);
// 这个命令只能在linux下运行,window会出错
$command = [
// 不要挂断
'nohup',
'php',
'start.php',
// 加前缀防止误指向不安全的文件
$php_worker . '.php',
// 用单引号,防止bash发生转换
"'{$params}'",
self::PS_GREP_SIGNAL,
// > /dev/null 2>&1 必须有这句,否则本php会等到子进程结束才返回;
// 注意这个日志文件会不停的增长,除非使用 log rotate 来处理,其实也可以考虑不同的处理类一个日志,
// 因为这些文件不是长期运行的,其实也可以在路径上加上日期,比如按启动时的月份来切割
// 然后实现一个处理基类格式化日志输出方便从日志看出启动的整个流程,毕竟后台启动的php很难调试
'>>./../../cache/' . basename(__FILE__) . '.log',
// 重定向错误到另一个文件
'2>>./../../cache/' . basename(__FILE__) . '.err',
// 后台运行
'&',
// 返回当前进程id
'echo',
'$!'
];
$command = implode(' ', $command);
exec($command, $op, $exit_code);
// 不管运行成功与否,这个值是都是该fork进程的pid
$child_pid = $op[0];
//var_dump($op);
self::echo_block('fork pid ' . $child_pid . ' : ' . $command);
// TODO 这里目前无法判断是否运行成功
return $child_pid;
}
/**
* TODO 未完全了解kafka的消费流程,暂把处理内置
* 关于消费时offset写入机制,见
* https://github.com/edenhill/librdkafka/wiki/Consumer-offset-management
* 及
* https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#offset-management
* @return bool|mixed|null
* @throws Exception
*/
private function get_task()
{
if (empty($this->consumer)) {
$conf = new Conf();
// 保证重启也能保持原offset
$conf->set('group.id', fork_php_worker::class);
// Emit RD_KAFKA_RESP_ERR__PARTITION_EOF
// event whenever the consumer reaches the end of a partition.
$conf->set('enable.partition.eof', 'true');
// 如果发现kafka有问题,可以打开这个,会得到kafka的互动细节
// $conf->set('debug', 'all');
// 自己控制 offset 保存
$conf->set('enable.auto.offset.store', 'false');
// 自己提交 offset
$conf->set('enable.auto.commit', 'false');
// 本地内存型缓存队列个数,这个缓冲意义就是它会提前主动提取远程数据到本地,你的消费进程就能随时从这个池子中拿
// 不用考虑tcp耗时,因为队列是不支持修改与删除,就不用担心近/远不同步问题了
$conf->set('queued.min.messages', '1000');
// 服务器地址
$conf->set('metadata.broker.list', config::KAFKA_BROKER);
// offset未设定,或是超过了怎么处理:使用0
$conf->set('auto.offset.reset', 'smallest');
// 因为rd kafka关闭的机制比较慢,比如它可能会等待超时才结束,而不是close,见rdkafka的讨论区,有提到过很多处理无法让消费者控制
// 然后使用 发送信息的方式让它立刻退出,这个做法是见讨论区有人提到采用超时退出方式会导致消费者退出了,进程还在运行
// 因为没有句柄可以直接close它们,所以只能采用信号通知模式
$conf->set('internal.termination.signal', SIGIO);
// 如果提交offset时会触发,方便监听该事件产生时机
$conf->setOffsetCommitCb([$this, 'offset_commit_cb']);
// 消费者退出组,会导致broker重新分配消费者负责的主题或是分区,分配时不可用
//$conf->setRebalanceCb([$this, 're_balance_cb']);
// $conf->setErrorCb([$this, 'setRebalanceCb']);
$this->consumer = new KafkaConsumer($conf);
// 关于高级消费用法,如何设定offset是从最后储存位置开始问题,见
// https://github.com/edenhill/librdkafka/issues/1475
// 意思,默认行为就是,但是如果需要自定义控制,需要自己保存和加载然后用
// RdKafka\KafkaConsumer::assign 及 RdKafka\TopicPartition::setOffset 来设定
// 这个无法指定分区,使用手工指定 assign
// $this->consumer->subscribe([
// config::KAFKA_TOPIC_ASYNC_TASK
// ]);
$this->consumer->assign(
[
new TopicPartition(
config::KAFKA_TOPIC_ASYNC_TASK,
// 为防止多分区乱序,当前生产只能指定固定的分区0
config::KAFKA_TOPIC_ASYNC_TASK_PARTITION,
// 最后退出时保存的指针,从这个位置开始消费
RD_KAFKA_OFFSET_STORED
)
]
);
}
// 设置得太短,将取不到数据就超时;注意当队列为空时,它会阻塞到有或是超时
// 所以设定需要权衡,比如阻塞太久,会导致kill信号超时被干掉
$time_out = 60e3;
$msg = $this->consumer->consume($time_out);
switch ($msg->err) {
// 成功
case RD_KAFKA_RESP_ERR_NO_ERROR:
// 立刻保存offset
// offset 管理只支持高级消费方式,低级的应该是不支持,用当前消息offset保存
// 这个是阻塞直到提交成功或是失败
// 根据调试顺序发现 get_task_new get_task_consume signal_handler2 get_task_commit __destruct
// todo ctrl+c 或是 sigterm 退出时提示kafka错误原因未知,错误信息;感觉就是上方造成的
// %3|1584010609.753|ERROR|rdkafka#consumer-1| [thrd:GroupCoordinator]: 1/1 brokers are down
$this->consumer->commit($msg);
break;
// 队列空了
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
self::echo_block('task_empty');
return null;
// 超时,这几个常量,有在php rdkafka中看到可能不同版本行为会变化,请留意
case RD_KAFKA_RESP_ERR__TIMED_OUT:
self::echo_block('task_timeout');
return false;
default:
throw new Exception($msg->errstr(), $msg->err);
}
$task_header = $msg->headers;
$this->fork_process($task_header['php_worker'], $msg->offset);
return true;
}
public function offset_commit_cb($kafka, $err, $partitions)
{
// 如果offset 提交失败是否要处理?
//var_dump('offset_commit_cb', $kafka, $err, $partitions);
}
public function __destruct()
{
self::echo_block(
"cost sec:" . (time() - self::$start_time)
);
}
}
另外一个php
<?php
namespace project\core\includes\classes;
use Exception;
use project\core\cli\run\fork_php_worker;
use project\core\cli\run\worker_test;
use project\core\includes\config;
use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Producer;
use RdKafka\TopicConf;
use RdKafka\TopicPartition;
use function project\cache\echo_block;
class kafka
{
// 当前 msg flag 可设置0 与 RD_KAFKA_MSG_F_BLOCK,本常量假设它是 RD_KAFKA_MSG_F_BLOCK 反义
// 意思是不阻塞等待,直接返回,那就可能意味着直接把消息丢掉?待确认
const RD_KAFKA_MSG_F_BLOCK_enable = 0;
// 通过offset,获取指定topic的消息
static public function async_task_consume_by_offset($offset = 0)
{
$conf = new Conf();
// Emit RD_KAFKA_RESP_ERR__PARTITION_EOF
// event whenever the consumer reaches the end of a partition.
$conf->set('enable.partition.eof', 'true');
// 如果发现kafka有问题,可以打开这个,会得到kafka的互动细节
// $conf->set('debug', 'all');
// 自己控制 offset 保存
$conf->set('enable.auto.offset.store', 'false');
// 自己提交 offset
$conf->set('enable.auto.commit', 'false');
// 本地内存型缓存队列个数
$conf->set('queued.min.messages', '1');
$conf->set('metadata.broker.list', config::KAFKA_BROKER);
// 因为rd kafka关闭的机制比较慢,比如它可能会等待超时才结束,而不是close
// 然后使用 发送信息的方式让它立刻退出
$conf->set('internal.termination.signal', SIGIO);
$consumer = new Consumer($conf);
$topic = $consumer->newTopic(config::KAFKA_TOPIC_ASYNC_TASK);
$topic->consumeStart(config::KAFKA_TOPIC_ASYNC_TASK_PARTITION, $offset);
// 设置得太短,将取不到数据就超时
$time_out = 120e3;
$msg = $topic->consume(config::KAFKA_TOPIC_ASYNC_TASK_PARTITION, $time_out);
$result = null;
switch ($msg->err) {
// 成功
case RD_KAFKA_RESP_ERR_NO_ERROR:
$result = json_decode($msg->payload, true);
break;
// 队列空了
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$result = null;
break;
}
$topic->consumeStop(config::KAFKA_TOPIC_ASYNC_TASK_PARTITION);
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
unset($topic);
unset($consumer);
return $result;
}
// 生产消息
static public function async_task_produce($task_array = [], $php_work_class = self::class)
{
if (!class_exists($php_work_class)) {
echo $php_work_class . ' 不存在';
return false;
}
$php_work_class = preg_replace('/^.*\W/', '', $php_work_class);
$json_msg = json_encode($task_array, JSON_UNESCAPED_SLASHES);
$headers = [
'php_worker' => $php_work_class,
];
$conf = new Conf();
$conf->set('metadata.broker.list', config::KAFKA_BROKER);
$producer = new Producer($conf);
$producer_topic = $producer->newTopic(config::KAFKA_TOPIC_ASYNC_TASK);
$producer_topic->producev(
config::KAFKA_TOPIC_ASYNC_TASK_PARTITION,
self::RD_KAFKA_MSG_F_BLOCK_enable,
$json_msg,
null,
$headers
);
// 触发事件
$producer->poll(0);
// 从缓存放到broker中
$result = $producer->flush(10e3);
// 如果失败,可能该消息失败,也可能成功
return RD_KAFKA_RESP_ERR_NO_ERROR === $result;
}
}
心得
- topic是什么?它就是“一个队列”;
- 我应该如何规划topic?创始人建议,某网友分析,我的个人理解:消息间无任何依赖,消费足够快不担心某消息过多积压其它消息的,可以全部放在一个topic中;其它情况可能需要考虑不同topic+合理分区;
- 队列是否支持修改或删除某个消息?目前根据我的理解,好像kafka没有此功能;
- topic与partition的概念:topic是队列名,而partition指的是队列消息存放在几个分区中,提出这个概念是因为–1个partition只支持一个消费者消费,而1个topic支持把消息存放到足够多的partition中,那么这就能解决了1个topic允许n个消费者同时消费的场景,但是需要注意的是kafka不保证不同partition之间消费者是有序消费的,意思就是如果你把“生成订单后台处理“消息放到partition 0(消费者a处理),把“支付订单后台处理“消息放到partition 1(消费者b处理),有可能消费者b会先执行,具体看相应消费者是否有空;kafka没有任何策略保证;所以需要保证有序,建议是消息只放到同一个分区中(推荐在生产时指定分区号;若采用指定固定key–好像是kafka hash后取余得出分区号,那消费时指定分区号又是一个麻烦事),然后再采用本地内存型队列达到多消费者处理,kafka更多的是解决队列与集群,业务介入比较少;
- 像php rdkafka实现的queue是什么?根据我的理解,它是本地内存式的前置缓存队列,一是为了提前提取(缓冲)远程数据到本地,加快消费响应,消费者结束用不完就扔掉就是,并不会影响“已消费”进度,它只是一个copy(毕竟远程数据是不可删除/修改的),当然这块自动释放机制可能在实现上并非消费者一结束就立即同步清除;二是可以支持把多个topic+partition导入本地缓存队列,然后消费者直接逐个消费即可,当然,根据我的测试导入的顺序–应该是没有保证的;
- 建议添加专用用户kafka来运行kafka,并把所有相关信息都放到它的home下,当前配置未考虑集群;
- 单机配置,且无安全配置的情况,建议配置成仅本机通讯方式;
config/zookeeper.properties
修改/增加clientPortAddress=127.0.0.1
; - 使用kafka提供的脚本启动服务时,先启动zookeeper,再启动kafka,停止时,先停止kafka再停止zookeeper,如果启动后发现没有进程,说明启动失败,可以把守护启动模式 -daemon 去掉;解决问题后再用守护模式;遇到比较多的问题就是不同的用户来启动,导致启动失败,解决方法是先删除配置文件中指定的zookeeper与kafka的日志文件(修改所有者也行,不清楚内容中是否有限定用户),再使用固定的用户来启动即可;