|
|
@@ -1,6 +1,7 @@
|
|
|
<?php namespace Seller\Lib;
|
|
|
use Dever;
|
|
|
use Dever\Helper\Redis;
|
|
|
+use Workerman\Coroutine;
|
|
|
use Workerman\Timer;
|
|
|
class Get
|
|
|
{
|
|
|
@@ -25,13 +26,20 @@ class Get
|
|
|
// 每次拉起 Worker 都重置渠道计数,确保调度公平
|
|
|
Dever::load(Channel::class)->resetUsageCounters();
|
|
|
|
|
|
- // 开始循环消费
|
|
|
- Timer::add(0.001, function() use ($workerId) {
|
|
|
- $this->consumeNewMessages($workerId);
|
|
|
+ // 启动阻塞式消费协程
|
|
|
+ Coroutine::create(function () use ($workerId) {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ $this->consumeNewMessages($workerId);
|
|
|
+ } catch (\Throwable $e) {
|
|
|
+ echo "Worker#$workerId 拉取订单失败:" . $e->getMessage() . "\n";
|
|
|
+ Timer::sleep(0.5);
|
|
|
+ }
|
|
|
+ }
|
|
|
});
|
|
|
|
|
|
- // 每 10 秒处理 Pending (未 ACK 的消息)
|
|
|
- Timer::add(10, function() use ($workerId) {
|
|
|
+ // 每 30 秒处理 Pending (未 ACK 的消息)
|
|
|
+ Timer::add(30, function() use ($workerId) {
|
|
|
$this->checkPending($workerId);
|
|
|
});
|
|
|
}
|
|
|
@@ -44,8 +52,8 @@ class Get
|
|
|
$this->group,
|
|
|
"consumer_$workerId",
|
|
|
[$this->streamKey => '>'],
|
|
|
- 10, // 一次读 1 条
|
|
|
- 500 // 阻塞 0.5 秒
|
|
|
+ 10, // 一次读 10 条
|
|
|
+ 2000 // 阻塞 2 秒
|
|
|
);
|
|
|
|
|
|
if (!$msgs || !isset($msgs[$this->streamKey])) {
|