rabin há 3 dias atrás
pai
commit
3f6c9188d9
1 ficheiros alterados com 52 adições e 21 exclusões
  1. 52 21
      src/dai/seller/app/Lib/Get.php

+ 52 - 21
src/dai/seller/app/Lib/Get.php

@@ -78,15 +78,29 @@ class Get
                 '0-0',
                 $count
             );
-
+            $startFrom = $result[0] ?? '0-0';
             $messages = $result[1] ?? [];
-            if (empty($messages)) {
-                return;
-            }
 
-            echo "Worker#$workerId 自动接管 Pending:".count($messages)." 条\n";
-            foreach ($messages as $id => $data) {
-                $this->processOrderSafe($id, $data);
+            while (!empty($messages)) {
+                echo "Worker#$workerId 自动接管 Pending:".count($messages)." 条\n";
+                foreach ($messages as $id => $data) {
+                    $this->processOrderSafe($id, $data);
+                }
+
+                if (!isset($result[0]) || $result[0] === '0-0' || count($messages) < $count) {
+                    break;
+                }
+
+                $result = $redis->xAutoClaim(
+                    $this->streamKey,
+                    $this->group,
+                    $consumer,
+                    $minIdle,
+                    $startFrom,
+                    $count
+                );
+                $startFrom = $result[0] ?? '0-0';
+                $messages = $result[1] ?? [];
             }
             return;
         }
@@ -118,22 +132,39 @@ class Get
             return;
         }
 
-        $claimed = $redis->xClaim(
-            $this->streamKey,
-            $this->group,
-            $consumer,
-            $minIdle,
-            $ids
-        );
+        do {
+            $claimed = $redis->xClaim(
+                $this->streamKey,
+                $this->group,
+                $consumer,
+                $minIdle,
+                $ids
+            );
 
-        if (!$claimed) {
-            return;
-        }
+            if (!$claimed) {
+                break;
+            }
 
-        echo "Worker#$workerId 兼容模式接管 Pending:".count($claimed)." 条\n";
-        foreach ($claimed as $id => $data) {
-            $this->processOrderSafe($id, $data);
-        }
+            echo "Worker#$workerId 兼容模式接管 Pending:".count($claimed)." 条\n";
+            foreach ($claimed as $id => $data) {
+                $this->processOrderSafe($id, $data);
+            }
+
+            // 清空已处理 ID,避免重复 claim
+            $ids = [];
+
+            // 检查是否还有更多 pending
+            $entries = $redis->xPending($this->streamKey, $this->group, '-', '+', $count);
+            $ids = [];
+            foreach ($entries as $entry) {
+                if (is_array($entry)) {
+                    $ids[] = $entry[0] ?? ($entry['message_id'] ?? null);
+                } else {
+                    $ids[] = $entry;
+                }
+            }
+            $ids = array_values(array_filter($ids));
+        } while ($ids);
     }
 
     // =========================================================