基于Hyperf+ WebSocket+redis实现数据大屏的实时推送
1. 安装扩展
安装 websocket 服务端
composer require hyperf/websocket-server
2. 配置server
在config/autoload/server.php增加配置
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
use Hyperf\Server\Event;
use Hyperf\Server\Server;
use Swoole\Constant;
return [
'mode' => SWOOLE_PROCESS,
'servers' => [
[
'name' => 'http',
'type' => Server::SERVER_HTTP,
'host' => '0.0.0.0',
'port' => 9501,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
Event::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
],
'options' => [
// Whether to enable request lifecycle event
'enable_request_lifecycle' => false,
],
],
// 增加部分
[
'name' => 'wsDataScreen',
'type' => Server::SERVER_WEBSOCKET,
'host' => '0.0.0.0',
'port' => 9503,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
],
'settings' => [
// 心跳检测配置
'heartbeat_idle_time' => 600, // 连接最大空闲时间(秒)
'heartbeat_check_interval' => 60, // 心跳检测间隔(秒)
'open_websocket_ping_frame' => false, // 开启PING帧自动回复
// 'open_websocket_pong_frame' => true, // 开启PONG帧处理
],
],
],
'settings' => [
Constant::OPTION_ENABLE_COROUTINE => true, // 开启内置协程
Constant::OPTION_WORKER_NUM => swoole_cpu_num(), // 设置启动的 Worker 进程数
Constant::OPTION_PID_FILE => BASE_PATH . '/runtime/hyperf.pid', // master 进程的 PID
Constant::OPTION_OPEN_TCP_NODELAY => true, // TCP 连接发送数据时会关闭 Nagle 合并算法,立即发往客户端连接
Constant::OPTION_MAX_COROUTINE => 100000, // 设置当前工作进程最大协程数量
Constant::OPTION_OPEN_HTTP2_PROTOCOL => true, // 启用 HTTP2 协议解析
Constant::OPTION_MAX_REQUEST => 100000, // 设置 worker 进程的最大任务数
Constant::OPTION_SOCKET_BUFFER_SIZE => 2 * 1024 * 1024, // 配置客户端连接的缓存区长度
Constant::OPTION_BUFFER_OUTPUT_SIZE => 2 * 1024 * 1024,
// 添加任务工作进程配置
'task_worker_num' => swoole_cpu_num(), // 根据服务器CPU核心数设置
'task_enable_coroutine' => true, // 启用协程支持
],
'callbacks' => [
Event::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'],
Event::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'],
Event::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],
Event::ON_TASK => [Hyperf\Framework\Bootstrap\TaskCallback::class, 'onTask'],
Event::ON_FINISH => [Hyperf\Framework\Bootstrap\FinishCallback::class, 'onFinish'],
],
];
3. 配置路由
在config/routes.php下新增
Router::addServer('wsDataScreen', function () {
Router::get('/wsDataScreen/ws', \App\Module\DataScreen\Controller\WebSocketController::class);
});
4. 中间件配置
可以在config/autoload/middlewares.php配置中间件,但是只有握手阶段会走该中间件,所以我这里不配置,因为我需要发送消息阶段也需要鉴权
'wsDataScreen' => [
// \App\Middleware\DataScreenAuthMiddleware::class
]
5. 创建控制器
在Module/DataScreen/Controller/WebSocketController.php下创建控制器
<?php
//declare(strict_types=1);
namespace App\Module\DataScreen\Controller;
use App\Common\Enum\AuthGuardEnum;
use App\Common\Trait\RedisKeyTrait;
use App\Middleware\DataScreenAuthMiddleware;
use App\Module\DataScreen\Business\WebSocketBusiness;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use App\Module\DataScreen\Repository\AdminDataRuleRepository;
use App\Module\DataScreen\Request\WebSocketRequest;
use Hyperf\WebSocketServer\Context;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Engine\WebSocket\Frame;
use Hyperf\Engine\WebSocket\Response;
use Hyperf\Redis\Redis;
use Hyperf\Validation\Contract\ValidatorFactoryInterface;
use Hyperf\WebSocketServer\Constant\Opcode;
use Qbhy\HyperfAuth\AuthManager;
class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
use RedisKeyTrait;
#[Inject]
protected AuthManager $auth;
#[Inject]
private Redis $redis;
#[Inject]
private ValidatorFactoryInterface $validationFactory;
#[Inject]
private WebSocketRequest $webSocketRequest;
#[Inject]
private AdminDataRuleRepository $adminDataRuleRepository;
#[Inject]
private WebSocketBusiness $webSocketBusiness;
public function onMessage($server, $frame): void
{
$alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
$response = (new Response($server))->init($frame);
// 客户端ID
$fd = $frame->fd;
// 获取用户信息
$user = Context::get("user:{$fd}");
if (!$user) {
$server->disconnect($fd, 401, '请先登录');
return;
}
// 获取用户数据权限
$data_rule = Context::get("user_data_rule:{$user->id}");
if($frame->opcode == Opcode::TEXT && $frame->data == 'ping') {
// 手动处理心跳帧
$response->push(new Frame(opcode: Opcode::TEXT, payloadData: 'pong'));
// 添加到存活列表
$this->redis->sAdd($alive_key, $fd);
// 续期
$this->redis->expire($alive_key, RedisKeyEnum::ALIVE_TTL);
return;
}
try {
$data = json_decode($frame->data, true, 512, JSON_THROW_ON_ERROR);
// 使用验证器验证数据
$validator = $this->validationFactory->make(
$data,
$this->webSocketRequest->rules($data),
$this->webSocketRequest->messages()
);
if ($validator->fails()) {
$error = $validator->errors()->first();
throw new \InvalidArgumentException($error);
}
// 统一参数
$params = $this->webSocketBusiness->formatParams($data, $data_rule);
// redis存储 客户端=>筛选条件
$this->webSocketBusiness->storeClientFilters($fd, $params);
// 立即返回当前数据
$census_data = $this->webSocketBusiness->getCurrentData($params);
$response->push(new Frame(payloadData: responseSuccess($census_data)));
} catch (\Throwable $e) {
$response->push(new Frame(payloadData: responseError($e->getMessage())));
}
}
public function onClose($server, int $fd, int $reactorId): void
{
// 移除存活列表集合中指定的fd
$alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
$this->redis->sRem($alive_key, $fd);
// 移除fd的筛选
$filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
$this->redis->hDel($filter_key, $fd);
}
public function onOpen($server, $request): void
{
$response = (new Response($server))->init($request);
try {
if (!$this->auth->guard(AuthGuardEnum::DATA_SCREEN_JWT)->check()) {
throw new \RuntimeException('请先登录', 401);
}
// 将用户信息存储在上下文中,以便后续使用
$user = $this->auth->guard(AuthGuardEnum::DATA_SCREEN_JWT)->user();
if (empty($user)) {
throw new \RuntimeException('请先登录', 401);
}
Context::set("user:{$request->fd}", $user);
// 查询用户的数据权限
$data_rule = $this->adminDataRuleRepository->getAdminDataRuleByAdminId($user->id);
Context::set("user_data_rule:{$user->id}", $data_rule);
$alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
// 添加到存活列表
$this->redis->sAdd($alive_key, $request->fd);
// 续期
$this->redis->expire($alive_key, RedisKeyEnum::ALIVE_TTL);
$response->push(new Frame(payloadData: responseSuccess('连接成功')));
} catch (\Throwable $e) {
if ($e->getCode() == 401) {
$server->disconnect($request->fd, 401, $e->getMessage());
return;
}
$response->push(new Frame(payloadData: responseError($e->getMessage())));
}
}
}
其中使用到了RedisKeyTrait特征,主要是为了生成用户筛选唯一的ke y
<?php
declare (strict_types=1);
namespace App\Common\Trait;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use function Hyperf\Config\config;
trait RedisKeyTrait
{
/**
* 生成前缀
* @param string $key
* @param string $module
* @param string $other_key
* @return string
*/
public function generateKeyPrefix(string $key, string $module = RedisKeyEnum::MODULE_DATA_SCREEN, string $other_key = '', int $version = 1): string
{
$env = config('app_env');
$version_str = !empty($version) ? "_v{$version}" : '';
return "{$module}_{$env}_{$key}{$other_key}{$version_str}";
}
/**
* 生成根据筛选条件定义的key
* @param array $params
* @return string
*/
public function generateFilterGroupKey(array $params): string
{
return implode('_', array_merge(...array_map(fn($k, $v) => [$k, $v], array_keys($params), $params)));
}
}
使用到常量
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Enum;
class RedisKeyEnum
{
// 存活时间
const ALIVE_TTL = 7200;
// 应用名称-数据大屏
const MODULE_DATA_SCREEN = 'DataScreen';
const MODULE_DATA_CENTER = 'DataCenter';
// 存活的客户端key
const REDIS_KEY_WEBSOCKET_FD = 'websocket_active_fd';
// 客户端的筛选
const REDIS_KEY_WEBSOCKET_FD_FILTER = 'websocket_fd_filter';
// 筛选分组下的数据
const REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP = 'websocket_fd_filter_group';
// 平台商品关联货品缓存
const REDIS_KEY_WDT_GOODS_LINK = 'wdt_goods_link';
// 交易漏斗缓存
const REDIS_KEY_WDT_ORDER_TRANSFORM = 'wdt_order_transform';
// 核心交易
const REDIS_KEY_WDT_ORDER_CORE_API = 'wdt_order_core_api';
// 订单结构
const REDIS_KEY_WDT_ORDER_STRUCT = 'wdt_order_struct';
// 售后
const REDIS_KEY_WDT_ORDER_AFTER_SALE = 'wdt_order_after_sale';
// 退款top
const REDIS_KEY_WDT_ORDER_REFUND_TOP = 'wdt_order_refund_top';
// 库存
const REDIS_KEY_WDT_CENSUS_STOCK = 'wdt_census_stock';
// 各地库存
const REDIS_KEY_WDT_WAREHOUSE_STOCK = 'wdt_warehouse_stock';
// 地图
const REDIS_KEY_WDT_OVERVIEW_MAP = 'wdt_overview_map';
// 退款占比
const REDIS_KEY_WDT_REFUND_RATE = 'wdt_refund_rate';
// 入账金额折线图
const REDIS_KEY_WDT_PAY_MAP = 'wdt_pay_map';
}
验证器验证规则
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Request;
use Carbon\Carbon;
class WebSocketRequest
{
/**
* 验证规则
*/
public function rules(array $requestData): array
{
return [
'module' => 'required|in:1,2,3,4,5,6,7,8',
'platform' => 'required|in:0,1,2',
'date_type' => 'required|in:0,1',
'date_value' => 'required_if:date_type,0|in:0,1,2,3,4',
'date_start' => [
'required_if:date_type,1',
'date',
'date_format:Y-m-d',
'before_or_equal:date_end',
function ($attr, $value, $fail) use ($requestData) {
if (($requestData['date_type'] ?? 0) == 1 && !empty($requestData['date_end'] ?? '')) {
$start = Carbon::parse($value);
$end = Carbon::parse($requestData['date_end']);
if ($start->diffInDays($end) > 31) {
$fail('日期范围不能超过31天');
}
}
}
],
'date_end' => 'required_if:date_type,1|date|date_format:Y-m-d|after_or_equal:date_start',
'module_user_id' => [
'required_if:module,5,6,7',
'not_in:undefined',
],
'position' => 'required|in:0,1,2',
'fx_user_date_start' => [
'required_if:position,1,2',
'date',
'date_format:Y-m-d',
'before_or_equal:fx_user_date_end',
function ($attr, $value, $fail) use ($requestData) {
if (!empty($requestData['fx_user_date_end'] ?? '')) {
$start = Carbon::parse($value);
$end = Carbon::parse($requestData['fx_user_date_end']);
if ($start->diffInDays($end) > 31) {
$fail('日期范围不能超过31天');
}
}
}
],
'fx_user_date_end' => 'required_if:position,1,2|date|date_format:Y-m-d|after_or_equal:fx_user_date_start',
'fx_user_page' => 'required_if:position,1,2|numeric|min:1',
];
}
/**
* 自定义错误消息
*/
public function messages(): array
{
return [
'module.required' => '模块不能为空',
'module.in' => '模块参数非法',
'module_user_id.required_if' => '相关用户ID不能为空',
'module_user_id.not_in' => '相关用户ID不能为空!',
'platform.required' => '平台不能为空',
'platform.in' => '平台参数非法',
'date_type.required' => '时间类型不能为空',
'date_type.in' => '时间类型参数非法',
'date_value.required_if' => '时间筛选不能为空',
'date_value.in' => '时间筛选参数非法',
'date_start.required_if' => '开始时间不能为空',
'date_start.date' => '开始时间参数非法',
'date_start.date_format' => '开始时间格式必须为YYYY-MM-DD',
'date_start.before_or_equal' => '开始时间参数必须小于等于结束时间',
'date_end.required_if' => '结束时间不能为空',
'date_end.date' => '结束时间参数非法',
'date_end.date_format' => '结束时间格式必须为YYYY-MM-DD',
'date_end.before_or_equal' => '结束时间参数必须小于等于开始时间',
'position.required' => '当前停留的位置参数不能为空',
'fx_user_date_start.required_if' => '开始时间不能为空',
'fx_user_date_start.date' => '开始时间参数非法',
'fx_user_date_start.date_format' => '开始时间格式必须为YYYY-MM-DD',
'fx_user_date_start.before_or_equal' => '开始时间参数必须小于等于结束时间',
'fx_user_date_end.required_if' => '结束时间不能为空',
'fx_user_date_end.date' => '结束时间参数非法',
'fx_user_date_end.date_format' => '结束时间格式必须为YYYY-MM-DD',
'fx_user_date_end.before_or_equal' => '结束时间参数必须小于等于开始时间',
'fx_user_page.required_if' => '页码不能为空',
'fx_user_page.numeric' => '页码必须为数字',
'fx_user_page.min' => '页码最小值为1',
];
}
}
其中使用到了查询用户拥有的数据权限(无需关注)
<?php
declare(strict_types=1);
namespace App\Module\DataScreen\Repository;
use App\Common\Utils\BaseRepository;
use App\Module\DataScreen\Model\Admin;
use App\Module\DataScreen\Model\AdminDataRule;
use App\Module\DataScreen\Model\DataRule;
class AdminDataRuleRepository extends BaseRepository
{
public function __construct()
{
$this->setModel(new AdminDataRule());
}
public function getByAdminId($adminId)
{
return $this->getModel()::query()->where('admin_id', $adminId)->get();
}
/**
* 获取用户的数据权限
* @param int $adminId
* @return array
*/
public function getAdminDataRuleByAdminId(int $adminId): array
{
$table_name = $this->getModel()->getTable();
$data_rule_table_name = (new DataRule())->getTable();
return $this->getModel()::query()
->select([
"{$table_name}.admin_id",
"{$data_rule_table_name}.type",
"{$data_rule_table_name}.step_type",
"{$data_rule_table_name}.rule_id",
"{$data_rule_table_name}.source_id",
])
->where("{$table_name}.admin_id", $adminId)
->join($data_rule_table_name, "{$table_name}.rule_id", '=', "{$data_rule_table_name}.id")
->get()->toArray();
}
/**
* 删除旧的数据权限
* @param int $adminId
* @return int|mixed
*/
public function deleteOldDataRules(int $adminId): mixed
{
return $this->getModel()::query()->where('admin_id', $adminId)->delete();
}
}
6. 创建服务层
创建Module/DataScreen/Business/WebSocketBusiness.php
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Business;
use App\Common\Trait\RedisKeyTrait;
use App\Module\DataScreen\Common\Trait\DataCenterTrait;
use App\Module\DataScreen\Common\Trait\DataRuleTrait;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use App\Module\DataScreen\Enum\WebSocketEnum;
use Carbon\Carbon;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
class WebSocketBusiness
{
use RedisKeyTrait;
use DataCenterTrait;
use DataRuleTrait;
#[Inject]
private Redis $redis;
/**
* 统一参数格式
* @param array $data 传递参数
* @param array $data_rule 数据权限
* @return array
*/
public function formatParams(array $data, array $data_rule): array
{
$params = [
// 模块 1兵工场 2机构 3分销者 4产品 5单兵工场页面
'module' => intval($data['module'] ?? WebSocketEnum::MODULE_BING_GONG_CHANG),
// 模块相关用户ID
'module_user_id' => trim($data['module_user_id'] ?? ''),
// 平台 1快手 2视频号 3抖音
'platform' => intval($data['platform'] ?? WebSocketEnum::PLATFORM_KS),
// 时间类型:0时间选择 1自定义时间
'date_type' => intval($data['date_type'] ?? WebSocketEnum::DATE_TYPE_SELECT),
// 时间选择 1今日 2昨日 3本月 4上月(当时间类型为0时必传 为1的时候传0)
'date_value' => intval($data['date_value'] ?? 0),
// 开始时间(当时间类型为1时必传 为0的时候传'')
'date_start' => trim($data['date_start'] ?? ''),
// 结束时间(当时间类型为1时必传 为0的时候传'')
'date_end' => trim($data['date_end'] ?? ''),
// 当前页面:0首页 1分销者数据详情
'position' => intval($data['position'] ?? WebSocketEnum::POSITION_INDEX),
// 分销者数据详情时间筛选-开始时间
'fx_user_date_start' => trim($data['fx_user_date_start'] ?? ''),
// 分销者数据详情时间筛选-结束时间
'fx_user_date_end' => trim($data['fx_user_date_end'] ?? ''),
// 分销者用户编号
'fx_user_no' => trim($data['fx_user_no'] ?? ''),
// 分销者数据详情页码
'fx_user_page' => intval($data['fx_user_page'] ?? 1),
];
// 时间筛选
[$params['start_time'], $params['end_time']] = getTimeByType($params['date_type'], $params['date_value'], $params['date_start'], $params['date_end']);
// 分销者详情数据时间
[$params['fx_user_start_time'], $params['fx_user_end_time']] = getTimeByType(
WebSocketEnum::DATE_TYPE_DIY,
0,
$params['fx_user_date_start'],
$params['fx_user_date_end']
);
$params = array_merge($params, $this->getDataRuleParams($params, $data_rule));
ksort($params);
return $params;
}
public function storeClientFilters(int $fd, array $params): void
{
$json_params = json_encode($params, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
$filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
// 存储客户端筛选条件
$this->redis->hSet($filter_key, strval($fd), $json_params);
}
/**
* 获取当前数据
* @param array $params
* @return array
* @throws \RedisException
*/
public function getCurrentData(array $params): array
{
// 查询是否有缓存
$other_key = $this->generateFilterGroupKey($params);
$filter_group_key = $this->generateKeyPrefix(
RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP,
RedisKeyEnum::MODULE_DATA_SCREEN,
":{$other_key}");
$cache_data = $this->redis->get($filter_group_key) ?: '';
$data_raw = !empty($cache_data) ? json_decode($cache_data, true) : [];
$data = ($data_raw['data'] ?? []) ?: [];
// 数据版本号
$last_version = ($data_raw['version'] ?? 0) ?: 0;
if (!empty($data)) {
return $data;
}
// 实时查询
$data = $this->dataCenter($params);
// 设置缓存数据
$this->redis->set($filter_group_key, json_encode(['filter' => $params, 'data' => $data, 'version' => $last_version + 1, 'update_time' => time()]));
$this->redis->expire($filter_group_key, 7200);
return $data;
}
}
其中使用到了数据查询模块常量
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Enum;
class WebSocketEnum
{
/** 模块 */
const MODULE_BING_GONG_CHANG = 1;
const MODULE_INSTITUTION = 2;
const MODULE_FX_USER = 3;
const MODULE_FX_PRODUCT = 4;
const MODULE_SINGLE_BING_GONG_CHANG = 5;
const MODULE_SINGLE_INSTITUTION = 6;
const MODULE_SINGLE_FACTORY_FX_USER = 7;
// 旺店通
const MODULE_WANG_DAN_TONG = 8;
const MODULE_DESC = [
self::MODULE_BING_GONG_CHANG => '兵工场',
self::MODULE_INSTITUTION => '机构',
self::MODULE_FX_USER => '分销者',
self::MODULE_FX_PRODUCT => '产品',
self::MODULE_SINGLE_BING_GONG_CHANG => '单兵工场',
self::MODULE_SINGLE_INSTITUTION => '单机构',
self::MODULE_SINGLE_FACTORY_FX_USER => '分销者实时数据汇总',
self::MODULE_WANG_DAN_TONG => '旺店通',
];
/** 平台 */
const PLATFORM_KS = 1;
const PLATFORM_SPH = 2;
const PLATFORM_DY = 3;
const PLATFORM_WDT = 0;
const PLATFORM_DESC = [
self::PLATFORM_KS => '快手',
self::PLATFORM_SPH => '视频号',
self::PLATFORM_DY => '抖音',
self::PLATFORM_WDT => '旺店通',
];
/** 时间类型 */
const DATE_TYPE_SELECT = 0;
const DATE_TYPE_DIY = 1;
const DATE_TYPE_DESC = [
self::DATE_TYPE_SELECT => '时间选择',
self::DATE_TYPE_DIY => '自定义时间',
];
/** 时间选择 */
const DATE_SELECT_TODAY = 1;
const DATE_SELECT_YESTERDAY = 2;
const DATE_SELECT_THIS_MONTH = 3;
const DATE_SELECT_LAST_MONTH = 4;
// 时间选择描述
const DATE_SELECT_DESC = [
self::DATE_SELECT_TODAY => '今日',
self::DATE_SELECT_YESTERDAY => '昨日',
self::DATE_SELECT_THIS_MONTH => '本月',
self::DATE_SELECT_LAST_MONTH => '上月',
];
// 上期的描述
const DATE_PREVIOUS_DESC = [
self::DATE_SELECT_TODAY => '昨日',
self::DATE_SELECT_YESTERDAY => '前日',
self::DATE_SELECT_THIS_MONTH => '上月',
self::DATE_SELECT_LAST_MONTH => '前月',
];
// 循环类型
const FOR_TYPE_HOUR = 0;
const FOR_TYPE_DAY = 1;
const FOR_TYPE_DESC = [
self::FOR_TYPE_HOUR => '小时',
self::FOR_TYPE_DAY => '天',
];
/** 当前停留的块 */
const POSITION_INDEX = 0;
const POSITION_FX_USER_DETAIL = 1;
const POSITION_INSTITUTION_PARTNER_DETAIL = 2;
const POSITION_DESC = [
self::POSITION_INDEX => '首页',
self::POSITION_FX_USER_DETAIL => '分销者数据详情',
self::POSITION_INSTITUTION_PARTNER_DETAIL => '机构招商数据详情',
];
// 默认页码
const DEFAULT_PAGE = 10;
}
其中使用到了助手函数 主要是获取开始时间、结束时间、以及折线图需要的数据
if (!function_exists('getTimeByType')) {
/**
* 根据时间类型获取时间
*/
function getTimeByType(int $date_type, int $date_value = 0, string $date_start = '', string $date_end = ''): array
{
$start_time = $end_time = $for_i = $for_type = 0;
if ($date_type == WebSocketEnum::DATE_TYPE_SELECT) {
switch ($date_value) {
case WebSocketEnum::DATE_SELECT_TODAY:
$start_time = Carbon::today()->startOfDay()->timestamp;
$end_time = Carbon::today()->endOfDay()->timestamp;
$for_type = WebSocketEnum::FOR_TYPE_HOUR;
$for_i = 23;
break;
case WebSocketEnum::DATE_SELECT_YESTERDAY:
$start_time = Carbon::yesterday()->startOfDay()->timestamp;
$end_time = Carbon::yesterday()->endOfDay()->timestamp;
$for_type = WebSocketEnum::FOR_TYPE_HOUR;
$for_i = 23;
break;
case WebSocketEnum::DATE_SELECT_THIS_MONTH:
$start_time_handler = Carbon::now()->startOfMonth();
$end_time_handler = Carbon::now()->endOfMonth();
$for_i = $end_time_handler->diffInDays($start_time_handler);
$start_time = $start_time_handler->timestamp;
$end_time = $end_time_handler->timestamp;
$for_type = WebSocketEnum::FOR_TYPE_DAY;
break;
case WebSocketEnum::DATE_SELECT_LAST_MONTH:
$start_time_handler = Carbon::now()->subMonth()->startOfMonth();
$end_time_handler = Carbon::now()->subMonth()->endOfMonth();
$for_i = $end_time_handler->diffInDays($start_time_handler);
$start_time = $start_time_handler->timestamp;
$end_time = $end_time_handler->timestamp;
$for_type = WebSocketEnum::FOR_TYPE_DAY;
break;
}
} elseif ($date_type == WebSocketEnum::DATE_TYPE_DIY) {
if (empty($date_start) && empty($date_end)) {
return [$start_time, $end_time, $for_type, $for_i];
}
$start_time_handler = Carbon::parse($date_start)->startOfDay();
$end_time_handler = Carbon::parse($date_end)->endOfDay();
if ($start_time_handler->isSameDay($end_time_handler)) {
$for_type = WebSocketEnum::FOR_TYPE_HOUR;
$for_i = 23;
} else {
$for_type = WebSocketEnum::FOR_TYPE_DAY;
$for_i = $end_time_handler->diffInDays($start_time_handler);
}
$start_time = $start_time_handler->timestamp;
$end_time = $end_time_handler->timestamp;
}
return [$start_time, $end_time, $for_type, $for_i];
}
}
还使用到了DataRuleTrait特征,主要是获取当前用户可查看的数据范围ID列表
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Common\Trait;
use App\Module\DataScreen\Enum\DataRuleEnum;
use App\Module\DataScreen\Enum\WebSocketEnum;
trait DataRuleTrait
{
/**
* 数据权限
* @param array $params
* @param array $data_rule
* @return string[]
*/
public function getDataRuleParams(array $params, array $data_rule): array
{
$return_data = [
'rule_source_ids' => ''
];
$data_rule_data = [];
foreach ($data_rule as $rule) {
$data_rule_data[$rule['type']][$rule['step_type']][] = $rule['source_id'];
}
$module = intval($params['module']);
if (!in_array($module, [WebSocketEnum::MODULE_BING_GONG_CHANG, WebSocketEnum::MODULE_INSTITUTION])) {
return $return_data;
}
$return_data['rule_source_ids'] = '-1';
if ($module == WebSocketEnum::MODULE_BING_GONG_CHANG) {
$rule_source_ids = ($data_rule_data[DataRuleEnum::TYPE_FACTORY][DataRuleEnum::STEP_TYPE_FACTORY] ?? []) ?: [];
$return_data['rule_source_ids'] = !empty($rule_source_ids) ? implode(',', $rule_source_ids) : '-1';
return $return_data;
}
if ($module == WebSocketEnum::MODULE_INSTITUTION) {
$step_type = DataRuleEnum::PLATFORM_STEP_TYPE_MAP[intval($params['platform'])];
$rule_source_ids = ($data_rule_data[DataRuleEnum::TYPE_INSTITUTION][$step_type] ?? []) ?: [];
$return_data['rule_source_ids'] = !empty($rule_source_ids) ? implode(',', $rule_source_ids) : '-1';
return $return_data;
}
return $return_data;
}
}
7. 创建数据查询特征
创建数据查询中心特征Module/DataScreen/Common/Trait/DataCenterTrait.php,无需关注业务逻辑 只需要关注中心方法dataCenter()即可,每个Business需要有public WebSocketParam $params;属性
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Common\Trait;
use App\Module\DataScreen\Business\OrderCensusBusiness;
use App\Module\DataScreen\Business\OrderChartBusiness;
use App\Module\DataScreen\Business\OrderRankBusiness;
use App\Module\DataScreen\Business\ProductBusiness;
use App\Module\DataScreen\Business\SendSampleBusiness;
use App\Module\DataScreen\Business\WdtOriginOrderBusiness;
use App\Module\DataScreen\Common\Params\WebSocketParam;
use App\Module\DataScreen\Enum\WebSocketEnum;
use Hyperf\Di\Annotation\Inject;
trait DataCenterTrait
{
#[Inject]
private WebSocketParam $webSocketParam;
#[Inject]
private OrderCensusBusiness $orderCensusBusiness;
#[Inject]
private SendSampleBusiness $sendSampleBusiness;
#[Inject]
private OrderRankBusiness $orderRankBusiness;
#[Inject]
private OrderChartBusiness $orderChartBusiness;
#[Inject]
private ProductBusiness $productBusiness;
/**
* 数据统一处理
* @param array $params
* @return array
*/
public function dataCenter(array $params): array
{
$return_data = [];
switch ($params['module']) {
case WebSocketEnum::MODULE_BING_GONG_CHANG:
// 兵工场模块
$return_data = $this->getBingGongChangModule($params);
break;
case WebSocketEnum::MODULE_INSTITUTION:
// 机构模块
$return_data = $this->getInstitutionModule($params);
break;
case WebSocketEnum::MODULE_FX_USER:
// 分销者模块
$return_data = $this->getFxUserModule($params);
break;
case WebSocketEnum::MODULE_FX_PRODUCT:
// 产品模块
$return_data = $this->getProductModule($params);
break;
case WebSocketEnum::MODULE_SINGLE_BING_GONG_CHANG:
// 单兵工场模块
$return_data = $this->getSingleBingGongChangModule($params);
break;
case WebSocketEnum::MODULE_SINGLE_INSTITUTION:
// 单机构模块
$return_data = $this->getSingleInstitutionModule($params);
break;
case WebSocketEnum::MODULE_SINGLE_FACTORY_FX_USER:
// 分销者实时数据汇总模块
$return_data = $this->getSingleFactoryFxUserModule($params);
break;
case WebSocketEnum::MODULE_WANG_DAN_TONG:
// 产品模块
$return_data = $this->getWdtModule($params);
break;
}
return $return_data;
}
/**
* 获取兵工场模块数据
* @param array $params
* @return array
*/
public function getBingGongChangModule(array $params): array
{
return match ($params['position']) {
WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getBingGongChangFxUserPosition($params),
default => $this->getBingGongChangIndexPosition($params),
};
}
/**
* 获取单兵工场模块数据
* @param array $params
* @return array
*/
public function getSingleBingGongChangModule(array $params): array
{
return match ($params['position']) {
WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getBingGongChangFxUserPosition($params),
default => $this->getSingleBingGongChangIndexPosition($params),
};
}
/**
* 获取机构模块数据
* @param array $params
* @return array
*/
public function getInstitutionModule(array $params): array
{
return match ($params['position']) {
WebSocketEnum::POSITION_INSTITUTION_PARTNER_DETAIL => $this->getInstitutionPartnerPosition($params),
default => $this->getInstitutionIndexPosition($params),
};
}
/**
* 获取单机构模块数据
* @param array $params
* @return array
*/
public function getSingleInstitutionModule(array $params): array
{
return match ($params['position']) {
WebSocketEnum::POSITION_INSTITUTION_PARTNER_DETAIL => $this->getInstitutionPartnerPosition($params),
default => $this->getSingleInstitutionIndexPosition($params),
};
}
/**
* 获取分销者实时数据汇总模块数据
* @param array $params
* @return array
*/
public function getSingleFactoryFxUserModule(array $params): array
{
return $this->getSingleFactoryFxUserIndexPosition($params);
}
/**
* 获取机构招商数据详情
* @param array $params
* @return array
*/
protected function getInstitutionPartnerPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
return [
'institution_partner_detail_rank_block' => $orderRankService->getInstitutionPartnerDetailBlock(),
];
}
/**
* 机构首页数据
* @param array $params
* @return array
*/
protected function getInstitutionIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
// 图表
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 申样发货数
'current_express_sample_count' => formatWithUnit($sendSampleService->getExpressSampleCount(), 0),
];
return [
// gmv中心块
'gmv_block' => $orderCensusService->getGmvBlock(),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 招商排行(左下)
'institution_partner_rank_block' => $orderRankService->getInstitutionPartnerRankBlock(),
// 订单折线图(右上)
'sales_chart_block' => $orderChartService->getLineChartBlock(),
// 机构销售榜单(右下)
'institution_sales_rank_block' => $orderRankService->getInstitutionSalesRankBlock(),
// 机构销售概况(中心饼图)
'institution_sales_pie_block' => $orderChartService->getInstitutionSalesPieBlock(),
];
}
/**
* 获取分销者实时数据汇总首页数据
* @param array $params
* @return array
*/
protected function getSingleFactoryFxUserIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();;
$orderRankService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 去重达人数
'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
];
// 分销者总数
$fx_user_count = $orderCensusService->getFxUserCount();
return [
// gmv中心块
'gmv_block' => array_merge($orderCensusService->getGmvBlock(), compact('fx_user_count')),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 小组排行(左下)
'fx_user_group_rank_block' => $orderRankService->getFxUserGroupRankBlock(),
// 分销者销售红榜(右上)
'fx_user_red_sales_block' => $orderRankService->getCommonFxUserRankBlock(),
// 分销者销售黑榜(右下)
'fx_user_black_sales_block' => $orderRankService->getCommonFxUserRankBlock('ASC'),
// 分销者详情
'fx_user_detail_block' => $orderRankService->getFxUserDetailBlock(),
];
}
/**
* 单机构首页数据
* @param array $params
* @return array
*/
protected function getSingleInstitutionIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
// 图表
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 申样发货数
'current_express_sample_count' => formatWithUnit($sendSampleService->getExpressSampleCount(), 0),
];
return [
// gmv中心块
'gmv_block' => $orderCensusService->getGmvBlock(),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 招商排行(左下)
'institution_partner_rank_block' => $orderRankService->getInstitutionPartnerRankBlock(),
// 订单折线图(右上)
'sales_chart_block' => $orderChartService->getLineChartBlock(),
// 产品分类(右下)
'category_sales_pie_block' => $orderChartService->getCategorySalesPieBlock(),
// 兵工场销售占比(中心饼图)
'bing_factory_sales_pie_block' => $orderChartService->getBingFactorySalesPieBlock(),
];
}
/**
* 兵工场首页数据
* @param array $params
* @return array
*/
protected function getBingGongChangIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
// 图表
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 去重达人数
'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
];
return [
// gmv中心块
'gmv_block' => $orderCensusService->getGmvBlock(),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 分销者销售排行(左下)
'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
// 订单折线图(右上)
'sales_chart_block' => $orderChartService->getLineChartBlock(),
// 兵工场排行(右下)
'bing_factory_rank_block' => $orderRankService->getBingFactoryRankBlock(),
// 兵工场地图
'bing_factory_map_block' => $orderChartService->getBingFactoryMapBlock(),
];
}
/**
* 单兵工场首页数据
* @param array $params
* @return array
*/
protected function getSingleBingGongChangIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
// 图表
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 去重达人数
'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
];
return [
// gmv中心块
'gmv_block' => $orderCensusService->getGmvBlock(),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 分销者销售排行(左下)
'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
// 订单折线图(右上)
'sales_chart_block' => $orderChartService->getLineChartBlock(),
// 热销产品排行(右下)
'product_rank_block' => $orderRankService->getProductRankBlock(),
// 机构销售占比
'institution_sales_pie_block' => $orderChartService->getInstitutionSalesPieBlock(),
];
}
/**
* 兵工场-分销者详情数据
* @param array $params
* @return array
*/
protected function getBingGongChangFxUserPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
return [
'fx_user_detail_rank_block' => $orderRankService->getFxUserDetailBlock(),
];
}
/**
* 分销者-分销者详情数据
* @param array $params
* @return array
*/
protected function getFxUserDetailPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
return [
'fx_user_detail_rank_block' => $orderRankService->getFxUserDetailBlock(),
];
}
/**
* 分销者首页数据
* @param array $params
* @return array
*/
protected function getFxUserIndexPosition(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 汇总
$orderCensusService = new OrderCensusBusiness();
$orderCensusService->params = $paramService;
// 申样
$sendSampleService = new SendSampleBusiness();
$sendSampleService->params = $paramService;
// 排行
$orderRankService = new OrderRankBusiness();
$orderRankService->params = $paramService;
// 图表
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
// 领样数据
$send_sample_data = [
// 领样数
'current_send_sample_count' => formatWithUnit($sendSampleService->getSendSampleCount(), 0),
// 去重达人数
'current_distinct_talent_num' => formatWithUnit($sendSampleService->getDistinctTalentNum(), 0),
];
return [
// gmv中心块
'gmv_block' => $orderCensusService->getGmvBlock(),
// 销售数据(左上)
'sales_block' => array_merge($orderCensusService->getSalesBlock(), $send_sample_data),
// 分销者销售排行(左下)
'fx_user_rank_block' => $orderRankService->getFxUserRankBlock(),
// 订单折线图(右上)
'sales_chart_block' => $orderChartService->getLineChartBlock(),
// 达人销售排行(右下)
'customers_sales_rank_block' => $orderRankService->getCustomersSalesRankBlock(),
// 地图
'fx_user_map_block' => $orderChartService->getFxUserMapBlock(),
];
}
/**
* 分销者模块
* @param array $params
* @return array
*/
public function getFxUserModule(array $params): array
{
return match ($params['position']) {
WebSocketEnum::POSITION_FX_USER_DETAIL => $this->getFxUserDetailPosition($params),
default => $this->getFxUserIndexPosition($params),
};
}
/**
* 产品模块
* @param array $params
* @return array
*/
public function getProductModule(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 产品
$productService = new ProductBusiness();
$productService->params = $paramService;
$orderChartService = new OrderChartBusiness();
$orderChartService->params = $paramService;
return [
// gmv中心块
'gmv_block' => $productService->getGmvBlock(),
// 产品地图数据
'region_block' => $productService->getRegionBlock(),
// 产品数据汇总(左上)
'product_census_block' => $productService->getProductCensusBlock(),
// 出单产品增长情况(左下)
'product_sales_chart_block' => $productService->getProductSalesChartBlock(),
// 产品销售排行榜(右上)
'product_sales_rank_block' => $productService->getProductSalesRandBlock(),
// 产品分类占比(右下)
'product_category_chart_block' => $orderChartService->getCategorySalesPieBlock(),
];
}
/**
* 旺店通模块
* @param array $params
* @return array
*/
public function getWdtModule(array $params): array
{
// 参数服务
$paramService = new WebSocketParam();
fillProperty($params, $paramService);
// 旺店通订单
$wdtOrderService = new WdtOriginOrderBusiness();
$wdtOrderService->params = $paramService;
return [
// gmv中心块
'gmv_block' => $wdtOrderService->getGmvBlock(),
// 店铺资金流向 中
'fund_flow_block' => $wdtOrderService->getFundFlowBlock(),
// 成交排行榜(左上)
'deal_rank_block' => $wdtOrderService->getDealRankBlock(),
// 订单监控 右上
'order_monitor_block' => $wdtOrderService->getOrderMonitorBlock(),
// 店铺销售占比 左下
'shop_sales_proportion_block' => $wdtOrderService->getShopSalesProportionBlock(),
// 仓库信息,右下
'warehouse_info_block' => $wdtOrderService->getWarehouseInfoBlock(),
// 仓库信息详情
// 'warehouse_info_detail_block' => $wdtOrderService->getWarehouseInfoDetailBlock(),
];
}
}
其中使用到了Module/DataScreen/Common/Params/WebSocketParam.php,为了参数统一
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Common\Params;
class WebSocketParam
{
// 平台
public int $platform;
// 模块
public int $module;
// 开始时间
public int $start_time;
// 结束时间
public int $end_time;
// 开始时间 字符
public string $date_start;
// 结束时间 字符
public string $date_end;
// 时间类型
public int $date_type;
// 时间值
public int $date_value;
// 当前位置
public int $position;
// 分销者详情开始时间
public int $fx_user_start_time;
// 分销者详情结束时间
public int $fx_user_end_time;
// 分销者详情开始时间 字符
public string $fx_user_date_start;
// 分销者详情结束时间 字符
public string $fx_user_date_end;
// 分销者详情编号
public string $fx_user_no;
// 分销者详情页码
public int $fx_user_page;
// 模块相关用户ID
public string $module_user_id = '';
// 用户ID
// public int $user_id;
// 数据权限ID
public string $rule_source_ids = '';
}
使用到了助手函数 填充类属性
if (!function_exists('fillProperty')) {
/**
* 使用反射填充属性
* @param array $params
* @param object $instance
*/
function fillProperty(array $params, object $instance): void
{
$ref = new ReflectionObject($instance);
foreach ($params as $key => $value) {
if ($ref->hasProperty($key)) {
$prop = $ref->getProperty($key);
$prop->setAccessible(true); // 允许访问 protected/private
$prop->setValue($instance, $value);
}
}
}
}
8. 测试
请求ws://127.0.0.1:9503/wsDataScreen/ws测试连接是否成功
由于做了jwt鉴权需要带上token参数,目前我这里token不需要过期
我这里的发送参数类似这样,根据不同模块参数值变化
{
"module":8,
"platform":0,
"date_type":1,
"date_value":0,
"date_start":"2025-08-12",
"date_end":"2025-09-10",
"position":0,
"fx_user_date_start":"2025-02-12",
"fx_user_date_end":"2025-03-10",
"fx_user_page":1,
"fx_user_no":""
}
9. 创建定时任务
创建Command/DataScreenPushCommand.php文件 定时查询数据是否变化 推送对应客户端
这里注意:手动运行command会推送失败 因为命令行的生命周期不一样,没有启动对应的websocket服务,定时任务可以正常推送。
<?php
declare(strict_types=1);
namespace App\Command;
use App\Service\DataScreenPushService;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Crontab\Annotation\Crontab;
use Hyperf\Di\Annotation\Inject;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Hyperf\WebSocketServer\Sender;
#[Command]
#[Crontab(
rule: "*\/1 * * * *",
name: "dataScreenPush",
type: "command",
singleton: true,
callback: [
'command' => 'dataScreen:push',
'--disable-event-dispatcher' => true,
],
memo: "数据大屏消息推送",
enable: false,
)]
class DataScreenPushCommand extends HyperfCommand
{
#[Inject]
protected Sender $sender;
public function __construct(
protected ContainerInterface $container,
protected DataScreenPushService $service,
protected LoggerInterface $logger
)
{
parent::__construct('dataScreen:push');
}
public function configure()
{
parent::configure();
$this->setDescription('数据大屏消息推送');
}
public function handle()
{
try {
$start = microtime(true);
$this->service->pushDataScreen();
$duration = round(microtime(true) - $start, 2);
$this->info("数据大屏消息推送操作完成,耗时:{$duration}秒");
} catch (\Throwable $e) {
// 简单记录错误
$this->logger->error('数据大屏消息推送系统错误', [
'error' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]);
$this->info('数据大屏消息推送系统错误!' . var_export($e->getMessage(), true));
}
}
}
创建服务Service/DataScreenPushService.php
<?php
namespace App\Service;
use App\Common\Trait\RedisKeyTrait;
use App\Module\DataScreen\Common\Trait\DataCenterTrait;
use App\Module\DataScreen\Enum\RedisKeyEnum;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use Hyperf\WebSocketServer\Sender;
use Psr\Log\LoggerInterface;
class DataScreenPushService
{
use RedisKeyTrait;
use DataCenterTrait;
// 客户端=>筛选的key
protected string $filter_key;
// 存活客户端的key
protected string $alive_key;
#[Inject]
private Redis $redis;
#[Inject]
private LoggerInterface $logger;
#[Inject]
private Sender $sender;
public function pushDataScreen(): void
{
$this->filter_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER);
$this->alive_key = $this->generateKeyPrefix(RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD, version: 0);
// 处理redis数据
$fds = $this->dealRedisData();
if (empty($fds)) {
$this->logger->error('没有存活的客户端');
return;
}
try {
$filter_group_version = [];
$filter_group_map_need_push = [];
foreach ($fds as $fd) {
$fd = intval($fd);
if (!$this->sender->check($fd)) {
$this->redis->sRem($this->alive_key, $fd);
$this->redis->hDel($this->filter_key, $fd);
continue;
}
// 获取筛选条件
$params_raw = $this->redis->hGet($this->filter_key, strval($fd));
$params = json_decode($params_raw ?: '', true);
if (empty($params)) {
$this->logger->error("客户端{$fd}没有筛选条件");
continue;
}
// 参数转换字符串
$other_key = $this->generateFilterGroupKey($params);
// 查询该筛选下的数据
$filter_group_key = $this->generateKeyPrefix(
RedisKeyEnum::REDIS_KEY_WEBSOCKET_FD_FILTER_GROUP,
RedisKeyEnum::MODULE_DATA_SCREEN,
":{$other_key}");
$last_data_raw = $this->redis->get($filter_group_key);
$last_data = json_decode($last_data_raw ?: '', true) ?: [];
// 当前版本号
$last_version = ($last_data['version'] ?? 0) ?: 0;
!isset($filter_group_version[$filter_group_key]) && $filter_group_version[$filter_group_key] = $last_version;
if ($filter_group_version[$filter_group_key] < $last_version) {
// 该筛选查询过了且数据有变化 则直接推送
if (!empty($filter_group_map_need_push[$filter_group_key] ?? 0)) {
$this->pushHandler($fd, $last_data['data'] ?: [], $params);
}
continue;
}
// 查询数据
try {
$data = $this->dataCenter($params) ?: [];
} catch (\Throwable $e) {
$this->logger->error('查询数据异常:参数为:' . json_encode($params) . ',异常原因:' . $e->getMessage());
continue;
}
$this->redis->set($filter_group_key, json_encode(['filter' => $params, 'data' => $data, 'version' => $filter_group_version[$filter_group_key] + 1, 'update_time' => time()]));
$this->redis->expire($filter_group_key, 7200);
if (($last_data['data'] ?? []) != $data) {
$filter_group_map_need_push[$filter_group_key] = 1;
$this->pushHandler($fd, $data, $params);
}
}
} catch (\Throwable $e) {
$this->logger->error('数据大屏消息推送消息异常:' . $e->getMessage());
}
}
/**
* 推送消息
* @param int $fd
* @param array $data
* @param array $last_filter_param
* @return bool
* @throws \RedisException
*/
protected function pushHandler(int $fd, array $data, array $last_filter_param): bool
{
// 检查客户端是否在线
if (!$this->sender->check($fd)) {
$this->redis->sRem($this->alive_key, $fd);
$this->redis->hDel($this->filter_key, $fd);
return false;
}
// 再次检查筛选条件是否变化
$params_raw = $this->redis->hGet($this->filter_key, strval($fd));
$params = json_decode($params_raw ?: '', true);
if ($params != $last_filter_param) {
return false;
}
// 推送消息
return $this->sender->push($fd, responseSuccess($data));
}
/**
* 对比数据不同处
* @param array $last_data
* @param array $data
* @return array
*/
protected function compareData(array $last_data, array $data): array
{
if (empty($last_data) || empty($data)) {
return $data;
}
$return_data = [];
foreach ($data as $key => $value) {
if (!isset($last_data[$key])) {
$return_data[$key] = $value;
continue;
}
if ($value != $last_data[$key]) {
$return_data[$key] = $value;
}
}
return $return_data;
}
/**
* 处理redis数据
* @return array
*/
protected function dealRedisData(): array
{
try {
// 存活的fd
$fds = $this->getAliveFds();
// 客户端对应的筛选
if (empty($fds)) {
$this->redis->del($this->filter_key);
return [];
}
// 获取所有key
$all_filter_fd = $this->redis->hKeys($this->filter_key);
foreach ($all_filter_fd as $fd_item) {
if (!in_array($fd_item, $fds)) {
// 删除不存在的fd
$this->redis->hDel($this->filter_key, $fd_item);
}
}
return $fds;
} catch (\Throwable $e) {
$this->logger->error("操作redis出现错误:" . $e->getMessage());
return [];
}
}
/**
* 获取存活的客户端
* @return array
* @throws \RedisException
*/
protected function getAliveFds(): array
{
return $this->redis->sMembers($this->alive_key) ?: [];
}
}
使用到了返回函数
if (!function_exists('responseSuccess')) {
/**
* 成功响应
*/
function responseSuccess(mixed $data = null, string $message = 'success', int $code = 200): false|string
{
return json_encode([
'code' => $code,
'message' => $message,
'data' => $data,
'timestamp' => time()
]);
}
}
if (!function_exists('responseError')) {
/**
* 失败响应
*/
function responseError(string $message = '', int $code = 400, mixed $data = null): false|string
{
return json_encode([
'code' => $code,
'message' => $message ?: default_error_message($code),
'data' => $data,
'timestamp' => time()
]);
}
}
10. 反向代理
配置nginx反向代理 顺便升级w s为w s s
# 至少需要一个 Hyperf 节点,多个配置多行
upstream hyperf_websocket {
# Hyperf WebSocket Server 的 IP 及 端口
server 127.0.0.1:9503;
}
server {
listen 80;
listen 443 ssl;
# 改成你的域名 可以匹配多个
server_name xxx.com;
# 配置 ssl 替换成自己的已经获得的域名签名文件名
ssl_certificate xxx.pem; # pem文件的路径
ssl_certificate_key xxx.pem; # key文件的路径
ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
ssl_ciphers EECDH+CHACHA20:EECDH+CHACHA20-draft:EECDH+AES128:RSA+AES128:EECDH+AES256:RSA+AES256:EECDH+3DES:RSA+3DES:!MD5;
ssl_prefer_server_ciphers on;
ssl_session_tickets on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
add_header Strict-Transport-Security "max-age=31536000";
error_page 497 https://$host$request_uri;
location / {
# WebSocket Header
proxy_http_version 1.1;
proxy_set_header Upgrade websocket;
proxy_set_header Connection "Upgrade";
# 将客户端的 Host 和 IP 信息一并转发到对应节点
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
# 客户端与服务端无交互 60s 后自动断开连接,请根据实际业务场景设置
proxy_read_timeout 60s ;
# 执行代理访问真实服务器
proxy_pass http://hyperf_websocket;
}
}
配置好之后可以使用wss://xxx.com/wsDataScreen/ws连接了
11. 扩展
可以安装websocket客户端测试,非必要
composer require hyperf/websocket-client
创建控制器
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\WebSocketClient\ClientFactory;
use Hyperf\WebSocketClient\Frame;
class IndexController
{
#[Inject]
protected ClientFactory $clientFactory;
public function index()
{
// 对端服务的地址,如没有提供 ws:// 或 wss:// 前缀,则默认补充 ws://
$host = '127.0.0.1:9503';
// 通过 ClientFactory 创建 Client 对象,创建出来的对象为短生命周期对象
$client = $this->clientFactory->create($host);
// 向 WebSocket 服务端发送消息
$client->push('HttpServer 中使用 WebSocket Client 发送数据。');
// 获取服务端响应的消息,服务端需要通过 push 向本客户端的 fd 投递消息,才能获取;以下设置超时时间 2s,接收到的数据类型为 Frame 对象。
/** @var Frame $msg */
$msg = $client->recv(2);
// 获取文本数据:$res_msg->data
return $msg->data;
}
}
12. 总结
该流程会在握手阶段把存活的客户端fd保存在redis的DataScreen_prod_websocket_active_fd中。
数据类似
1161
然后记录客户端的筛选条件到redis的DataScreen_prod_websocket_fd_filter_v1中。
数据类似 3209对应
{
"date_end": "2025-09-26",
"date_start": "2025-09-25",
"date_type": 1,
"date_value": 0,
"end_time": 1758902399,
"fx_user_date_end": "",
"fx_user_date_start": "",
"fx_user_end_time": 0,
"fx_user_no": "",
"fx_user_page": 1,
"fx_user_start_time": 0,
"module": 8,
"module_user_id": "",
"platform": 1,
"position": 0,
"rule_source_ids": "",
"start_time": 1758729600
}
每次查询都会记录数据缓存到DataScreen_prod_websocket_fd_filter_group:date_end__date_start__date_type_0_date_value_1_end_time_1762790399_fx_user_date_end__fx_user_date_start__fx_user_end_time_0_fx_user_no__fx_user_page_1_fx_user_start_time_0_module_8_module_user_id__platform_1_position_0_rule_source_ids__start_time_1762704000_v1hash中,为命令行校验数据变动使用,同时也加快了查询效率。
数据类似
{
"filter": {
"date_end": "",
"date_start": "",
"date_type": 0,
"date_value": 1,
"end_time": 1762790399,
"fx_user_date_end": "",
"fx_user_date_start": "",
"fx_user_end_time": 0,
"fx_user_no": "",
"fx_user_page": 1,
"fx_user_start_time": 0,
"module": 8,
"module_user_id": "",
"platform": 1,
"position": 0,
"rule_source_ids": "",
"start_time": 1762704000
},
"data": {},
"version": 1,
"update_time": 1762763257
}
评论区