rabin hai 3 días
pai
achega
407422f161
Modificáronse 1 ficheiros con 88 adicións e 34 borrados
  1. 88 34
      src/dai/seller/app/Lib/Get.php

+ 88 - 34
src/dai/seller/app/Lib/Get.php

@@ -8,6 +8,7 @@ class Get
     private $group = 'order_group';
     private int $pendingMinIdleMs = 60000;
     private int $pendingBatchSize = 50;
+    private ?bool $autoClaimSupported = null;
     public function run($workerId)
     {
         $redis = Redis::connect();
@@ -69,40 +70,15 @@ class Get
         $minIdle = $this->pendingMinIdleMs;
         $count = $this->pendingBatchSize;
 
-        if (method_exists($redis, 'xAutoClaim')) {
-            $result = $redis->xAutoClaim(
-                $this->streamKey,
-                $this->group,
-                $consumer,
-                $minIdle,
-                '0-0',
-                $count
-            );
-            $startFrom = $result[0] ?? '0-0';
-            $messages = $result[1] ?? [];
-
-            while (!empty($messages)) {
-                echo "Worker#$workerId 自动接管 Pending:".count($messages)." 条\n";
-                foreach ($messages as $id => $data) {
-                    $this->processOrderSafe($id, $data);
-                }
-
-                if (!isset($result[0]) || $result[0] === '0-0' || count($messages) < $count) {
-                    break;
+        if ($this->canUseAutoClaim($redis)) {
+            try {
+                if ($this->autoClaimPending($redis, $consumer, $minIdle, $count, $workerId)) {
+                    return;
                 }
-
-                $result = $redis->xAutoClaim(
-                    $this->streamKey,
-                    $this->group,
-                    $consumer,
-                    $minIdle,
-                    $startFrom,
-                    $count
-                );
-                $startFrom = $result[0] ?? '0-0';
-                $messages = $result[1] ?? [];
+            } catch (\Throwable $e) {
+                $this->autoClaimSupported = false;
+                echo "XAUTOCLAIM 不可用,使用兼容模式:" . $e->getMessage() . "\n";
             }
-            return;
         }
 
         $pending = $redis->xPending($this->streamKey, $this->group);
@@ -133,7 +109,7 @@ class Get
         }
 
         do {
-            $claimed = $redis->xClaim(
+            $claimedRaw = $redis->xClaim(
                 $this->streamKey,
                 $this->group,
                 $consumer,
@@ -141,6 +117,7 @@ class Get
                 $ids
             );
 
+            $claimed = $this->normalizeStreamMessages($claimedRaw);
             if (!$claimed) {
                 break;
             }
@@ -158,7 +135,7 @@ class Get
             $ids = [];
             foreach ($entries as $entry) {
                 if (is_array($entry)) {
-                    $ids[] = $entry[0] ?? ($entry['message_id'] ?? null);
+                    $ids[] = $entry[0] ?? ($entry['message_id'] ?? $entry['id'] ?? null);
                 } else {
                     $ids[] = $entry;
                 }
@@ -167,6 +144,83 @@ class Get
         } while ($ids);
     }
 
+    private function canUseAutoClaim($redis): bool
+    {
+        if ($this->autoClaimSupported === null) {
+            $this->autoClaimSupported = method_exists($redis, 'xAutoClaim');
+        }
+        return $this->autoClaimSupported;
+    }
+
+    private function autoClaimPending($redis, string $consumer, int $minIdle, int $count, int $workerId): bool
+    {
+        $handled = false;
+        $start = '0-0';
+
+        while (true) {
+            $result = $redis->xAutoClaim(
+                $this->streamKey,
+                $this->group,
+                $consumer,
+                $minIdle,
+                $start,
+                $count
+            );
+            if (!is_array($result)) {
+                break;
+            }
+            $nextStart = $result[0] ?? '0-0';
+            $rawEntries = $result[1] ?? [];
+            $messages = $this->normalizeStreamMessages($rawEntries);
+            if (!$messages) {
+                if ($nextStart === '0-0') {
+                    break;
+                }
+                $start = $nextStart;
+                continue;
+            }
+
+            echo "Worker#$workerId 自动接管 Pending:" . count($messages) . " 条\n";
+            foreach ($messages as $id => $data) {
+                $this->processOrderSafe($id, $data);
+            }
+            $handled = true;
+
+            $rawCount = is_array($rawEntries) ? count($rawEntries) : 0;
+            if ($nextStart === '0-0' || $rawCount < $count) {
+                break;
+            }
+            $start = $nextStart;
+        }
+
+        return $handled;
+    }
+
+    private function normalizeStreamMessages($messages): array
+    {
+        $normalized = [];
+
+        foreach ((array)$messages as $key => $value) {
+            if (is_string($key)) {
+                $normalized[$key] = $value;
+                continue;
+            }
+
+            if (is_array($value)) {
+                if (isset($value['id'], $value['message']) && is_array($value['message'])) {
+                    $normalized[$value['id']] = $value['message'];
+                    continue;
+                }
+                if (count($value) === 2 && isset($value[0]) && is_array($value[1])) {
+                    $normalized[$value[0]] = $value[1];
+                    continue;
+                }
+            }
+        }
+
+        return $normalized;
+    }
+
     // =========================================================
     // 订单处理(安全封装)
     // =========================================================