实现了消息消费失败入mysql数据库,提供了消息重新入队的command,有手动重试机制、防止消息重复消费逻辑。
暂未实现延时队列
1.进入到项目根目录安装rabbitmq包
composer require php-amqplib/php-amqplib
如需忽略版本安装 --ignore-platform-reqs,例如:
composer require --ignore-platform-reqs php-amqplib/php-amqplib
2.在.env配置文件加入以下配置
# Rabbitmq
RABBITMQ_HOST = 127.0.0.1
RABBITMQ_PORT = 5672
RABBITMQ_LOGIN = admin
RABBITMQ_PASSWORD = admin
RABBITMQ_VHOST = /
3. 在config文件夹下创建rabbitmq.php配置文件
<?php
$env = app('env')->get('app_env') ?? 'dev';
return [
// 默认配置项
'default' => [
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'login' => env('RABBITMQ_LOGIN', 'admin'),
'password' => env('RABBITMQ_PASSWORD', 'admin'),
'vhost' => env('RABBITMQ_VHOST', '/'),
],
'queues' => [
// 测试直连队列
'hello' => [
'exchange' => "test.hello.{$env}", // 交换机名称,不填表示使用默认的
'exchange_type' => 'direct', // 交换机类型 direct fanout topic headers
'queue' => "test.hello.queue.{$env}", //队列名
'routing_key' => "test.hello.key.{$env}", // 路由键
],
// 测试广播队列
'promoter_bind_result' => [
'exchange' => "promoter.bind.result.change.{$env}",
'exchange_type' => 'fanout',
'queue' => "promoter.bind.result.change.queue.{$env}",
'routing_key' => '',
]
],
];
4.创建RabbitMQ帮助类
<?php
declare (strict_types=1);
namespace core\Library;
use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Config;
use think\facade\Log;
/**
* Rabbitmq消息队列基类
* Class Rabbitmq
* @package core\Library
*/
class Rabbitmq
{
/**
* 连接
* @var AMQPStreamConnection
*/
private AMQPStreamConnection $connection;
/**
* 通道
* @var AbstractChannel|AMQPChannel
*/
private AbstractChannel|AMQPChannel $channel;
public function __construct()
{
$this->open();
}
public function __destruct()
{
$this->close();
}
/**
* 打开连接
* @param string $config_name
* @return void
* @throws Exception
*/
protected function open(string $config_name = 'default'): void
{
if (!empty($this->channel) && $this->channel->is_open()) {
return;
}
$config = Config::get("rabbitmq.{$config_name}");
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['login'],
$config['password'],
$config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
60.0,
60.0,
null,
false,
30
);
$this->channel = $this->connection->channel();
}
/**
* 生产者发布消息
* @param $message
* @param string $queue 队列配置名称
* @param string $message_id 指定消息ID
* @return void
* @throws Exception
*/
public function sendMessage($message, string $queue = 'hello', string $message_id = ''): void
{
$this->open();
$config = Config::get("rabbitmq.queues.{$queue}");
/**
* 创建队列(Queue)
* queue: hello // 队列名称
* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
* durable: true // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
* 设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
* exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
$this->channel->queue_declare(
$config['queue'],
false,
true,
false,
false
);
/**
* 创建交换机(Exchange)
* exchange: exchange// 交换机名称
* type: direct // 交换机类型,分别为direct/fanout/topic
* passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
* durable: false // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
*/
$this->channel->exchange_declare(
$config['exchange'],
$config['exchange_type'],
false,
true,
false
);
/**
* 绑定队列和交换机
* $queue 队列名称
* $exchange 交换器名称
* $routing_key 路由key
*/
$this->channel->queue_bind($config['queue'], $config['exchange'], $config['routing_key']);
$messageBody = is_array($message) ? json_encode($message, JSON_UNESCAPED_UNICODE) : $message;
/**
* 发送消息
* messageBody:消息体
* content_type:消息的类型 可以不指定
* application_headers:公共头
* delivery_mode:消息持久化最关键的参数
* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1; 不持久化
* AMQPMessage::DELIVERY_MODE_PERSISTENT = 2; 持久化
*/
$msg = new AMQPMessage($messageBody, [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'application_headers'=> new AMQPTable([
'retry' => 0, // 重试次数
'message_id' => $message_id ?: Snowflake::uuid(), // 消息唯一ID
'queue_config' => $queue,
'exchange_type' => $config['exchange_type'],
'queue_name' => $config['queue'],
]),
]);
/**
* 发送消息
* msg // AMQP消息内容
* exchange // 交换机名称
* routing key // 路由键名称
*/
$this->channel->basic_publish($msg, $config['exchange'], $config['routing_key']);
$this->close();
}
/**
* 消费者接收消息
* @param $callback
* @param string $queue
* @param bool $no_ack
* @return void
* @throws Exception
*/
public function receiveMessage($callback, string $queue = 'hello', bool $no_ack = false): void
{
$this->open();
$config = Config::get("rabbitmq.queues.{$queue}");
/**
* 设置消费者(Consumer)客户端同时只处理一条队列
* 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),
* 直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。
*/
$this->channel->basic_qos(0, 1, false);
/**
* 同样是创建路由和队列,以及绑定路由队列,注意要跟publisher的一致
* 这里其实可以不用,但是为了防止队列没有被创建所以做的容错处理
*/
$this->channel->queue_declare(
$config['queue'],
false,
true,
false,
false
);
$this->channel->exchange_declare(
$config['exchange'],
$config['exchange_type'],
false,
true,
false
);
$this->channel->queue_bind($config['queue'], $config['exchange'], $config['routing_key']);
/**
queue: 从哪里获取消息的队列
consumer_tag: 消费者标识符,用于区分多个客户端
no_local: 不接收此使用者发布的消息
no_ack: 设置为true,则使用者将使用自动确认模式。详情请参见.
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
exclusive:是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
nowait: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
callback: :回调逻辑处理函数,PHP回调 array($this, 'process_message') 调用本对象的process_message方法
*/
$this->channel->basic_consume($config['queue'], '', false, $no_ack, false, false, $callback);
register_shutdown_function(function () {
$this->close();
});
// 阻塞队列监听事件
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
$this->close();
}
/**
* 关闭连接
* @return void
*/
protected function close(): void
{
if (empty($this->channel)) {
return;
}
$this->channel->close();
try {
$this->connection->close();
} catch (\Throwable $e) {
throw new AMQPConnectionClosedException("关闭连接失败!");
}
}
}
其中生成消息唯一ID有用到一个生成uuid的函数,也可以自己生成唯一数,主要是为了防止消息重复消费
<?php
declare(strict_types=1);
namespace core\Library;
class Snowflake
{
// 开始时间戳(2023-01-01)
const EPOCH = 1672531200000;
// 机器ID所占位数
const WORKER_ID_BITS = 5;
// 数据中心ID所占位数
const DATACENTER_ID_BITS = 5;
// 序列号所占位数
const SEQUENCE_BITS = 12;
// 静态配置属性
private static $isConfigured = false;
private static $workerId = 1;
private static $datacenterId = 1;
private static $maxWorkerId;
private static $maxDatacenterId;
private static $workerIdShift;
private static $datacenterIdShift;
private static $timestampLeftShift;
private static $sequenceMask;
// 实例属性
private $sequence = 0;
private $lastTimestamp = -1;
/**
* 配置Snowflake参数
* @param int $workerId 工作机器ID (0-31)
* @param int $datacenterId 数据中心ID (0-31)
* @throws \InvalidArgumentException
*/
public static function config(int $workerId = 1, int $datacenterId = 1): void
{
self::$maxWorkerId = -1 ^ (-1 << self::WORKER_ID_BITS);
self::$maxDatacenterId = -1 ^ (-1 << self::DATACENTER_ID_BITS);
if ($workerId > self::$maxWorkerId || $workerId < 0) {
throw new \InvalidArgumentException("Worker ID must be between 0 and " . self::$maxWorkerId);
}
if ($datacenterId > self::$maxDatacenterId || $datacenterId < 0) {
throw new \InvalidArgumentException("Datacenter ID must be between 0 and " . self::$maxDatacenterId);
}
self::$workerId = $workerId;
self::$datacenterId = $datacenterId;
self::$workerIdShift = self::SEQUENCE_BITS;
self::$datacenterIdShift = self::SEQUENCE_BITS + self::WORKER_ID_BITS;
self::$timestampLeftShift = self::SEQUENCE_BITS + self::WORKER_ID_BITS + self::DATACENTER_ID_BITS;
self::$sequenceMask = -1 ^ (-1 << self::SEQUENCE_BITS);
self::$isConfigured = true;
}
/**
* 生成雪花算法ID
* @return string 64位整数字符串
* @throws \RuntimeException
*/
public function generate(): string
{
// 延迟初始化配置
if (!self::$isConfigured) {
self::config();
}
$timestamp = $this->milliseconds();
if ($timestamp < $this->lastTimestamp) {
$offset = $this->lastTimestamp - $timestamp;
if ($offset <= 5) { // 允许5ms的时钟回拨
usleep($offset * 1000);
$timestamp = $this->milliseconds();
} else {
throw new \RuntimeException("Clock moved backwards. Refusing to generate id for {$offset} milliseconds");
}
}
if ($this->lastTimestamp == $timestamp) {
$this->sequence = ($this->sequence + 1) & self::$sequenceMask;
if ($this->sequence == 0) {
$timestamp = $this->tilNextMillis($this->lastTimestamp);
}
} else {
$this->sequence = random_int(0, 9); // 引入随机数降低冲突概率
}
$this->lastTimestamp = $timestamp;
return (string)(
(($timestamp - self::EPOCH) << self::$timestampLeftShift) |
(self::$datacenterId << self::$datacenterIdShift) |
(self::$workerId << self::$workerIdShift) |
$this->sequence
);
}
/**
* 静态方式生成ID
* @return string
* @throws \RuntimeException
*/
public static function uuid(): string
{
return static::getInstance()->generate(); // 使用 static 代替 self
}
/**
* 获取当前毫秒时间戳
* @return int
*/
private function milliseconds(): int
{
return (int)(microtime(true) * 1000);
}
/**
* 等待下一毫秒
* @param int $lastTimestamp
* @return int
*/
private function tilNextMillis(int $lastTimestamp): int
{
$timestamp = $this->milliseconds();
while ($timestamp <= $lastTimestamp) {
$timestamp = $this->milliseconds();
}
return $timestamp;
}
}
5.队列常量定义
该常量为config/rabbitmq.php下queues配置的键名
<?php
declare (strict_types=1);
namespace app\common\enum;
/**
* 队列配置的键名
* Class AmqpQueueEnum
* @package app\common\enum
*/
class AmqpQueueEnum
{
const STATUS_INIT = 0;
const STATUS_PADDING = 1;
const STATUS_SUCCESS = 2;
const STATUS_FAILED = 3;
const STATUS_DESC = [
self::STATUS_INIT => '待处理',
self::STATUS_PADDING => '处理中',
self::STATUS_SUCCESS => '处理成功',
self::STATUS_FAILED => '处理失败'
];
// 测试队列
const TEST_QUEUE_NAME = 'hello';
// 测试广播队列
const PROMOTER_BIND_RESULT_QUEUE_NAME = 'promoter_bind_result';
}
6.生产者测试
command代码
<?php
declare (strict_types=1);
namespace app\command\definition\producer;
use app\common\enum\AmqpQueueEnum;
use core\Library\Rabbitmq;
use think\annotation\Inject;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
class TestProducerCommand extends Command
{
protected function configure()
{
$this->setName('producer:test_producer')
->setDescription('测试生产者')
->addArgument('task', null, '业务类型');
}
protected function execute(Input $input, Output $output)
{
$exitCode = 0;
try {
// 在执行前记录日志,第二个参数设置是否捕获输出
$this->beforeExecute($input, false); // 设置为 false 则不捕获输出
// 业务逻辑
$output->writeln("command is start...");
$task = $input->getArgument('task');
if (method_exists($this, $task)) {
$this->$task($output);
} else {
$output->writeln("Task '$task' not found.");
}
$output->writeln('successfully!');
} catch (\Throwable $e) {
dump($e);
$exitCode = $e->getCode() ?: 1;
$this->handleException($e);
if (Db::connect()->getPdo()->inTransaction()) {
Db::rollback();
}
} finally {
$errorMessage = $this->lastError ? $this->lastError['message'] : '';
if ($errorMessage) {
$output->writeln($errorMessage);
}
$this->afterExecute($output, $exitCode, $errorMessage);
}
return $exitCode;
}
/**
* @command php think producer:test_producer publishMessage
* @taskname 生产者测试
*/
protected function publishMessage(): void
{
try {
$rabbitmq = new Rabbitmq();
$rabbitmq->sendMessage(['a' => 1, 'b' => 2], AmqpQueueEnum::TEST_QUEUE_NAME);
} catch (\Exception $e) {
echo '发生错误:' . var_export($e->getMessage(), true) . PHP_EOL;
}
}
}
配置好console文件(第12点),运行命令
php think producer:test_producer publishMessage后可以在rabbitmq面板上看到有test.hello.queue.dev队列生成了
7. 消费者测试
以下代码仅测试消费者是否正常 不包含重试、失败消息入库逻辑
<?php
declare (strict_types=1);
namespace app\command\definition\consumer;
use app\common\enum\AmqpQueueEnum;
use core\Library\Rabbitmq;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
use think\facade\Log;
class TestConsumerCommand extends Command
{
protected function configure()
{
$this->setName('consumer:test_consumer')
->setDescription('测试消费者')
->addArgument('task', null, '业务类型');
}
protected function execute(Input $input, Output $output)
{
$exitCode = 0;
try {
// 在执行前记录日志,第二个参数设置是否捕获输出
$this->beforeExecute($input, false); // 设置为 false 则不捕获输出
// 业务逻辑
$output->writeln("command is start...");
$task = $input->getArgument('task');
if (method_exists($this, $task)) {
$this->$task($output);
} else {
$output->writeln("Task '$task' not found.");
}
$output->writeln('successfully!');
} catch (\Throwable $e) {
dump($e);
$exitCode = $e->getCode() ?: 1;
$this->handleException($e);
if (Db::connect()->getPdo()->inTransaction()) {
Db::rollback();
}
} finally {
$errorMessage = $this->lastError ? $this->lastError['message'] : '';
if ($errorMessage) {
$output->writeln($errorMessage);
}
$this->afterExecute($output, $exitCode, $errorMessage);
}
return $exitCode;
}
/**
* @command php think consumer:test_consumer consumeMessage
* @taskname 消费者测试
*/
protected function consumeMessage(): void
{
$callback = function ($msg) {
$this->callbackFunction($msg);
};
// 常驻内存 消费消息
while (true) {
try {
$rabbitmq = new Rabbitmq();
// 不可使用[$this, 'callbackFunction']方式,该方式会报错 只能使用闭包函数
$rabbitmq->receiveMessage($callback, AmqpQueueEnum::TEST_QUEUE_NAME);
} catch (\Exception $e) {
sleep(5);
Log::error("外部发生异常:" . var_export($e->getMessage(), true));
}
}
}
/**
* 处理消息
* @param AMQPMessage $msg
* @return void
*/
protected function callbackFunction(AMQPMessage $msg): void
{
try {
$body = json_decode($msg->getBody(), true);
Log::error("接收到消息:" . $msg->getBody());
$msg->ack();
} catch (AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
Log::error("发生异常:" . var_export($e->getMessage(), true));
$msg->reject(false);
}
}
}
8. 创建失败消息记录表
CREATE TABLE `tk_amqp_dead_letter_message` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`message_id` bigint unsigned NOT NULL DEFAULT '0' COMMENT '消息ID',
`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '原始交换机',
`routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '原始路由键',
`queue_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '队列名称',
`queue_config` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '队列配置名(区分业务)',
`exchange_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '交换机类型:direct fanout topic headers',
`message_content` json NOT NULL COMMENT '消息内容',
`retry_count` int unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
`last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',
`status` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',
`error_msg` json DEFAULT NULL COMMENT '错误信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`delete_time` datetime DEFAULT NULL COMMENT '删除时间',
PRIMARY KEY (`id`),
KEY `idxes_queue_config_status` (`queue_config`,`status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=147826 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Amqp失败队列消息表';
9.失败消息常量定义
<?php
declare (strict_types=1);
namespace app\common\enum;
class CacheEnum
{
// 防止队列重复消费键
const CHECK_QUEUE_REPEAT_CONSUMING_KEY = 'check_queue_repeat_consuming';
}
10. 生产可用消费者
command代码
<?php
declare (strict_types=1);
namespace app\command\definition\consumer;
use app\command\logic\ChangeNotifyLogic;
use app\common\enum\AmqpQueueEnum;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
class ChangeNotifyCommand extends Command
{
protected function configure()
{
$this->setName('consumer:change_notify')
->setDescription('消费者')
->addArgument('task', null, '业务类型');
}
protected function execute(Input $input, Output $output)
{
$exitCode = 0;
try {
// 在执行前记录日志,第二个参数设置是否捕获输出
$this->beforeExecute($input, false); // 设置为 false 则不捕获输出
// 业务逻辑
$output->writeln("command is start...");
$task = $input->getArgument('task');
if (method_exists($this, $task)) {
$this->$task($output);
} else {
$output->writeln("Task '$task' not found.");
}
$output->writeln('successfully!');
} catch (\Throwable $e) {
dump($e);
$exitCode = $e->getCode() ?: 1;
$this->handleException($e);
if (Db::connect()->getPdo()->inTransaction()) {
Db::rollback();
}
} finally {
$errorMessage = $this->lastError ? $this->lastError['message'] : '';
if ($errorMessage) {
$output->writeln($errorMessage);
}
$this->afterExecute($output, $exitCode, $errorMessage);
}
return $exitCode;
}
/**
* @command php think consumer:change_notify consumePromoterBindResult
* @taskname 消费者
*/
protected function consumePromoterBindResult(): void
{
// $msg = new AMQPMessage(json_encode(['MsgType' => 'event', 'Event' => 'promoter_bind_result', 'bind_status' => 1, 'sharer_appid' => '', 'sharer_openid' => '', 'CreateTime' => 1750876217]));
// ChangeNotifyLogic::getInstance()->promoterBindResultConsume($msg);
$option = ['promoterBindResultConsume', AmqpQueueEnum::PROMOTER_BIND_RESULT_QUEUE_NAME];
ChangeNotifyLogic::getInstance()->changeConsume(...$option);
}
}
业务层代码
<?php
namespace app\command\logic;
use app\business\ProductDetailBusiness;
use app\common\enum\AmqpQueueEnum;
use app\common\enum\ChangeNotifyCallbackEnum;
use app\common\enum\SphProductEnum;
use app\repository\SphProductDetailRepository;
use app\repository\SphProductRepository;
use app\repository\SphUserSharerRepository;
use app\service\TkBoostService;
use core\Exception\App\BusinessException;
use core\Library\Rabbitmq;
use core\Trait\AmqpDlxTrait;
use core\Trait\SingletonTrait;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;
class ChangeNotifyLogic
{
use SingletonTrait;
use AmqpDlxTrait;
/**
* 开始监听队列消息
* @param \Closure $callback 回调函数
* @param string $queue_name 配置键名
* @return void
*/
protected function startListen(\Closure $callback, string $queue_name): void
{
// 常驻内存 消费消息
while (true) {
try {
$rabbitmq = new Rabbitmq();
// 不可使用[$this, 'callbackFunction']方式,该方式会报错 只能使用闭包函数
$rabbitmq->receiveMessage($callback, $queue_name);
} catch (\Exception $e) {
sleep(5);
Log::error("[监听消息{$queue_name}]外部发生异常:" . var_export($e->getMessage(), true));
}
}
}
/**
* 变更通知事件队列消费
* @param string $callback_function_name 回调函数名称
* @param string $queue_name 队列配置名
* @return void
*/
public function changeConsume(string $callback_function_name, string $queue_name): void
{
$callback = function ($msg) use ($callback_function_name) {
$this->$callback_function_name($msg);
};
$this->startListen($callback, $queue_name);
}
/**
* 消费者
* @param AMQPMessage $msg
* @return void
*/
protected function promoterBindResultConsume(AMQPMessage $msg): void
{
try {
Log::info("接收到消息:" . $msg->getBody());
$body = json_decode($msg->getBody(), true);
if (empty($body)) {
throw new BusinessException("消息不合法");
}
// 检查消息是否重复消费
if (!$this->checkMessageRepeat($msg)) {
throw new BusinessException("[推客绑定机构的回调消费者]消息重复消费,本次跳过");
}
// todo 业务处理 ba la ba la
// 确认应答
$this->ackHandle($msg);
// 主动断开连接
Db::close();
// 适当延迟,避免接口限流
usleep(200000); // 200ms
} catch (BusinessException|AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
if ($e instanceof BusinessException) {
Log::error("发生业务性异常 消息丢弃:" . $msg->getBody() . ",错误原因:" . var_export($e->getMessage(), true));
$error_msg = "[业务异常]错误原因:" . var_export($e->getMessage(), true);
// 丢弃消息
$this->rejectHandle($msg, $error_msg);
} else {
Log::error("发生致命异常 消息重新入队:" . $msg->getBody() . ",错误原因:" . var_export($e->getMessage(), true));
$error_msg = "[致命异常]错误原因:" . var_export($e->getMessage(), true);
// 手动重试机制
$this->retryHandle($msg, $error_msg);
}
// 主动断开连接
Db::close();
// 适当延迟,避免接口限流
usleep(200000); // 200ms
}
}
}
其中用到单例模式的特征
<?php
declare (strict_types = 1);
namespace core\Trait;
trait SingletonTrait
{
private static $instance = null; // 使用 nullable 类型并初始化为 null
private function __construct() {}
private function __clone(): void {}
public function __wakeup(): void {
throw new \RuntimeException("Cannot unserialize singleton");
}
public static function getInstance(): static
{
if (self::$instance === null) {
self::$instance = new static(); // 使用 static 代替 self 以支持继承
}
return self::$instance;
}
}
失败消息处理特征
<?php
declare (strict_types=1);
namespace core\Trait;
use app\common\enum\AmqpQueueEnum;
use app\common\enum\CacheEnum;
use app\repository\AmqpDeadLetterMessageRepository;
use core\Library\CacheHelper;
use core\Library\Snowflake;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Cache;
trait AmqpDlxTrait
{
/**
* 手动重试
* @param AMQPMessage $msg
* @param string $error_msg 错误信息
* @param int $retry_num 重试次数
* @return bool
*/
public function retryHandle(AMQPMessage $msg, string $error_msg, int $retry_num = 3): bool
{
$msgData = $this->messageHandle($msg, $error_msg);
$exchange = $msgData['exchange'];
$routing_key = $msgData['routing_key'];
$channel = $msg->getChannel();
$body = $msgData['message_content'];
$headersArray = $msgData['headersArray'];
if ($headersArray['retry'] < $retry_num) {
$headersArray['retry']++;
// 重新入队列
$channel->basic_publish(
new AMQPMessage($body, [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time(),
'application_headers'=> new AMQPTable($headersArray)
]),
$exchange,
$routing_key
);
// 因为重新入队了 所以这条消息直接确认
$msg->ack();
$msgData['status'] = AmqpQueueEnum::STATUS_PADDING;
} else {
// 丢弃
$msg->reject(false);
$msgData['status'] = AmqpQueueEnum::STATUS_FAILED;
}
// 记录失败日志
return $this->saveDlxMessage($msgData);
}
/**
* 丢弃消息
* @param AMQPMessage $msg
* @param string $error_msg 错误信息
* @return bool
*/
public function rejectHandle(AMQPMessage $msg, string $error_msg): bool
{
$msgData = $this->messageHandle($msg, $error_msg);
$msgData['status'] = AmqpQueueEnum::STATUS_FAILED;
// 丢弃消息
$msg->reject(false);
// 记录失败日志
return $this->saveDlxMessage($msgData);
}
/**
* 确认应答
* @param AMQPMessage $msg
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function ackHandle(AMQPMessage $msg): bool
{
$msgData = $this->messageHandle($msg, '');
$msgData['status'] = AmqpQueueEnum::STATUS_SUCCESS;
// 确认消息
$msg->ack();
// 更新失败日志
return $this->saveDlxMessage($msgData, false);
}
/**
* 获取队列消息详细信息
* @param AMQPMessage $msg
* @param string $error_msg
* @param bool $needGenerateId 不存在message_id时是否生成
* @return array
*/
public function messageHandle(AMQPMessage $msg, string $error_msg, bool $needGenerateId = true): array
{
$exchange = $msg->getExchange();
$routing_key = $msg->getRoutingKey();
$message_content = $msg->getBody();
//headersObject 是一个AMQPTable对象
$headersObject = $msg->get_properties()['application_headers'];
//调用getNativeData()得到一个数组
$headersArray = $headersObject->getNativeData();
if ($needGenerateId && !isset($headersArray['message_id'])) {
$headersArray['retry'] = 0;
$headersArray['message_id'] = Snowflake::uuid();
}
$message_id = $headersArray['message_id'];
$queue_name = $headersArray['queue_name'] ?? '';
$queue_config = $headersArray['queue_config'] ?? '';
$exchange_type = $headersArray['exchange_type'] ?? '';
$retry_count = ($headersArray['retry'] ?? 0);
$last_retry_time = date('Y-m-d H:i:s');
$error_msg = compact('error_msg');
return compact('exchange', 'routing_key', 'message_id', 'queue_name', 'error_msg',
'queue_config', 'exchange_type', 'retry_count', 'headersArray', 'message_content', 'last_retry_time');
}
/**
* 检查消息是否重复消费
* @return bool
*/
public function checkMessageRepeat(AMQPMessage $msg): bool
{
$msgData = $this->messageHandle($msg, '', false);
$message_id = intval($msgData['message_id']);
// 重试的话不检查重复消费
if (empty($message_id) || ($msgData['retry_count'] ?? 0) > 0) {
return true;
}
$redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ':' . app('env')->get('app_env') . ":{$message_id}";
$redis = Cache::store('redis')->handler();
// 一天内不能重复消费
return $redis->setnx($redis_key, $message_id) && $redis->expire($redis_key, 24 * 3600);
}
/**
* 保存失败队列消息
* @param array $saveData
* @param bool $needInsert 记录不存在是否新增
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function saveDlxMessage(array $saveData, bool $needInsert = true): bool
{
if (empty($saveData) || empty($saveData['message_id'])) {
return false;
}
$saveWhere = [
'message_id' => $saveData['message_id'],
'queue_config' => $saveData['queue_config'],
];
$saveData['update_time'] = date('Y-m-d H:i:s');
// 这里可以修改成自己的插入或更新tk_amqp_dead_letter_message表的方法
return AmqpDeadLetterMessageRepository::getInstance()->insertOrUpdate($saveWhere, $saveData, $needInsert);
}
}
插入或更新方法类似下面这样
public function insertOrUpdate(array $where, array $data, bool $needInsert = true): bool
{
$this->model = $this->model->newInstance();
// 如果传入了 $where 条件,尝试查找记录
if (!empty($where)) {
$record = $this->model->where($where)->find();
// 记录存在,执行更新
return $record && $record->save($data);
}
// 记录不存在,执行新增
if ($needInsert) {
return $this->model->save($data);
}
return true;
}
11.失败消息重新入队命令
command代码
<?php
declare (strict_types=1);
namespace app\command\definition\producer;
use app\command\logic\DeadLetterMessageLogic;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Db;
class DeadLetterMessageCommand extends Command
{
// 消息唯一ID列表
protected string $message_ids = '';
// 是否调试模式 0否 1是
protected string $debug = '0';
protected function configure()
{
$this->setName('producer:dead_letter_message')
->setDescription('失败消息生产者')
->addOption('debug', null, Option::VALUE_OPTIONAL, '是否调试:1是 0否')
->addOption('message_ids', null, Option::VALUE_OPTIONAL, '消息唯一ID列表')
->addArgument('task', null, '业务类型');
}
protected function execute(Input $input, Output $output)
{
$exitCode = 0;
try {
// 在执行前记录日志,第二个参数设置是否捕获输出
$this->beforeExecute($input, false); // 设置为 false 则不捕获输出
// 业务逻辑
$output->writeln("command is start...");
$task = $input->getArgument('task');
$this->debug = $input->getOption('debug') ?? '0';
$this->message_ids = $input->getOption('message_ids') ?? '';
if (method_exists($this, $task)) {
$this->$task($output);
} else {
$output->writeln("Task '$task' not found.");
}
$output->writeln('successfully!');
} catch (\Throwable $e) {
dump($e);
$exitCode = $e->getCode() ?: 1;
$this->handleException($e);
if (Db::connect()->getPdo()->inTransaction()) {
Db::rollback();
}
} finally {
$errorMessage = $this->lastError ? $this->lastError['message'] : '';
if ($errorMessage) {
$output->writeln($errorMessage);
}
$this->afterExecute($output, $exitCode, $errorMessage);
}
return $exitCode;
}
/**
* @command php think producer:dead_letter_message dlxMessageRequeue --message_ids="1,2"
* @taskname 失败消息重新入队生产者
*/
protected function dlxMessageRequeue(): void
{
$service = DeadLetterMessageLogic::getInstance();
// 处理参数
$service->message_ids = !empty($this->message_ids) ? explode(',', $this->message_ids) : [];
$service->debug = $this->debug == 1;
$service->requeueDlxMessages();
}
}
业务层代码
<?php
declare (strict_types=1);
namespace app\command\logic;
use app\common\enum\AmqpQueueEnum;
use app\common\enum\CacheEnum;
use app\repository\AmqpDeadLetterMessageRepository;
use core\Library\CacheHelper;
use core\Library\Rabbitmq;
use core\Trait\AmqpDlxTrait;
use core\Trait\SingletonTrait;
use think\facade\Cache;
use think\facade\Log;
class DeadLetterMessageLogic
{
use SingletonTrait;
use AmqpDlxTrait;
// 唯一消息ID列表
public array $message_ids = [];
// 是否调试模式
public bool $debug = false;
/**
* 重新入队死信消息
* @return void
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function requeueDlxMessages(): void
{
try {
if (empty($this->message_ids)) {
$this->debugLog('参数为空', 'error');
return;
}
$repository = AmqpDeadLetterMessageRepository::getInstance();
$list_collect = $repository->getModel()
->field(['id', 'message_id', 'exchange', 'routing_key', 'queue_name', 'queue_config', 'exchange_type', 'message_content'])
->whereIn('message_id', $this->message_ids)
->select();
if ($list_collect->isEmpty()) {
$this->debugLog('没有符合条件的死信消息', 'error');
return;
}
$list = $list_collect->toArray();
$this->debugLog('开始处理死信消息:' . count($list));
$redis = Cache::store('redis')->handler();
foreach ($list as $item) {
try {
$this->debugLog("[死信消息重回队列]正在处理消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
"消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE));
$item_message_id = strval($item['message_id']);
if (!$this->debug) {
// 删除防止重复消费键
$redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ':' . app('env')->get('app_env') . ":{$item_message_id}";
$redis->del($redis_key);
// 重新入队列
$rabbitmq = new Rabbitmq();
$rabbitmq->sendMessage($item['message_content'], $item['queue_config'], $item_message_id);
}
// 更新死信消息为处理中 因为重新入队了
$saveData = [
'message_id' => $item_message_id,
'queue_config' => $item['queue_config'],
'retry_count' => 0,
'last_retry_time' => date('Y-m-d H:i:s'),
'status' => AmqpQueueEnum::STATUS_PADDING,
'error_msg' => ['error_msg' => '']
];
!$this->debug && $this->saveDlxMessage($saveData, false);
} catch (\Exception $e) {
$this->debugLog("[死信消息重回队列异常]消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
"消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE) .
",错误原因:" . var_export($e->getMessage(), true), 'error');
}
}
} catch (\Exception $e) {
$this->debugLog("[死信消息重回队列致命错误]" . var_export($e->getMessage(), true), 'error');
}
}
/**
* 输出信息 并记录日志
* @param string $msg
* @return void
*/
protected function debugLog(string $msg, $func = 'info'): void
{
if (!$this->debug && $func != 'error') {
return;
}
echo $msg . PHP_EOL;
Log::$func($msg);
}
}
如果有某个失败消息 希望重新入队消费 则执行php think producer:dead_letter_message dlxMessageRequeue --message_ids="1,2"即可重新入队。
12. 配置命令行
配置好config/console.php即可运行以上命令
<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
// 指令定义
'commands' => [
//消费者
'consumer:test_consumer' => \app\command\definition\consumer\TestConsumerCommand::class,
'consumer:change_notify' => \app\command\definition\consumer\ChangeNotifyCommand::class,
// 生产者
'producer:test_producer' => \app\command\definition\producer\TestProducerCommand::class,
'producer:dead_letter_message' => \app\command\definition\producer\DeadLetterMessageCommand::class,
],
];
评论区