|
@@ -8,7 +8,6 @@ class Get
|
|
|
private $group = 'order_group';
|
|
private $group = 'order_group';
|
|
|
private int $pendingMinIdleMs = 60000;
|
|
private int $pendingMinIdleMs = 60000;
|
|
|
private int $pendingBatchSize = 50;
|
|
private int $pendingBatchSize = 50;
|
|
|
- private ?bool $autoClaimSupported = null;
|
|
|
|
|
public function run($workerId)
|
|
public function run($workerId)
|
|
|
{
|
|
{
|
|
|
$redis = Redis::connect();
|
|
$redis = Redis::connect();
|
|
@@ -70,130 +69,54 @@ class Get
|
|
|
$minIdle = $this->pendingMinIdleMs;
|
|
$minIdle = $this->pendingMinIdleMs;
|
|
|
$count = $this->pendingBatchSize;
|
|
$count = $this->pendingBatchSize;
|
|
|
|
|
|
|
|
- if ($this->canUseAutoClaim($redis)) {
|
|
|
|
|
- try {
|
|
|
|
|
- if ($this->autoClaimPending($redis, $consumer, $minIdle, $count, $workerId)) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
- $this->autoClaimSupported = false;
|
|
|
|
|
- echo "XAUTOCLAIM 不可用,使用兼容模式:" . $e->getMessage() . "\n";
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ $pendingSummary = $redis->xPending($this->streamKey, $this->group);
|
|
|
|
|
|
|
|
- $pending = $redis->xPending($this->streamKey, $this->group);
|
|
|
|
|
-
|
|
|
|
|
- if (empty($pending) || empty($pending['count'])) {
|
|
|
|
|
|
|
+ if (empty($pendingSummary) || empty($pendingSummary['count'])) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- echo "Worker#$workerId 检测到 Pending 数量:" . $pending['count'] . "\n";
|
|
|
|
|
|
|
+ echo "Worker#$workerId 检测到 Pending 数量:" . $pendingSummary['count'] . "\n";
|
|
|
|
|
|
|
|
- // 旧版本兼容:使用 xPending + xClaim 方式
|
|
|
|
|
- $entries = $redis->xPending($this->streamKey, $this->group, '-', '+', $count);
|
|
|
|
|
- if (!$entries) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- $ids = [];
|
|
|
|
|
- foreach ($entries as $entry) {
|
|
|
|
|
- if (is_array($entry)) {
|
|
|
|
|
- $ids[] = $entry[0] ?? ($entry['message_id'] ?? null);
|
|
|
|
|
- } else {
|
|
|
|
|
- $ids[] = $entry;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- $ids = array_values(array_filter($ids));
|
|
|
|
|
|
|
+ $ids = $this->fetchPendingIds($redis, $count);
|
|
|
if (!$ids) {
|
|
if (!$ids) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- do {
|
|
|
|
|
- $claimedRaw = $redis->xClaim(
|
|
|
|
|
- $this->streamKey,
|
|
|
|
|
- $this->group,
|
|
|
|
|
- $consumer,
|
|
|
|
|
- $minIdle,
|
|
|
|
|
- $ids
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- $claimed = $this->normalizeStreamMessages($claimedRaw);
|
|
|
|
|
- if (!$claimed) {
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- echo "Worker#$workerId 兼容模式接管 Pending:".count($claimed)." 条\n";
|
|
|
|
|
- foreach ($claimed as $id => $data) {
|
|
|
|
|
- $this->processOrderSafe($id, $data);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 清空已处理 ID,避免重复 claim
|
|
|
|
|
- $ids = [];
|
|
|
|
|
|
|
+ $claimedRaw = $redis->xClaim(
|
|
|
|
|
+ $this->streamKey,
|
|
|
|
|
+ $this->group,
|
|
|
|
|
+ $consumer,
|
|
|
|
|
+ $minIdle,
|
|
|
|
|
+ $ids
|
|
|
|
|
+ );
|
|
|
|
|
|
|
|
- // 检查是否还有更多 pending
|
|
|
|
|
- $entries = $redis->xPending($this->streamKey, $this->group, '-', '+', $count);
|
|
|
|
|
- $ids = [];
|
|
|
|
|
- foreach ($entries as $entry) {
|
|
|
|
|
- if (is_array($entry)) {
|
|
|
|
|
- $ids[] = $entry[0] ?? ($entry['message_id'] ?? $entry['id'] ?? null);
|
|
|
|
|
- } else {
|
|
|
|
|
- $ids[] = $entry;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- $ids = array_values(array_filter($ids));
|
|
|
|
|
- } while ($ids);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ $claimed = $this->normalizeStreamMessages($claimedRaw);
|
|
|
|
|
+ if (!$claimed) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- private function canUseAutoClaim($redis): bool
|
|
|
|
|
- {
|
|
|
|
|
- if ($this->autoClaimSupported === null) {
|
|
|
|
|
- $this->autoClaimSupported = method_exists($redis, 'xAutoClaim');
|
|
|
|
|
|
|
+ echo "Worker#$workerId 接管 Pending:".count($claimed)." 条\n";
|
|
|
|
|
+ foreach ($claimed as $id => $data) {
|
|
|
|
|
+ $this->processOrderSafe($id, $data);
|
|
|
}
|
|
}
|
|
|
- return $this->autoClaimSupported;
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private function autoClaimPending($redis, string $consumer, int $minIdle, int $count, int $workerId): bool
|
|
|
|
|
|
|
+ private function fetchPendingIds($redis, int $limit): array
|
|
|
{
|
|
{
|
|
|
- $handled = false;
|
|
|
|
|
- $start = '0-0';
|
|
|
|
|
-
|
|
|
|
|
- while (true) {
|
|
|
|
|
- $result = $redis->xAutoClaim(
|
|
|
|
|
- $this->streamKey,
|
|
|
|
|
- $this->group,
|
|
|
|
|
- $consumer,
|
|
|
|
|
- $minIdle,
|
|
|
|
|
- $start,
|
|
|
|
|
- $count
|
|
|
|
|
- );
|
|
|
|
|
- if (!is_array($result)) {
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- $nextStart = $result[0] ?? '0-0';
|
|
|
|
|
- $rawEntries = $result[1] ?? [];
|
|
|
|
|
- $messages = $this->normalizeStreamMessages($rawEntries);
|
|
|
|
|
- if (!$messages) {
|
|
|
|
|
- if ($nextStart === '0-0') {
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- $start = $nextStart;
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- echo "Worker#$workerId 自动接管 Pending:" . count($messages) . " 条\n";
|
|
|
|
|
- foreach ($messages as $id => $data) {
|
|
|
|
|
- $this->processOrderSafe($id, $data);
|
|
|
|
|
- }
|
|
|
|
|
- $handled = true;
|
|
|
|
|
|
|
+ $entries = $redis->xPending($this->streamKey, $this->group, '-', '+', $limit);
|
|
|
|
|
+ if (!$entries) {
|
|
|
|
|
+ return [];
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- $rawCount = is_array($rawEntries) ? count($rawEntries) : 0;
|
|
|
|
|
- if ($nextStart === '0-0' || $rawCount < $count) {
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ $ids = [];
|
|
|
|
|
+ foreach ($entries as $entry) {
|
|
|
|
|
+ if (is_array($entry)) {
|
|
|
|
|
+ $ids[] = $entry[0] ?? ($entry['message_id'] ?? $entry['id'] ?? null);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ $ids[] = $entry;
|
|
|
}
|
|
}
|
|
|
- $start = $nextStart;
|
|
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- return $handled;
|
|
|
|
|
|
|
+ return array_values(array_filter($ids));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private function normalizeStreamMessages($messages): array
|
|
private function normalizeStreamMessages($messages): array
|