beanstalkd 消息队列的使用 非原创 转发
出自https://yusure.cn/linux/257.html#comment-40 这个大佬的代码!!!
https://learnku.com/articles/18687
应用场景
为什么要用呢,有什么好处?这应该放在最开头说,一件东西你只有了解它是干什么的,适合干什么,才能更好的与自己的项目相结合,用到哪里学到哪里,学了不用等于不会,我们平时就应该多考虑一些这样的问题:自己做个什么项目功能能跟 xx 技术相结合呢?这个 xx 技术放在这种业务场景下行不行呢?而不是 “学了这个 xx 技术能干嘛呢,公司现在也没有用这个的呀,学了也没用啊”,带着这样心情去学习 xx 技术,肯定很痛苦。
队列大家都知道是将一些耗时的操作先不去做,先埋点,再异步去处理,这样对一些发邮件发短信之类的耗时操作,用户是感觉不到的,因为埋点结束,操作也就结束了,消费队列都是在服务器上做的。主要应用在短信或邮件通知,访问第三方接口订阅消息,商城的一些秒杀活动,都可以结合队列来完成。
Beanstalkd 介绍
Beanstalkd是一个高性能,轻量级的分布式内存队列,C 代码,典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。
beanstalkd 的最初设计意图是在高并发的网络请求下,通过异步执行耗时较多的请求,及时返回结果,减少请求的响应延迟。
Ubuntu 安装
1 | sudo apt-get install beanstalkd |
Ubuntu 配置文件
1 | vim /etc/default/beanstalkd |
CentOS 安装
1 | yum install beanstalkd |
CentOS 配置文件
1 | vim /etc/sysconfig/beanstalkd |
查看状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | service beanstalkd status # 命令回显 # root@:/www/server/php/72/etc# service beanstalkd status ● beanstalkd.service - Simple, fast work queue Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled) Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago Docs: man:beanstalkd(1) Main PID: 7033 (beanstalkd) Tasks: 1 (limit: 4634) CGroup: /system.slice/beanstalkd.service └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue. |
配置连通性 + 持久化
/usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog -F &
ip 用 0.0.0.0 允许所有连接,靠配置安全组或防火墙去约束连接,放开 -b 参数 (默认没有持久化),内存的队列消息可以落地到硬盘 binlog 实现持久化,断电可重新读取队列消息。
1 2 3 4 5 | vim /etc/default/beanstalkd BEANSTALKD_LISTEN_ADDR=0.0.0.0 BEANSTALKD_LISTEN_PORT=11300 BEANSTALKD_EXTRA="-b /var/lib/beanstalkd" |
beanstalkd 任务状态
状态 | 注释 |
---|---|
delayed | 延迟状态 |
ready | 准备好状态 |
reserved | 消费者把任务读出来,处理时 |
buried | 预留状态 |
delete | 删除状态 |
管理工具
亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个web的。
命令行:https://github.com/src-d/beanstool
web 界面:https://github.com/ptrofimov/beanstalk_console
编程语言客户端
PHP 客户端
https://packagist.org/packages/pda/pheanstalk
1 | composer require pda/pheanstalk |
写入 job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | <?php //创建队列消息 require_once ( './vendor/autoload.php' ); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk( '127.0.0.1' ,11300); $tubeName = 'email_list' ; $jobData = [ 'email' => '123456@163.com' , 'message' => 'Hello World !!' , 'dtime' => date ( 'Y-m-d H:i:s' ), ]; $pheanstalk ->useTube( $tubeName )->put( json_encode( $jobData ) ); |
消费 job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | <?php ini_set ( 'default_socket_timeout' , 86400*7); ini_set ( 'memory_limit' , '256M' ); // 消费队列消息 require_once ( './vendor/autoload.php' ); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk( '127.0.0.1' ,11300); $tubeName = 'email_list' ; while ( true ) { // 获取队列信息, reserve 阻塞获取 $job = $pheanstalk ->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job ->getData(); /* TODO 逻辑操作 */ /* 处理完成,删除 job */ $pheanstalk -> delete ( $job ); } } |
default_socket_timeout
这个参数是一定要加的,php默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
就是这样的代码,供参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public function watchJob() { $job = $this ->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job ->getData(); $this ->subscribe( $job_data ); $this ->pheanstalk-> delete ( $job ); /* 继续 Watch 下一个 job */ $this ->watchJob(); } else { $this ->log->error( 'reserve false' , 'reserve false' ); } } |
监控 beanstalkd 状态
1 2 3 4 5 6 7 8 9 10 | <?php //监控服务状态 require_once ( './vendor/autoload.php' ); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk( '127.0.0.1' ,11300); $isAlive = $pheanstalk ->getConnection()->isServiceListening(); var_dump( $isAlive ); |
可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。
一些相关命令
开启,关闭 Beanstalkd 命令
1 2 | /etc/init.d/beanstalkd start /etc/init.d/beanstalkd stop |
重启 Beanstalkd 命令
1 | /etc/init.d/beanstalkd restart |
查看 beanstalkd 服务内存占用
1 | top -u beanstalkd |
后台运行 consumer 脚本
1 | nohup php googlehome_subscribe.php & |
查看 consumer 脚本运行时间
1 | ps -A -opid,stime,etime,args | grep consumer.php |
手工重启 consumer 脚本
1 2 | ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 nohup php googlehome_subscribe.php & |
一些总结
php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | try { /* TODO: 逻辑操作 */ } catch ( RequestException $e ) { $results [ 'code' ] = $e ->getResponse()->getStatusCode(); $results [ 'reason' ] = $e ->getResponse()->getReasonPhrase(); $this ->log->error( 'properties-changed RequestException' , $results ); } catch ( ClientException $e ) { $results [ 'mid' ] = $this ->mid; $results [ 'code' ] = $e ->getResponse()->getStatusCode(); $results [ 'reason' ] = $e ->getResponse()->getReasonPhrase(); $this ->log->error( 'properties-changed ClientException' , $results ); } catch ( ServerException $e ) { $results [ 'mid' ] = $this ->mid; $results [ 'code' ] = $e ->getResponse()->getStatusCode(); $results [ 'reason' ] = $e ->getResponse()->getReasonPhrase(); $this ->log->error( 'properties-changed ServerException' , $results ); } catch ( ConnectException $e ) { $results [ 'mid' ] = $this ->mid; $results [ 'code' ] = $e ->getCode(); $results [ 'reason' ] = $e ->getMessage(); $this ->log->error( 'properties-changed ConnectException' , $results ); } |
job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。
参考资料
Beanstalkd Message Queue
http://ongx1e5a0.bkt.clouddn.com/Beanstalkd-share-slides.pdf
Beanstalkd FAQ 中文版
http://www.fzb.me/2015-7-31-beanstalkd-faq.html
PHP消息队列beanstalkd
https://www.kancloud.cn/daiji/beanstalkd/735176
Beanstalkd-带你玩转消息队列
https://www.imooc.com/learn/912