目 录CONTENT

文章目录

🍥Hyperf3.1使用RabbitMQ

柯基
2025-11-09 / 0 评论 / 0 点赞 / 14 阅读 / 4,639 字

实现了消息消费失败入mysql数据库,提供了消息重新入队的command,有手动重试机制、防止消息重复消费逻辑。暂未实现延时队列

1. 安装扩展包

composer require hyperf/amqp

2. 生成配置文件

运行命令自动生成配置文件

php bin/hyperf.php vendor:publish hyperf/amqp

修改下配置项 /config/autoload/amqp.php

可在 producer 或者 consumer__construct 函数中,设置不同 pool,例如下面的 defaultcrm

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
use Hyperf\Amqp\IO\IOFactory;

use function Hyperf\Support\env;

// 默认配置 如果需要项目隔离vhost 赋值新数组并更改vhost参数项
$default_config = [
    'host' => env('AMQP_HOST', 'localhost'),
    'port' => (int) env('AMQP_PORT', 5672),
    'user' => env('AMQP_USER', 'guest'),
    'password' => env('AMQP_PASSWORD', 'guest'),
    'vhost' => env('AMQP_VHOST', 'center'),
    'open_ssl' => false,
    'concurrent' => [
        // 单个消费者进程内的并发消费上限
        'limit' => 2,
    ],
    'pool' => [
        // 连接池数量
        'connections' => 2,
    ],
    'io' => IOFactory::class,
    'params' => [
        'insist' => false,
        'login_method' => 'AMQPLAIN',
        'login_response' => null,
        'locale' => 'en_US',
        'connection_timeout' => 10,
        // 尽量保持是 heartbeat 数值的两倍
        'read_write_timeout' => 60,
        'context' => null,
        // 网络层保活 与心跳互补 兜底检测
        'keepalive' => true,
        // 尽量保证每个消息的消费时间小于心跳时间
        'heartbeat' => 30,
        'channel_rpc_timeout' => 0.0,
        // 是否在对象析构时自动关闭连接
        'close_on_destruct' => false,
        // 连接内可复用的空闲 Channel 数上限
        'max_idle_channels' => 10,
        'connection_name' => null,
    ],
];

// 如果需要隔离项目vhost
// 商家
$merchant_config = $default_config;
$merchant_config['vhost'] = env('MERCHANT_AMQP_VHOST', 'merchant');

return [
    'enable' => true,
    'default' => $default_config,
    'merchant' => $merchant_config,
];

3. 配置.env文件

# RabbitMQ
AMQP_HOST=127.0.0.1
AMQP_PORT=5672
AMQP_USER=guest
AMQP_PASSWORD=guest
AMQP_VHOST=center
MERCHANT_AMQP_VHOST=merchant

4. 创建队列常量

<?php
declare (strict_types=1);

namespace App\Common\Enum;
class AmqpEnum
{
    // AMQP重试次数
    const AMQP_RETRY_COUNT = 3;

    const STATUS_INIT = 0;
    const STATUS_PADDING = 1;
    const STATUS_SUCCESS = 2;
    const STATUS_FAILED = 3;
    const STATUS_DESC = [
        self::STATUS_INIT => '待处理',
        self::STATUS_PADDING => '处理中',
        self::STATUS_SUCCESS => '处理成功',
        self::STATUS_FAILED => '处理失败'
    ];

    // 默认vhost配置项
    const DEFAULT_VHOST_CONFIG = 'default';

    // 商家vhost配置项
    const MERCHANT_VHOST_CONFIG = 'merchant';

    // 测试队列配置名称
    const DEMO_QUEUE_NAME = 'demo';
    // 商家端 测试队列配置名称
    const M_DEMO_QUEUE_NAME = 'm_demo';



}

5.创建队列相关配置文件

创建配置文件/config/autoload/amqp_queue.php

<?php
declare(strict_types=1);

use App\Amqp\Producer\Merchant\DyCallBackProducer;
use Hyperf\Amqp\Message\Type;
use function Hyperf\Support\env;
use App\Amqp\Producer\Demo\DemoProducer;
use App\Common\Enum\AmqpEnum;

$env = env('APP_ENV', 'dev');
return [
    // 默认配置
    AmqpEnum::DEFAULT_VHOST_CONFIG => [
        'vhost' => env('AMQP_VHOST', 'center'),
        'queues' => [
            // 命名规则 队列名称.项目名称(默认center).环境
            AmqpEnum::DEMO_QUEUE_NAME => [
                'exchange' => "test.demo.center.{$env}",
                'exchange_type' => Type::DIRECT->value, // 交换机类型 direct fanout topic headers
                'queue' => "test.demo.queue.center.{$env}", // 队列名
                'routing_key' => "test.demo.key.center.{$env}", // 路由键
                'producer' => DemoProducer::class, // 生产者
            ]
        ]
    ],
    // 商家配置
    AmqpEnum::MERCHANT_VHOST_CONFIG => [
        'vhost' => env('MERCHANT_AMQP_VHOST', 'merchant'),
        'queues' => [
            // 命名规则 队列名称.项目名称(merchant).环境
            AmqpEnum::M_DEMO_QUEUE_NAME => [
                'exchange' => "test.m_demo.merchant.{$env}",
                'exchange_type' => Type::FANOUT->value, // 交换机类型 direct fanout topic headers
                'queue' => "test.m_demo.queue.merchant.{$env}", // 队列名
                'routing_key' => "test.m_demo.key.merchant.{$env}", // 路由键
              	'producer' => DemoProducer::class, // 生产者
            ]
        ]
    ],
];

6.创建缓存键常量

<?php
declare (strict_types=1);

namespace App\Common\Enum;

class CacheEnum
{
    // 防止队列重复消费键
    const CHECK_QUEUE_REPEAT_CONSUMING_KEY = 'check_queue_repeat_consuming';
}

7. 创建失败队列消息表

CREATE TABLE `ufo_m_amqp_dead_letter_message` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `message_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息ID',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '原始交换机',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '原始路由键',
  `queue_name` varchar(255) NOT NULL DEFAULT '' COMMENT '队列名称',
  `queue_config` varchar(255) NOT NULL DEFAULT '' COMMENT '队列配置名(区分业务)',
  `exchange_type` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机类型:direct fanout topic headers',
  `message_content` json  COMMENT '消息内容',
  `retry_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
  `last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',
  `status` tinyint(4) unsigned NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',
  `error_msg` json DEFAULT NULL COMMENT '错误信息',
  `created_at` datetime DEFAULT NULL COMMENT '创建时间',
  `updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted_at` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idxes_queue_config_status` (`queue_config`,`status`),
  KEY `idx_create_time` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Amqp失败队列消息表';

8. 创建AmqpMessageTrait特征

其中 用到了snowflake包(IdGeneratorInterface) 生成唯一消息ID

运行composer require hyperf/snowflake安装扩展

<?php
declare (strict_types=1);

namespace App\Common\Trait;

use App\Common\Enum\AmqpEnum;
use App\Common\Enum\CacheEnum;
use App\Module\Merchant\Repository\AmqpDeadLetterMessageRepository as MerchantAmqpDeadLetterMessageRepository;
use Hyperf\Amqp\Constants;
use Hyperf\Amqp\Result;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Hyperf\Snowflake\IdGeneratorInterface;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use function Hyperf\Config\config;

trait AmqpMessageTrait
{
    #[Inject]
    public Redis $redis;

    #[Inject]
    public MerchantAmqpDeadLetterMessageRepository $merchantAmqpDeadLetterMessageRepository;

    #[Inject]
    public IdGeneratorInterface $idGenerator;

    /**
     * 生成队列配置
     * @param string $pool_name 队列池名称
     * @param string $queue_name 队列配置名称
     * @param string $message_id 唯一消息ID
     * @return array
     */
    public function getConfig(string $pool_name, string $queue_name, string $message_id = ''): array
    {
        $poolName = $pool_name;

        // 队列配置
        $queue_config = config("amqp_queue.{$pool_name}.queues." . $queue_name);
        $exchange = $queue_config['exchange'];
        $type = $queue_config['exchange_type'];
        $routingKey = $queue_config['routing_key'];
        $queue = $queue_config['queue'];
        $properties = [
            'content_type' => 'text/plain',
            'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
            'timestamp' => time(),
            'application_headers'=> new AMQPTable([
                'retry' => '0', // 重试次数
                'message_id' => strval($message_id ?: $this->idGenerator->generate()), // 消息唯一ID
                'queue_config' => $queue_name,
                'exchange_type' => (string) $type,
                'queue_name' => $queue_config['queue'],
                'vhost_config' => $pool_name
            ]),
        ];

        return compact('poolName', 'exchange', 'type', 'routingKey', 'properties', 'queue');
    }


    /**
     * 检查消息是否重复消费
     * @param AMQPMessage $msg
     * @return bool
     * @throws \RedisException
     */
    public function checkMessageRepeat(AMQPMessage $msg): bool
    {
        $msgData = $this->messageHandle($msg, '', false);
        $message_id = $msgData['message_id'];
        $vhost_config = $msgData['vhost_config'];
        // 重试的话不检查重复消费
        if (empty($message_id) || ($msgData['retry_count'] ?? 0) > 0) {
            return true;
        }
        $redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ":{$vhost_config}:" . config('app_env') . ":{$message_id}";

        // 一天内不能重复消费
        return $this->redis->setnx($redis_key, $message_id) && $this->redis->expire($redis_key, 24 * 3600);
    }


    /**
     * 获取队列消息详细信息
     * @param AMQPMessage $msg
     * @param string $error_msg
     * @param bool $needGenerateId 不存在message_id时是否生成
     * @return array
     */
    public function messageHandle(AMQPMessage $msg, string $error_msg, bool $needGenerateId = true): array
    {
        $exchange = $msg->getExchange();
        $routing_key = $msg->getRoutingKey();
        $message_content = $msg->getBody();
        //headersObject 是一个AMQPTable对象
        $headersObject = $msg->get_properties()['application_headers'];
        //调用getNativeData()得到一个数组
        $headersArray  = $headersObject->getNativeData();

        if ($needGenerateId && !isset($headersArray['message_id'])) {
            $headersArray['retry'] = 0;
            $headersArray['message_id'] = strval($this->idGenerator->generate());
        }

        $message_id = $headersArray['message_id'];
        $queue_name = $headersArray['queue_name'] ?? '';
        $queue_config = $headersArray['queue_config'] ?? '';
        $exchange_type = $headersArray['exchange_type'] ?? '';
        $vhost_config = $headersArray['vhost_config'] ?? '';
        $retry_count = ($headersArray['retry'] ?? 0);
        $last_retry_time = date('Y-m-d H:i:s');
        $error_msg = compact('error_msg');

        return compact('exchange', 'routing_key', 'message_id', 'queue_name', 'error_msg', 'vhost_config',
            'queue_config', 'exchange_type', 'retry_count', 'headersArray', 'message_content', 'last_retry_time');
    }

    /**
     * 丢弃消息
     * @param AMQPMessage $msg
     * @param string $error_msg 错误信息
     * @return Result
     */
    public function rejectHandle(AMQPMessage $msg, string $error_msg): Result
    {
        $msgData = $this->messageHandle($msg, $error_msg);
        $msgData['status'] = AmqpEnum::STATUS_FAILED;

        // 记录失败日志
        $this->saveDlxMessage($msgData);
        return Result::DROP;
    }

    /**
     * 确认应答
     * @param AMQPMessage $msg
     * @return Result
     */
    public function ackHandle(AMQPMessage $msg): Result
    {
        $msgData = $this->messageHandle($msg, '');
        $msgData['status'] = AmqpEnum::STATUS_SUCCESS;

        // 更新失败日志
        $this->saveDlxMessage($msgData, false);
        return Result::ACK;
    }

    /**
     * 手动重试
     * @param AMQPMessage $msg
     * @param string $error_msg 错误信息
     * @param int $retry_num 重试次数
     * @return Result
     */
    public function retryHandle(AMQPMessage $msg, string $error_msg, int $retry_num = AmqpEnum::AMQP_RETRY_COUNT): Result
    {
        $msgData = $this->messageHandle($msg, $error_msg);
        $exchange      = $msgData['exchange'];
        $routing_key   = $msgData['routing_key'];
        $channel       = $msg->getChannel();
        $body          = $msgData['message_content'];
        $headersArray  = $msgData['headersArray'];


        if ($headersArray['retry'] < $retry_num) {
            $headersArray['retry']++;
            // 重新入队列
            $channel->basic_publish(
                new AMQPMessage($body, [
                    'content_type' => 'text/plain',
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'timestamp' => time(),
                    'application_headers'=> new AMQPTable($headersArray)
                ]),
                $exchange,
                $routing_key
            );
            // 因为重新入队了 所以这条消息直接确认
            $result = Result::ACK;
            $msgData['status'] = AmqpEnum::STATUS_PADDING;
        } else {
            // 丢弃
            $result = Result::DROP;
            $msgData['status'] = AmqpEnum::STATUS_FAILED;
        }
        // 记录失败日志
        $this->saveDlxMessage($msgData);
        return $result;
    }

    /**
     * 保存失败队列消息
     * @param array $saveData
     * @param bool $needInsert 记录不存在是否新增
     * @return bool
     */
    public function saveDlxMessage(array $saveData, bool $needInsert = true): bool
    {
        if (empty($saveData) || empty($saveData['message_id'])) {
            return false;
        }
        $saveWhere = [
            'message_id' => $saveData['message_id'],
            'queue_config' => $saveData['queue_config'],
        ];

        $saveData['update_time'] = date('Y-m-d H:i:s');
        $saveData['error_msg'] = $saveData['error_msg'] ?? [];


        $saveData['message_content'] = empty($saveData['message_content']) ? null : (is_string($saveData['message_content'])
            ? json_decode($saveData['message_content'], true) : $saveData['message_content']);

        // 根据不同项目 写入不同表
        $vhost_config = $saveData['vhost_config'] ?? AmqpEnum::DEFAULT_VHOST_CONFIG;
        $repository = $this->getDlxMessageRepository($vhost_config);

        return $repository->insertOrUpdate($saveWhere, $saveData, $needInsert);
    }


    /**
     * 根据不同vhost配置 返回不同项目的失败消息表
     * @param string $vhost_config
     */
    public function getDlxMessageRepository(string $vhost_config)
    {
        switch ($vhost_config) {
            case AmqpEnum::MERCHANT_VHOST_CONFIG:
                // 商家端
                $repository = $this->merchantAmqpDeadLetterMessageRepository;
                break;
            default:
                $repository = $this->merchantAmqpDeadLetterMessageRepository;
        }
        return $repository;
    }
}

$repository->insertOrUpdate()方法类似这样

/**
 * 插入或更新
 * @param array $where
 * @param array $data
 * @param bool $needInsert 记录不存在是否新增
 * @return bool
 */
public function insertOrUpdate(array $where, array $data, bool $needInsert = true): bool
{
    // 如果传入了 $where 条件,尝试查找记录
    if (!empty($where)) {
        $record = AmqpDeadLetterMessage::query()->where($where)->first();
        if ($record) {
            // 记录存在,执行更新
            return $record->fill($data)->save();
        }
    }

    // 记录不存在,执行新增
    if ($needInsert) {
        return (new AmqpDeadLetterMessage())->fill($data)->save();
    }
    return true;
}

模型需要定义以下属性

主要是message_content、error_msg两个json类型的字段 交给hyperf处理数组

protected array $fillable = ['id', 'message_id', 'exchange', 'routing_key', 'queue_name', 'queue_config', 'exchange_type', 'message_content', 'retry_count', 'last_retry_time', 'status', 'error_msg', 'created_at', 'updated_at', 'deleted_at'];


protected array $casts = ['id' => 'integer', 'retry_count' => 'integer', 'status' => 'integer', 'created_at' => 'datetime', 'updated_at' => 'datetime', 'message_content' => 'array', 'error_msg' => 'array'];

9. 创建生产者

运行php bin/hyperf.php gen:amqp-producer DemoProducer 下载官方demo

修改代码

<?php

declare(strict_types=1);

namespace App\Amqp\Producer\Demo;

use App\Common\Enum\AmqpEnum;
use App\Common\Trait\AmqpMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;

/**
 * 测试生产者
 * Class DemoProducer
 * @package App\Amqp\Producer\Demo
 */
class DemoProducer extends ProducerMessage
{
    use AmqpMessageTrait;
    public function __construct($data, string $message_id = '')
    {
        $this->setConfig($message_id);
        // 需要发送的数据
        $this->payload = $data;
    }

    /**
     * 设置队列配置
     * @param string $message_id
     * @return void
     */
    public function setConfig(string $message_id = ''): void
    {
        $params = $this->getConfig(AmqpEnum::DEFAULT_VHOST_CONFIG, AmqpEnum::DEMO_QUEUE_NAME, $message_id);
        fillProperty($params, $this);
    }
}

其中用到了填充属性的方法fillProperty

if (!function_exists('fillProperty')) {
    /**
     * 使用反射填充属性
     * @param array $params
     * @param object $instance
     */
    function fillProperty(array $params, object $instance): void
    {
        $ref = new ReflectionObject($instance);
        foreach ($params as $key => $value) {
            if ($ref->hasProperty($key)) {
                $prop = $ref->getProperty($key);
                $prop->setAccessible(true); // 允许访问 protected/private
                $prop->setValue($instance, $value);
            }
        }
    }
}

10.创建一个控制器测试生产者

<?php
declare (strict_types=1);

namespace App\Controller;

use App\Amqp\Producer\Demo\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\PostMapping;

#[Controller(prefix: 'demo')]
class DemoController extends AbstractController
{
    #[Inject]
    protected Producer $producer;

    /**
     * demo 生产者
     * @return array
     */
    #[PostMapping(path: 'demoProduce')]
    public function demoProduce()
    {
        $res = $this->producer->produce(new DemoProducer(['tenant_id' => 1, 'product_list' => [1,2]], 'test_message_id'));

        return [
            'message' => $res ? "发送成功" : '发送失败',
        ];
    }
}

post方式调用127.0.0.1:9501/demo/demoProduce即可在rabbitmq后台看到exchange生成了,但队列没有生成(正常现象,设计理念就是生产者只负责消息的封装与发送,不会自动帮你创建队列,队列的声明和绑定动作需要在消费者里实现),消费者启动后队列就生成了(消费者跟随项目启动 无需手动启动)

11.创建消费者

运行php bin/hyperf.php gen:amqp-consumer DemoConsumer下载官方消费者demo

修改代码

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer\Demo;

use App\Amqp\Consumer\BasicConsumer;
use App\Common\Enum\AmqpEnum;
use App\Exception\BusinessException;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;

#[Consumer(name: "DemoConsumer", nums: 1)]
class DemoConsumer extends BasicConsumer
{
    public function __construct()
    {
        $this->setConfig();
    }

    /**
     * 设置队列配置
     * @return void
     */
    public function setConfig(): void
    {
        $params = $this->getConfig(AmqpEnum::DEFAULT_VHOST_CONFIG, AmqpEnum::DEMO_QUEUE_NAME);
        fillProperty($params, $this);
    }

    public function consumeMessage($data, AMQPMessage $message): Result
    {
        $json_data = json_encode($data, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
        try {

            $this->logger->info('接收到消息:' . $json_data);

            // 检查消息是否重复消费
            if (!$this->checkMessageRepeat($message)) {
                throw new BusinessException("消息重复消费,本次跳过");
            }

            // todo 业务处理


            // 确认应答
            return $this->ackHandle($message);
        }  catch (BusinessException|AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
            if ($e instanceof BusinessException) {
                $this->logger->error("[测试消费者]发生业务性异常 消息丢弃:" . $json_data . ",错误原因:" . var_export($e->getMessage(), true));
                $error_msg = "[业务异常]错误原因:" . var_export($e->getMessage(), true);
                // 丢弃消息
                return $this->rejectHandle($message, $error_msg);
            } else {
                $this->logger->error("[测试消费者]发生致命异常 消息重新入队:" . $json_data . ",错误原因:" . var_export($e->getMessage(), true));
                $error_msg = "[致命异常]错误原因:" . var_export($e->getMessage(), true);
                // 手动重试机制
                return $this->retryHandle($message, $error_msg);
            }
        }

    }

}

其中用到BusinessException异常类 主要是为了区分 消息是丢弃还是重试

<?php
namespace App\Exception;

use Throwable;

class BusinessException extends \RuntimeException
{
    protected $code = 400;

    public function __construct($message = "", $code = 0, Throwable $previous = null)
    {
        $this->code = $code ?: $this->code;
        parent::__construct($message, $this->code, $previous);
    }
}

这里还修改了继承类为BasicConsumer

<?php
declare (strict_types=1);

namespace App\Amqp\Consumer;

use App\Common\Trait\AmqpMessageTrait;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Di\Annotation\Inject;
use Psr\Log\LoggerInterface;

class BasicConsumer extends ConsumerMessage
{
    use AmqpMessageTrait;

    #[Inject]
    protected LoggerInterface $logger;

    /**
     * 判断是否允许自启动
     * @return bool
     */
    public function isEnable(): bool
    {
        return true;
    }
}

如果需要禁止所有消费者启动 修改isEnable方法即可,如果需要单个消费者启动 重写isEnable方法即可

12.失败消息重新入队脚本

创建一个命令行

<?php
declare (strict_types=1);

namespace App\Command;

use App\Command\Service\DeadLetterMessageService;
use App\Common\Enum\AmqpEnum;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Di\Annotation\Inject;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;

/**
 * 失败消息重新入队
 * Class DeadLetterMessageCommand
 * @package App\Command
 */
#[Command(
    name: "dlxMessage:requeue",
    arguments:[
        ['vhost_config', InputArgument::REQUIRED, 'vhost配置名称:default|merchant'],
        ['message_ids', InputArgument::REQUIRED, '消息ID,多个用英文逗号隔开']
    ],
    options:[
        ['debug', null, InputOption::VALUE_OPTIONAL, '是否调试模式', '0']
    ],
    description:"失败消息重新入队")
]
class DeadLetterMessageCommand extends HyperfCommand
{
    #[Inject]
    protected LoggerInterface $logger;

    #[Inject]
    protected DeadLetterMessageService $service;

    public function handle()
    {
        try {
            $start = microtime(true);
            $vhost_config = $this->input->getArgument('vhost_config') ?: AmqpEnum::DEFAULT_VHOST_CONFIG;
            $message_ids = $this->input->getArgument('message_ids');
            $debug = $this->input->getOption('debug');

            $message_ids = !empty($message_ids) ? explode(',', $message_ids) : [];

            $this->service->requeueDlxMessages($vhost_config, $message_ids, $debug == 1);

            $duration = round(microtime(true) - $start, 2);
            $this->info("失败消息重新入队操作完成,耗时:{$duration}秒");
        } catch (\Throwable $e) {
            // 简单记录错误
            $this->logger->error('失败消息重新入队系统错误', [
                'error' => $e->getMessage(),
                'file' => $e->getFile(),
                'line' => $e->getLine()
            ]);
            $this->info('失败消息重新入队系统错误!' . var_export($e->getMessage(), true));
        }

    }
}

创建业务类

<?php

namespace App\Command\Service;

use App\Common\Enum\AmqpEnum;
use App\Common\Enum\CacheEnum;
use App\Common\Trait\AmqpMessageTrait;
use App\Exception\BusinessException;
use Hyperf\Amqp\Producer;
use Hyperf\Context\ApplicationContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Psr\Log\LoggerInterface;
use function Hyperf\Config\config;

class DeadLetterMessageService
{
    use AmqpMessageTrait;

    #[Inject]
    protected LoggerInterface $logger;

    #[Inject]
    protected Producer $producer;

    #[Inject]
    public Redis $redis;

    /**
     * 重新入队死信消息
     * @param string $vhost_config vhost配置名称
     * @param array $message_ids 消息ID
     * @param bool $debug 是否调试模式
     * @return void
     */
    public function requeueDlxMessages(string $vhost_config, array $message_ids, bool $debug): void
    {
        if (empty($message_ids) || empty($vhost_config)) {
            $this->logger->error('参数非法');
            throw new BusinessException('参数非法');
        }

        $repository = $this->getDlxMessageRepository($vhost_config);
        $list = $repository->getListByMessageIds($message_ids);
        $container = ApplicationContext::getContainer();

        if (empty($list)) {
            $this->logger->error('没有符合条件的死信消息');
            throw new BusinessException('没有符合条件的死信消息');
        }

        foreach ($list as $item) {
            try {
                $item_message_id = strval($item['message_id']);
                if (!$debug) {
                    // 删除防止重复消费键

                    $redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ":{$vhost_config}:" . config('app_env') . ":{$item_message_id}";
                    $this->redis->del($redis_key);


                    // 获取队列配置
                    $queue_config = config("amqp_queue.{$vhost_config}.queues." . $item['queue_config']);

                    $producer = $container->make($queue_config['producer'], [
                        $item['message_content'] ?: [],
                        $item_message_id
                    ]);

                    // 重新入队列
                    $this->producer->produce($producer);
                }

                // 更新死信消息为处理中 因为重新入队了
                $saveData = [
                    'message_id' => $item_message_id,
                    'queue_config' => $item['queue_config'],
                    'retry_count' => 0,
                    'last_retry_time' => date('Y-m-d H:i:s'),
                    'status' => AmqpEnum::STATUS_PADDING,
                    'error_msg' => ['error_msg' => '']
                ];
                !$debug && $this->saveDlxMessage($saveData, false);

            } catch (\Exception $e) {
                $this->logger->error("[死信消息重回队列异常]消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
                    "消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE) .
                    ",错误原因:" . var_export($e->getMessage(), true));
            }
        }
    }
}

其中用到了获取信息列表的方法 类似这样

/**
 * 根据消息ID获取列表
 * @param array $message_ids
 * @return array
 */
public function getListByMessageIds(array $message_ids): array
{
    $list = AmqpDeadLetterMessage::query()->whereIn('message_id', $message_ids)->get();
    if ($list->isEmpty()) {
        return [];
    }
    return $list->toArray();
}

运行php bin/hyperf.php dlxMessage:requeue default "1,2,3"就可以将message_id为1、2、3的记录重新入队了。

0

评论区