实现了消息消费失败入mysql数据库,提供了消息重新入队的command,有手动重试机制、防止消息重复消费逻辑。暂未实现延时队列
1. 安装扩展包
composer require hyperf/amqp
2. 生成配置文件
运行命令自动生成配置文件
php bin/hyperf.php vendor:publish hyperf/amqp
修改下配置项 /config/autoload/amqp.php
可在
producer或者consumer的__construct函数中,设置不同pool,例如下面的default和crm
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
use Hyperf\Amqp\IO\IOFactory;
use function Hyperf\Support\env;
// 默认配置 如果需要项目隔离vhost 赋值新数组并更改vhost参数项
$default_config = [
'host' => env('AMQP_HOST', 'localhost'),
'port' => (int) env('AMQP_PORT', 5672),
'user' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', 'center'),
'open_ssl' => false,
'concurrent' => [
// 单个消费者进程内的并发消费上限
'limit' => 2,
],
'pool' => [
// 连接池数量
'connections' => 2,
],
'io' => IOFactory::class,
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 10,
// 尽量保持是 heartbeat 数值的两倍
'read_write_timeout' => 60,
'context' => null,
// 网络层保活 与心跳互补 兜底检测
'keepalive' => true,
// 尽量保证每个消息的消费时间小于心跳时间
'heartbeat' => 30,
'channel_rpc_timeout' => 0.0,
// 是否在对象析构时自动关闭连接
'close_on_destruct' => false,
// 连接内可复用的空闲 Channel 数上限
'max_idle_channels' => 10,
'connection_name' => null,
],
];
// 如果需要隔离项目vhost
// 商家
$merchant_config = $default_config;
$merchant_config['vhost'] = env('MERCHANT_AMQP_VHOST', 'merchant');
return [
'enable' => true,
'default' => $default_config,
'merchant' => $merchant_config,
];
3. 配置.env文件
# RabbitMQ
AMQP_HOST=127.0.0.1
AMQP_PORT=5672
AMQP_USER=guest
AMQP_PASSWORD=guest
AMQP_VHOST=center
MERCHANT_AMQP_VHOST=merchant
4. 创建队列常量
<?php
declare (strict_types=1);
namespace App\Common\Enum;
class AmqpEnum
{
// AMQP重试次数
const AMQP_RETRY_COUNT = 3;
const STATUS_INIT = 0;
const STATUS_PADDING = 1;
const STATUS_SUCCESS = 2;
const STATUS_FAILED = 3;
const STATUS_DESC = [
self::STATUS_INIT => '待处理',
self::STATUS_PADDING => '处理中',
self::STATUS_SUCCESS => '处理成功',
self::STATUS_FAILED => '处理失败'
];
// 默认vhost配置项
const DEFAULT_VHOST_CONFIG = 'default';
// 商家vhost配置项
const MERCHANT_VHOST_CONFIG = 'merchant';
// 测试队列配置名称
const DEMO_QUEUE_NAME = 'demo';
// 商家端 测试队列配置名称
const M_DEMO_QUEUE_NAME = 'm_demo';
}
5.创建队列相关配置文件
创建配置文件
/config/autoload/amqp_queue.php
<?php
declare(strict_types=1);
use App\Amqp\Producer\Merchant\DyCallBackProducer;
use Hyperf\Amqp\Message\Type;
use function Hyperf\Support\env;
use App\Amqp\Producer\Demo\DemoProducer;
use App\Common\Enum\AmqpEnum;
$env = env('APP_ENV', 'dev');
return [
// 默认配置
AmqpEnum::DEFAULT_VHOST_CONFIG => [
'vhost' => env('AMQP_VHOST', 'center'),
'queues' => [
// 命名规则 队列名称.项目名称(默认center).环境
AmqpEnum::DEMO_QUEUE_NAME => [
'exchange' => "test.demo.center.{$env}",
'exchange_type' => Type::DIRECT->value, // 交换机类型 direct fanout topic headers
'queue' => "test.demo.queue.center.{$env}", // 队列名
'routing_key' => "test.demo.key.center.{$env}", // 路由键
'producer' => DemoProducer::class, // 生产者
]
]
],
// 商家配置
AmqpEnum::MERCHANT_VHOST_CONFIG => [
'vhost' => env('MERCHANT_AMQP_VHOST', 'merchant'),
'queues' => [
// 命名规则 队列名称.项目名称(merchant).环境
AmqpEnum::M_DEMO_QUEUE_NAME => [
'exchange' => "test.m_demo.merchant.{$env}",
'exchange_type' => Type::FANOUT->value, // 交换机类型 direct fanout topic headers
'queue' => "test.m_demo.queue.merchant.{$env}", // 队列名
'routing_key' => "test.m_demo.key.merchant.{$env}", // 路由键
'producer' => DemoProducer::class, // 生产者
]
]
],
];
6.创建缓存键常量
<?php
declare (strict_types=1);
namespace App\Common\Enum;
class CacheEnum
{
// 防止队列重复消费键
const CHECK_QUEUE_REPEAT_CONSUMING_KEY = 'check_queue_repeat_consuming';
}
7. 创建失败队列消息表
CREATE TABLE `ufo_m_amqp_dead_letter_message` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`message_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息ID',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '原始交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '原始路由键',
`queue_name` varchar(255) NOT NULL DEFAULT '' COMMENT '队列名称',
`queue_config` varchar(255) NOT NULL DEFAULT '' COMMENT '队列配置名(区分业务)',
`exchange_type` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机类型:direct fanout topic headers',
`message_content` json COMMENT '消息内容',
`retry_count` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
`last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',
`status` tinyint(4) unsigned NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',
`error_msg` json DEFAULT NULL COMMENT '错误信息',
`created_at` datetime DEFAULT NULL COMMENT '创建时间',
`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted_at` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idxes_queue_config_status` (`queue_config`,`status`),
KEY `idx_create_time` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Amqp失败队列消息表';
8. 创建AmqpMessageTrait特征
其中 用到了snowflake包(IdGeneratorInterface) 生成唯一消息ID
运行
composer require hyperf/snowflake安装扩展
<?php
declare (strict_types=1);
namespace App\Common\Trait;
use App\Common\Enum\AmqpEnum;
use App\Common\Enum\CacheEnum;
use App\Module\Merchant\Repository\AmqpDeadLetterMessageRepository as MerchantAmqpDeadLetterMessageRepository;
use Hyperf\Amqp\Constants;
use Hyperf\Amqp\Result;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Hyperf\Snowflake\IdGeneratorInterface;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use function Hyperf\Config\config;
trait AmqpMessageTrait
{
#[Inject]
public Redis $redis;
#[Inject]
public MerchantAmqpDeadLetterMessageRepository $merchantAmqpDeadLetterMessageRepository;
#[Inject]
public IdGeneratorInterface $idGenerator;
/**
* 生成队列配置
* @param string $pool_name 队列池名称
* @param string $queue_name 队列配置名称
* @param string $message_id 唯一消息ID
* @return array
*/
public function getConfig(string $pool_name, string $queue_name, string $message_id = ''): array
{
$poolName = $pool_name;
// 队列配置
$queue_config = config("amqp_queue.{$pool_name}.queues." . $queue_name);
$exchange = $queue_config['exchange'];
$type = $queue_config['exchange_type'];
$routingKey = $queue_config['routing_key'];
$queue = $queue_config['queue'];
$properties = [
'content_type' => 'text/plain',
'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'application_headers'=> new AMQPTable([
'retry' => '0', // 重试次数
'message_id' => strval($message_id ?: $this->idGenerator->generate()), // 消息唯一ID
'queue_config' => $queue_name,
'exchange_type' => (string) $type,
'queue_name' => $queue_config['queue'],
'vhost_config' => $pool_name
]),
];
return compact('poolName', 'exchange', 'type', 'routingKey', 'properties', 'queue');
}
/**
* 检查消息是否重复消费
* @param AMQPMessage $msg
* @return bool
* @throws \RedisException
*/
public function checkMessageRepeat(AMQPMessage $msg): bool
{
$msgData = $this->messageHandle($msg, '', false);
$message_id = $msgData['message_id'];
$vhost_config = $msgData['vhost_config'];
// 重试的话不检查重复消费
if (empty($message_id) || ($msgData['retry_count'] ?? 0) > 0) {
return true;
}
$redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ":{$vhost_config}:" . config('app_env') . ":{$message_id}";
// 一天内不能重复消费
return $this->redis->setnx($redis_key, $message_id) && $this->redis->expire($redis_key, 24 * 3600);
}
/**
* 获取队列消息详细信息
* @param AMQPMessage $msg
* @param string $error_msg
* @param bool $needGenerateId 不存在message_id时是否生成
* @return array
*/
public function messageHandle(AMQPMessage $msg, string $error_msg, bool $needGenerateId = true): array
{
$exchange = $msg->getExchange();
$routing_key = $msg->getRoutingKey();
$message_content = $msg->getBody();
//headersObject 是一个AMQPTable对象
$headersObject = $msg->get_properties()['application_headers'];
//调用getNativeData()得到一个数组
$headersArray = $headersObject->getNativeData();
if ($needGenerateId && !isset($headersArray['message_id'])) {
$headersArray['retry'] = 0;
$headersArray['message_id'] = strval($this->idGenerator->generate());
}
$message_id = $headersArray['message_id'];
$queue_name = $headersArray['queue_name'] ?? '';
$queue_config = $headersArray['queue_config'] ?? '';
$exchange_type = $headersArray['exchange_type'] ?? '';
$vhost_config = $headersArray['vhost_config'] ?? '';
$retry_count = ($headersArray['retry'] ?? 0);
$last_retry_time = date('Y-m-d H:i:s');
$error_msg = compact('error_msg');
return compact('exchange', 'routing_key', 'message_id', 'queue_name', 'error_msg', 'vhost_config',
'queue_config', 'exchange_type', 'retry_count', 'headersArray', 'message_content', 'last_retry_time');
}
/**
* 丢弃消息
* @param AMQPMessage $msg
* @param string $error_msg 错误信息
* @return Result
*/
public function rejectHandle(AMQPMessage $msg, string $error_msg): Result
{
$msgData = $this->messageHandle($msg, $error_msg);
$msgData['status'] = AmqpEnum::STATUS_FAILED;
// 记录失败日志
$this->saveDlxMessage($msgData);
return Result::DROP;
}
/**
* 确认应答
* @param AMQPMessage $msg
* @return Result
*/
public function ackHandle(AMQPMessage $msg): Result
{
$msgData = $this->messageHandle($msg, '');
$msgData['status'] = AmqpEnum::STATUS_SUCCESS;
// 更新失败日志
$this->saveDlxMessage($msgData, false);
return Result::ACK;
}
/**
* 手动重试
* @param AMQPMessage $msg
* @param string $error_msg 错误信息
* @param int $retry_num 重试次数
* @return Result
*/
public function retryHandle(AMQPMessage $msg, string $error_msg, int $retry_num = AmqpEnum::AMQP_RETRY_COUNT): Result
{
$msgData = $this->messageHandle($msg, $error_msg);
$exchange = $msgData['exchange'];
$routing_key = $msgData['routing_key'];
$channel = $msg->getChannel();
$body = $msgData['message_content'];
$headersArray = $msgData['headersArray'];
if ($headersArray['retry'] < $retry_num) {
$headersArray['retry']++;
// 重新入队列
$channel->basic_publish(
new AMQPMessage($body, [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'application_headers'=> new AMQPTable($headersArray)
]),
$exchange,
$routing_key
);
// 因为重新入队了 所以这条消息直接确认
$result = Result::ACK;
$msgData['status'] = AmqpEnum::STATUS_PADDING;
} else {
// 丢弃
$result = Result::DROP;
$msgData['status'] = AmqpEnum::STATUS_FAILED;
}
// 记录失败日志
$this->saveDlxMessage($msgData);
return $result;
}
/**
* 保存失败队列消息
* @param array $saveData
* @param bool $needInsert 记录不存在是否新增
* @return bool
*/
public function saveDlxMessage(array $saveData, bool $needInsert = true): bool
{
if (empty($saveData) || empty($saveData['message_id'])) {
return false;
}
$saveWhere = [
'message_id' => $saveData['message_id'],
'queue_config' => $saveData['queue_config'],
];
$saveData['update_time'] = date('Y-m-d H:i:s');
$saveData['error_msg'] = $saveData['error_msg'] ?? [];
$saveData['message_content'] = empty($saveData['message_content']) ? null : (is_string($saveData['message_content'])
? json_decode($saveData['message_content'], true) : $saveData['message_content']);
// 根据不同项目 写入不同表
$vhost_config = $saveData['vhost_config'] ?? AmqpEnum::DEFAULT_VHOST_CONFIG;
$repository = $this->getDlxMessageRepository($vhost_config);
return $repository->insertOrUpdate($saveWhere, $saveData, $needInsert);
}
/**
* 根据不同vhost配置 返回不同项目的失败消息表
* @param string $vhost_config
*/
public function getDlxMessageRepository(string $vhost_config)
{
switch ($vhost_config) {
case AmqpEnum::MERCHANT_VHOST_CONFIG:
// 商家端
$repository = $this->merchantAmqpDeadLetterMessageRepository;
break;
default:
$repository = $this->merchantAmqpDeadLetterMessageRepository;
}
return $repository;
}
}
$repository->insertOrUpdate()方法类似这样
/**
* 插入或更新
* @param array $where
* @param array $data
* @param bool $needInsert 记录不存在是否新增
* @return bool
*/
public function insertOrUpdate(array $where, array $data, bool $needInsert = true): bool
{
// 如果传入了 $where 条件,尝试查找记录
if (!empty($where)) {
$record = AmqpDeadLetterMessage::query()->where($where)->first();
if ($record) {
// 记录存在,执行更新
return $record->fill($data)->save();
}
}
// 记录不存在,执行新增
if ($needInsert) {
return (new AmqpDeadLetterMessage())->fill($data)->save();
}
return true;
}
模型需要定义以下属性
主要是message_content、error_msg两个json类型的字段 交给hyperf处理数组
protected array $fillable = ['id', 'message_id', 'exchange', 'routing_key', 'queue_name', 'queue_config', 'exchange_type', 'message_content', 'retry_count', 'last_retry_time', 'status', 'error_msg', 'created_at', 'updated_at', 'deleted_at'];
protected array $casts = ['id' => 'integer', 'retry_count' => 'integer', 'status' => 'integer', 'created_at' => 'datetime', 'updated_at' => 'datetime', 'message_content' => 'array', 'error_msg' => 'array'];
9. 创建生产者
运行php bin/hyperf.php gen:amqp-producer DemoProducer 下载官方demo
修改代码
<?php
declare(strict_types=1);
namespace App\Amqp\Producer\Demo;
use App\Common\Enum\AmqpEnum;
use App\Common\Trait\AmqpMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* 测试生产者
* Class DemoProducer
* @package App\Amqp\Producer\Demo
*/
class DemoProducer extends ProducerMessage
{
use AmqpMessageTrait;
public function __construct($data, string $message_id = '')
{
$this->setConfig($message_id);
// 需要发送的数据
$this->payload = $data;
}
/**
* 设置队列配置
* @param string $message_id
* @return void
*/
public function setConfig(string $message_id = ''): void
{
$params = $this->getConfig(AmqpEnum::DEFAULT_VHOST_CONFIG, AmqpEnum::DEMO_QUEUE_NAME, $message_id);
fillProperty($params, $this);
}
}
其中用到了填充属性的方法fillProperty
if (!function_exists('fillProperty')) {
/**
* 使用反射填充属性
* @param array $params
* @param object $instance
*/
function fillProperty(array $params, object $instance): void
{
$ref = new ReflectionObject($instance);
foreach ($params as $key => $value) {
if ($ref->hasProperty($key)) {
$prop = $ref->getProperty($key);
$prop->setAccessible(true); // 允许访问 protected/private
$prop->setValue($instance, $value);
}
}
}
}
10.创建一个控制器测试生产者
<?php
declare (strict_types=1);
namespace App\Controller;
use App\Amqp\Producer\Demo\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\PostMapping;
#[Controller(prefix: 'demo')]
class DemoController extends AbstractController
{
#[Inject]
protected Producer $producer;
/**
* demo 生产者
* @return array
*/
#[PostMapping(path: 'demoProduce')]
public function demoProduce()
{
$res = $this->producer->produce(new DemoProducer(['tenant_id' => 1, 'product_list' => [1,2]], 'test_message_id'));
return [
'message' => $res ? "发送成功" : '发送失败',
];
}
}
post方式调用127.0.0.1:9501/demo/demoProduce即可在rabbitmq后台看到exchange生成了,但队列没有生成(正常现象,设计理念就是生产者只负责消息的封装与发送,不会自动帮你创建队列,队列的声明和绑定动作需要在消费者里实现),消费者启动后队列就生成了(消费者跟随项目启动 无需手动启动)
11.创建消费者
运行
php bin/hyperf.php gen:amqp-consumer DemoConsumer下载官方消费者demo修改代码
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer\Demo;
use App\Amqp\Consumer\BasicConsumer;
use App\Common\Enum\AmqpEnum;
use App\Exception\BusinessException;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(name: "DemoConsumer", nums: 1)]
class DemoConsumer extends BasicConsumer
{
public function __construct()
{
$this->setConfig();
}
/**
* 设置队列配置
* @return void
*/
public function setConfig(): void
{
$params = $this->getConfig(AmqpEnum::DEFAULT_VHOST_CONFIG, AmqpEnum::DEMO_QUEUE_NAME);
fillProperty($params, $this);
}
public function consumeMessage($data, AMQPMessage $message): Result
{
$json_data = json_encode($data, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
try {
$this->logger->info('接收到消息:' . $json_data);
// 检查消息是否重复消费
if (!$this->checkMessageRepeat($message)) {
throw new BusinessException("消息重复消费,本次跳过");
}
// todo 业务处理
// 确认应答
return $this->ackHandle($message);
} catch (BusinessException|AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
if ($e instanceof BusinessException) {
$this->logger->error("[测试消费者]发生业务性异常 消息丢弃:" . $json_data . ",错误原因:" . var_export($e->getMessage(), true));
$error_msg = "[业务异常]错误原因:" . var_export($e->getMessage(), true);
// 丢弃消息
return $this->rejectHandle($message, $error_msg);
} else {
$this->logger->error("[测试消费者]发生致命异常 消息重新入队:" . $json_data . ",错误原因:" . var_export($e->getMessage(), true));
$error_msg = "[致命异常]错误原因:" . var_export($e->getMessage(), true);
// 手动重试机制
return $this->retryHandle($message, $error_msg);
}
}
}
}
其中用到BusinessException异常类 主要是为了区分 消息是丢弃还是重试
<?php
namespace App\Exception;
use Throwable;
class BusinessException extends \RuntimeException
{
protected $code = 400;
public function __construct($message = "", $code = 0, Throwable $previous = null)
{
$this->code = $code ?: $this->code;
parent::__construct($message, $this->code, $previous);
}
}
这里还修改了继承类为BasicConsumer
<?php
declare (strict_types=1);
namespace App\Amqp\Consumer;
use App\Common\Trait\AmqpMessageTrait;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Di\Annotation\Inject;
use Psr\Log\LoggerInterface;
class BasicConsumer extends ConsumerMessage
{
use AmqpMessageTrait;
#[Inject]
protected LoggerInterface $logger;
/**
* 判断是否允许自启动
* @return bool
*/
public function isEnable(): bool
{
return true;
}
}
如果需要禁止所有消费者启动 修改isEnable方法即可,如果需要单个消费者启动 重写isEnable方法即可
12.失败消息重新入队脚本
创建一个命令行
<?php
declare (strict_types=1);
namespace App\Command;
use App\Command\Service\DeadLetterMessageService;
use App\Common\Enum\AmqpEnum;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Di\Annotation\Inject;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
/**
* 失败消息重新入队
* Class DeadLetterMessageCommand
* @package App\Command
*/
#[Command(
name: "dlxMessage:requeue",
arguments:[
['vhost_config', InputArgument::REQUIRED, 'vhost配置名称:default|merchant'],
['message_ids', InputArgument::REQUIRED, '消息ID,多个用英文逗号隔开']
],
options:[
['debug', null, InputOption::VALUE_OPTIONAL, '是否调试模式', '0']
],
description:"失败消息重新入队")
]
class DeadLetterMessageCommand extends HyperfCommand
{
#[Inject]
protected LoggerInterface $logger;
#[Inject]
protected DeadLetterMessageService $service;
public function handle()
{
try {
$start = microtime(true);
$vhost_config = $this->input->getArgument('vhost_config') ?: AmqpEnum::DEFAULT_VHOST_CONFIG;
$message_ids = $this->input->getArgument('message_ids');
$debug = $this->input->getOption('debug');
$message_ids = !empty($message_ids) ? explode(',', $message_ids) : [];
$this->service->requeueDlxMessages($vhost_config, $message_ids, $debug == 1);
$duration = round(microtime(true) - $start, 2);
$this->info("失败消息重新入队操作完成,耗时:{$duration}秒");
} catch (\Throwable $e) {
// 简单记录错误
$this->logger->error('失败消息重新入队系统错误', [
'error' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]);
$this->info('失败消息重新入队系统错误!' . var_export($e->getMessage(), true));
}
}
}
创建业务类
<?php
namespace App\Command\Service;
use App\Common\Enum\AmqpEnum;
use App\Common\Enum\CacheEnum;
use App\Common\Trait\AmqpMessageTrait;
use App\Exception\BusinessException;
use Hyperf\Amqp\Producer;
use Hyperf\Context\ApplicationContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Psr\Log\LoggerInterface;
use function Hyperf\Config\config;
class DeadLetterMessageService
{
use AmqpMessageTrait;
#[Inject]
protected LoggerInterface $logger;
#[Inject]
protected Producer $producer;
#[Inject]
public Redis $redis;
/**
* 重新入队死信消息
* @param string $vhost_config vhost配置名称
* @param array $message_ids 消息ID
* @param bool $debug 是否调试模式
* @return void
*/
public function requeueDlxMessages(string $vhost_config, array $message_ids, bool $debug): void
{
if (empty($message_ids) || empty($vhost_config)) {
$this->logger->error('参数非法');
throw new BusinessException('参数非法');
}
$repository = $this->getDlxMessageRepository($vhost_config);
$list = $repository->getListByMessageIds($message_ids);
$container = ApplicationContext::getContainer();
if (empty($list)) {
$this->logger->error('没有符合条件的死信消息');
throw new BusinessException('没有符合条件的死信消息');
}
foreach ($list as $item) {
try {
$item_message_id = strval($item['message_id']);
if (!$debug) {
// 删除防止重复消费键
$redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ":{$vhost_config}:" . config('app_env') . ":{$item_message_id}";
$this->redis->del($redis_key);
// 获取队列配置
$queue_config = config("amqp_queue.{$vhost_config}.queues." . $item['queue_config']);
$producer = $container->make($queue_config['producer'], [
$item['message_content'] ?: [],
$item_message_id
]);
// 重新入队列
$this->producer->produce($producer);
}
// 更新死信消息为处理中 因为重新入队了
$saveData = [
'message_id' => $item_message_id,
'queue_config' => $item['queue_config'],
'retry_count' => 0,
'last_retry_time' => date('Y-m-d H:i:s'),
'status' => AmqpEnum::STATUS_PADDING,
'error_msg' => ['error_msg' => '']
];
!$debug && $this->saveDlxMessage($saveData, false);
} catch (\Exception $e) {
$this->logger->error("[死信消息重回队列异常]消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
"消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE) .
",错误原因:" . var_export($e->getMessage(), true));
}
}
}
}
其中用到了获取信息列表的方法 类似这样
/**
* 根据消息ID获取列表
* @param array $message_ids
* @return array
*/
public function getListByMessageIds(array $message_ids): array
{
$list = AmqpDeadLetterMessage::query()->whereIn('message_id', $message_ids)->get();
if ($list->isEmpty()) {
return [];
}
return $list->toArray();
}
运行php bin/hyperf.php dlxMessage:requeue default "1,2,3"就可以将message_id为1、2、3的记录重新入队了。
评论区