1. 安装扩展
composer require smi2/phpclickhouse
2. 创建连接服务类
<?php
declare (strict_types=1);
namespace App\Service;
use ClickHouseDB\Client;
use Hyperf\Contract\ConfigInterface;
class ClickHouseService
{
protected Client $client;
public function __construct(ConfigInterface $config)
{
$clickhouseConfig = $config->get('clickhouse.default');
$this->client = new Client($clickhouseConfig);
$this->client->database($clickhouseConfig['database'] ?? 'default');
$this->client->setTimeout($clickhouseConfig['options']['timeout'] ?? 10);
$this->client->setConnectTimeOut(floatval($clickhouseConfig['options']['connect_timeout'] ?? 5));
}
public function getClient(): Client
{
return $this->client;
}
public function select(string $sql, array $bindings = []): array
{
$statement = $this->client->select($sql, $bindings);
return $statement->rows();
}
public function fetchOne(string $sql, array $bindings = []): ?array
{
$statement = $this->client->select($sql, $bindings);
return $statement->fetchOne();
}
public function execute(string $sql, array $bindings = []): bool
{
$this->client->write($sql, $bindings);
return true;
}
public function batchInsert(string $table, array $columns, array $rows): bool
{
$this->client->insert($table, $rows, $columns);
return true;
}
}
3. 封装抽象类
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Repository;
use App\Service\ClickHouseService;
use Hyperf\Di\Annotation\Inject;
use function Hyperf\Support\make;
abstract class AbstractClickHouseRepository
{
/**
* @var string 表名
*/
protected string $table;
#[Inject]
protected ClickHouseService $clickHouseService;
protected array $query = [
'select' => '*',
'from' => null,
'alias' => '',
'joins' => [],
'wheres' => [],
'groups' => [],
'havings' => [],
'orders' => [],
'limit' => null,
'offset' => null,
];
/**
* 获取表名
* @return string
*/
public function getTable(): string
{
if (empty($this->table)) {
throw new \RuntimeException('Table name is not defined in repository');
}
return $this->table;
}
/**
* 设置表别名
* @param string $alias
* @return $this
*/
public function alias(string $alias): self
{
$this->query['alias'] = $alias;
// 同时更新 FROM 子句以包含别名
if ($this->query['from'] === $this->getTable()) {
$this->query['from'] = "{$this->getTable()} AS {$alias}";
}
return $this;
}
/**
* 获取全新的查询条件数组
*/
protected function freshQuery(): array
{
return [
'select' => '*',
'from' => $this->getTable(), // 默认使用当前表
'alias' => '',
'joins' => [],
'wheres' => [],
'groups' => [],
'havings' => [],
'orders' => [],
'limit' => null,
'offset' => null,
];
}
/**
* 开始一个新的查询构建
*/
public function newQuery(): self
{
// 克隆当前实例以保持其他属性
$clone = clone $this;
// 重置查询条件
$clone->query = $this->freshQuery();
return $clone;
}
/**
* 设置FROM表
*/
public function from(string $table, string $alias = ''): self
{
$this->query['from'] = $alias ? "{$table} AS {$alias}" : $table;
return $this;
}
/**
* 设置SELECT字段
*/
public function select($columns = ['*']): self
{
$this->query['select'] = is_array($columns) ? implode(', ', $columns) : $columns;
return $this;
}
/**
* 添加SELECT字段
*/
public function addSelect($columns): self
{
$current = $this->query['select'] === '*' ? [] : [$this->query['select']];
$newColumns = is_array($columns) ? $columns : [$columns];
$this->query['select'] = implode(', ', array_merge($current, $newColumns));
return $this;
}
/**
* LEFT JOIN
*/
public function leftJoin(string $table, string $first, string $operator, string $second, string $alias = ''): self
{
$this->query['joins'][] = [
'type' => 'LEFT',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => compact('first', 'operator', 'second')
];
return $this;
}
/**
* LEFT JOIN 支持多条件ON子句
* @param string $table 要关联的表名
* @param array $onConditions ON条件数组
* 格式: [
* ['left_field', 'operator', 'right_field'],
* ['left_field', 'operator', 'right_field'],
* ...
* ]
* @param string $alias 表别名
*/
public function leftJoinMultiOn(string $table, array $onConditions, string $alias = ''): self
{
// 验证条件格式
$validatedConditions = array_map(function($condition) {
if (count($condition) !== 3) {
throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
}
return [
'first' => $condition[0],
'operator' => $condition[1],
'second' => $condition[2]
];
}, $onConditions);
$this->query['joins'][] = [
'type' => 'LEFT',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => $validatedConditions,
'multi_on' => true // 标记是多条件JOIN
];
return $this;
}
/**
* 关联Repository进行LEFT JOIN
*/
public function leftJoinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
{
$related = make($relatedRepo);
$table = $related->getTable();
$alias = $alias ?: $table;
$this->leftJoin($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);
if ($select) {
$columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
$this->addSelect($columns);
}
return $this;
}
/**
* 获取带表名前缀的列名
* @param string $column
* @return string
*/
protected function getQualifiedColumn(string $column): string
{
$prefix = $this->query['alias'] ?: $this->getTable();
return "{$prefix}.{$column}";
}
/**
* INNER JOIN
* @param string $table 要关联的表
* @param string $first 主表字段
* @param string $operator 操作符
* @param string $second 关联表字段
* @param string $alias 表别名
* @return $this
*/
public function join(string $table, string $first, string $operator, string $second, string $alias = ''): self
{
$this->query['joins'][] = [
'type' => 'INNER',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => compact('first', 'operator', 'second')
];
return $this;
}
/**
* INNER JOIN关联Repository
* @param string $relatedRepo 关联的Repository类名
* @param string $foreignKey 关联表的外键
* @param string $localKey 主表的键
* @param string $alias 表别名
* @param array $select 选择字段
* @return $this
*/
public function joinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
{
$related = make($relatedRepo);
$table = $related->getTable();
$alias = $alias ?: $table;
$this->join($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);
if ($select) {
$columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
$this->addSelect($columns);
}
return $this;
}
/**
* INNER JOIN 支持多条件ON子句
* @param string $table 要关联的表名
* @param array $onConditions ON条件数组
* 格式: [
* ['left_field', 'operator', 'right_field'],
* ['left_field', 'operator', 'right_field'],
* ...
* ]
* @param string $alias 表别名
*/
public function joinMultiOn(string $table, array $onConditions, string $alias = ''): self
{
// 验证条件格式
$validatedConditions = array_map(function($condition) {
if (count($condition) !== 3) {
throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
}
return [
'first' => $condition[0],
'operator' => $condition[1],
'second' => $condition[2]
];
}, $onConditions);
$this->query['joins'][] = [
'type' => 'INNER',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => $validatedConditions,
'multi_on' => true // 标记是多条件JOIN
];
return $this;
}
/**
* RIGHT JOIN
* @param string $table 要关联的表
* @param string $first 主表字段
* @param string $operator 操作符
* @param string $second 关联表字段
* @param string $alias 表别名
* @return $this
*/
public function rightJoin(string $table, string $first, string $operator, string $second, string $alias = ''): self
{
$this->query['joins'][] = [
'type' => 'RIGHT',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => compact('first', 'operator', 'second')
];
return $this;
}
/**
* RIGHT JOIN关联Repository
* @param string $relatedRepo 关联的Repository类名
* @param string $foreignKey 关联表的外键
* @param string $localKey 主表的键
* @param string $alias 表别名
* @param array $select 选择字段
* @return $this
*/
public function rightJoinRelated(string $relatedRepo, string $foreignKey, string $localKey = 'id', string $alias = '', array $select = []): self
{
$related = make($relatedRepo);
$table = $related->getTable();
$alias = $alias ?: $table;
$this->rightJoin($table, $this->getQualifiedColumn($localKey), '=', "{$alias}.{$foreignKey}", $alias);
if ($select) {
$columns = array_map(fn($col) => "{$alias}.{$col} as {$alias}_{$col}", $select);
$this->addSelect($columns);
}
return $this;
}
/**
* RIGHT JOIN 支持多条件ON子句
* @param string $table 要关联的表名
* @param array $onConditions ON条件数组
* 格式: [
* ['left_field', 'operator', 'right_field'],
* ['left_field', 'operator', 'right_field'],
* ...
* ]
* @param string $alias 表别名
*/
public function rightJoinMultiOn(string $table, array $onConditions, string $alias = ''): self
{
// 验证条件格式
$validatedConditions = array_map(function($condition) {
if (count($condition) !== 3) {
throw new \InvalidArgumentException('ON condition must be [left, operator, right]');
}
return [
'first' => $condition[0],
'operator' => $condition[1],
'second' => $condition[2]
];
}, $onConditions);
$this->query['joins'][] = [
'type' => 'RIGHT',
'table' => $alias ? "{$table} AS {$alias}" : $table,
'on' => $validatedConditions,
'multi_on' => true // 标记是多条件JOIN
];
return $this;
}
/**
* 添加JOIN提示
* @param string $hint 提示类型(GLOBAL/LOCAL等)
* @return $this
*/
public function withJoinHint(string $hint): self
{
if (!empty($this->query['joins'])) {
end($this->query['joins']);
$lastKey = key($this->query['joins']);
$this->query['joins'][$lastKey]['hint'] = $hint;
}
return $this;
}
/**
* WHERE条件
*/
public function where($column, $operator = null, $value = null, $boolean = 'AND'): self
{
if (is_array($column)) {
foreach ($column as $key => $value) {
$this->where($key, '=', $value, $boolean);
}
return $this;
}
if (func_num_args() === 2) {
$value = $operator;
$operator = '=';
}
$this->query['wheres'][] = compact('column', 'operator', 'value', 'boolean');
return $this;
}
/**
* OR WHERE条件
*/
public function orWhere($column, $operator = null, $value = null): self
{
return $this->where($column, $operator, $value, 'OR');
}
/**
* WHERE IN条件
*/
public function whereIn(string $column, array $values, string $boolean = 'AND'): self
{
$this->query['wheres'][] = [
'column' => $column,
'operator' => 'IN',
'value' => $values,
'boolean' => $boolean
];
return $this;
}
/**
* WHERE NOT IN条件
*/
public function whereNotIn(string $column, array $values, string $boolean = 'AND'): self
{
$this->query['wheres'][] = [
'column' => $column,
'operator' => 'NOT IN',
'value' => $values,
'boolean' => $boolean
];
return $this;
}
/**
* GROUP BY
*/
public function groupBy(...$groups): self
{
$this->query['groups'] = array_merge($this->query['groups'], $groups);
return $this;
}
/**
* HAVING条件
*/
public function having($column, $operator, $value = null): self
{
if (func_num_args() === 2) {
$value = $operator;
$operator = '=';
}
$this->query['havings'][] = compact('column', 'operator', 'value');
return $this;
}
/**
* ORDER BY
*/
public function orderBy($column, $direction = 'ASC'): self
{
$this->query['orders'][$column] = strtoupper($direction) === 'ASC' ? 'ASC' : 'DESC';
return $this;
}
/**
* LIMIT
*/
public function limit(int $limit): self
{
$this->query['limit'] = $limit;
return $this;
}
/**
* OFFSET
*/
public function offset(int $offset): self
{
$this->query['offset'] = $offset;
return $this;
}
/**
* 执行查询获取结果
*/
public function get(): array
{
$sql = $this->toSql();
// 执行后自动重置查询条件
// $this->query = $this->freshQuery();
return $this->clickHouseService->select($sql);
}
/**
* 执行原生 SQL 查询(返回结果集)
* @param string $sql
* @return array
*/
public function query(string $sql): array
{
return $this->clickHouseService->select($sql);
}
/**
* 获取第一条记录
*/
public function first(): ?array
{
$results = $this->limit(1)->get();
return $results[0] ?? null;
}
/**
* 安全转义值
* @param mixed $value
* @return string
*/
protected function quoteValue(mixed $value): string
{
if (is_null($value)) {
return 'NULL';
}
if ($value == "''") {
return "''";
}
if (is_bool($value)) {
return $value ? '1' : '0';
}
if (is_int($value) || is_float($value)) {
return (string)$value;
}
// 如果是字段引用(包含点号),不要加引号
if (is_string($value) && str_contains($value, '.')) {
return $value;
}
// 字符串转义
return "'" . $value . "'";
}
/**
* 构建SQL语句
*/
public function toSql(): string
{
// 确保 FROM 子句包含别名
if ($this->query['alias'] && !str_contains($this->query['from'], ' AS ')) {
$this->query['from'] = "{$this->getTable()} AS {$this->query['alias']}";
}
$from = $this->query['from'];
$sql = "SELECT {$this->query['select']} FROM {$from}";
// 处理所有JOIN类型
foreach ($this->query['joins'] as $join) {
$hint = $join['hint'] ?? '';
$sql .= " {$hint} {$join['type']} JOIN {$join['table']}";
if (!empty($join['multi_on'])) {
// 多条件ON处理
$onClauses = [];
foreach ($join['on'] as $on) {
$value = $on['second'];
$operator = strtoupper($on['operator']);
// 处理IN条件
if (in_array($operator, ['IN', 'NOT IN'])) {
// 空数组处理
if (is_array($value) && empty($value)) {
// IN () 改为 1=0 (永远为假)
// NOT IN () 改为 1=1 (永远为真)
$onClauses[] = $operator === 'IN' ? '1=0' : '1=1';
continue;
}
$quotedValues = array_map([$this, 'quoteValue'], $value);
$valueString = implode(', ', $quotedValues);
$onClauses[] = "{$on['first']} {$operator} ({$valueString})";
} else {
$quotedValue = $this->quoteValue($value);
$onClauses[] = "{$on['first']} {$operator} {$quotedValue}";
}
}
$sql .= " ON " . implode(' AND ', $onClauses);
} else {
// 单条件ON处理
$value = $join['on']['second'];
$operator = strtoupper($join['on']['operator']);
if (in_array($operator, ['IN', 'NOT IN'])) {
// 空数组处理
if (is_array($value) && empty($value)) {
// IN () 改为 1=0 (永远为假)
// NOT IN () 改为 1=1 (永远为真)
$condition = $operator === 'IN' ? '1=0' : '1=1';
$sql .= " ON {$condition}";
} else {
$quotedValues = array_map([$this, 'quoteValue'], $value);
$valueString = implode(', ', $quotedValues);
$sql .= " ON {$join['on']['first']} {$operator} ({$valueString})";
}
} else {
$quotedValue = $this->quoteValue($value);
$sql .= " ON {$join['on']['first']} {$operator} {$quotedValue}";
}
}
}
// WHERE
if ($this->query['wheres']) {
$whereClauses = [];
$firstWhere = true;
foreach ($this->query['wheres'] as $where) {
$boolean = $firstWhere ? '' : ($where['boolean'] ?? 'AND');
$firstWhere = false;
$value = $where['value'];
$operator = strtoupper($where['operator']);
// 处理IN条件
if (in_array($operator, ['IN', 'NOT IN'])) {
// 空数组处理
if (is_array($value) && empty($value)) {
// IN () 改为 1=0 (永远为假)
// NOT IN () 改为 1=1 (永远为真)
$whereClauses[] = $boolean . ' ' . ($operator === 'IN' ? '1=0' : '1=1');
continue;
}
$quotedValues = array_map([$this, 'quoteValue'], $value);
$valueString = implode(', ', $quotedValues);
$whereClauses[] = "{$boolean} {$where['column']} {$operator} ({$valueString})";
} else {
$quotedValue = $this->quoteValue($value);
$whereClauses[] = "{$boolean} {$where['column']} {$operator} {$quotedValue}";
}
}
$sql .= ' WHERE ' . ltrim(implode(' ', $whereClauses));
}
// GROUP BY
if ($this->query['groups']) {
$sql .= ' GROUP BY ' . implode(', ', $this->query['groups']);
}
// HAVING
if ($this->query['havings']) {
$havingClauses = [];
foreach ($this->query['havings'] as $having) {
$quotedValue = $this->quoteValue($having['value']);
$havingClauses[] = "{$having['column']} {$having['operator']} {$quotedValue}";
}
$sql .= ' HAVING ' . implode(' AND ', $havingClauses);
}
// ORDER BY
if ($this->query['orders']) {
$orderClauses = [];
foreach ($this->query['orders'] as $column => $direction) {
$orderClauses[] = "{$column} {$direction}";
}
$sql .= ' ORDER BY ' . implode(', ', $orderClauses);
}
// LIMIT
if ($this->query['limit']) {
$sql .= ' LIMIT ' . $this->query['limit'];
if ($this->query['offset']) {
$sql .= ' OFFSET ' . $this->query['offset'];
}
}
return $sql;
}
/**
* 聚合方法
*/
public function count(string $column = ''): int
{
$result = $this->select("COUNT({$column}) as aggregate")->first();
return (int)($result['aggregate'] ?? 0);
}
public function avg(string $column): float
{
$result = $this->select("AVG({$column}) as aggregate")->first();
return (float)($result['aggregate'] ?? 0);
}
public function sum(string $column): float
{
$result = $this->select("SUM({$column}) as aggregate")->first();
return (float)($result['aggregate'] ?? 0);
}
public function max(string $column)
{
$result = $this->select("MAX({$column}) as aggregate")->first();
return $result['aggregate'] ?? null;
}
public function min(string $column)
{
$result = $this->select("MIN({$column}) as aggregate")->first();
return $result['aggregate'] ?? null;
}
/**
* 插入数据
*/
public function insert(array $data): bool
{
if (empty($data)) return false;
$columns = array_keys($data);
return $this->clickHouseService->batchInsert(
$this->getTable(),
$columns,
[array_values($data)]
);
}
/**
* 批量插入
*/
public function batchInsert(array $columns, array $rows): bool
{
return $this->clickHouseService->batchInsert(
$this->getTable(),
$columns,
$rows
);
}
}
4. 创建查询类
<?php
declare (strict_types=1);
namespace App\Module\DataScreen\Repository;
class AdminCkRepository extends AbstractClickHouseRepository
{
protected string $table = 'admin';
/**
* 排除已删除的查询范围
* @param string $alias
* @return self
*/
public function scopeNormal(string $alias = ''): self
{
$alias = $alias ? "{$alias}.": '';
return $this->where("{$alias}delete_time", 'IS', null);
}
}
5. 使用方式
需要注意的是 每次查询都需要调用newQuery()方法初始化,否则内存共享会出现意料之外的结果。
$this->adminCkRepository
->newQuery()
->alias('ad')
->select([
'ad.id AS institution_partner_id',
'MAX(ad.avatar) AS institution_partner_avatar',
'MAX(ad.nickname) AS institution_partner_name',
'countIf(sl.status = ' . KsSendSamplesLogEnum::STATUS_REVIEW_PASS. ') AS count_audited_success',
'countIf(sl.status = ' . KsSendSamplesLogEnum::STATUS_DELIVERED. ') AS count_express_success',
])
->leftJoinMultiOn($sample_log_table, [
['ad.id', '=', 'sl.admin_user_id'],
['sl.create_time', '>=', $this->params->fx_user_start_time],
['sl.create_time', '<=', $this->params->fx_user_end_time],
['sl.source', '=', KsSendSamplesLogEnum::SOURCE_ADMIN]
], 'sl')
->where('ad.role_id', $role_id)
->scopeNormal('ad')
->groupBy('institution_partner_id')
->orderBy('count_audited_success', 'DESC')
->orderBy('institution_partner_id', 'DESC')
->limit($pageSize)->offset($offset)->get();
评论区