目 录CONTENT

文章目录

🍶ThinkPHP8中使用RabbitMQ

柯基
2025-11-10 / 0 评论 / 0 点赞 / 14 阅读 / 6,607 字

实现了消息消费失败入mysql数据库,提供了消息重新入队的command,有手动重试机制、防止消息重复消费逻辑。

暂未实现延时队列

1.进入到项目根目录安装rabbitmq包

composer require php-amqplib/php-amqplib

如需忽略版本安装 --ignore-platform-reqs,例如:composer require --ignore-platform-reqs php-amqplib/php-amqplib

2.在.env配置文件加入以下配置

# Rabbitmq
RABBITMQ_HOST = 127.0.0.1
RABBITMQ_PORT = 5672
RABBITMQ_LOGIN = admin
RABBITMQ_PASSWORD = admin
RABBITMQ_VHOST = /

3. 在config文件夹下创建rabbitmq.php配置文件

<?php
$env = app('env')->get('app_env') ?? 'dev';
return [
    // 默认配置项
    'default' => [
        'host' => env('RABBITMQ_HOST', '127.0.0.1'),
        'port' => env('RABBITMQ_PORT', 5672),
        'login' => env('RABBITMQ_LOGIN', 'admin'),
        'password' => env('RABBITMQ_PASSWORD', 'admin'),
        'vhost' => env('RABBITMQ_VHOST', '/'),
    ],
    'queues' => [
        // 测试直连队列
        'hello' => [
            'exchange' => "test.hello.{$env}", // 交换机名称,不填表示使用默认的
            'exchange_type' => 'direct', // 交换机类型 direct fanout topic headers
            'queue' => "test.hello.queue.{$env}", //队列名
            'routing_key' => "test.hello.key.{$env}", // 路由键
        ],
        // 测试广播队列
        'promoter_bind_result' => [
            'exchange' => "promoter.bind.result.change.{$env}",
            'exchange_type' => 'fanout',
            'queue' => "promoter.bind.result.change.queue.{$env}",
            'routing_key' => '',
        ]
    ],
];

4.创建RabbitMQ帮助类

<?php
declare (strict_types=1);

namespace core\Library;

use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Config;
use think\facade\Log;

/**
 * Rabbitmq消息队列基类
 * Class Rabbitmq
 * @package core\Library
 */
class Rabbitmq
{
    /**
     * 连接
     * @var AMQPStreamConnection
     */
    private AMQPStreamConnection $connection;

    /**
     * 通道
     * @var AbstractChannel|AMQPChannel
     */
    private AbstractChannel|AMQPChannel $channel;

    public function __construct()
    {
        $this->open();
    }

    public function __destruct()
    {
        $this->close();
    }

    /**
     * 打开连接
     * @param string $config_name
     * @return void
     * @throws Exception
     */
    protected function open(string $config_name = 'default'): void
    {
        if (!empty($this->channel) && $this->channel->is_open()) {
            return;
        }
        $config = Config::get("rabbitmq.{$config_name}");

        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['login'],
            $config['password'],
            $config['vhost'],
            false,
            'AMQPLAIN',
            null,
            'en_US',
            60.0,
            60.0,
            null,
            false,
            30
        );

        $this->channel = $this->connection->channel();

    }

    /**
     * 生产者发布消息
     * @param $message
     * @param string $queue 队列配置名称
     * @param string $message_id 指定消息ID
     * @return void
     * @throws Exception
     */
    public function sendMessage($message, string $queue = 'hello', string $message_id = ''): void
    {
        $this->open();

        $config = Config::get("rabbitmq.queues.{$queue}");

        /**
         * 创建队列(Queue)
         * queue: hello         // 队列名称
         * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
         * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
         *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
         * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
         * auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
         */
        $this->channel->queue_declare(
            $config['queue'],
            false,
            true,
            false,
            false
        );

        /**
         * 创建交换机(Exchange)
         * exchange: exchange// 交换机名称
         * type: direct        // 交换机类型,分别为direct/fanout/topic
         * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
         * durable: false      // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
         */
        $this->channel->exchange_declare(
            $config['exchange'],
            $config['exchange_type'],
            false,
            true,
            false
        );

        /**
         * 绑定队列和交换机
         *  $queue 队列名称
         *  $exchange  交换器名称
         *  $routing_key   路由key
         */
        $this->channel->queue_bind($config['queue'], $config['exchange'], $config['routing_key']);

        $messageBody = is_array($message) ? json_encode($message, JSON_UNESCAPED_UNICODE) : $message;
        /**
         * 发送消息
         * messageBody:消息体
         * content_type:消息的类型 可以不指定
         * application_headers:公共头
         * delivery_mode:消息持久化最关键的参数
         * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1; 不持久化
         * AMQPMessage::DELIVERY_MODE_PERSISTENT = 2; 持久化
         */
        $msg = new AMQPMessage($messageBody, [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'timestamp' => time(),
            'application_headers'=> new AMQPTable([
                'retry' => 0, // 重试次数
                'message_id' => $message_id ?: Snowflake::uuid(), // 消息唯一ID
                'queue_config' => $queue,
                'exchange_type' => $config['exchange_type'],
                'queue_name' => $config['queue'],
            ]),
        ]);

        /**
         * 发送消息
         * msg       // AMQP消息内容
         * exchange  // 交换机名称
         * routing key     // 路由键名称
         */
        $this->channel->basic_publish($msg, $config['exchange'], $config['routing_key']);

        $this->close();
    }

    /**
     * 消费者接收消息
     * @param $callback
     * @param string $queue
     * @param bool $no_ack
     * @return void
     * @throws Exception
     */
    public function receiveMessage($callback, string $queue = 'hello', bool $no_ack = false): void
    {
        $this->open();

        $config = Config::get("rabbitmq.queues.{$queue}");

        /**
         * 设置消费者(Consumer)客户端同时只处理一条队列
         * 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),
         * 直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。
         */
        $this->channel->basic_qos(0, 1, false);

        /**
         * 同样是创建路由和队列,以及绑定路由队列,注意要跟publisher的一致
         * 这里其实可以不用,但是为了防止队列没有被创建所以做的容错处理
         */
        $this->channel->queue_declare(
            $config['queue'],
            false,
            true,
            false,
            false
        );
        $this->channel->exchange_declare(
            $config['exchange'],
            $config['exchange_type'],
            false,
            true,
            false
        );
        $this->channel->queue_bind($config['queue'], $config['exchange'], $config['routing_key']);

        /**
           queue: 从哪里获取消息的队列
           consumer_tag: 消费者标识符,用于区分多个客户端
           no_local: 不接收此使用者发布的消息
           no_ack: 设置为true,则使用者将使用自动确认模式。详情请参见.
                       自动ACK:消息一旦被接收,消费者自动发送ACK
                       手动ACK:消息接收后,不会发送ACK,需要手动调用
           exclusive:是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
           nowait: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
           callback: :回调逻辑处理函数,PHP回调 array($this, 'process_message') 调用本对象的process_message方法
       */

        $this->channel->basic_consume($config['queue'], '', false, $no_ack, false, false, $callback);

        register_shutdown_function(function () {
            $this->close();
        });

        // 阻塞队列监听事件
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }

        $this->close();
    }



    /**
     * 关闭连接
     * @return void
     */
    protected function close(): void
    {
        if (empty($this->channel)) {
            return;
        }
        $this->channel->close();
        try {
            $this->connection->close();
        } catch (\Throwable $e) {
            throw new AMQPConnectionClosedException("关闭连接失败!");
        }
    }

}

其中生成消息唯一ID有用到一个生成uuid的函数,也可以自己生成唯一数,主要是为了防止消息重复消费

<?php
declare(strict_types=1);

namespace core\Library;


class Snowflake
{

    // 开始时间戳(2023-01-01)
    const EPOCH = 1672531200000;

    // 机器ID所占位数
    const WORKER_ID_BITS = 5;

    // 数据中心ID所占位数
    const DATACENTER_ID_BITS = 5;

    // 序列号所占位数
    const SEQUENCE_BITS = 12;

    // 静态配置属性
    private static $isConfigured = false;
    private static $workerId = 1;
    private static $datacenterId = 1;
    private static $maxWorkerId;
    private static $maxDatacenterId;
    private static $workerIdShift;
    private static $datacenterIdShift;
    private static $timestampLeftShift;
    private static $sequenceMask;

    // 实例属性
    private $sequence = 0;
    private $lastTimestamp = -1;

    /**
     * 配置Snowflake参数
     * @param int $workerId 工作机器ID (0-31)
     * @param int $datacenterId 数据中心ID (0-31)
     * @throws \InvalidArgumentException
     */
    public static function config(int $workerId = 1, int $datacenterId = 1): void
    {
        self::$maxWorkerId = -1 ^ (-1 << self::WORKER_ID_BITS);
        self::$maxDatacenterId = -1 ^ (-1 << self::DATACENTER_ID_BITS);

        if ($workerId > self::$maxWorkerId || $workerId < 0) {
            throw new \InvalidArgumentException("Worker ID must be between 0 and " . self::$maxWorkerId);
        }

        if ($datacenterId > self::$maxDatacenterId || $datacenterId < 0) {
            throw new \InvalidArgumentException("Datacenter ID must be between 0 and " . self::$maxDatacenterId);
        }

        self::$workerId = $workerId;
        self::$datacenterId = $datacenterId;
        self::$workerIdShift = self::SEQUENCE_BITS;
        self::$datacenterIdShift = self::SEQUENCE_BITS + self::WORKER_ID_BITS;
        self::$timestampLeftShift = self::SEQUENCE_BITS + self::WORKER_ID_BITS + self::DATACENTER_ID_BITS;
        self::$sequenceMask = -1 ^ (-1 << self::SEQUENCE_BITS);

        self::$isConfigured = true;
    }

    /**
     * 生成雪花算法ID
     * @return string 64位整数字符串
     * @throws \RuntimeException
     */
    public function generate(): string
    {
        // 延迟初始化配置
        if (!self::$isConfigured) {
            self::config();
        }

        $timestamp = $this->milliseconds();

        if ($timestamp < $this->lastTimestamp) {
            $offset = $this->lastTimestamp - $timestamp;
            if ($offset <= 5) { // 允许5ms的时钟回拨
                usleep($offset * 1000);
                $timestamp = $this->milliseconds();
            } else {
                throw new \RuntimeException("Clock moved backwards. Refusing to generate id for {$offset} milliseconds");
            }
        }

        if ($this->lastTimestamp == $timestamp) {
            $this->sequence = ($this->sequence + 1) & self::$sequenceMask;
            if ($this->sequence == 0) {
                $timestamp = $this->tilNextMillis($this->lastTimestamp);
            }
        } else {
            $this->sequence = random_int(0, 9); // 引入随机数降低冲突概率
        }

        $this->lastTimestamp = $timestamp;

        return (string)(
            (($timestamp - self::EPOCH) << self::$timestampLeftShift) |
            (self::$datacenterId << self::$datacenterIdShift) |
            (self::$workerId << self::$workerIdShift) |
            $this->sequence
        );
    }

    /**
     * 静态方式生成ID
     * @return string
     * @throws \RuntimeException
     */
    public static function uuid(): string
    {
        return static::getInstance()->generate();  // 使用 static 代替 self
    }

    /**
     * 获取当前毫秒时间戳
     * @return int
     */
    private function milliseconds(): int
    {
        return (int)(microtime(true) * 1000);
    }

    /**
     * 等待下一毫秒
     * @param int $lastTimestamp
     * @return int
     */
    private function tilNextMillis(int $lastTimestamp): int
    {
        $timestamp = $this->milliseconds();
        while ($timestamp <= $lastTimestamp) {
            $timestamp = $this->milliseconds();
        }
        return $timestamp;
    }
}

5.队列常量定义

该常量为config/rabbitmq.php下queues配置的键名

<?php
declare (strict_types=1);

namespace app\common\enum;

/**
 * 队列配置的键名
 * Class AmqpQueueEnum
 * @package app\common\enum
 */
class AmqpQueueEnum
{
    const STATUS_INIT = 0;
    const STATUS_PADDING = 1;
    const STATUS_SUCCESS = 2;
    const STATUS_FAILED = 3;
    const STATUS_DESC = [
        self::STATUS_INIT => '待处理',
        self::STATUS_PADDING => '处理中',
        self::STATUS_SUCCESS => '处理成功',
        self::STATUS_FAILED => '处理失败'
    ];

    // 测试队列
    const TEST_QUEUE_NAME = 'hello';
    // 测试广播队列
    const PROMOTER_BIND_RESULT_QUEUE_NAME = 'promoter_bind_result';

}

6.生产者测试

command代码

<?php
declare (strict_types=1);

namespace app\command\definition\producer;

use app\common\enum\AmqpQueueEnum;
use core\Library\Rabbitmq;
use think\annotation\Inject;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;

class TestProducerCommand extends Command
{

    protected function configure()
    {
        $this->setName('producer:test_producer')
            ->setDescription('测试生产者')
            ->addArgument('task', null,  '业务类型');

    }

    protected function execute(Input $input, Output $output)
    {
        $exitCode = 0;

        try {
            // 在执行前记录日志,第二个参数设置是否捕获输出
            $this->beforeExecute($input, false); // 设置为 false 则不捕获输出

            // 业务逻辑
            $output->writeln("command is start...");
            $task = $input->getArgument('task');

            if (method_exists($this, $task)) {
                $this->$task($output);
            } else {
                $output->writeln("Task '$task' not found.");
            }
            $output->writeln('successfully!');
        } catch (\Throwable $e) {
            dump($e);
            $exitCode = $e->getCode() ?: 1;
            $this->handleException($e);
            if (Db::connect()->getPdo()->inTransaction()) {
                Db::rollback();
            }
        } finally {
            $errorMessage = $this->lastError ? $this->lastError['message'] : '';
            if ($errorMessage) {
                $output->writeln($errorMessage);
            }
            $this->afterExecute($output, $exitCode, $errorMessage);
        }

        return $exitCode;
    }

    /**
     * @command php think producer:test_producer publishMessage
     * @taskname 生产者测试
     */
    protected function publishMessage(): void
    {
        try {
            $rabbitmq = new Rabbitmq();
            $rabbitmq->sendMessage(['a' => 1, 'b' => 2], AmqpQueueEnum::TEST_QUEUE_NAME);
        } catch (\Exception $e) {
            echo '发生错误:' . var_export($e->getMessage(), true) . PHP_EOL;
        }

    }
}

配置好console文件(第12点),运行命令php think producer:test_producer publishMessage后可以在rabbitmq面板上看到有test.hello.queue.dev队列生成了

7. 消费者测试

以下代码仅测试消费者是否正常 不包含重试、失败消息入库逻辑

<?php
declare (strict_types=1);

namespace app\command\definition\consumer;

use app\common\enum\AmqpQueueEnum;
use core\Library\Rabbitmq;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
use think\facade\Log;

class TestConsumerCommand extends Command
{

    protected function configure()
    {
        $this->setName('consumer:test_consumer')
            ->setDescription('测试消费者')
            ->addArgument('task', null,  '业务类型');

    }

    protected function execute(Input $input, Output $output)
    {
        $exitCode = 0;

        try {
            // 在执行前记录日志,第二个参数设置是否捕获输出
            $this->beforeExecute($input, false); // 设置为 false 则不捕获输出

            // 业务逻辑
            $output->writeln("command is start...");
            $task = $input->getArgument('task');

            if (method_exists($this, $task)) {
                $this->$task($output);
            } else {
                $output->writeln("Task '$task' not found.");
            }
            $output->writeln('successfully!');
        } catch (\Throwable $e) {
            dump($e);
            $exitCode = $e->getCode() ?: 1;
            $this->handleException($e);
            if (Db::connect()->getPdo()->inTransaction()) {
                Db::rollback();
            }
        } finally {
            $errorMessage = $this->lastError ? $this->lastError['message'] : '';
            if ($errorMessage) {
                $output->writeln($errorMessage);
            }
            $this->afterExecute($output, $exitCode, $errorMessage);
        }

        return $exitCode;
    }


    /**
     * @command php think consumer:test_consumer consumeMessage
     * @taskname 消费者测试
     */
    protected function consumeMessage(): void
    {
        $callback = function ($msg) {
            $this->callbackFunction($msg);
        };

        // 常驻内存 消费消息
        while (true) {
            try {
                $rabbitmq = new Rabbitmq();
                // 不可使用[$this, 'callbackFunction']方式,该方式会报错 只能使用闭包函数
                $rabbitmq->receiveMessage($callback, AmqpQueueEnum::TEST_QUEUE_NAME);

            } catch (\Exception $e) {
                sleep(5);
                Log::error("外部发生异常:" . var_export($e->getMessage(), true));
            }
        }
    }


    /**
     * 处理消息
     * @param AMQPMessage $msg
     * @return void
     */
    protected function callbackFunction(AMQPMessage $msg): void
    {
        try {
            $body = json_decode($msg->getBody(), true);
            Log::error("接收到消息:" . $msg->getBody());
            $msg->ack();
        } catch (AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
            Log::error("发生异常:" . var_export($e->getMessage(), true));
            $msg->reject(false);
        }
    }
}

8. 创建失败消息记录表

CREATE TABLE `tk_amqp_dead_letter_message` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `message_id` bigint unsigned NOT NULL DEFAULT '0' COMMENT '消息ID',
  `exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '原始交换机',
  `routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '原始路由键',
  `queue_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '队列名称',
  `queue_config` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '队列配置名(区分业务)',
  `exchange_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '交换机类型:direct fanout topic headers',
  `message_content` json NOT NULL COMMENT '消息内容',
  `retry_count` int unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
  `last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',
  `status` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',
  `error_msg` json DEFAULT NULL COMMENT '错误信息',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `delete_time` datetime DEFAULT NULL COMMENT '删除时间',
  PRIMARY KEY (`id`),
  KEY `idxes_queue_config_status` (`queue_config`,`status`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=147826 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Amqp失败队列消息表';

9.失败消息常量定义

<?php
declare (strict_types=1);

namespace app\common\enum;

class CacheEnum
{
    // 防止队列重复消费键
    const CHECK_QUEUE_REPEAT_CONSUMING_KEY = 'check_queue_repeat_consuming';
}

10. 生产可用消费者

command代码

<?php
declare (strict_types=1);

namespace app\command\definition\consumer;

use app\command\logic\ChangeNotifyLogic;
use app\common\enum\AmqpQueueEnum;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;

class ChangeNotifyCommand extends Command
{

    protected function configure()
    {
        $this->setName('consumer:change_notify')
            ->setDescription('消费者')
            ->addArgument('task', null,  '业务类型');

    }

    protected function execute(Input $input, Output $output)
    {
        $exitCode = 0;

        try {
            // 在执行前记录日志,第二个参数设置是否捕获输出
            $this->beforeExecute($input, false); // 设置为 false 则不捕获输出

            // 业务逻辑
            $output->writeln("command is start...");
            $task = $input->getArgument('task');

            if (method_exists($this, $task)) {
                $this->$task($output);
            } else {
                $output->writeln("Task '$task' not found.");
            }
            $output->writeln('successfully!');
        } catch (\Throwable $e) {
            dump($e);
            $exitCode = $e->getCode() ?: 1;
            $this->handleException($e);
            if (Db::connect()->getPdo()->inTransaction()) {
                Db::rollback();
            }
        } finally {
            $errorMessage = $this->lastError ? $this->lastError['message'] : '';
            if ($errorMessage) {
                $output->writeln($errorMessage);
            }
            $this->afterExecute($output, $exitCode, $errorMessage);
        }

        return $exitCode;
    }


    /**
     * @command php think consumer:change_notify consumePromoterBindResult
     * @taskname 消费者
     */
    protected function consumePromoterBindResult(): void
    {
//        $msg = new AMQPMessage(json_encode(['MsgType' => 'event', 'Event' => 'promoter_bind_result', 'bind_status' => 1, 'sharer_appid' => '', 'sharer_openid' => '', 'CreateTime' => 1750876217]));
//        ChangeNotifyLogic::getInstance()->promoterBindResultConsume($msg);
        $option = ['promoterBindResultConsume', AmqpQueueEnum::PROMOTER_BIND_RESULT_QUEUE_NAME];
        ChangeNotifyLogic::getInstance()->changeConsume(...$option);
    }
}

业务层代码

<?php

namespace app\command\logic;

use app\business\ProductDetailBusiness;
use app\common\enum\AmqpQueueEnum;
use app\common\enum\ChangeNotifyCallbackEnum;
use app\common\enum\SphProductEnum;
use app\repository\SphProductDetailRepository;
use app\repository\SphProductRepository;
use app\repository\SphUserSharerRepository;
use app\service\TkBoostService;
use core\Exception\App\BusinessException;
use core\Library\Rabbitmq;
use core\Trait\AmqpDlxTrait;
use core\Trait\SingletonTrait;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;

class ChangeNotifyLogic
{
    use SingletonTrait;
    use AmqpDlxTrait;

    /**
     * 开始监听队列消息
     * @param \Closure $callback 回调函数
     * @param string $queue_name 配置键名
     * @return void
     */
    protected function startListen(\Closure $callback, string $queue_name): void
    {
        // 常驻内存 消费消息
        while (true) {
            try {
                $rabbitmq = new Rabbitmq();
                // 不可使用[$this, 'callbackFunction']方式,该方式会报错 只能使用闭包函数
                $rabbitmq->receiveMessage($callback, $queue_name);

            } catch (\Exception $e) {
                sleep(5);
                Log::error("[监听消息{$queue_name}]外部发生异常:" . var_export($e->getMessage(), true));
            }
        }
    }


    /**
     * 变更通知事件队列消费
     * @param string $callback_function_name 回调函数名称
     * @param string $queue_name 队列配置名
     * @return void
     */
    public function changeConsume(string $callback_function_name, string $queue_name): void
    {
        $callback = function ($msg) use ($callback_function_name) {
            $this->$callback_function_name($msg);
        };

        $this->startListen($callback, $queue_name);
    }


    /**
     * 消费者
     * @param AMQPMessage $msg
     * @return void
     */
    protected function promoterBindResultConsume(AMQPMessage $msg): void
    {
        try {
            Log::info("接收到消息:" . $msg->getBody());
            $body = json_decode($msg->getBody(), true);

            if (empty($body)) {
                throw new BusinessException("消息不合法");
            }

            // 检查消息是否重复消费
            if (!$this->checkMessageRepeat($msg)) {
                throw new BusinessException("[推客绑定机构的回调消费者]消息重复消费,本次跳过");
            }

            // todo 业务处理 ba la ba la

            // 确认应答
            $this->ackHandle($msg);
            // 主动断开连接
            Db::close();
            // 适当延迟,避免接口限流
            usleep(200000); // 200ms
        } catch (BusinessException|AMQPIOException|AMQPConnectionClosedException|AMQPConnectionBlockedException|AMQPChannelClosedException|\Exception $e) {
            if ($e instanceof BusinessException) {
                Log::error("发生业务性异常 消息丢弃:" . $msg->getBody() . ",错误原因:" . var_export($e->getMessage(), true));
                $error_msg = "[业务异常]错误原因:" . var_export($e->getMessage(), true);
                // 丢弃消息
                $this->rejectHandle($msg, $error_msg);
            } else {
                Log::error("发生致命异常 消息重新入队:" . $msg->getBody() . ",错误原因:" . var_export($e->getMessage(), true));
                $error_msg = "[致命异常]错误原因:" . var_export($e->getMessage(), true);
                // 手动重试机制
                $this->retryHandle($msg, $error_msg);
            }

            // 主动断开连接
            Db::close();
            // 适当延迟,避免接口限流
            usleep(200000); // 200ms
        }
    }
}

其中用到单例模式的特征

<?php
declare (strict_types = 1);

namespace core\Trait;

trait SingletonTrait
{
    private static $instance = null;  // 使用 nullable 类型并初始化为 null

    private function __construct() {}
    private function __clone(): void {}
    public function __wakeup(): void {
        throw new \RuntimeException("Cannot unserialize singleton");
    }

    public static function getInstance(): static
    {
        if (self::$instance === null) {
            self::$instance = new static();  // 使用 static 代替 self 以支持继承
        }

        return self::$instance;
    }
}

失败消息处理特征

<?php
declare (strict_types=1);

namespace core\Trait;

use app\common\enum\AmqpQueueEnum;
use app\common\enum\CacheEnum;
use app\repository\AmqpDeadLetterMessageRepository;
use core\Library\CacheHelper;
use core\Library\Snowflake;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Cache;

trait AmqpDlxTrait
{
    /**
     * 手动重试
     * @param AMQPMessage $msg
     * @param string $error_msg 错误信息
     * @param int $retry_num 重试次数
     * @return bool
     */
    public function retryHandle(AMQPMessage $msg, string $error_msg, int $retry_num = 3): bool
    {
        $msgData = $this->messageHandle($msg, $error_msg);
        $exchange      = $msgData['exchange'];
        $routing_key   = $msgData['routing_key'];
        $channel       = $msg->getChannel();
        $body          = $msgData['message_content'];
        $headersArray  = $msgData['headersArray'];


        if ($headersArray['retry'] < $retry_num) {
            $headersArray['retry']++;
            // 重新入队列
            $channel->basic_publish(
                new AMQPMessage($body, [
                    'content_type' => 'text/plain',
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'timestamp' => time(),
                    'application_headers'=> new AMQPTable($headersArray)
                ]),
                $exchange,
                $routing_key
            );
            // 因为重新入队了 所以这条消息直接确认
            $msg->ack();
            $msgData['status'] = AmqpQueueEnum::STATUS_PADDING;
        } else {
            // 丢弃
            $msg->reject(false);
            $msgData['status'] = AmqpQueueEnum::STATUS_FAILED;
        }
        // 记录失败日志
        return $this->saveDlxMessage($msgData);
    }

    /**
     * 丢弃消息
     * @param AMQPMessage $msg
     * @param string $error_msg 错误信息
     * @return bool
     */
    public function rejectHandle(AMQPMessage $msg, string $error_msg): bool
    {
        $msgData = $this->messageHandle($msg, $error_msg);
        $msgData['status'] = AmqpQueueEnum::STATUS_FAILED;
        // 丢弃消息
        $msg->reject(false);
        // 记录失败日志
        return $this->saveDlxMessage($msgData);
    }

    /**
     * 确认应答
     * @param AMQPMessage $msg
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\DbException
     * @throws \think\db\exception\ModelNotFoundException
     */
    public function ackHandle(AMQPMessage $msg): bool
    {
        $msgData = $this->messageHandle($msg, '');
        $msgData['status'] = AmqpQueueEnum::STATUS_SUCCESS;
        // 确认消息
        $msg->ack();
        // 更新失败日志
        return $this->saveDlxMessage($msgData, false);
    }

    /**
     * 获取队列消息详细信息
     * @param AMQPMessage $msg
     * @param string $error_msg
     * @param bool $needGenerateId 不存在message_id时是否生成
     * @return array
     */
    public function messageHandle(AMQPMessage $msg, string $error_msg, bool $needGenerateId = true): array
    {
        $exchange = $msg->getExchange();
        $routing_key = $msg->getRoutingKey();
        $message_content = $msg->getBody();
        //headersObject 是一个AMQPTable对象
        $headersObject = $msg->get_properties()['application_headers'];
        //调用getNativeData()得到一个数组
        $headersArray  = $headersObject->getNativeData();

        if ($needGenerateId && !isset($headersArray['message_id'])) {
            $headersArray['retry'] = 0;
            $headersArray['message_id'] = Snowflake::uuid();
        }

        $message_id = $headersArray['message_id'];
        $queue_name = $headersArray['queue_name'] ?? '';
        $queue_config = $headersArray['queue_config'] ?? '';
        $exchange_type = $headersArray['exchange_type'] ?? '';
        $retry_count = ($headersArray['retry'] ?? 0);
        $last_retry_time = date('Y-m-d H:i:s');
        $error_msg = compact('error_msg');

        return compact('exchange', 'routing_key', 'message_id', 'queue_name', 'error_msg',
            'queue_config', 'exchange_type', 'retry_count', 'headersArray', 'message_content', 'last_retry_time');
    }

    /**
     * 检查消息是否重复消费
     * @return bool
     */
    public function checkMessageRepeat(AMQPMessage $msg): bool
    {
        $msgData = $this->messageHandle($msg, '', false);
        $message_id = intval($msgData['message_id']);
        // 重试的话不检查重复消费
        if (empty($message_id) || ($msgData['retry_count'] ?? 0) > 0) {
            return true;
        }
        $redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ':' . app('env')->get('app_env') . ":{$message_id}";
        $redis = Cache::store('redis')->handler();
        // 一天内不能重复消费
        return $redis->setnx($redis_key, $message_id) && $redis->expire($redis_key, 24 * 3600);
    }

    /**
     * 保存失败队列消息
     * @param array $saveData
     * @param bool $needInsert 记录不存在是否新增
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\DbException
     * @throws \think\db\exception\ModelNotFoundException
     */
    public function saveDlxMessage(array $saveData, bool $needInsert = true): bool
    {
        if (empty($saveData) || empty($saveData['message_id'])) {
            return false;
        }
        $saveWhere = [
            'message_id' => $saveData['message_id'],
            'queue_config' => $saveData['queue_config'],
        ];

        $saveData['update_time'] = date('Y-m-d H:i:s');
       // 这里可以修改成自己的插入或更新tk_amqp_dead_letter_message表的方法
        return AmqpDeadLetterMessageRepository::getInstance()->insertOrUpdate($saveWhere, $saveData, $needInsert);
    }
}

插入或更新方法类似下面这样

public function insertOrUpdate(array $where, array $data, bool $needInsert = true): bool
{
    $this->model = $this->model->newInstance();
    // 如果传入了 $where 条件,尝试查找记录
    if (!empty($where)) {
        $record = $this->model->where($where)->find();
        // 记录存在,执行更新
        return $record && $record->save($data);
    }

    // 记录不存在,执行新增
    if ($needInsert) {
        return $this->model->save($data);
    }
    return true;
}

11.失败消息重新入队命令

command代码

<?php
declare (strict_types=1);

namespace app\command\definition\producer;

use app\command\logic\DeadLetterMessageLogic;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Db;

class DeadLetterMessageCommand extends Command
{

    // 消息唯一ID列表
    protected string $message_ids = '';
    // 是否调试模式 0否 1是
    protected string $debug = '0';

    protected function configure()
    {
        $this->setName('producer:dead_letter_message')
            ->setDescription('失败消息生产者')
            ->addOption('debug', null, Option::VALUE_OPTIONAL, '是否调试:1是 0否')
            ->addOption('message_ids', null, Option::VALUE_OPTIONAL, '消息唯一ID列表')
            ->addArgument('task', null,  '业务类型');
    }

    protected function execute(Input $input, Output $output)
    {
        $exitCode = 0;

        try {
            // 在执行前记录日志,第二个参数设置是否捕获输出
            $this->beforeExecute($input, false); // 设置为 false 则不捕获输出

            // 业务逻辑
            $output->writeln("command is start...");
            $task = $input->getArgument('task');
            $this->debug = $input->getOption('debug') ?? '0';
            $this->message_ids = $input->getOption('message_ids') ?? '';

            if (method_exists($this, $task)) {
                $this->$task($output);
            } else {
                $output->writeln("Task '$task' not found.");
            }
            $output->writeln('successfully!');
        } catch (\Throwable $e) {
            dump($e);
            $exitCode = $e->getCode() ?: 1;
            $this->handleException($e);
            if (Db::connect()->getPdo()->inTransaction()) {
                Db::rollback();
            }
        } finally {
            $errorMessage = $this->lastError ? $this->lastError['message'] : '';
            if ($errorMessage) {
                $output->writeln($errorMessage);
            }
            $this->afterExecute($output, $exitCode, $errorMessage);
        }

        return $exitCode;
    }

    /**
     * @command php think producer:dead_letter_message dlxMessageRequeue --message_ids="1,2"
     * @taskname 失败消息重新入队生产者
     */
    protected function dlxMessageRequeue(): void
    {
        $service = DeadLetterMessageLogic::getInstance();
        // 处理参数
        $service->message_ids = !empty($this->message_ids) ? explode(',', $this->message_ids) : [];
        $service->debug = $this->debug == 1;

        $service->requeueDlxMessages();
    }

}

业务层代码

<?php
declare (strict_types=1);

namespace app\command\logic;

use app\common\enum\AmqpQueueEnum;
use app\common\enum\CacheEnum;
use app\repository\AmqpDeadLetterMessageRepository;
use core\Library\CacheHelper;
use core\Library\Rabbitmq;
use core\Trait\AmqpDlxTrait;
use core\Trait\SingletonTrait;
use think\facade\Cache;
use think\facade\Log;

class DeadLetterMessageLogic
{
    use SingletonTrait;
    use AmqpDlxTrait;

    // 唯一消息ID列表
    public array $message_ids = [];

    // 是否调试模式
    public bool $debug = false;

    /**
     * 重新入队死信消息
     * @return void
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\DbException
     * @throws \think\db\exception\ModelNotFoundException
     */
    public function requeueDlxMessages(): void
    {
        try {
            if (empty($this->message_ids)) {
                $this->debugLog('参数为空', 'error');
                return;
            }

            $repository = AmqpDeadLetterMessageRepository::getInstance();
            $list_collect = $repository->getModel()
                ->field(['id', 'message_id', 'exchange', 'routing_key', 'queue_name', 'queue_config', 'exchange_type', 'message_content'])
                ->whereIn('message_id', $this->message_ids)
                ->select();
            if ($list_collect->isEmpty()) {
                $this->debugLog('没有符合条件的死信消息', 'error');
                return;
            }
            $list = $list_collect->toArray();
            $this->debugLog('开始处理死信消息:' . count($list));

            $redis = Cache::store('redis')->handler();

            foreach ($list as $item) {
                try {
                    $this->debugLog("[死信消息重回队列]正在处理消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
                        "消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE));
                    $item_message_id = strval($item['message_id']);
                    if (!$this->debug) {
                        // 删除防止重复消费键
                        $redis_key = CacheEnum::CHECK_QUEUE_REPEAT_CONSUMING_KEY . ':' . app('env')->get('app_env') . ":{$item_message_id}";
                        $redis->del($redis_key);

                        // 重新入队列
                        $rabbitmq = new Rabbitmq();
                        $rabbitmq->sendMessage($item['message_content'], $item['queue_config'], $item_message_id);
                    }

                    // 更新死信消息为处理中 因为重新入队了
                    $saveData = [
                        'message_id' => $item_message_id,
                        'queue_config' => $item['queue_config'],
                        'retry_count' => 0,
                        'last_retry_time' => date('Y-m-d H:i:s'),
                        'status' => AmqpQueueEnum::STATUS_PADDING,
                        'error_msg' => ['error_msg' => '']
                    ];
                    !$this->debug && $this->saveDlxMessage($saveData, false);

                } catch (\Exception $e) {
                    $this->debugLog("[死信消息重回队列异常]消息ID:{$item['message_id']},队列配置为:{$item['queue_config']}" .
                        "消息内容为:" . json_encode($item['message_content'], JSON_UNESCAPED_UNICODE) .
                        ",错误原因:" . var_export($e->getMessage(), true),  'error');
                }
            }
        } catch (\Exception $e) {
            $this->debugLog("[死信消息重回队列致命错误]" . var_export($e->getMessage(), true), 'error');
        }

    }


    /**
     * 输出信息 并记录日志
     * @param string $msg
     * @return void
     */
    protected function debugLog(string $msg, $func = 'info'): void
    {
        if (!$this->debug && $func != 'error') {
            return;
        }
        echo $msg . PHP_EOL;
        Log::$func($msg);
    }
}

如果有某个失败消息 希望重新入队消费 则执行php think producer:dead_letter_message dlxMessageRequeue --message_ids="1,2"即可重新入队。

12. 配置命令行

配置好config/console.php即可运行以上命令

<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
    // 指令定义
    'commands' => [
        //消费者
        'consumer:test_consumer' => \app\command\definition\consumer\TestConsumerCommand::class,
        'consumer:change_notify' => \app\command\definition\consumer\ChangeNotifyCommand::class,
        // 生产者
        'producer:test_producer' => \app\command\definition\producer\TestProducerCommand::class,
        'producer:dead_letter_message' => \app\command\definition\producer\DeadLetterMessageCommand::class,
    ],
];

0

评论区