目 录CONTENT

文章目录

🍜Hyperf3.1使用websocket实现数据大屏

柯基
2025-11-10 / 0 评论 / 0 点赞 / 15 阅读 / 9,820 字

基于Hyperf+ WebSocket+redis实现数据大屏的实时推送

1. 安装扩展

安装 websocket 服务端

composer require hyperf/websocket-server

2. 配置server

在config/autoload/server.php增加配置

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
use Hyperf\Server\Event;
use Hyperf\Server\Server;
use Swoole\Constant;

return [
    'mode' => SWOOLE_PROCESS,
    'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 9501,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
            ],
            'options' => [
                // Whether to enable request lifecycle event
                'enable_request_lifecycle' => false,
            ],
        ],
        // 增加部分
        [
            'name' => 'wsDataScreen',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 9503,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
            ],
            'settings' => [
                // 心跳检测配置
                'heartbeat_idle_time' => 600,  // 连接最大空闲时间(秒)
                'heartbeat_check_interval' => 60, // 心跳检测间隔(秒)
                'open_websocket_ping_frame' => false, // 开启PING帧自动回复
//                'open_websocket_pong_frame' => true, // 开启PONG帧处理
            ],
        ],
    ],
    'settings' => [
        Constant::OPTION_ENABLE_COROUTINE => true, // 开启内置协程
        Constant::OPTION_WORKER_NUM => swoole_cpu_num(), // 设置启动的 Worker 进程数
        Constant::OPTION_PID_FILE => BASE_PATH . '/runtime/hyperf.pid', // master 进程的 PID
        Constant::OPTION_OPEN_TCP_NODELAY => true, // TCP 连接发送数据时会关闭 Nagle 合并算法,立即发往客户端连接
        Constant::OPTION_MAX_COROUTINE => 100000, // 设置当前工作进程最大协程数量
        Constant::OPTION_OPEN_HTTP2_PROTOCOL => true, // 启用 HTTP2 协议解析
        Constant::OPTION_MAX_REQUEST => 100000, // 设置 worker 进程的最大任务数
        Constant::OPTION_SOCKET_BUFFER_SIZE => 2 * 1024 * 1024, // 配置客户端连接的缓存区长度
        Constant::OPTION_BUFFER_OUTPUT_SIZE => 2 * 1024 * 1024,

        // 添加任务工作进程配置
        'task_worker_num' => swoole_cpu_num(), // 根据服务器CPU核心数设置
        'task_enable_coroutine' => true, // 启用协程支持
    ],
    'callbacks' => [
        Event::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'],
        Event::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'],
        Event::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],

        Event::ON_TASK => [Hyperf\Framework\Bootstrap\TaskCallback::class, 'onTask'],
        Event::ON_FINISH => [Hyperf\Framework\Bootstrap\FinishCallback::class, 'onFinish'],
    ],
];

3. 配置路由

在config/routes.php下新增

Router::addServer('wsDataScreen', function () {
    Router::get('/wsDataScreen/ws', \App\Module\DataScreen\Controller\WebSocketController::class);
});

4. 中间件配置

可以在config/autoload/middlewares.php配置中间件,但是只有握手阶段会走该中间件,所以我这里不配置,因为我需要发送消息阶段也需要鉴权

'wsDataScreen' => [
//        \App\Middleware\DataScreenAuthMiddleware::class
    ]

5. 创建控制器

在Module/DataScreen/Controller/WebSocketController.php下创建控制器

<?php
//declare(strict_types=1);

namespace App\Module\DataScreen\Controller;

use App\Common\Enum\AuthGuardEnum;
use App\Common\Trait\RedisKeyTrait;
use App\Middleware\DataScreenAuthMiddleware;
use App\Module\DataScreen\Business\WebSocketBusiness;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use App\Module\DataScreen\Repository\AdminDataRuleRepository;
use App\Module\DataScreen\Request\WebSocketRequest;
use Hyperf\WebSocketServer\Context;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\Engine\WebSocket\Response;
use Hyperf\Redis\Redis;
use Hyperf\Validation\Contract\ValidatorFactoryInterface;
use Hyperf\WebSocketServer\Constant\Opcode;
use Qbhy\HyperfAuth\AuthManager;


class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    use RedisKeyTrait;

    #[Inject]
    protected AuthManager $auth;

    #[Inject]
    private Redis $redis;

    #[Inject]
    private ValidatorFactoryInterface $validationFactory;

    #[Inject]
    private WebSocketRequest $webSocketRequest;

    #[Inject]
    private AdminDataRuleRepository $adminDataRuleRepository;

    #[Inject]
    private WebSocketBusiness $webSocketBusiness;


    public function onMessage($server, $frame): void
    {
        $alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
        $response = (new Response($server))->init($frame);
        // 客户端ID
        $fd = $frame->fd;

        // 获取用户信息
        $user = Context::get("user:{$fd}");
        if (!$user) {
            $server->disconnect($fd, 401, '请先登录');
            return;
        }

        // 获取用户数据权限
        $data_rule = Context::get("user_data_rule:{$user->id}");


        if($frame->opcode == Opcode::TEXT && $frame->data == 'ping') {
            // 手动处理心跳帧
            $response->push(new Frame(opcode: Opcode::TEXT, payloadData: 'pong'));
            // 添加到存活列表
            $this->redis->sAdd($alive_key, $fd);
            // 续期
            $this->redis->expire($alive_key, RedisKeyEnum::ALIVE_TTL);
            return;
        }

        try {
            $data = json_decode($frame->data, true, 512, JSON_THROW_ON_ERROR);

            // 使用验证器验证数据
            $validator = $this->validationFactory->make(
                $data,
                $this->webSocketRequest->rules($data),
                $this->webSocketRequest->messages()
            );

            if ($validator->fails()) {
                $error = $validator->errors()->first();
                throw new \InvalidArgumentException($error);
            }

            // 统一参数
            $params = $this->webSocketBusiness->formatParams($data, $data_rule);

            // redis存储 客户端=>筛选条件
            $this->webSocketBusiness->storeClientFilters($fd, $params);

            // 立即返回当前数据
            $census_data = $this->webSocketBusiness->getCurrentData($params);

            $response->push(new Frame(payloadData: responseSuccess($census_data)));

        } catch (\Throwable $e) {
            $response->push(new Frame(payloadData: responseError($e->getMessage())));
        }
    }

    public function onClose($server, int $fd, int $reactorId): void
    {
        // 移除存活列表集合中指定的fd
        $alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
        $this->redis->sRem($alive_key, $fd);
        // 移除fd的筛选
        $filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
        $this->redis->hDel($filter_key, $fd);
    }

    public function onOpen($server, $request): void
    {
        $response = (new Response($server))->init($request);
        try {

            if (!$this->auth->guard(AuthGuardEnum::DATA_SCREEN_JWT)->check()) {
                throw new \RuntimeException('请先登录', 401);
            }

            // 将用户信息存储在上下文中,以便后续使用
            $user = $this->auth->guard(AuthGuardEnum::DATA_SCREEN_JWT)->user();
            if (empty($user)) {
                throw new \RuntimeException('请先登录', 401);
            }

            Context::set("user:{$request->fd}", $user);

            // 查询用户的数据权限
            $data_rule = $this->adminDataRuleRepository->getAdminDataRuleByAdminId($user->id);
            Context::set("user_data_rule:{$user->id}", $data_rule);

            $alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);

            // 添加到存活列表
            $this->redis->sAdd($alive_key, $request->fd);
            // 续期
            $this->redis->expire($alive_key, RedisKeyEnum::ALIVE_TTL);

            $response->push(new Frame(payloadData: responseSuccess('连接成功')));
        } catch (\Throwable $e) {
            if ($e->getCode() == 401) {
                $server->disconnect($request->fd, 401, $e->getMessage());
                return;
            }
            $response->push(new Frame(payloadData: responseError($e->getMessage())));
        }

    }
}

其中使用到了RedisKeyTrait特征,主要是为了生成用户筛选唯一的ke y

<?php
declare (strict_types=1);

namespace App\Common\Trait;

use App\Module\DataScreen\Enum\RedisKeyEnum;
use function Hyperf\Config\config;

trait RedisKeyTrait
{
    /**
     * 生成前缀
     * @param string $key
     * @param string $module
     * @param string $other_key
     * @return string
     */
    public function generateKeyPrefix(string $key, string $module = RedisKeyEnum::MODULE_DATA_SCREEN, string $other_key = '', int $version = 1): string
    {
        $env = config('app_env');
        $version_str = !empty($version) ? "_v{$version}" : '';
        return "{$module}_{$env}_{$key}{$other_key}{$version_str}";
    }

    /**
     * 生成根据筛选条件定义的key
     * @param array $params
     * @return string
     */
    public function generateFilterGroupKey(array $params): string
    {
        return implode('_', array_merge(...array_map(fn($k, $v) => [$k, $v], array_keys($params), $params)));
    }
}

使用到常量

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Enum;

class RedisKeyEnum
{
    // 存活时间
    const ALIVE_TTL = 7200;
    // 应用名称-数据大屏
    const MODULE_DATA_SCREEN = 'DataScreen';

    const MODULE_DATA_CENTER = 'DataCenter';

    // 存活的客户端key
    const REDIS_KEY_WEBSOCKET_FD = 'websocket_active_fd';
    // 客户端的筛选
    const REDIS_KEY_WEBSOCKET_FD_FILTER = 'websocket_fd_filter';
    // 筛选分组下的数据
    const REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP = 'websocket_fd_filter_group';

    // 平台商品关联货品缓存
    const REDIS_KEY_WDT_GOODS_LINK = 'wdt_goods_link';

    // 交易漏斗缓存
    const REDIS_KEY_WDT_ORDER_TRANSFORM = 'wdt_order_transform';
    // 核心交易
    const REDIS_KEY_WDT_ORDER_CORE_API = 'wdt_order_core_api';
    // 订单结构
    const REDIS_KEY_WDT_ORDER_STRUCT = 'wdt_order_struct';
    // 售后
    const REDIS_KEY_WDT_ORDER_AFTER_SALE = 'wdt_order_after_sale';
    // 退款top
    const REDIS_KEY_WDT_ORDER_REFUND_TOP = 'wdt_order_refund_top';
    // 库存
    const REDIS_KEY_WDT_CENSUS_STOCK = 'wdt_census_stock';

    // 各地库存
    const REDIS_KEY_WDT_WAREHOUSE_STOCK = 'wdt_warehouse_stock';

    // 地图
    const REDIS_KEY_WDT_OVERVIEW_MAP = 'wdt_overview_map';

    // 退款占比
    const REDIS_KEY_WDT_REFUND_RATE = 'wdt_refund_rate';

    // 入账金额折线图
    const REDIS_KEY_WDT_PAY_MAP  = 'wdt_pay_map';
}

验证器验证规则

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Request;

use Carbon\Carbon;

class WebSocketRequest
{
    /**
     * 验证规则
     */
    public function rules(array $requestData): array
    {
        return [
            'module' => 'required|in:1,2,3,4,5,6,7,8',
            'platform' => 'required|in:0,1,2',
            'date_type'  => 'required|in:0,1',
            'date_value' => 'required_if:date_type,0|in:0,1,2,3,4',
            'date_start' => [
                'required_if:date_type,1',
                'date',
                'date_format:Y-m-d',
                'before_or_equal:date_end',
                function ($attr, $value, $fail) use ($requestData) {
                    if (($requestData['date_type'] ?? 0) == 1 && !empty($requestData['date_end'] ?? '')) {
                        $start = Carbon::parse($value);
                        $end = Carbon::parse($requestData['date_end']);

                        if ($start->diffInDays($end) > 31) {
                            $fail('日期范围不能超过31天');
                        }
                    }
                }
            ],
            'date_end' => 'required_if:date_type,1|date|date_format:Y-m-d|after_or_equal:date_start',
            'module_user_id' => [
                'required_if:module,5,6,7',
                'not_in:undefined',
            ],
            'position' => 'required|in:0,1,2',
            'fx_user_date_start' => [
                'required_if:position,1,2',
                'date',
                'date_format:Y-m-d',
                'before_or_equal:fx_user_date_end',
                function ($attr, $value, $fail) use ($requestData) {
                    if (!empty($requestData['fx_user_date_end'] ?? '')) {
                        $start = Carbon::parse($value);
                        $end = Carbon::parse($requestData['fx_user_date_end']);

                        if ($start->diffInDays($end) > 31) {
                            $fail('日期范围不能超过31天');
                        }
                    }
                }
            ],
            'fx_user_date_end' => 'required_if:position,1,2|date|date_format:Y-m-d|after_or_equal:fx_user_date_start',
            'fx_user_page' => 'required_if:position,1,2|numeric|min:1',
        ];
    }

    /**
     * 自定义错误消息
     */
    public function messages(): array
    {
        return [
            'module.required' => '模块不能为空',
            'module.in' => '模块参数非法',

            'module_user_id.required_if' => '相关用户ID不能为空',
            'module_user_id.not_in' => '相关用户ID不能为空!',

            'platform.required' => '平台不能为空',
            'platform.in' => '平台参数非法',

            'date_type.required' => '时间类型不能为空',
            'date_type.in' => '时间类型参数非法',

            'date_value.required_if' => '时间筛选不能为空',
            'date_value.in' => '时间筛选参数非法',

            'date_start.required_if' => '开始时间不能为空',
            'date_start.date' => '开始时间参数非法',
            'date_start.date_format' => '开始时间格式必须为YYYY-MM-DD',
            'date_start.before_or_equal' => '开始时间参数必须小于等于结束时间',

            'date_end.required_if' => '结束时间不能为空',
            'date_end.date' => '结束时间参数非法',
            'date_end.date_format' => '结束时间格式必须为YYYY-MM-DD',
            'date_end.before_or_equal' => '结束时间参数必须小于等于开始时间',

            'position.required' => '当前停留的位置参数不能为空',

            'fx_user_date_start.required_if' => '开始时间不能为空',
            'fx_user_date_start.date' => '开始时间参数非法',
            'fx_user_date_start.date_format' => '开始时间格式必须为YYYY-MM-DD',
            'fx_user_date_start.before_or_equal' => '开始时间参数必须小于等于结束时间',

            'fx_user_date_end.required_if' => '结束时间不能为空',
            'fx_user_date_end.date' => '结束时间参数非法',
            'fx_user_date_end.date_format' => '结束时间格式必须为YYYY-MM-DD',
            'fx_user_date_end.before_or_equal' => '结束时间参数必须小于等于开始时间',

            'fx_user_page.required_if' => '页码不能为空',
            'fx_user_page.numeric' => '页码必须为数字',
            'fx_user_page.min' => '页码最小值为1',
        ];
    }
}

其中使用到了查询用户拥有的数据权限(无需关注)

<?php
declare(strict_types=1);
namespace App\Module\DataScreen\Repository;

use App\Common\Utils\BaseRepository;
use App\Module\DataScreen\Model\Admin;
use App\Module\DataScreen\Model\AdminDataRule;
use App\Module\DataScreen\Model\DataRule;

class AdminDataRuleRepository extends BaseRepository
{
    public function __construct()
    {
        $this->setModel(new AdminDataRule());
    }

    public function getByAdminId($adminId)
    {
        return $this->getModel()::query()->where('admin_id', $adminId)->get();
    }


    /**
     * 获取用户的数据权限
     * @param int $adminId
     * @return array
     */
    public function getAdminDataRuleByAdminId(int $adminId): array
    {
        $table_name = $this->getModel()->getTable();
        $data_rule_table_name = (new DataRule())->getTable();
        return $this->getModel()::query()
            ->select([
                "{$table_name}.admin_id",
                "{$data_rule_table_name}.type",
                "{$data_rule_table_name}.step_type",
                "{$data_rule_table_name}.rule_id",
                "{$data_rule_table_name}.source_id",
            ])
            ->where("{$table_name}.admin_id", $adminId)
            ->join($data_rule_table_name, "{$table_name}.rule_id", '=', "{$data_rule_table_name}.id")
            ->get()->toArray();
    }


    /**
     * 删除旧的数据权限
     * @param int $adminId
     * @return int|mixed
     */
    public function deleteOldDataRules(int $adminId): mixed
    {
        return $this->getModel()::query()->where('admin_id', $adminId)->delete();
    }

}

6. 创建服务层

创建Module/DataScreen/Business/WebSocketBusiness.php

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Business;

use App\Common\Trait\RedisKeyTrait;
use App\Module\DataScreen\Common\Trait\DataCenterTrait;
use App\Module\DataScreen\Common\Trait\DataRuleTrait;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use App\Module\DataScreen\Enum\WebSocketEnum;
use Carbon\Carbon;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;

class WebSocketBusiness
{
    use RedisKeyTrait;
    use DataCenterTrait;
    use DataRuleTrait;

    #[Inject]
    private Redis $redis;

    /**
     * 统一参数格式
     * @param array $data 传递参数
     * @param array $data_rule 数据权限
     * @return array
     */
    public function formatParams(array $data, array $data_rule): array
    {
        $params = [
            // 模块 1兵工场 2机构 3分销者 4产品 5单兵工场页面
            'module' => intval($data['module'] ?? WebSocketEnum::MODULE_BING_GONG_CHANG),
            // 模块相关用户ID
            'module_user_id' => trim($data['module_user_id'] ?? ''),
            // 平台 1快手 2视频号 3抖音
            'platform' => intval($data['platform'] ?? WebSocketEnum::PLATFORM_KS),
            // 时间类型:0时间选择 1自定义时间
            'date_type' => intval($data['date_type'] ?? WebSocketEnum::DATE_TYPE_SELECT),
            // 时间选择 1今日 2昨日 3本月 4上月(当时间类型为0时必传 为1的时候传0)
            'date_value' => intval($data['date_value'] ?? 0),
            // 开始时间(当时间类型为1时必传 为0的时候传'')
            'date_start' => trim($data['date_start'] ?? ''),
            // 结束时间(当时间类型为1时必传 为0的时候传'')
            'date_end' => trim($data['date_end'] ?? ''),

            // 当前页面:0首页 1分销者数据详情
            'position' => intval($data['position'] ?? WebSocketEnum::POSITION_INDEX),
            // 分销者数据详情时间筛选-开始时间
            'fx_user_date_start' => trim($data['fx_user_date_start'] ?? ''),
            // 分销者数据详情时间筛选-结束时间
            'fx_user_date_end' => trim($data['fx_user_date_end'] ?? ''),
            // 分销者用户编号
            'fx_user_no' => trim($data['fx_user_no'] ?? ''),
            // 分销者数据详情页码
            'fx_user_page' => intval($data['fx_user_page'] ?? 1),
        ];

        // 时间筛选
        [$params['start_time'], $params['end_time']] = getTimeByType($params['date_type'], $params['date_value'], $params['date_start'], $params['date_end']);
        // 分销者详情数据时间
        [$params['fx_user_start_time'], $params['fx_user_end_time']] = getTimeByType(
            WebSocketEnum::DATE_TYPE_DIY,
            0,
            $params['fx_user_date_start'],
            $params['fx_user_date_end']
        );

        $params = array_merge($params, $this->getDataRuleParams($params, $data_rule));

        ksort($params);
        return $params;
    }

    public function storeClientFilters(int $fd, array $params): void
    {
        $json_params = json_encode($params, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);

        $filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
        // 存储客户端筛选条件
        $this->redis->hSet($filter_key, strval($fd), $json_params);
    }


    /**
     * 获取当前数据
     * @param array $params
     * @return array
     * @throws \RedisException
     */
    public function getCurrentData(array $params): array
    {
        // 查询是否有缓存
        $other_key = $this->generateFilterGroupKey($params);
        $filter_group_key = $this->generateKeyPrefix(
            RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP,
            RedisKeyEnum::MODULE_DATA_SCREEN,
            ":{$other_key}");
        $cache_data = $this->redis->get($filter_group_key) ?: '';
        $data_raw = !empty($cache_data) ? json_decode($cache_data, true) : [];
        $data = ($data_raw['data'] ?? []) ?: [];
        // 数据版本号
        $last_version = ($data_raw['version'] ?? 0) ?: 0;
        if (!empty($data)) {
            return $data;
        }

        // 实时查询
        $data = $this->dataCenter($params);
        // 设置缓存数据
        $this->redis->set($filter_group_key, json_encode(['filter' => $params, 'data' => $data, 'version' => $last_version + 1, 'update_time' => time()]));
        $this->redis->expire($filter_group_key, 7200);

        return $data;
    }



}

其中使用到了数据查询模块常量

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Enum;

class WebSocketEnum
{
    /** 模块 */
    const MODULE_BING_GONG_CHANG = 1;
    const MODULE_INSTITUTION = 2;
    const MODULE_FX_USER = 3;
    const MODULE_FX_PRODUCT = 4;
    const MODULE_SINGLE_BING_GONG_CHANG = 5;
    const MODULE_SINGLE_INSTITUTION = 6;
    const MODULE_SINGLE_FACTORY_FX_USER = 7;
    // 旺店通
    const MODULE_WANG_DAN_TONG = 8;

    const MODULE_DESC = [
        self::MODULE_BING_GONG_CHANG => '兵工场',
        self::MODULE_INSTITUTION => '机构',
        self::MODULE_FX_USER => '分销者',
        self::MODULE_FX_PRODUCT => '产品',
        self::MODULE_SINGLE_BING_GONG_CHANG => '单兵工场',
        self::MODULE_SINGLE_INSTITUTION => '单机构',
        self::MODULE_SINGLE_FACTORY_FX_USER => '分销者实时数据汇总',
        self::MODULE_WANG_DAN_TONG => '旺店通',
    ];

    /** 平台 */
    const PLATFORM_KS = 1;
    const PLATFORM_SPH = 2;
    const PLATFORM_DY = 3;
    const PLATFORM_WDT = 0;

    const PLATFORM_DESC = [
        self::PLATFORM_KS => '快手',
        self::PLATFORM_SPH => '视频号',
        self::PLATFORM_DY => '抖音',
        self::PLATFORM_WDT => '旺店通',
    ];


    /** 时间类型 */
    const DATE_TYPE_SELECT = 0;
    const DATE_TYPE_DIY = 1;

    const DATE_TYPE_DESC = [
        self::DATE_TYPE_SELECT => '时间选择',
        self::DATE_TYPE_DIY => '自定义时间',
    ];

    /** 时间选择 */
    const DATE_SELECT_TODAY = 1;
    const DATE_SELECT_YESTERDAY = 2;
    const DATE_SELECT_THIS_MONTH = 3;
    const DATE_SELECT_LAST_MONTH = 4;
    // 时间选择描述
    const DATE_SELECT_DESC = [
        self::DATE_SELECT_TODAY => '今日',
        self::DATE_SELECT_YESTERDAY => '昨日',
        self::DATE_SELECT_THIS_MONTH => '本月',
        self::DATE_SELECT_LAST_MONTH => '上月',
    ];

    // 上期的描述
    const DATE_PREVIOUS_DESC = [
        self::DATE_SELECT_TODAY => '昨日',
        self::DATE_SELECT_YESTERDAY => '前日',
        self::DATE_SELECT_THIS_MONTH => '上月',
        self::DATE_SELECT_LAST_MONTH => '前月',
    ];

    // 循环类型
    const FOR_TYPE_HOUR = 0;
    const FOR_TYPE_DAY = 1;

    const FOR_TYPE_DESC = [
        self::FOR_TYPE_HOUR => '小时',
        self::FOR_TYPE_DAY => '天',
    ];


    /** 当前停留的块 */
    const POSITION_INDEX = 0;
    const POSITION_FX_USER_DETAIL = 1;
    const POSITION_INSTITUTION_PARTNER_DETAIL = 2;

    const POSITION_DESC = [
        self::POSITION_INDEX => '首页',
        self::POSITION_FX_USER_DETAIL => '分销者数据详情',
        self::POSITION_INSTITUTION_PARTNER_DETAIL => '机构招商数据详情',
    ];
    // 默认页码
    const DEFAULT_PAGE = 10;
}

其中使用到了助手函数 主要是获取开始时间、结束时间、以及折线图需要的数据

if (!function_exists('getTimeByType')) {
    /**
     * 根据时间类型获取时间
     */
    function getTimeByType(int $date_type, int $date_value = 0, string $date_start = '', string $date_end = ''): array
    {
        $start_time = $end_time = $for_i = $for_type = 0;
        if ($date_type == WebSocketEnum::DATE_TYPE_SELECT) {
            switch ($date_value) {
                case WebSocketEnum::DATE_SELECT_TODAY:
                    $start_time = Carbon::today()->startOfDay()->timestamp;
                    $end_time = Carbon::today()->endOfDay()->timestamp;
                    $for_type = WebSocketEnum::FOR_TYPE_HOUR;
                    $for_i = 23;
                    break;
                case WebSocketEnum::DATE_SELECT_YESTERDAY:
                    $start_time = Carbon::yesterday()->startOfDay()->timestamp;
                    $end_time = Carbon::yesterday()->endOfDay()->timestamp;
                    $for_type = WebSocketEnum::FOR_TYPE_HOUR;
                    $for_i = 23;
                    break;
                case WebSocketEnum::DATE_SELECT_THIS_MONTH:
                    $start_time_handler = Carbon::now()->startOfMonth();
                    $end_time_handler = Carbon::now()->endOfMonth();
                    $for_i = $end_time_handler->diffInDays($start_time_handler);
                    $start_time = $start_time_handler->timestamp;
                    $end_time = $end_time_handler->timestamp;
                    $for_type = WebSocketEnum::FOR_TYPE_DAY;
                    break;
                case WebSocketEnum::DATE_SELECT_LAST_MONTH:
                    $start_time_handler = Carbon::now()->subMonth()->startOfMonth();
                    $end_time_handler = Carbon::now()->subMonth()->endOfMonth();
                    $for_i = $end_time_handler->diffInDays($start_time_handler);
                    $start_time = $start_time_handler->timestamp;
                    $end_time = $end_time_handler->timestamp;
                    $for_type = WebSocketEnum::FOR_TYPE_DAY;
                    break;
            }
        } elseif ($date_type == WebSocketEnum::DATE_TYPE_DIY) {
            if (empty($date_start) && empty($date_end)) {
                return [$start_time, $end_time, $for_type, $for_i];
            }
            $start_time_handler = Carbon::parse($date_start)->startOfDay();
            $end_time_handler = Carbon::parse($date_end)->endOfDay();
            if ($start_time_handler->isSameDay($end_time_handler)) {
                $for_type = WebSocketEnum::FOR_TYPE_HOUR;
                $for_i = 23;
            } else {
                $for_type = WebSocketEnum::FOR_TYPE_DAY;
                $for_i = $end_time_handler->diffInDays($start_time_handler);
            }
            $start_time = $start_time_handler->timestamp;
            $end_time = $end_time_handler->timestamp;
        }

        return [$start_time, $end_time, $for_type, $for_i];
    }
}

还使用到了DataRuleTrait特征,主要是获取当前用户可查看的数据范围ID列表

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Common\Trait;

use App\Module\DataScreen\Enum\DataRuleEnum;
use App\Module\DataScreen\Enum\WebSocketEnum;

trait DataRuleTrait
{

    /**
     * 数据权限
     * @param array $params
     * @param array $data_rule
     * @return string[]
     */
    public function getDataRuleParams(array $params, array $data_rule): array
    {
        $return_data = [
            'rule_source_ids' => ''
        ];

        $data_rule_data = [];
        foreach ($data_rule as $rule) {
            $data_rule_data[$rule['type']][$rule['step_type']][] = $rule['source_id'];
        }

        $module = intval($params['module']);
        if (!in_array($module, [WebSocketEnum::MODULE_BING_GONG_CHANG, WebSocketEnum::MODULE_INSTITUTION])) {
            return $return_data;
        }

        $return_data['rule_source_ids'] = '-1';
        if ($module == WebSocketEnum::MODULE_BING_GONG_CHANG) {
            $rule_source_ids = ($data_rule_data[DataRuleEnum::TYPE_FACTORY][DataRuleEnum::STEP_TYPE_FACTORY] ?? []) ?: [];
            $return_data['rule_source_ids'] = !empty($rule_source_ids) ? implode(',', $rule_source_ids) : '-1';
            return $return_data;
        }

        if ($module == WebSocketEnum::MODULE_INSTITUTION) {
            $step_type = DataRuleEnum::PLATFORM_STEP_TYPE_MAP[intval($params['platform'])];
            $rule_source_ids = ($data_rule_data[DataRuleEnum::TYPE_INSTITUTION][$step_type] ?? []) ?: [];
            $return_data['rule_source_ids'] = !empty($rule_source_ids) ? implode(',', $rule_source_ids) : '-1';
            return $return_data;
        }

        return $return_data;
    }
}

7. 创建数据查询特征

创建数据查询中心特征Module/DataScreen/Common/Trait/DataCenterTrait.php,无需关注业务逻辑 只需要关注中心方法dataCenter()即可,每个Business需要有public WebSocketParam $params;属性

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Common\Trait;

use App\Module\DataScreen\Business\OrderCensusBusiness;
use App\Module\DataScreen\Business\OrderChartBusiness;
use App\Module\DataScreen\Business\OrderRankBusiness;
use App\Module\DataScreen\Business\ProductBusiness;
use App\Module\DataScreen\Business\SendSampleBusiness;
use App\Module\DataScreen\Business\WdtOriginOrderBusiness;
use App\Module\DataScreen\Common\Params\WebSocketParam;
use App\Module\DataScreen\Enum\WebSocketEnum;
use Hyperf\Di\Annotation\Inject;

trait DataCenterTrait
{

    #[Inject]
    private WebSocketParam $webSocketParam;

    #[Inject]
    private OrderCensusBusiness $orderCensusBusiness;

    #[Inject]
    private SendSampleBusiness $sendSampleBusiness;


    #[Inject]
    private OrderRankBusiness $orderRankBusiness;

    #[Inject]
    private OrderChartBusiness $orderChartBusiness;

    #[Inject]
    private ProductBusiness $productBusiness;

    /**
     * 数据统一处理
     * @param array $params
     * @return array
     */
    public function dataCenter(array $params): array
    {

        $return_data = [];
        switch ($params['module']) {
            case WebSocketEnum::MODULE_BING_GONG_CHANG:
                // 兵工场模块
                $return_data = $this->getBingGongChangModule($params);
                break;
            case WebSocketEnum::MODULE_INSTITUTION:
                // 机构模块
                $return_data = $this->getInstitutionModule($params);
                break;
            case WebSocketEnum::MODULE_FX_USER:
                // 分销者模块
                $return_data = $this->getFxUserModule($params);
                break;
            case WebSocketEnum::MODULE_FX_PRODUCT:
                // 产品模块
                $return_data = $this->getProductModule($params);
                break;
            case WebSocketEnum::MODULE_SINGLE_BING_GONG_CHANG:
                // 单兵工场模块
                $return_data = $this->getSingleBingGongChangModule($params);
                break;
            case WebSocketEnum::MODULE_SINGLE_INSTITUTION:
                // 单机构模块
                $return_data = $this->getSingleInstitutionModule($params);
                break;
            case WebSocketEnum::MODULE_SINGLE_FACTORY_FX_USER:
                // 分销者实时数据汇总模块
                $return_data = $this->getSingleFactoryFxUserModule($params);
                break;
            case WebSocketEnum::MODULE_WANG_DAN_TONG:
                // 产品模块
                $return_data = $this->getWdtModule($params);
                break;
        }
        return $return_data;
    }


    /**
     * 获取兵工场模块数据
     * @param array $params
     * @return array
     */
    public function getBingGongChangModule(array $params): array
    {
        return match ($params['position']) {
            WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getBingGongChangFxUserPosition($params),
            default => $this->getBingGongChangIndexPosition($params),
        };
    }

    /**
     * 获取单兵工场模块数据
     * @param array $params
     * @return array
     */
    public function getSingleBingGongChangModule(array $params): array
    {
        return match ($params['position']) {
            WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getBingGongChangFxUserPosition($params),
            default => $this->getSingleBingGongChangIndexPosition($params),
        };
    }

    /**
     * 获取机构模块数据
     * @param array $params
     * @return array
     */
    public function getInstitutionModule(array $params): array
    {
        return match ($params['position']) {
            WebSocketEnum::POSITION_INSTITUTION_PARTNER_DETAIL => $this->getInstitutionPartnerPosition($params),
            default => $this->getInstitutionIndexPosition($params),
        };
    }

    /**
     * 获取单机构模块数据
     * @param array $params
     * @return array
     */
    public function getSingleInstitutionModule(array $params): array
    {
        return match ($params['position']) {
            WebSocketEnum::POSITION_INSTITUTION_PARTNER_DETAIL => $this->getInstitutionPartnerPosition($params),
            default => $this->getSingleInstitutionIndexPosition($params),
        };
    }

    /**
     * 获取分销者实时数据汇总模块数据
     * @param array $params
     * @return array
     */
    public function getSingleFactoryFxUserModule(array $params): array
    {
        return $this->getSingleFactoryFxUserIndexPosition($params);
    }

    /**
     * 获取机构招商数据详情
     * @param array $params
     * @return array
     */
    protected function getInstitutionPartnerPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        return [
            'institution_partner_detail_rank_block' => $orderRankService->getInstitutionPartnerDetailBlock(),
        ];
    }

    /**
     * 机构首页数据
     * @param array $params
     * @return array
     */
    protected function getInstitutionIndexPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);
        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        // 图表
        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;


        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 申样发货数
            'current_express_sample_count' => formatWithUnit($sendSampleService->getExpressSampleCount(), 0),
        ];

        return [
            // gmv中心块
            'gmv_block' => $orderCensusService->getGmvBlock(),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 招商排行(左下)
            'institution_partner_rank_block' => $orderRankService->getInstitutionPartnerRankBlock(),
            // 订单折线图(右上)
            'sales_chart_block' => $orderChartService->getLineChartBlock(),
            // 机构销售榜单(右下)
            'institution_sales_rank_block' => $orderRankService->getInstitutionSalesRankBlock(),
            // 机构销售概况(中心饼图)
            'institution_sales_pie_block' => $orderChartService->getInstitutionSalesPieBlock(),
        ];
    }

    /**
     * 获取分销者实时数据汇总首页数据
     * @param array $params
     * @return array
     */
    protected function getSingleFactoryFxUserIndexPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();;
        $orderRankService->params = $paramService;



        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 去重达人数
            'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
        ];

        // 分销者总数
        $fx_user_count = $orderCensusService->getFxUserCount();

        return [
            // gmv中心块
            'gmv_block' => array_merge($orderCensusService->getGmvBlock(), compact('fx_user_count')),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 小组排行(左下)
            'fx_user_group_rank_block' => $orderRankService->getFxUserGroupRankBlock(),
            // 分销者销售红榜(右上)
            'fx_user_red_sales_block' => $orderRankService->getCommonFxUserRankBlock(),
            // 分销者销售黑榜(右下)
            'fx_user_black_sales_block' => $orderRankService->getCommonFxUserRankBlock('ASC'),
            // 分销者详情
            'fx_user_detail_block' => $orderRankService->getFxUserDetailBlock(),
        ];
    }


    /**
     * 单机构首页数据
     * @param array $params
     * @return array
     */
    protected function getSingleInstitutionIndexPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        // 图表
        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;

        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 申样发货数
            'current_express_sample_count' => formatWithUnit($sendSampleService->getExpressSampleCount(), 0),
        ];

        return [
            // gmv中心块
            'gmv_block' => $orderCensusService->getGmvBlock(),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 招商排行(左下)
            'institution_partner_rank_block' => $orderRankService->getInstitutionPartnerRankBlock(),
            // 订单折线图(右上)
            'sales_chart_block' => $orderChartService->getLineChartBlock(),
            // 产品分类(右下)
            'category_sales_pie_block' => $orderChartService->getCategorySalesPieBlock(),
            // 兵工场销售占比(中心饼图)
            'bing_factory_sales_pie_block' => $orderChartService->getBingFactorySalesPieBlock(),
        ];
    }


    /**
     * 兵工场首页数据
     * @param array $params
     * @return array
     */
    protected function getBingGongChangIndexPosition(array $params): array
    {


        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        // 图表
        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;

        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 去重达人数
            'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
        ];

        return [
            // gmv中心块
            'gmv_block' => $orderCensusService->getGmvBlock(),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 分销者销售排行(左下)
            'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
            // 订单折线图(右上)
            'sales_chart_block' => $orderChartService->getLineChartBlock(),
            // 兵工场排行(右下)
            'bing_factory_rank_block' => $orderRankService->getBingFactoryRankBlock(),
            // 兵工场地图
            'bing_factory_map_block' => $orderChartService->getBingFactoryMapBlock(),
        ];
    }


    /**
     * 单兵工场首页数据
     * @param array $params
     * @return array
     */
    protected function getSingleBingGongChangIndexPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        // 图表
        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;

        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 去重达人数
            'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
        ];

        return [
            // gmv中心块
            'gmv_block' => $orderCensusService->getGmvBlock(),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 分销者销售排行(左下)
            'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
            // 订单折线图(右上)
            'sales_chart_block' => $orderChartService->getLineChartBlock(),
            // 热销产品排行(右下)
            'product_rank_block' => $orderRankService->getProductRankBlock(),
            // 机构销售占比
            'institution_sales_pie_block' => $orderChartService->getInstitutionSalesPieBlock(),
        ];
    }

    /**
     * 兵工场-分销者详情数据
     * @param array $params
     * @return array
     */
    protected function getBingGongChangFxUserPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        return [
            'fx_user_detail_rank_block' => $orderRankService->getFxUserDetailBlock(),
        ];
    }


    /**
     * 分销者-分销者详情数据
     * @param array $params
     * @return array
     */
    protected function getFxUserDetailPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        return [
            'fx_user_detail_rank_block' => $orderRankService->getFxUserDetailBlock(),
        ];
    }


    /**
     * 分销者首页数据
     * @param array $params
     * @return array
     */
    protected function getFxUserIndexPosition(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 汇总
        $orderCensusService = new OrderCensusBusiness();
        $orderCensusService->params = $paramService;

        // 申样
        $sendSampleService = new SendSampleBusiness();
        $sendSampleService->params = $paramService;

        // 排行
        $orderRankService = new OrderRankBusiness();
        $orderRankService->params = $paramService;

        // 图表
        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;

        // 领样数据
        $send_sample_data = [
            // 领样数
            'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
            // 去重达人数
            'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
        ];

        return [
            // gmv中心块
            'gmv_block' => $orderCensusService->getGmvBlock(),
            // 销售数据(左上)
            'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
            // 分销者销售排行(左下)
            'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
            // 订单折线图(右上)
            'sales_chart_block' => $orderChartService->getLineChartBlock(),
            // 达人销售排行(右下)
            'customers_sales_rank_block' => $orderRankService->getCustomersSalesRankBlock(),
            // 地图
            'fx_user_map_block' => $orderChartService->getFxUserMapBlock(),
        ];
    }

    /**
     * 分销者模块
     * @param array $params
     * @return array
     */
    public function getFxUserModule(array $params): array
    {
        return match ($params['position']) {
            WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getFxUserDetailPosition($params),
            default => $this->getFxUserIndexPosition($params),
        };
    }

    /**
     * 产品模块
     * @param array $params
     * @return array
     */
    public function getProductModule(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 产品
        $productService = new ProductBusiness();
        $productService->params = $paramService;

        $orderChartService = new OrderChartBusiness();
        $orderChartService->params = $paramService;

        return [
            // gmv中心块
            'gmv_block' => $productService->getGmvBlock(),
            // 产品地图数据
            'region_block' => $productService->getRegionBlock(),
            // 产品数据汇总(左上)
            'product_census_block' => $productService->getProductCensusBlock(),
            // 出单产品增长情况(左下)
            'product_sales_chart_block' => $productService->getProductSalesChartBlock(),
            // 产品销售排行榜(右上)
            'product_sales_rank_block' => $productService->getProductSalesRandBlock(),
            // 产品分类占比(右下)
            'product_category_chart_block' => $orderChartService->getCategorySalesPieBlock(),
        ];
    }


    /**
     * 旺店通模块
     * @param array $params
     * @return array
     */
    public function getWdtModule(array $params): array
    {
        // 参数服务
        $paramService = new WebSocketParam();
        fillProperty($params, $paramService);

        // 旺店通订单
        $wdtOrderService = new WdtOriginOrderBusiness();
        $wdtOrderService->params = $paramService;

        return [
            // gmv中心块
            'gmv_block' => $wdtOrderService->getGmvBlock(),
            // 店铺资金流向 中
            'fund_flow_block' => $wdtOrderService->getFundFlowBlock(),
            // 成交排行榜(左上)
            'deal_rank_block' => $wdtOrderService->getDealRankBlock(),
            // 订单监控 右上
            'order_monitor_block' => $wdtOrderService->getOrderMonitorBlock(),
            // 店铺销售占比 左下
            'shop_sales_proportion_block' => $wdtOrderService->getShopSalesProportionBlock(),
            // 仓库信息,右下
            'warehouse_info_block' => $wdtOrderService->getWarehouseInfoBlock(),
            // 仓库信息详情
//            'warehouse_info_detail_block' => $wdtOrderService->getWarehouseInfoDetailBlock(),
        ];
    }
}

其中使用到了Module/DataScreen/Common/Params/WebSocketParam.php,为了参数统一

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Common\Params;

class WebSocketParam
{
    // 平台
    public int $platform;
    // 模块
    public int $module;
    // 开始时间
    public int $start_time;
    // 结束时间
    public int $end_time;
    // 开始时间 字符
    public string $date_start;
    // 结束时间 字符
    public string $date_end;
    // 时间类型
    public int $date_type;
    // 时间值
    public int $date_value;
    // 当前位置
    public int $position;
    // 分销者详情开始时间
    public int $fx_user_start_time;
    // 分销者详情结束时间
    public int $fx_user_end_time;
    // 分销者详情开始时间 字符
    public string $fx_user_date_start;
    // 分销者详情结束时间 字符
    public string $fx_user_date_end;
    // 分销者详情编号
    public string $fx_user_no;
    // 分销者详情页码
    public int $fx_user_page;

    // 模块相关用户ID
    public string $module_user_id = '';

    // 用户ID
//    public int $user_id;

    // 数据权限ID
    public string $rule_source_ids = '';

}

使用到了助手函数 填充类属性

if (!function_exists('fillProperty')) {
    /**
     * 使用反射填充属性
     * @param array $params
     * @param object $instance
     */
    function fillProperty(array $params, object $instance): void
    {
        $ref = new ReflectionObject($instance);
        foreach ($params as $key => $value) {
            if ($ref->hasProperty($key)) {
                $prop = $ref->getProperty($key);
                $prop->setAccessible(true); // 允许访问 protected/private
                $prop->setValue($instance, $value);
            }
        }
    }
}

8. 测试

请求ws://127.0.0.1:9503/wsDataScreen/ws测试连接是否成功

由于做了jwt鉴权需要带上token参数,目前我这里token不需要过期

我这里的发送参数类似这样,根据不同模块参数值变化

{
    "module":8,
    "platform":0,
    "date_type":1,
    "date_value":0, 
    "date_start":"2025-08-12",
    "date_end":"2025-09-10",
    "position":0, 
    "fx_user_date_start":"2025-02-12",
    "fx_user_date_end":"2025-03-10",
    "fx_user_page":1,
    "fx_user_no":""
}

9. 创建定时任务

创建Command/DataScreenPushCommand.php文件 定时查询数据是否变化 推送对应客户端

这里注意:手动运行command会推送失败 因为命令行的生命周期不一样,没有启动对应的websocket服务,定时任务可以正常推送。

<?php

declare(strict_types=1);

namespace App\Command;

use App\Service\DataScreenPushService;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Crontab\Annotation\Crontab;
use Hyperf\Di\Annotation\Inject;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Hyperf\WebSocketServer\Sender;

#[Command]
#[Crontab(
    rule: "*\/1 * * * *",
    name: "dataScreenPush",
    type: "command",
    singleton: true,
    callback: [
        'command' => 'dataScreen:push',
        '--disable-event-dispatcher' => true,
    ],
    memo: "数据大屏消息推送",
    enable: false,
)]
class DataScreenPushCommand extends HyperfCommand
{
    #[Inject]
    protected Sender $sender;
    public function __construct(
        protected ContainerInterface $container,
        protected DataScreenPushService $service,
        protected LoggerInterface $logger
    )
    {
        parent::__construct('dataScreen:push');
    }

    public function configure()
    {
        parent::configure();
        $this->setDescription('数据大屏消息推送');
    }

    public function handle()
    {
        try {
            $start = microtime(true);

            $this->service->pushDataScreen();

            $duration = round(microtime(true) - $start, 2);
            $this->info("数据大屏消息推送操作完成,耗时:{$duration}秒");
        } catch (\Throwable $e) {
            // 简单记录错误
            $this->logger->error('数据大屏消息推送系统错误', [
                'error' => $e->getMessage(),
                'file' => $e->getFile(),
                'line' => $e->getLine()
            ]);
            $this->info('数据大屏消息推送系统错误!' . var_export($e->getMessage(), true));
        }

    }
}

创建服务Service/DataScreenPushService.php

<?php

namespace App\Service;

use App\Common\Trait\RedisKeyTrait;
use App\Module\DataScreen\Common\Trait\DataCenterTrait;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Hyperf\WebSocketServer\Sender;
use Psr\Log\LoggerInterface;

class DataScreenPushService
{
    use RedisKeyTrait;
    use DataCenterTrait;

    // 客户端=>筛选的key
    protected string $filter_key;

    // 存活客户端的key
    protected string $alive_key;

    #[Inject]
    private Redis $redis;

    #[Inject]
    private LoggerInterface $logger;

    #[Inject]
    private Sender $sender;
    public function pushDataScreen(): void
    {
        $this->filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
        $this->alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
        // 处理redis数据
        $fds = $this->dealRedisData();

        if (empty($fds)) {
            $this->logger->error('没有存活的客户端');
            return;
        }

        try {
            $filter_group_version = [];
            $filter_group_map_need_push = [];
            foreach ($fds as $fd) {
                $fd = intval($fd);
                if (!$this->sender->check($fd)) {
                    $this->redis->sRem($this->alive_key, $fd);
                    $this->redis->hDel($this->filter_key, $fd);
                    continue;
                }

                // 获取筛选条件
                $params_raw = $this->redis->hGet($this->filter_key, strval($fd));
                $params = json_decode($params_raw ?: '', true);
                if (empty($params)) {
                    $this->logger->error("客户端{$fd}没有筛选条件");
                    continue;
                }

                // 参数转换字符串
                $other_key = $this->generateFilterGroupKey($params);

                // 查询该筛选下的数据
                $filter_group_key = $this->generateKeyPrefix(
                    RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP,
                    RedisKeyEnum::MODULE_DATA_SCREEN,
                    ":{$other_key}");
                $last_data_raw = $this->redis->get($filter_group_key);
                $last_data = json_decode($last_data_raw ?: '', true) ?: [];

                // 当前版本号
                $last_version = ($last_data['version'] ?? 0) ?: 0;
                !isset($filter_group_version[$filter_group_key]) && $filter_group_version[$filter_group_key] = $last_version;

                if ($filter_group_version[$filter_group_key] < $last_version) {
                    // 该筛选查询过了且数据有变化 则直接推送
                    if (!empty($filter_group_map_need_push[$filter_group_key] ?? 0)) {
                        $this->pushHandler($fd, $last_data['data'] ?: [], $params);
                    }
                    continue;
                }

                // 查询数据
                try {
                    $data = $this->dataCenter($params) ?: [];
                } catch (\Throwable $e) {
                    $this->logger->error('查询数据异常:参数为:' . json_encode($params) . ',异常原因:' . $e->getMessage());
                    continue;
                }

                $this->redis->set($filter_group_key, json_encode(['filter' => $params, 'data' => $data, 'version' => $filter_group_version[$filter_group_key] + 1, 'update_time' => time()]));
                $this->redis->expire($filter_group_key, 7200);

                if (($last_data['data'] ?? []) != $data) {
                    $filter_group_map_need_push[$filter_group_key] = 1;
                    $this->pushHandler($fd, $data, $params);
                }
            }
        } catch (\Throwable $e) {
            $this->logger->error('数据大屏消息推送消息异常:' . $e->getMessage());
        }
    }

    /**
     * 推送消息
     * @param int $fd
     * @param array $data
     * @param array $last_filter_param
     * @return bool
     * @throws \RedisException
     */
    protected function pushHandler(int $fd, array $data, array $last_filter_param): bool
    {
        // 检查客户端是否在线
        if (!$this->sender->check($fd)) {
            $this->redis->sRem($this->alive_key, $fd);
            $this->redis->hDel($this->filter_key, $fd);
            return false;
        }

        // 再次检查筛选条件是否变化
        $params_raw = $this->redis->hGet($this->filter_key, strval($fd));
        $params = json_decode($params_raw ?: '', true);
        if ($params != $last_filter_param) {
            return false;
        }

        // 推送消息
        return $this->sender->push($fd, responseSuccess($data));
    }

    /**
     * 对比数据不同处
     * @param array $last_data
     * @param array $data
     * @return array
     */
    protected function compareData(array $last_data, array $data): array
    {
        if (empty($last_data) || empty($data)) {
            return $data;
        }

        $return_data = [];
        foreach ($data as $key => $value) {
            if (!isset($last_data[$key])) {
                $return_data[$key] = $value;
                continue;
            }
            if ($value != $last_data[$key]) {
                $return_data[$key] = $value;
            }
        }
        return $return_data;
    }

    /**
     * 处理redis数据
     * @return array
     */
    protected function dealRedisData(): array
    {
        try {
            // 存活的fd
            $fds = $this->getAliveFds();

            // 客户端对应的筛选
            if (empty($fds)) {
                $this->redis->del($this->filter_key);
                return [];
            }

            // 获取所有key
            $all_filter_fd = $this->redis->hKeys($this->filter_key);
            foreach ($all_filter_fd as $fd_item) {
                if (!in_array($fd_item, $fds)) {
                    // 删除不存在的fd
                    $this->redis->hDel($this->filter_key, $fd_item);
                }
            }
            return $fds;
        } catch (\Throwable $e) {
            $this->logger->error("操作redis出现错误:" . $e->getMessage());
            return [];
        }

    }

    /**
     * 获取存活的客户端
     * @return array
     * @throws \RedisException
     */
    protected function getAliveFds(): array
    {
        return $this->redis->sMembers($this->alive_key) ?: [];
    }
}

使用到了返回函数

if (!function_exists('responseSuccess')) {
    /**
     * 成功响应
     */
    function responseSuccess(mixed $data = null, string $message = 'success', int $code = 200): false|string
    {
        return json_encode([
            'code' => $code,
            'message' => $message,
            'data' => $data,
            'timestamp' => time()
        ]);
    }
}

if (!function_exists('responseError')) {
    /**
     * 失败响应
     */
    function responseError(string $message = '', int $code = 400, mixed $data = null): false|string
    {
        return json_encode([
            'code' => $code,
            'message' => $message ?: default_error_message($code),
            'data' => $data,
            'timestamp' => time()
        ]);
    }
}

10. 反向代理

配置nginx反向代理 顺便升级w s为w s s

# 至少需要一个 Hyperf 节点,多个配置多行
upstream hyperf_websocket {
    # Hyperf WebSocket Server 的 IP 及 端口
    server 127.0.0.1:9503;
}

server {
    listen 80;
    listen 443 ssl;
    # 改成你的域名 可以匹配多个
    server_name xxx.com;
    # 配置 ssl  替换成自己的已经获得的域名签名文件名
    ssl_certificate     xxx.pem;  # pem文件的路径
    ssl_certificate_key  xxx.pem; # key文件的路径
    ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
    ssl_ciphers EECDH+CHACHA20:EECDH+CHACHA20-draft:EECDH+AES128:RSA+AES128:EECDH+AES256:RSA+AES256:EECDH+3DES:RSA+3DES:!MD5;
    ssl_prefer_server_ciphers on;
    ssl_session_tickets on;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    add_header Strict-Transport-Security "max-age=31536000";
    error_page 497  https://$host$request_uri;
    
    location / {
        # WebSocket Header
        proxy_http_version 1.1;
        proxy_set_header Upgrade websocket;
        proxy_set_header Connection "Upgrade";
        
        # 将客户端的 Host 和 IP 信息一并转发到对应节点  
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
    
        # 客户端与服务端无交互 60s 后自动断开连接,请根据实际业务场景设置
        proxy_read_timeout 60s ;
        
        # 执行代理访问真实服务器
        proxy_pass http://hyperf_websocket;
    }
}

配置好之后可以使用wss://xxx.com/wsDataScreen/ws连接了

11. 扩展

可以安装websocket客户端测试,非必要

composer require hyperf/websocket-client

创建控制器

<?php
declare(strict_types=1);

namespace App\Controller;

use Hyperf\Di\Annotation\Inject;
use Hyperf\WebSocketClient\ClientFactory;
use Hyperf\WebSocketClient\Frame;

class IndexController
{
    #[Inject]
    protected ClientFactory $clientFactory;

    public function index()
    {
        // 对端服务的地址,如没有提供 ws:// 或 wss:// 前缀,则默认补充 ws://
        $host = '127.0.0.1:9503';
        // 通过 ClientFactory 创建 Client 对象,创建出来的对象为短生命周期对象
        $client = $this->clientFactory->create($host);
        // 向 WebSocket 服务端发送消息
        $client->push('HttpServer 中使用 WebSocket Client 发送数据。');
        // 获取服务端响应的消息,服务端需要通过 push 向本客户端的 fd 投递消息,才能获取;以下设置超时时间 2s,接收到的数据类型为 Frame 对象。
        /** @var Frame $msg */
        $msg = $client->recv(2);
        // 获取文本数据:$res_msg->data
        return $msg->data;
    }
}

12. 总结

该流程会在握手阶段把存活的客户端fd保存在redis的DataScreen_prod_websocket_active_fd中。

数据类似

1161

然后记录客户端的筛选条件到redis的DataScreen_prod_websocket_fd_filter_v1中。

数据类似 3209对应

{
    "date_end": "2025-09-26",
    "date_start": "2025-09-25",
    "date_type": 1,
    "date_value": 0,
    "end_time": 1758902399,
    "fx_user_date_end": "",
    "fx_user_date_start": "",
    "fx_user_end_time": 0,
    "fx_user_no": "",
    "fx_user_page": 1,
    "fx_user_start_time": 0,
    "module": 8,
    "module_user_id": "",
    "platform": 1,
    "position": 0,
    "rule_source_ids": "",
    "start_time": 1758729600
}

每次查询都会记录数据缓存到DataScreen_prod_websocket_fd_filter_group:date_end__date_start__date_type_0_date_value_1_end_time_1762790399_fx_user_date_end__fx_user_date_start__fx_user_end_time_0_fx_user_no__fx_user_page_1_fx_user_start_time_0_module_8_module_user_id__platform_1_position_0_rule_source_ids__start_time_1762704000_v1hash中,为命令行校验数据变动使用,同时也加快了查询效率。

数据类似

{
    "filter": {
        "date_end": "",
        "date_start": "",
        "date_type": 0,
        "date_value": 1,
        "end_time": 1762790399,
        "fx_user_date_end": "",
        "fx_user_date_start": "",
        "fx_user_end_time": 0,
        "fx_user_no": "",
        "fx_user_page": 1,
        "fx_user_start_time": 0,
        "module": 8,
        "module_user_id": "",
        "platform": 1,
        "position": 0,
        "rule_source_ids": "",
        "start_time": 1762704000
    },
    "data": {},
    "version": 1,
    "update_time": 1762763257
}
0

评论区