目 录CONTENT

文章目录

🍣Hyperf3.1中封装clickhouse查询

柯基
2025-11-10 / 0 评论 / 0 点赞 / 16 阅读 / 3,367 字

1. 安装扩展

composer require smi2/phpclickhouse

2. 创建连接服务类

<?php
declare (strict_types=1);

namespace App\Service;

use ClickHouseDB\Client;
use Hyperf\Contract\ConfigInterface;

class ClickHouseService
{
    protected Client $client;

    public function __construct(ConfigInterface $config)
    {
        $clickhouseConfig = $config->get('clickhouse.default');

        $this->client = new Client($clickhouseConfig);
        $this->client->database($clickhouseConfig['database'] ?? 'default');
        $this->client->setTimeout($clickhouseConfig['options']['timeout'] ?? 10);
        $this->client->setConnectTimeOut(floatval($clickhouseConfig['options']['connect_timeout'] ?? 5));

    }

    public function getClient(): Client
    {
        return $this->client;
    }

    public function select(string $sql, array $bindings = []): array
    {

        $statement = $this->client->select($sql, $bindings);
        return $statement->rows();
    }

    public function fetchOne(string $sql, array $bindings = []): ?array
    {
        $statement = $this->client->select($sql, $bindings);
        return $statement->fetchOne();
    }

    public function execute(string $sql, array $bindings = []): bool
    {
        $this->client->write($sql, $bindings);
        return true;
    }

    public function batchInsert(string $table, array $columns, array $rows): bool
    {
        $this->client->insert($table, $rows, $columns);
        return true;
    }

}

3. 封装抽象类

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Repository;

use App\Service\ClickHouseService;
use Hyperf\Di\Annotation\Inject;
use function Hyperf\Support\make;

abstract class AbstractClickHouseRepository
{
    /**
     * @var string 表名
     */
    protected string $table;

    #[Inject]
    protected ClickHouseService $clickHouseService;

    protected array $query = [
        'select' => '*',
        'from' => null,
        'alias' => '',
        'joins' => [],
        'wheres' => [],
        'groups' => [],
        'havings' => [],
        'orders' => [],
        'limit' => null,
        'offset' => null,
    ];


    /**
     * 获取表名
     * @return string
     */
    public function getTable(): string
    {
        if (empty($this->table)) {
            throw new \RuntimeException('Table name is not defined in repository');
        }
        return $this->table;
    }

    /**
     * 设置表别名
     * @param string $alias
     * @return $this
     */
    public function alias(string $alias): self
    {
        $this->query['alias'] = $alias;
        // 同时更新 FROM 子句以包含别名
        if ($this->query['from'] === $this->getTable()) {
            $this->query['from'] = "{$this->getTable()} AS {$alias}";
        }
        return $this;
    }


    /**
     * 获取全新的查询条件数组
     */
    protected function freshQuery(): array
    {
        return [
            'select' => '*',
            'from' => $this->getTable(),  // 默认使用当前表
            'alias' => '',
            'joins' => [],
            'wheres' => [],
            'groups' => [],
            'havings' => [],
            'orders' => [],
            'limit' => null,
            'offset' => null,
        ];
    }

    /**
     * 开始一个新的查询构建
     */
    public function newQuery(): self
    {
        // 克隆当前实例以保持其他属性
        $clone = clone $this;
        // 重置查询条件
        $clone->query = $this->freshQuery();
        return $clone;
    }

    /**
     * 设置FROM表
     */
    public function from(string $table, string $alias = ''): self
    {
        $this->query['from'] = $alias ? "{$table} AS {$alias}" : $table;
        return $this;
    }

    /**
     * 设置SELECT字段
     */
    public function select($columns = ['*']): self
    {
        $this->query['select'] = is_array($columns) ? implode(', ', $columns) : $columns;
        return $this;
    }

    /**
     * 添加SELECT字段
     */
    public function addSelect($columns): self
    {
        $current = $this->query['select'] === '*' ? [] : [$this->query['select']];
        $newColumns = is_array($columns) ? $columns : [$columns];
        $this->query['select'] = implode(', ', array_merge($current, $newColumns));
        return $this;
    }

    /**
     * LEFT JOIN
     */
    public function leftJoin(string $table, string $first, string $operator, string $second, string $alias = ''): self
    {
        $this->query['joins'][] = [
            'type' => 'LEFT',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => compact('first', 'operator', 'second')
        ];
        return $this;
    }


    /**
     * LEFT JOIN 支持多条件ON子句
     * @param string $table 要关联的表名
     * @param array $onConditions ON条件数组
     *  格式: [
     *     ['left_field', 'operator', 'right_field'],
     *     ['left_field', 'operator', 'right_field'],
     *     ...
     *  ]
     * @param string $alias 表别名
     */
    public function leftJoinMultiOn(string $table, array $onConditions, string $alias = ''): self
    {
        // 验证条件格式
        $validatedConditions = array_map(function($condition) {
            if (count($condition) !== 3) {
                throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
            }
            return [
                'first' => $condition[0],
                'operator' => $condition[1],
                'second' => $condition[2]
            ];
        }, $onConditions);

        $this->query['joins'][] = [
            'type' => 'LEFT',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => $validatedConditions,
            'multi_on' => true  // 标记是多条件JOIN
        ];

        return $this;
    }

    /**
     * 关联Repository进行LEFT JOIN
     */
    public function leftJoinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
    {
        $related = make($relatedRepo);
        $table = $related->getTable();
        $alias = $alias ?: $table;

        $this->leftJoin($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);

        if ($select) {
            $columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
            $this->addSelect($columns);
        }

        return $this;
    }

    /**
     * 获取带表名前缀的列名
     * @param string $column
     * @return string
     */
    protected function getQualifiedColumn(string $column): string
    {
        $prefix = $this->query['alias'] ?: $this->getTable();
        return "{$prefix}.{$column}";
    }


    /**
     * INNER JOIN
     * @param string $table 要关联的表
     * @param string $first 主表字段
     * @param string $operator 操作符
     * @param string $second 关联表字段
     * @param string $alias 表别名
     * @return $this
     */
    public function join(string $table, string $first, string $operator, string $second, string $alias = ''): self
    {
        $this->query['joins'][] = [
            'type' => 'INNER',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => compact('first', 'operator', 'second')
        ];
        return $this;
    }

    /**
     * INNER JOIN关联Repository
     * @param string $relatedRepo 关联的Repository类名
     * @param string $foreignKey 关联表的外键
     * @param string $localKey 主表的键
     * @param string $alias 表别名
     * @param array $select 选择字段
     * @return $this
     */
    public function joinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
    {
        $related = make($relatedRepo);
        $table = $related->getTable();
        $alias = $alias ?: $table;

        $this->join($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);

        if ($select) {
            $columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
            $this->addSelect($columns);
        }

        return $this;
    }

    /**
     * INNER JOIN 支持多条件ON子句
     * @param string $table 要关联的表名
     * @param array $onConditions ON条件数组
     *  格式: [
     *     ['left_field', 'operator', 'right_field'],
     *     ['left_field', 'operator', 'right_field'],
     *     ...
     *  ]
     * @param string $alias 表别名
     */
    public function joinMultiOn(string $table, array $onConditions, string $alias = ''): self
    {
        // 验证条件格式
        $validatedConditions = array_map(function($condition) {
            if (count($condition) !== 3) {
                throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
            }
            return [
                'first' => $condition[0],
                'operator' => $condition[1],
                'second' => $condition[2]
            ];
        }, $onConditions);

        $this->query['joins'][] = [
            'type' => 'INNER',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => $validatedConditions,
            'multi_on' => true  // 标记是多条件JOIN
        ];

        return $this;
    }

    /**
     * RIGHT JOIN
     * @param string $table 要关联的表
     * @param string $first 主表字段
     * @param string $operator 操作符
     * @param string $second 关联表字段
     * @param string $alias 表别名
     * @return $this
     */
    public function rightJoin(string $table, string $first, string $operator, string $second, string $alias = ''): self
    {
        $this->query['joins'][] = [
            'type' => 'RIGHT',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => compact('first', 'operator', 'second')
        ];
        return $this;
    }

    /**
     * RIGHT JOIN关联Repository
     * @param string $relatedRepo 关联的Repository类名
     * @param string $foreignKey 关联表的外键
     * @param string $localKey 主表的键
     * @param string $alias 表别名
     * @param array $select 选择字段
     * @return $this
     */
    public function rightJoinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
    {
        $related = make($relatedRepo);
        $table = $related->getTable();
        $alias = $alias ?: $table;

        $this->rightJoin($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);

        if ($select) {
            $columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
            $this->addSelect($columns);
        }

        return $this;
    }


    /**
     * RIGHT JOIN 支持多条件ON子句
     * @param string $table 要关联的表名
     * @param array $onConditions ON条件数组
     *  格式: [
     *     ['left_field', 'operator', 'right_field'],
     *     ['left_field', 'operator', 'right_field'],
     *     ...
     *  ]
     * @param string $alias 表别名
     */
    public function rightJoinMultiOn(string $table, array $onConditions, string $alias = ''): self
    {
        // 验证条件格式
        $validatedConditions = array_map(function($condition) {
            if (count($condition) !== 3) {
                throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
            }
            return [
                'first' => $condition[0],
                'operator' => $condition[1],
                'second' => $condition[2]
            ];
        }, $onConditions);

        $this->query['joins'][] = [
            'type' => 'RIGHT',
            'table' => $alias ? "{$table} AS {$alias}" : $table,
            'on' => $validatedConditions,
            'multi_on' => true  // 标记是多条件JOIN
        ];

        return $this;
    }


    /**
     * 添加JOIN提示
     * @param string $hint 提示类型(GLOBAL/LOCAL等)
     * @return $this
     */
    public function withJoinHint(string $hint): self
    {
        if (!empty($this->query['joins'])) {
            end($this->query['joins']);
            $lastKey = key($this->query['joins']);
            $this->query['joins'][$lastKey]['hint'] = $hint;
        }
        return $this;
    }

    /**
     * WHERE条件
     */
    public function where($column, $operator = null, $value = null, $boolean = 'AND'): self
    {
        if (is_array($column)) {
            foreach ($column as $key => $value) {
                $this->where($key, '=', $value, $boolean);
            }
            return $this;
        }

        if (func_num_args() === 2) {
            $value = $operator;
            $operator = '=';
        }

        $this->query['wheres'][] = compact('column', 'operator', 'value', 'boolean');

        return $this;
    }

    /**
     * OR WHERE条件
     */
    public function orWhere($column, $operator = null, $value = null): self
    {
        return $this->where($column, $operator, $value, 'OR');
    }

    /**
     * WHERE IN条件
     */
    public function whereIn(string $column, array $values, string $boolean = 'AND'): self
    {
        $this->query['wheres'][] = [
            'column' => $column,
            'operator' => 'IN',
            'value' => $values,
            'boolean' => $boolean
        ];
        return $this;
    }

    /**
     * WHERE NOT IN条件
     */
    public function whereNotIn(string $column, array $values, string $boolean = 'AND'): self
    {
        $this->query['wheres'][] = [
            'column' => $column,
            'operator' => 'NOT IN',
            'value' => $values,
            'boolean' => $boolean
        ];
        return $this;
    }

    /**
     * GROUP BY
     */
    public function groupBy(...$groups): self
    {
        $this->query['groups'] = array_merge($this->query['groups'], $groups);
        return $this;
    }

    /**
     * HAVING条件
     */
    public function having($column, $operator, $value = null): self
    {
        if (func_num_args() === 2) {
            $value = $operator;
            $operator = '=';
        }

        $this->query['havings'][] = compact('column', 'operator', 'value');

        return $this;
    }

    /**
     * ORDER BY
     */
    public function orderBy($column, $direction = 'ASC'): self
    {
        $this->query['orders'][$column] = strtoupper($direction) === 'ASC' ? 'ASC' : 'DESC';
        return $this;
    }

    /**
     * LIMIT
     */
    public function limit(int $limit): self
    {
        $this->query['limit'] = $limit;
        return $this;
    }

    /**
     * OFFSET
     */
    public function offset(int $offset): self
    {
        $this->query['offset'] = $offset;
        return $this;
    }

    /**
     * 执行查询获取结果
     */
    public function get(): array
    {
        $sql = $this->toSql();
        // 执行后自动重置查询条件
//        $this->query = $this->freshQuery();
        return $this->clickHouseService->select($sql);
    }

    /**
     * 执行原生 SQL 查询(返回结果集)
     * @param string $sql
     * @return array
     */
    public function query(string $sql): array
    {
        return $this->clickHouseService->select($sql);
    }

    /**
     * 获取第一条记录
     */
    public function first(): ?array
    {
        $results = $this->limit(1)->get();
        return $results[0] ?? null;
    }

    /**
     * 安全转义值
     * @param mixed $value
     * @return string
     */
    protected function quoteValue(mixed $value): string
    {
        if (is_null($value)) {
            return 'NULL';
        }
        if ($value == "''") {
            return "''";
        }
        if (is_bool($value)) {
            return $value ? '1' : '0';
        }
        if (is_int($value) || is_float($value)) {
            return (string)$value;
        }
        // 如果是字段引用(包含点号),不要加引号
        if (is_string($value) && str_contains($value, '.')) {
            return $value;
        }
        // 字符串转义
        return "'" . $value . "'";
    }

    /**
     * 构建SQL语句
     */
    public function toSql(): string
    {
        // 确保 FROM 子句包含别名
        if ($this->query['alias'] && !str_contains($this->query['from'], ' AS ')) {
            $this->query['from'] = "{$this->getTable()} AS {$this->query['alias']}";
        }
        $from = $this->query['from'];
        $sql = "SELECT {$this->query['select']} FROM {$from}";

        // 处理所有JOIN类型
        foreach ($this->query['joins'] as $join) {
            $hint = $join['hint'] ?? '';
            $sql .= " {$hint} {$join['type']} JOIN {$join['table']}";

            if (!empty($join['multi_on'])) {
                // 多条件ON处理
                $onClauses = [];
                foreach ($join['on'] as $on) {
                    $value = $on['second'];
                    $operator = strtoupper($on['operator']);

                    // 处理IN条件
                    if (in_array($operator, ['IN', 'NOT IN'])) {
                        // 空数组处理
                        if (is_array($value) && empty($value)) {
                            // IN () 改为 1=0 (永远为假)
                            // NOT IN () 改为 1=1 (永远为真)
                            $onClauses[] = $operator === 'IN' ? '1=0' : '1=1';
                            continue;
                        }

                        $quotedValues = array_map([$this, 'quoteValue'], $value);
                        $valueString = implode(', ', $quotedValues);
                        $onClauses[] = "{$on['first']} {$operator} ({$valueString})";
                    } else {
                        $quotedValue = $this->quoteValue($value);
                        $onClauses[] = "{$on['first']} {$operator} {$quotedValue}";
                    }
                }
                $sql .= " ON " . implode(' AND ', $onClauses);
            } else {
                // 单条件ON处理
                $value = $join['on']['second'];
                $operator = strtoupper($join['on']['operator']);

                if (in_array($operator, ['IN', 'NOT IN'])) {
                    // 空数组处理
                    if (is_array($value) && empty($value)) {
                        // IN () 改为 1=0 (永远为假)
                        // NOT IN () 改为 1=1 (永远为真)
                        $condition = $operator === 'IN' ? '1=0' : '1=1';
                        $sql .= " ON {$condition}";
                    } else {
                        $quotedValues = array_map([$this, 'quoteValue'], $value);
                        $valueString = implode(', ', $quotedValues);
                        $sql .= " ON {$join['on']['first']} {$operator} ({$valueString})";
                    }
                } else {
                    $quotedValue = $this->quoteValue($value);
                    $sql .= " ON {$join['on']['first']} {$operator} {$quotedValue}";
                }
            }
        }

        // WHERE
        if ($this->query['wheres']) {
            $whereClauses = [];
            $firstWhere = true;
            foreach ($this->query['wheres'] as $where) {
                $boolean = $firstWhere ? '' : ($where['boolean'] ?? 'AND');
                $firstWhere = false;

                $value = $where['value'];
                $operator = strtoupper($where['operator']);

                // 处理IN条件
                if (in_array($operator, ['IN', 'NOT IN'])) {
                    // 空数组处理
                    if (is_array($value) && empty($value)) {
                        // IN () 改为 1=0 (永远为假)
                        // NOT IN () 改为 1=1 (永远为真)
                        $whereClauses[] = $boolean . ' ' . ($operator === 'IN' ? '1=0' : '1=1');
                        continue;
                    }

                    $quotedValues = array_map([$this, 'quoteValue'], $value);
                    $valueString = implode(', ', $quotedValues);
                    $whereClauses[] = "{$boolean} {$where['column']} {$operator} ({$valueString})";
                } else {
                    $quotedValue = $this->quoteValue($value);
                    $whereClauses[] = "{$boolean} {$where['column']} {$operator} {$quotedValue}";
                }
            }
            $sql .= ' WHERE ' . ltrim(implode(' ', $whereClauses));
        }

        // GROUP BY
        if ($this->query['groups']) {
            $sql .= ' GROUP BY ' . implode(', ', $this->query['groups']);
        }

        // HAVING
        if ($this->query['havings']) {
            $havingClauses = [];
            foreach ($this->query['havings'] as $having) {
                $quotedValue = $this->quoteValue($having['value']);
                $havingClauses[] = "{$having['column']} {$having['operator']} {$quotedValue}";
            }
            $sql .= ' HAVING ' . implode(' AND ', $havingClauses);
        }

        // ORDER BY
        if ($this->query['orders']) {
            $orderClauses = [];
            foreach ($this->query['orders'] as $column => $direction) {
                $orderClauses[] = "{$column} {$direction}";
            }
            $sql .= ' ORDER BY ' . implode(', ', $orderClauses);
        }

        // LIMIT
        if ($this->query['limit']) {
            $sql .= ' LIMIT ' . $this->query['limit'];
            if ($this->query['offset']) {
                $sql .= ' OFFSET ' . $this->query['offset'];
            }
        }

        return $sql;
    }



    /**
     * 聚合方法
     */
    public function count(string $column = ''): int
    {
        $result = $this->select("COUNT({$column}) as aggregate")->first();
        return (int)($result['aggregate'] ?? 0);
    }

    public function avg(string $column): float
    {
        $result = $this->select("AVG({$column}) as aggregate")->first();
        return (float)($result['aggregate'] ?? 0);
    }

    public function sum(string $column): float
    {
        $result = $this->select("SUM({$column}) as aggregate")->first();
        return (float)($result['aggregate'] ?? 0);
    }

    public function max(string $column)
    {
        $result = $this->select("MAX({$column}) as aggregate")->first();
        return $result['aggregate'] ?? null;
    }

    public function min(string $column)
    {
        $result = $this->select("MIN({$column}) as aggregate")->first();
        return $result['aggregate'] ?? null;
    }

    /**
     * 插入数据
     */
    public function insert(array $data): bool
    {
        if (empty($data)) return false;

        $columns = array_keys($data);
        return $this->clickHouseService->batchInsert(
            $this->getTable(),
            $columns,
            [array_values($data)]
        );
    }

    /**
     * 批量插入
     */
    public function batchInsert(array $columns, array $rows): bool
    {
        return $this->clickHouseService->batchInsert(
            $this->getTable(),
            $columns,
            $rows
        );
    }
}

4. 创建查询类

<?php
declare (strict_types=1);

namespace App\Module\DataScreen\Repository;

class AdminCkRepository extends AbstractClickHouseRepository
{
    protected string $table = 'admin';

    /**
     * 排除已删除的查询范围
     * @param string $alias
     * @return self
     */
    public function scopeNormal(string $alias = ''): self
    {
        $alias = $alias ? "{$alias}.": '';
        return $this->where("{$alias}delete_time", 'IS', null);
    }
}

5. 使用方式

需要注意的是 每次查询都需要调用newQuery()方法初始化,否则内存共享会出现意料之外的结果。

$this->adminCkRepository
            ->newQuery()
            ->alias('ad')
            ->select([
                'ad.id AS institution_partner_id',
                'MAX(ad.avatar) AS institution_partner_avatar',
                'MAX(ad.nickname) AS institution_partner_name',
                'countIf(sl.status = ' . KsSendSamplesLogEnum::STATUS_REVIEW_PASS. ') AS count_audited_success',
                'countIf(sl.status = ' . KsSendSamplesLogEnum::STATUS_DELIVERED. ') AS count_express_success',
            ])
            ->leftJoinMultiOn($sample_log_table, [
                ['ad.id', '=', 'sl.admin_user_id'],
                ['sl.create_time', '>=', $this->params->fx_user_start_time],
                ['sl.create_time', '<=', $this->params->fx_user_end_time],
                ['sl.source', '=', KsSendSamplesLogEnum::SOURCE_ADMIN]
            ], 'sl')
            ->where('ad.role_id', $role_id)
            ->scopeNormal('ad')
            ->groupBy('institution_partner_id')
            ->orderBy('count_audited_success', 'DESC')
            ->orderBy('institution_partner_id', 'DESC')
					  ->limit($pageSize)->offset($offset)->get();
0

评论区