rabin 3 روز پیش
والد
کامیت
9f048f6f17
1فایلهای تغییر یافته به همراه54 افزوده شده و 6 حذف شده
  1. 54 6
      src/dai/seller/app/Lib/Get.php

+ 54 - 6
src/dai/seller/app/Lib/Get.php

@@ -6,6 +6,8 @@ class Get
 {
     private $streamKey = 'order_queue';
     private $group = 'order_group';
+    private int $pendingMinIdleMs = 60000;
+    private int $pendingBatchSize = 50;
     public function run($workerId)
     {
         $redis = Redis::connect();
@@ -63,6 +65,31 @@ class Get
     private function checkPending($workerId)
     {
         $redis = Redis::connect();
+        $consumer = "consumer_$workerId";
+        $minIdle = $this->pendingMinIdleMs;
+        $count = $this->pendingBatchSize;
+
+        if (method_exists($redis, 'xAutoClaim')) {
+            $result = $redis->xAutoClaim(
+                $this->streamKey,
+                $this->group,
+                $consumer,
+                $minIdle,
+                '0-0',
+                $count
+            );
+
+            $messages = $result[1] ?? [];
+            if (empty($messages)) {
+                return;
+            }
+
+            echo "Worker#$workerId 自动接管 Pending:".count($messages)." 条\n";
+            foreach ($messages as $id => $data) {
+                $this->processOrderSafe($id, $data);
+            }
+            return;
+        }
 
         $pending = $redis->xPending($this->streamKey, $this->group);
 
@@ -72,18 +99,39 @@ class Get
 
         echo "Worker#$workerId 检测到 Pending 数量:" . $pending['count'] . "\n";
 
-        // 拉取 10 条 pending 的消息
-        if (!isset($pending['min'], $pending['max'])) {
+        // 旧版本兼容:使用 xPending + xClaim 方式
+        $entries = $redis->xPending($this->streamKey, $this->group, '-', '+', $count);
+        if (!$entries) {
+            return;
+        }
+
+        $ids = [];
+        foreach ($entries as $entry) {
+            if (is_array($entry)) {
+                $ids[] = $entry[0] ?? ($entry['message_id'] ?? null);
+            } else {
+                $ids[] = $entry;
+            }
+        }
+        $ids = array_values(array_filter($ids));
+        if (!$ids) {
             return;
         }
-        $msgs = $redis->xRange($this->streamKey, $pending['min'], $pending['max'], 10);
 
-        if (!$msgs) {
+        $claimed = $redis->xClaim(
+            $this->streamKey,
+            $this->group,
+            $consumer,
+            $minIdle,
+            $ids
+        );
+
+        if (!$claimed) {
             return;
         }
 
-        foreach ($msgs as $id => $data) {
-            echo "Worker#$workerId 重新处理 Pending 订单:$id\n";
+        echo "Worker#$workerId 兼容模式接管 Pending:".count($claimed)." 条\n";
+        foreach ($claimed as $id => $data) {
             $this->processOrderSafe($id, $data);
         }
     }