ClickHouse 中其它常见的表引擎(八)

​ 本文来源: ( https://www.cnblogs.com/traditional/tag/ClickHouse:一款速度快到让人发指的列式存储数据库/ )


楔子

Everything is table(万物皆为表)是 ClickHouse 的一个非常有意思的设计思路,正因为 ClickHouse 是一款数据库,所以自然而然数据表就是它的武器,是它与外部进行交互的接口层。在数据表背后无论连接的是本地文件、HDFS、zookeeper,还是其它服务,终端用户只需要面对数据表,只需要使用 SQL 查询语言。

下面就来介绍一下其它类型的表引擎,它们以表为接口,极大地丰富了 ClickHouse 的查询能力。这些表引擎各自特点突出,或是独立地应用于特定场景,或是能够与 MergeTree 搭配使用。例如外部存储系列的表引擎,能够直接读取其它系统的数据,ClickHouse 自身只负责元数据的管理,类似使用外部表的形式;内存系列的表引擎,能够充当数据分发的临时存储载体或消息通道;日志文件系列的表引擎,拥有简单易用的特点;接口系列的表引擎,能够串联已有数据表,起到粘合剂的作用。那么下面我们就来分门别类的介绍一下,这些表引擎各自的使用特点。

外部存储类型

顾名思义,外部存储表引擎能够直接从其它的存储系统读取数据,例如直接读取 HDFS 的文件或者 MySQL 数据库的表,这些表引擎只负责元数据管理和数据查询,而它们自身通常不负责数据的写入,数据文件直接由外部系统提供。

HDFS

HDFS 是一款分布式文件存储系统,可以说是 Hadoop 生态的基石,而 ClickHouse 提供的 HDFS 表引擎则可以与之对接,读取 HDFS 内的文件。关于 HDFS 的安装这里不赘述了,这里假设已经安装完毕。但是注意,我们需要关闭 HDFS 的 Kerberos 认证,因为 HDFS 表引擎还不支持 Kerberos,然后在 HDFS 上创建用于存放文件的目录。

1
hdfs dfs -mkdir /clickhouse

最后在 HDFS 上给 clickhouse 用户授权:

1
hdfs dfs -chown -R clickhouse:clickhouse /clickhouse

然后我们创建 HDFS 数据表,而 ClickHouse 的一张 HDFS 数据表,对应 HDFS 文件系统上的一个文件:

1
2
3
4
5
6
7
CREATE TABLE hdfs_table1 (
id UInt32,
code String,
name String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table1', 'CSV')
-- HDFS('HDFS 的文件存储路径', '文件格式,如 CSV、TSV、JSON 等等')
-- 注:数据表的名字和 HDFS 文件系统上的文件名可以不一致

注意:我们这里虽然创建了一张表 hdfs_table1,但 HDFS 文件系统上还并没有 hdfs_table1 这个文件,而当我们往表中插入数据时,表对应的文件就会在 HDFS 文件系统上创建、同时将数据写进去。因此我们写入数据虽然表面上是通过 HDFS 数据表,但实际上数据是存储在 HDFS 文件系统上的,而 ClickHouse 在这里只负责元数据的管理。可能有人发现了,这不就是 Hive 嘛,是的,ClickHouse 在这里所干的事情和 Hive 是一样的。下面写入一批数据:

1
2
3
INSERT INTO hdfs_table1 
SELECT number, concat('code', toString(number)), concat('n', toString(number))
FROM numbers(5)

此时在 HDFS 文件系统的 /clickhouse 下面会创建一个文件,也叫 hdfs_table1,同时将数据写进去。然后我们就可以通过数据表查询,注意:因为数据存在 HDFS 文件系统上,所以查询实际上就是 ClickHouse 读取 HDFS 文件系统的一个过程。

img

然后我们再来看看 HDFS 上文件:

img

可以发现通过 HDFS 表引擎,ClickHouse 在 HDFS 的指定目录下创建了一个名为 hdfs_table1 的文件,并且按照 CSV 格式写入了数据。注意:这里创建的数据表类似于 hive 中的外部表,也就是说将 HDFS 数据表删除(删除元数据),并不会影响 HDFS 文件系统上的文件。

以上就是 ClickHouse 和 HDFS 之间的交互,不过我们知道 ClickHouse 具有分片功能(后面说),所以它完全不需要借助于 HDFS 存储系统来存储数据,而且使用 HDFS 的话,那么 ClickHouse 的列式存储、数据压缩、索引等一系列高级特性就都用不上了,反而会严重拖慢 ClickHouse 的效率。但 ClickHouse 之所以还提供和 HDFS 的交互,主要是考虑到 Hadoop 生态圈已经存在多年了,在 HDFS 之上已经存储了大量的数据,所以提供了和 HDFS 交互的接口。通过 HDFS 数据表将 HDFS 文件系统上的数据读取出来之后,导入到 MergeTree 数据表中,然后进行数据分析。

所以 HDFS 数据表虽然既负责写又负责读,就像我们上面演示的那样,但很明显我们基本不会用 HDFS 数据表写数据。因此当涉及 ClickHouse 和 HDFS 的交互时,都是数据已经存在于 HDFS 文件系统之上,我们只是创建一个 HDFS 数据表将数据从 HDFS 文件系统上读取出来罢了。所以此时创建 HDFS 数据表就需要根据文件内容来创建了。

比如 HDFS 上存在一个 CSV 文件,这个文件里面有 4 列,那么我们创建的数据表就应该有 4 个字段。举个栗子:

img

此时 HDFS 上有一个 TSV 格式的文件(CSV 文件的分隔符为逗号,TSV 文件的分隔符为 \t),这个时候我们需要使用 ClickHouse 将其读取出来。具体做法显然是创建一张 HDFS 数据表,然后指定数据文件在 HDFS 上存储路径即可,但问题是表字段要如何设计呢?没错,显然要根据文件的存储内容来进行设计,比如这里有 4 个列,那么 HDFS 数据表就应该要有 4 个字段,然后再根据存储的内容指定字段的类型,那么这个 HDFS 数据表就可以这么定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE hdfs_table2 (
a UInt32,
b String,
c UInt32,
d String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV');
-- 文件类型要指定 TSV,因为分隔符是 \t
-- 注:这里字段名叫什么完全由我们自己定义,我们也可以起一个有意义的名字
CREATE TABLE hdfs_table2_new (
id UInt32,
name String,
age UInt32,
place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV');

这里我们的两张 HDFS 数据表都指向 HDFS 文件系统上的同一个文件:

img

还是比较简单的,这里的 ClickHouse 完全就充当了 Hive 的角色,甚至比 Hive 还要好用不少。不过 ClickHouse 支持的还不止这些,在指定 HDFS 文件路径的时候 ClickHouse 支持多种方式:

  • 绝对路径:会读取指定路径的单个文件,比如HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV'),会读取 clickhouse 目录下的 hdfs_table2 文件
  • * 通配符:匹配任意数量的任意字符,比如 HDFS('hdfs://localhost:6666/clickhouse/*', 'TSV'),会读取 clickhouse 目录下的所有文件
  • ? 通配符:匹配单个任意字符,比如 ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table?', 'TSV'),会读取 clickhouse 目录下所有匹配 hdfs_table? 的文件
  • {M..N} 数字区间:匹配指定数字的文件,例如 HDFS('hdfs://localhost:6666/clickhouse/hdfs_table{1..3}', 'TSV'),会读取 clickhouse 目录下的 hdfs_table1、hdfs_table2、hdfs_table3

我们来测试一下,我们上面的文件都没有后缀名,但有后缀名也是可以的。

img

这里我们将之前的 hdfs_table2 拷贝 3 份,并上传至 HDFS,然后创建数据表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE girls (
id UInt32,
name String,
age UInt32,
place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/girls_{1..3}.tsv', 'TSV');
-- 这里写成 girls_?.tsv 也是可以的

CREATE TABLE girls_new (
id UInt32,
name String,
age UInt32,
place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/girls_?.tsv', 'TSV');

然后进行查询:

img

显然使用 girls 和 girls_new 都是可以查询到数据的,由于是 3 个文件,因此会以 3 个分区的形式合并返回。

以上就是 HDFS 数据表的相关内容,可以看到使用起来还是非常方便的,但还是像我们之前说的那样,ClickHouse 完全独立于 Hadoop 生态圈,并不需要借助 HDFS 存储数据。但之所以还提供 HDFS 数据表,主要是为了读取 HDFS 文件系统上已存在的数据,不然的话我们需要先手动将数据从 HDFS 上下载下来,然后再导入到 ClickHouse 中,会比较麻烦,因此 ClickHouse 通过表引擎的形式直接支持我们访问 HDFS 文件系统。

当然不光是 HDFS,ClickHouse 还支持很多其它常见的外部存储系统,当然支持的目的都是为了读取这些存储系统中已存在的数据。

MySQL

MySQL 表引擎可以和 MySQL 数据库中的数据表建立映射,并通过 SQL 向其发起远程查询,包括 SELECT 和 INSERT,声明方式如下:

1
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, on_duplicate_clause])

假设我们要访问 MySQL 的 default 库下的 trade_info 表,那么可以这么做:

1
2
3
4
5
6
7
8
9
CREATE TABLE clickhouse_trade_info (
id UInt32,
column1 type,
column2 type,
......
) ENGINE = MySQL('localhost:3306', 'default', 'trade_info', 'root', '123456')
-- 显然这几个参数的含义不需要多说,但还有两个可选参数 replace_query 和 on_duplicate_clause
-- replace_query 默认为 0,如果设置为 1,会用 REPLACE INTO 代替 INSERT INTO
-- on_duplicate_clause 默认为 0,对应 MySQL 的 ON DUPLICATE KEY 语法,如果想启用该设置,那么需要设置为 1

创建成功之后,我们就可以通过 ClickHouse 的数据表来读取 MySQL 数据表的数据了,当然插入数据也是可以的。由于 MySQL 还是比较简单的,这里就不实际演示了,可以自己测试一下。

当然重点是,我们可以搭配物化视图一起使用:

1
2
3
4
5
6
CREATE MATERIALIZED VIEW trade_info_view
ENGINE = MergeTree()
ORDER BY id
AS SELECT * FROM clickhouse_trade_info
-- 这里指定数据表的时候一定要指定 ClickHouse 的数据表,不是 MySQL 的
-- 所以这里我们刻意将数据表其名为 clickhouse_trade_info

不过遗憾的是,目前 MySQL 表引擎不支持 UPDATE 和 DELETE 操作,如果需要数据更新的话,可以考虑使用 CollapsingMergeTree 作为视图的表引擎。不过还是之前所说,使用外部存储系统基本上都是为了读数据,很少会有插入、更新和删除之类的场景出现。

JDBC

相比 MySQL 表引擎,JDBC 表引擎不仅可以读取 MySQL 数据库,还能读取 PostgreSQL、SQLite 和 H2 数据库。但是光有 JDBC 表引擎还不够,它还需要依赖一个基于 Java 语言实现的 SQL 代理服务,名为 clickhouse-jdbc-bridge,它可以为 ClickHouse 代理访问数据库。

但 clickhouse-jdbc-bridge 需要使用 Maven 进行构建,而我本人不是 Java 方向的,只知道 Java 如何安装,甚至不知道如何用 Java 写一个 Hello World,所以更别提使用 Maven 构建项目了,因此这部分内容有兴趣可以自己了解一下。总之创建 JDBC 表引擎和 MySQL 表引擎是类似的:

1
ENGINE = JDBC('jdbc:url', 'database', 'table')

不同的数据库使用不同的 url,可以自己测试一下。

Kafka

Kafka 是大数据领域非常流行的一款分布式消息系统,而 ClickHouse 也提供了 Kafka 表引擎与之对接,进而订阅 Kafka 中的主题并实时接收消息数据。而总所周知,在消息系统中存在三层语义:

  • 最多一次(At Most Once):可能出现消息丢失的情况,因为在这种情形下,一条消息在消费端最多被接收一次
  • 最少一次(At Least Once):可能出现消息重复的情况,因为在这种情形下,一条消息在消费端允许被接收多次
  • 精确一次(Exactly Once):数据不多不少,一条消息在消费端恰好被消费一次,这也是最理想的情况,因为消息不可能百分之百不丢失

虽然 Kafka 本身能够支持上述三种语义,但是目前 ClickHouse 还不支持精确一次语义,因为这需要应用端和 Kafka 深度配合才可以实现。kafka 使用 Offset 标志位来记录主题数据被消费的位置信息,当应用端接收到消息之后,通过自动提交或手动提交当前的位移信息,以保障消息的语义,但 ClickHouse 在这方面还有进步的空间。

Kafka 表引擎的声明方式如下:

1
2
3
4
5
6
7
8
9
10
ENGINE = Kafka()
SETTINGS kafka_broker_list = 'host:port,...',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group_name',
kafka_format = 'data_format[,]',
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_skip_broken_message = N,]
[kafka_commit_every_batch = N]

其中带有方括号的表示选填项,下面依次介绍这些参数的作用:

  • kafka_broker_list:表示 Broker 服务的地址列表,多个地址之间使用逗号分割
  • kafka_topic_list:表示订阅的消息主题的名称列表,多个主题之间使用逗号分割,多个主题中的数据均被消费
  • kafka_group_name:表示消费者组的名称,表引擎会依据此名称创建消费者组
  • kafka_format:表示用于解析消息的数据格式,在消息的发送端,必须按照此格式发送消息。而数据格式也必须是 ClickHouse 提供的格式之一,例如 TSV、JSONEachRow 和 CSV 等
  • kafka_row_delimiter:表示判定一行数据的结束符,默认为 ‘\0’
  • kafka_schema:对应 Kafka 的 schema 参数
  • kafka_num_consumers:表示消费者的数据量,默认值为 1,表引擎会依据此参数在消费者组中开启相应数量的消费者线程,当然线程数不要超过分区数,否则没有意义。因为在 kafka 的主题中,一个分区只能被某个消费者组里面的一个消费者消费(如果想被多个消费者消费,那么这些消费者一定要隶属于不同的消费者组)
  • kafka_skip_broken_message:当表引擎按照预定格式解析数据出现错误时,允许跳过失败的数据的行数,默认值为 0,即不允许任何格式错误的情形发生。在此种情形下,只要 kafka 主题中存在无法解析的数据,数据表都将不会接收任何数据。如果将其设置成非 0 的正整数,例如设置为 10,则表示只要 kafka 主题中存在无法解析的数据的总数小于 10,数据表就能正常接收消息数据,而解析错误的数据会被自动跳过
  • kafka_commit_every_batch:表示执行 kafka commit 的频率,因此这里提交偏移量的方式是手动提交,默认值为 0,即当一整个 Block 块完全写入数据表后才执行一次 commit。如果设置为 1,则每写完一个 Batch 批次的数据就会执行一次 kakfa commit(一次 Block 写入操作,由多次 Batch 写入操作而成)

因此 ClickHouse 在对接 Kakfa 的时候是会将消息写入到数据表中的,所以还有一些配置参数可以调整表引擎的行为,比如 stream_poll_timeout_ms,它表示拉取数据的间隔时间。默认值为 500 毫秒,所以 Kafka 表引擎每隔 500 毫秒拉取一次数据,而拉取的数据会先被放入缓存当中,在时机成熟的时候,会被刷新到数据表。

而触发 Kakfa 表引擎刷新缓存的条件有两个,当满足其中任何一个时,便会触发刷新动作:

  • 当一个数据块写入完成的时候,一个数据块的大小由 kafka_max_block_size 参数控制,默认情况下大小为 65536
  • 等待间隔超过 7500 毫秒,由 stream_fush_interval_ms 控制

Kafka 表引擎底层负责和 Kafka 通信的部分是基于 librdkafka 实现的,这是一个由 C++ 实现的 Kafka 库,项目地址为 https://github.com/edenhill/librdkafka 。librdkafka 提供了许多自定义的配置参数,例如在默认情况下,每次只会读取 kafka 中最新的数据,如果将 auto.offset.reset 改成 earliest(默认是 latest),数据将从会从最近一次提交的偏移位置开始读取。当然里面还支持很多其它的参数,可以通过项目中的 CONFIGURATION.md 进行查看。

ClickHouse 对 librdkafka 的自定义参数也提供了良好的扩展支持,在 ClickHouse 的全局设置中,提供了一组 Kafka 标签,专门用于定义 librdkafka 的自定义参数。不过需要注意的是,librdkafka 的原生参数中使用了点连接符,而在 ClickHouse 中需要改成下划线的形式,例如:

1
2
3
4
<kafka>
<!-- librdkafka 中,参数名是 auto.offset.reset,在这里需要使用下划线进行分割 -->
<auto_offset_reset>earliest</auto_offset_reset>
</kafka>

下面我们就来测试一下,首先使用 Go 来连接 kafka,创建一个主题,并写入几条数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"encoding/json"
"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
cluster, _ := sarama.NewClusterAdmin([]string{"47.94.174.89:9092"}, config)
// 创建主题,该主题有三个分区
_ = cluster.CreateTopic("heroes", &sarama.TopicDetail{NumPartitions: 3, ReplicationFactor: 1}, false)
// 写入消息,每个分区写入两条
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Partitioner = sarama.NewManualPartitioner
producer, _ := sarama.NewAsyncProducer([]string{"47.94.174.89:9092"}, config)
messages := []map[string]interface{}{
{"id": 1, "name": "麦克雷", "age": 37, "weapon": "维和者", "ultimate": "午时已到"},
{"id": 2, "name": "源氏", "age": 35, "weapon": "镖", "ultimate": "斩"},
{"id": 3, "name": "半藏", "age": 38, "weapon": "弓", "ultimate": "龙"},
{"id": 4, "name": "士兵76", "age": 55, "weapon": "脉冲步枪", "ultimate": "战术目镜"},
{"id": 5, "name": "死神(谐星)", "age": 57, "weapon": "*弹枪", "ultimate": "死亡绽放"},
{"id": 6, "name": "路霸", "age": 48, "weapon": "爆裂枪", "ultimate": "鸡飞狗跳"},
}
for i, message := range messages {
value, _ := json.Marshal(message)
// 将 map 转成 json
if i < 2 {
producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 0,
Value: sarama.StringEncoder(value)}
} else if i < 4 {
producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 1,
Value: sarama.StringEncoder(value)}
} else {
producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 2,
Value: sarama.StringEncoder(value)}
}
select {
case <-producer.Successes():
case <-producer.Errors():
}
}
}

以上我们就创建一个主题叫 heroes,该主题有三个分区,每个分区写入了两条数据。当然你也可以使用其它语言提供的 API 实现,下面我们通过 kafka 控制台查看一下数据有没有写入成功。

img

显然写入成功了,上面的 172.24.60.6 是我的内网 IP,然后我们就来创建 kafka 数据表获取数据。由于数据已经写入了,所以在读取的时候必须指定 auto.offset.reset 为 earliest。

1
2
3
<kafka>    
<auto_offset_reset>earliest</auto_offset_reset>
</kafka>

我们修改 config.xml,然后 clickhouse restart 重启服务。下面开始创建 Kakfa 数据表:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE kafka_test (
id UInt32,
name String,
age UInt8,
weapon String,
ultimate String)
ENGINE = Kafka() -- 由于当前 ClickHouse 和 Kakfa 在同一个节点上,所以这里用内网 IP 也是可以的
SETTINGS kafka_broker_list = '47.94.174.89:9092',
kafka_topic_list = 'heroes',
kafka_group_name = 'my_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3

创建成功之后,我们来查询数据,看看能不能读取:

img

整体都很顺利,但问题是第二次查询的时候发现数据没了,原因就是 kafka 表引擎在执行完查询之后就会删除表内的数据。注意这里删除的数据是 Kakfa 表引擎从 kafka 中拖下来写入表中的数据,至于 kafka 上面的数据还在。不过很明显这不是我们期望的,因为不能每次查询都临时从 kafka 上拖吧。

所以真正的使用方式如下:

  • 首先创建 Kafka 数据表 A,它充当的是数据管道,负责从 kafka 上拖数据
  • 然后是另外一张任意引擎的数据表 B,它充当的角色是面向终端用户的查询表,在生产环境中通常是 MergeTree 系列
  • 最后是一张物化视图 C,它负责将表 A 的数据实时同步到表 B

下面具体操作一波:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE TABLE kafka_queue (
id UInt32,
name String,
age UInt8,
weapon String,
ultimate String
) ENGINE = Kafka()
SETTINGS kafka_broker_list = '47.94.174.89:9092',
kafka_topic_list = 'heroes',
kafka_group_name = 'my_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3;

-- 然后是面向终端用户的查询表,这里使用 MergeTree 引擎
CREATE TABLE kafka_table (
id UInt32,
name String,
age UInt8,
weapon String,
ultimate String
) ENGINE = MergeTree()
ORDER BY id;

-- 最后是一张物化视图,用于将数据从 kafka_queue 同步到 kafka_table
CREATE MATERIALIZED VIEW queue_to_table_view TO kafka_table
AS SELECT id, name, age, weapon, ultimate FROM kafka_queue

至此全部的工作就完成了,当数据进入 kafka_queue 的时候,物化视图 queue_to_table_view 会将数据从 kafka_queue 同步到 kafka_table,即使 kafka_queue 中的数据被删掉也不影响,因为数据已经进入了 kafka_table,而 kafka_table 才是负责面向数据查询的表。

说白了数据删除的问题并没有得到本质上的解决,只是换了一种曲线救国的方式,通过物化视图将数据放在了另一张表中。至于 kafka 数据表在查询之后数据为什么被删除,我们就不深究了。

img

注意:从 kafka 上面拖数据是有一定过程的,如果往 kafka 写完数据之后就立刻查询 kafka_table,不一定能查询得到数据,这之间会有一定的延迟。

如果想停止数据同步,可以删除视图:DROP TABEL queue_to_table_view,或者卸载视图:DETACH TABLE queue_to_table_view。这里我们将视图卸载掉,然后再将之前的数据重新写入一次,并进行查询:

img

我们发现数据并没有被同步过来,这是理所当然的,因为视图被卸载了。如果想继续同步,那么将卸载之后的视图重新装载进来即可:

1
2
ATTACH MATERIALIZED VIEW queue_to_table_view TO kafka_table
AS SELECT id, name, age, weapon, ultimate FROM kafka_queue

和创建视图类似,只需要将 CREATE 换成 ATTACH 即可,然后再进行查询:

img

此时数据就又同步过来了(如果没有同步过来就等一小会儿),但是切记:在重新装载物化视图之前一定不要查询 kakfa_queue,因为一旦查询数据就没了,物化视图就没法同步了。

File

File 表引擎能够直接读取本地文件的数据,通常被作为一种扩展手段来使用,例如它可以读取由其它系统生成的数据文件,如果外部系统直接修改了文件,则变相达到了数据更新的目的;还可以将 ClickHouse 数据导出为本地文件;以及用于数据格式转换等场景。除此之外,File 表引擎也被应用于 clickhouse-local 工具,之前介绍过。

File 表引擎的声明方式如下:

1
ENGINE = File(format)

其中 format 表示文件的数据格式,同样必须是 ClickHouse 支持的格式,例如 TSV、CSV、JSONEachRow 等。可以发现在 File 表引擎的定义参数中,并没有包含文件路径这一选项,因此 File 表引擎的数据文件只能保存在 config.xml 配置中由 path 指定的路径下,也就是和其它的数据表在同一个路径下。

每张 File 数据表均由目录和文件组成,其中目录以表的名称命名,而数据文件则以 data. 命名,比如 data.CSV、data.TSV 等等。而创建 File 表的方式有自动和手动两种,首先介绍自动创建的方式,即由 File 表引擎全权负责表目录和数据文件的创建:

1
2
3
4
CREATE TABLE file_table (  
name String,
value UInt32
) ENGINE = File('CSV')

和其它表引擎一样,当执行完上面的语句后,会在 /var/lib/clickhouse/data/default 中创建相应的目录,但是里面还没有数据文件,我们接着写入数据:

1
INSERT INTO file_table VALUES ('one', 1), ('two', 2), ('three', 3)

在数据写入之后,file_table 下面便会生成一个 data.CSV 数据文件:

1
2
[root@satori ~]# cat /var/lib/clickhouse/data/default/file_table/data.CSV "one",1"two",2"three",3
[root@satori ~]#

可以看到数据被写入了文件之中,但这种情况比较少见,因为写入数据我们基本上都不会使用外部存储系列的表引擎,它们存在的目的更多是为了读取现有的数据。所以接下来介绍手动创建的方式,也就是目录和里面的文件都已经存在了,它们是由 ClickHouse 之外的其它系统创建的,而我们需要使用 ClickHouse 读取它。

img

以上是一个文本文件,如果我们想要读取它该怎么做呢?首先要根据内部数据创建和合适表结构,这里我们应该选择 JSONEachRow:

1
2
3
4
5
6
CREATE TABLE file_table_new (
id UInt32,
name String,
place String
)
ENGINE = File('JSONEachRow')

创建完之后,可以查询一下试试,不出意外是会报错的,原因就是 file_table_new 下面没有 data.JSONEachRow 文件。不同于 MergeTree,MergeTree 数据表创建完之后如果不写入数据,那么查询结果是空,并不会报错。但 File 表引擎,它要求目录下必须有相应的 data.format 文件,所以我们将刚才的 girls.txt 拷贝过去:

1
[root@satori ~]# cp girls.txt /var/lib/clickhouse/data/default/file_table_new/data.JSONEachRow

拷贝的时候记得重命名,文件必须叫 data.,拷贝之后再执行一下查询:

img

当然我们也可以继续向表中追加数据,都是没有问题的。这里可能会有人好奇,如果我们不建表,而是手动创建一个 file_table_new 目录,然后将文件拷贝过去可不可以呢。答案是不可以,因为一张表除了对应一个物理目录之外,还有部分的元信息,这些元信息是在创建表的时候产生的。所以一定要先建表,然后自动生成对应的目录之后,再将文件拷贝过去。

以上就是 File 表引擎的基础用法,可以看到 ClickHouse 想的还是比较周全的,为了已经存在的数据存储也提供了相应的接口。

内存类型

之前介绍的表引擎,它们都有一个共同的特点:数据是在磁盘中被访问的,而接下来我们会介绍几种内存类型的表引擎,数据会从内存中被直接访问。当然,虽然它们是内存表引擎,但并不意味着不支持物理存储(落盘),事实上除了 Memory 表引擎之外,其余的几款内存表引擎都会将数据写入磁盘,因为为了防止数据丢失,所以也提供了这种故障恢复手段。而在数据表被加载时,它们会将数据全部加载至内存,以供查询。而将数据全量放在内存中,显然是一把双刃剑,因为在提升查询性能的同时增大了内存消耗。

Memory

Memory 表引擎直接将数据保存在内存中,数据既不会被压缩也不会被格式转换,数据在内存中保存的形态与查询时看到的如出一辙。正因为如此,当 ClickHouse 服务重启的时候,Memory 表内的数据会全部丢失。所以在一些场合,会将 Memory 作为测试表使用。由于不需要磁盘读取、序列化以及反序列化操作,所以 Memory 表引擎支持并行查询,并且在简单的查询场景中能够达到与 MergeTree 旗鼓相当的查询性能(一亿行数据以内)。Memory 表创建方法如下:

1
CREATE TABLE sweet_memory_1 (id UInt64) ENGINE = Memory()

当数据被写入之后,磁盘上不会创建任何数据文件,如果服务重启,那么这张表就没了。比较简单,这里就不测试了,但最后需要说明的是,Memory 数据表不单单被用作测试,它还被广泛应用在 ClickHouse 的内部,它会作为集群间分发数据的存储载体来使用。例如在分布式 IN 查询的场合中,会利用 Memory 临时表保存 IN 字句的查询结果,并通过网络将它传输到远端节点,关于这部分内容后续介绍。

Set

Set 表引擎是拥有物理存储的,数据首先会被写入内存,然后被同步到磁盘文件中。所以服务重启之后它的数据不会丢失,当数据表被重新状态时,文件数据会再次被全量加载到内存。而 Set 我们知道它内部的数据是唯一的,对于有 Python 经验的人应该再熟悉不过了,也就是说 Set 表引擎具有数据去重的功能。在数据写入的过程中,重复的数据会被自动忽略。然而 Set 表引擎的使用场景即特殊又有限,它虽然支持正常的 INSERT 写入,但并不能直接使用 SELECT 进行查询,Set 表只能间接作为 IN 查询的右侧条件被查询使用。

Set 表引擎的存储结构由两部分组成,它们分别是:

  • [num].bin 数据文件:保存了所有列字段的数据,其中 num 是一个自增 id,从 1 开始。伴随着每一批数据的写入(每一次 INSERT),都会生成一个新的 .bin 文件,num 也会随之加 1
  • tmp 临时目录:数据文件首先会被写到这个目录,当一批数据写入完毕之后,数据文件会被移出此目录

下面就来创建一个 Set 数据表测试一下:

img

正确的做法是将 Set 数据表作为 IN 查询的右侧条件,例如:

img

再来查询一下 set_table 的物理目录结构:

1
2
3
[root@satori set_table]# ls
1.bin tmp
[root@satori set_table]#

结果和我们分析的是一样的。

Join

Join 表引擎显然是为 JOIN 查询而生的,它等同于将 JOIN 查询进行了一层简单的封装。在 Join 表引擎的底层实现中,它与 Set 表引擎共用了大部分的处理逻辑,所以 Join 和 Set 表引擎拥有众多相似之处。例如 Join 表引擎的物理存储也由 [num].bin 数据文件和 tmp 临时目录两部分组成;数据首先会被写入内存,然后被同步到磁盘文件,但相比 Set 表引擎,Join 表引擎有着更加广泛的使用场景,它既能够作为 JOIN 查询的连接表,也能够被直接查询使用。

Join 表引擎的声明方式如下所示:

1
ENGINE = Join(join_strictness, join_type, key1[, key2, ...])

其中各参数的含义如下:

  • join_strictness:连接精度,它决定了 JOIN 查询在连接数据时所使用的策略,目前支持 ALL、ANY、SEMI、ANTI 四种类型
  • join_type:连接类型,它决定了 JOIN 查询在组合左右两个数据集合的策略,目前支持 INNER、LEFT、RIGHT 和 FULL 四种类型
  • join_key:连接键,它决定了使用哪个列字段进行关联

上面这些参数,每一条都对应了 JOIN 查询字句的语法规则,关于 JOIN 查询后续展开,我们首先创建相关数据表测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-- 首先创建主表,引擎为 Log,关于 Log 数据表一会说
CREATE TABLE join_table (
id UInt8,
name String,
time DateTime
) ENGINE = Log();
-- 写入数据
INSERT INTO join_table
VALUES (1, '古明地觉', '2020-05-01 12:00:00'),
(2, '雾雨魔理沙', '2020-05-01 12:30:00'),
(3, '琪露诺', '2020-05-01 13:00:00');

-- 接着创建 Join 表
CREATE TABLE id_join_table (
id UInt8,
score UInt8,
time DateTime
) ENGINE = Join(ANY, LEFT, id);
-- 如果 join_strictness 为 ANY,那么 join_key 重复的数据会自动被忽略
-- 所以下面虽然写了两条 id 为 1 数据,但只有第一条会保留,因为写入第二条的时候发现 id 为 1 的数据已经存在了,所以会停止写入
INSERT INTO TABLE id_join_table
VALUES (1, 100, '2020-05-01 11:55:00'),
(1, 105, '2020-05-01 11:10:00'),
(2, 90, '2020-05-01 12:01:00'),
(3, 80, '2020-05-01 13:10:00'),
(5, 70, '2020-05-01 14:00:00'),
(6, 60, '2020-05-01 13:50:00');

我们查询一下:

img

我们看到是可以查询成功的,Join 数据表支持查询,但这种查询方式并不是 Join 数据表的主战场,它的主战场应该是 Join 查询,例如:

img

当然 Join 数据表除了可以直接使用 SELECT 和 JOIN 之外,还可以通过 join 函数访问:

img

目前还没有涉及到 JOIN 查询,所以一些细节我们还没有解释,目前只需要知道有这么个引擎即可,具体内容在后面介绍查询的时候再详细说。

Buffer

Buffer 表引擎完全使用内存装载数据,不支持文件的持久化机制,所以当服务重启之后,表内的数据会被清空。Buffer 表引擎不是为了面向查询场景而设计的,它的作用是充当缓冲区的角色。假设有这样一种场景,我们需要将数据写入目标 MergeTree 表 A,由于写入的并发数很高,这可能导致表 A 的合并速度慢于写入速度,因为每次 INSERT 都会生成一个新的分区目录。此时便可引入 Buffer 数据表来缓解这类问题,将 Buffer 表作为数据写入的缓冲区,数据首先会被写入 Buffer 表,当满足预设条件时,Buffer 表会自动将数据刷新到目标表。

img

Buffer 表引擎的声明方式如下:

1
ENGINE = Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)

里面参数的作用如下:

  • database:目标表的数据库
  • table:目标表,Buffer 表内的数据会自动刷新到目标表
  • num_layers:可以理解为线程数,Buffer 表会按照 num_layers 的数量开启线程,以并行的方式将数据刷新到目标表,官方建议设置为 16

Buffer 表并不是实时刷新数据的,只有在阈值条件满足时才会刷新,阈值条件由三个最小和最大值组成,含义如下:

  • min_time 和 max_time:时间条件的最小值和最大值,单位为秒,从第一次向表内写入数据时开始计算
  • min_rows 和 max_rows:数据行数条件的最小值和最大值
  • min_bytes 和 max_bytes:数据大小的最小值和最大值,单位为字节

针对以上条件,Buffer 表刷新数据的判断依据有三个,满足其中任意一个就会刷新数据:

  • 三组条件中所有最小阈值都已满足,则触发刷新动作
  • 三组条件中有一个最大阈值满足(这里是超过最大值),则触发刷新动作
  • 如果写入一批数据的行数大运 max_rows 或者数据大小大于 max_bytes,则数据直接写入目标表

还有一点需要注意,上述三组条件在每一个 layer 中都是单独的计算的,假设 num_layers 为 16,则 Buffer 表最多开启 16 个线程来响应数据的写入,它们以轮训的方式接收请求,在每个线程内会独立进行上述判断的过程。也就是说,假设一张 Buffer 表的 max_bytes 为 100 MB,num_layers 为 16,那么这张 Buffer 表能够同时处理的最大数据量约为 1600 MB。

下面来测试一下它的用法,首先创建一个 Memory 数据表,再创建一张 Buffer 数数据表:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE memory_table (id UInt64) ENGINE = Memory();
-- 创建 Buffer 表,用于往 Memory 表里面写入数据
-- 注意这里 Buffer 表的创建语法,后面必须要有 AS,并且这里的 AS 还不是别名的意思
-- 因为创建 Buffer 数据表的时候我们没有指定字段,那么这张表的结构长什么样呢?不用想肯定和 Memory 数据表一样
-- 因为数据就是要往它里面导,所以 AS memory_table 表示将表 memory_table 的结构作为表 buffer_to_memory_table 的结构
-- 当然除了 memory_table 还可以是其它的数据表,不过一般和目标表保持一致,因为数据就是要刷到目标表里面的
CREATE TABLE buffer_to_memory_table AS memory_table
ENGINE = Buffer(default, memory_table, 16, 10, 100, 10000, 1000000, 10000000, 100000000);
-- 接下来向 Buffer 表里面写入 100 万行数据
INSERT INTO buffer_to_memory_table SELECT number FROM numbers(1000000);

此时 buffer_to_memory_table 内部有数据,但 memory_table 里面没有,因为三个条件,没有一个达到最大阈值(准确的说是超过)。而在 100 秒之后才会有数据,可以验证一下:

img

然后我们再写入一批数据,此时数据量改为 100 万零 1 条:

img

可以看到此时不会等待,Buffer 会立即将数据写入目标表。

日志类型

如果使用的数据量很小(例如 100 万以下),面对的数据查询场景也比较简单,并且是一次写入多次查询的模式,那么使用日志家族系列的表引擎将会是一种不错的选择。与合并树家族表引擎类似,日志家族系列的表引擎也有一些共性特征:比如均不支持索引、分区等高级特性,不支持并发读写等等。当针对一张日志表写入数据时,针对这张表的查询会被阻塞,直至写入动作结束。但它们同时也拥有切实的物理存储,数据会被保存到本地文件中,当然除了这些共同的特征职位啊啊,日志家族系列的表引擎也有这各自的特点。接下来就从性能由低到高的顺序,一依次介绍这些表引擎的使用方法。

TinyLog

TinyLog 是日志家族中性能最低的表引擎,它的存储结构由数据文件和元数据两部分组成。其中数据文件是按列存储的,也就是说每个字段都有与之对应的 .bin 文件,这种结构和 MergeTree 有些相似,但 TinyLog 既不支持分区,也没有 .mrk 标记文件。由于没有标记文件,它自然无法支持 .bin 文件的并行读取操作,所以它只适合在非常简单的场景下使用。下面就来创建一张 TinyLog 数据表:

1
2
3
4
5
6
CREATE TABLE tiny_log_table (
id UInt64,
code UInt64
) ENGINE = TinyLog();
-- 接着写入数据
INSERT INTO tiny_log_table SELECT number, number + 1 FROM numbers(100)

数据写入之后就能通过 SELECT 语句对它进行查询了,这里就不展示查询结果了,都能想到是什么,我们来看一下物理存储:

img

可以看到 id 和 code 各自生成了对应的 .bin 数据文件,然后还有一个 sizes.json,里面通过 JSON 格式记录了每个 .bin 文件内对应的数据大小信息。

StripeLog

StripeLog 表引擎的存储结构由固定的三个文件组成,分别是:

  • data.bin:数据文件,所有的列字段使用同一个文件保存,所有数据均会写入 data.bin,类似于数据量没超过阈值的 MergeTree 表
  • index.mrk:数据标记,保存了数据在 data.bin 文件中的位置信息,利用数据标记能够使用多个线程以并行的方式读取 data.bin 内的压缩数据块,从而提升数据查询的性能
  • sizes.json:元数据文件,记录了 data.bin 和 index.mrk 大小的信息

从上述信息能够得知,相比 TinyLog 而言,StripeLog 拥有更高的查询性能(因为具有 .mrk 文件,支持并行查询),同时其使用了更少的文件描述符(所有列都使用同一个文件保存)。下面来创建 StripeLog 数据表:

1
2
3
4
5
6
CREATE TABLE stripe_log_table ( 
id UInt64,
code UInt64
) ENGINE = StripeLog();
-- 然后写入数据
INSERT INTO stripe_log_table SELECT number, number + 100 FROM numbers(1000)

数据写入之后即可进行查询,这里我们还是直接查看一下物理存储目录:

img

里面只有三个文件,其代表的含义显然无需解释了。

Log

Log 表引擎结合了 TinyLog 和 StripeLog 两个表引擎的长处,是日志家族系列中性能最高的表引擎,Log 表引擎的存储结构由 3 个部分组成:

  • [column].bin:数据文件,数据文件按列独立存储,每一个列字段都拥有一个与之对应的 .bin 文件
  • __marks.mrk:数据标记,统一保存了数据在各个 [column].bin 文件中的位置信息,利用数据标记能够使用多个线程以并行的方式读取 [column].bin 内的压缩数据块,从而提升数据查询的性能
  • sizes.json:元数据文件,记录了 [column].bin 和 __marks.mrk 大小的信息

下面创建 Log 数据表:

1
2
3
4
5
6
CREATE TABLE log_table (  
id UInt64,
code UInt64
) ENGINE = Log();
-- 然后写入数据
INSERT INTO log_table SELECT number, number + 100 FROM numbers(1000)

数据写入之后即可进行查询,相信都能看出 TinyLog、StripeLog、Log 之间是高度相似的,我们还是只看一下目录结构:

img

以上就是日志类型的表引擎,个人觉得算是最简单的了,甚至比内存类型的表引擎还要简单。

接口类型

有这么一类表引擎,它们自身并不存储任何数据,而是像粘合剂一样可以整合其它的数据表。在使用这类表引擎的时候,我们不用担心底层的复杂性,它们就像接口一样,为用户提供了统一的访问界面,所以将它们归为接口类表引擎。

Merge

假设有这样一种场景:在数据仓库的设计中,数据按年分表存储,例如 test_table_2018、test_table_2019 和 test_table_2020,但是现在需要跨年度查询这些数据,应该如何实现呢?在这种情形下,使用 Merge 表引擎就是一种很合适的选择了。

Merge 表引擎就如同一层使用了门面模式的代理,它本身不存储任何数据,也不支持数据写入,它的作用就如同它的名字,即负责合并多个查询结果集。Merge 表引擎可以代理查询任意数量的数据表,这些查询会异步且并行执行,并最终合并成一个结果集返回。被代理查询的数据表被要求处于同一个数据库内,且拥有相同的表结构,但它们可以使用不同的表引擎以及不同的分区定义(对于 MergeTree 而言)。

Merge 表引擎的声明方式如下:

1
ENGINE = Merge(database, table_name)

其中 database 为数据库的名称,table_name 为数据表的名称,它支持使用正式则表达式,比如 ^ 表示合并所有以 test 为前缀的数据表。下面我们来简单说明一下 Merge 的使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- test_table_2018 保存了 2018 年的数据
CREATE TABLE test_table_2018 (
id String,
create_time DateTime,
code String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

-- 然后是 test_table_2019,它的结构和 test_table_2018 相同,但使用了不同的表引擎
CREATE TABLE test_table_2019 (
id String,
create_time DateTime,
code String
) ENGINE = Log();

-- 然后创建 Merge 表将上面两张表结合,这里的 AS 相当于为 merge_test_table_2018_and_2019 指定表结构
-- 显然这里会复制 test_table_2018 的表结构
CREATE TABLE merge_test_table_2018_and_2019 AS test_table_2018
ENGINE = Merge(currentDatabase(), '^test_table_201')
-- currentDatabase() 表示获取当前的数据库

然后我们来写入一些数据,然后进行查询:

img

通过返回的结果集可以印证,所有以 test_table_201 开头的数据表都被分别查询,然后合并返回了。值得一提的是,对于 Merge 数据表而言,会有一个虚拟字段 _table,它表示某行数据的来源表。所以通过 _table 我们可以实现表的过滤,比如我们新创建了 test_table_2017 表示 2017 的数据,但当前我们并且不需要 2017 的数据,那么就可以将其作为查询条件给过滤掉,比如:

1
SELECT * FROM merge_test_table_2018_and_2019 WHERE _table != 'test_table_2017'

还是蛮方便的,此时 _table 将等同于索引,Merge 表忽略那些被排除在外的表,不会向他们发起查询请求。

Dictionary

这里涉及到了数据字典,关于数据字典我们会专门放在后面说,目前可以先了解一下。Dictionary 表引擎是数据字典的一层代理封装,它可以取代字典函数,让用户通过数据表查询字典。字典内的数据被加载后,会全部保存到内存中,所以使用 Dictionary 对字典性能没有任何影响。声明 Dictionary 数据表的方式如下:

1
ENGINE = Dictionary(dict_name)

其中 dict_name 对应一个已被加载的字典名称,举个栗子:

1
2
3
4
5
CREATE TABLE tb_test_flat_dict ( 
id UInt64,
code String,
name String
) ENGINE = Dictionary(test_flat_dict)

tb_test_flat_dict 等同于数据字典 test_flat_dict 的代理表,对它进行 SELECT 查询即可获取内部的数据。

Distributed

在数据库领域,当面对海量业务数据的时候,一种主流的做法是实施 Sharding 方案,即将一张数据表横向扩展到多个数据库实例。其中每个数据库实例称为一个 Shard 分片,数据在写入时,需要按照预定的业务规则均匀地写至各个 Shard 分片;而在数据查询时,则需要在每个 Shard 分片上分别查询,最后归并结果集。所以为了实现 Sharding 方案,一款支持分布式数据库的中间件是必不可少的,例如 Apache ShardingSphere。

ClickHouse 作为一款性能卓越的分布式数据库,自然也是支持 Sharding 方案的,而 Distributed 表引擎就等同于 Sharding 方案中的数据库中间件。Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入分发以及查询路由工作。关于 Distributed 表引擎的详细介绍,将会在后续展开。

其它类型

接下来将要介绍的几款表引擎,由于各自用途迥异,所以只好把它们归为其它类型。最然这些表引擎的使用场景并不广泛,但仍建议了解它们的特性和使用方法,因为这些表引擎扩充了 ClickHouse 的能力边界。在一些特殊的场合,它们也能够发挥重要作用。

Live View

虽然 ClickHouse 已经提供了准实时的数据处理手段,例如 Kafka 表引擎和物化视图,但是在应用层面,一直缺乏开放给用户的事件监听机制。所以从 19.14 版本开始,ClickHouse 提供了一种全新的视图:Live View。

Live View 是一种特殊的视图,虽然它并不属于表引擎,但是因为它与数据表息息相关,所以还是把 LiveView 归类到了这里。Live View 的作用类似事件监听器,它能够将一条 SQL 查询结果作为监控目标,当目标数据增加时,LiveView 可以及时发出响应。若要使用 Live View,首先需要将 allow_experimental_live_view 参数设置为 1,可以执行如下语句确认参数是否设置正确:

img

现在来举例说明,首先创建一张数据表,它将作为 Live View 的监听目标:

1
2
3
4
5
6
7
8
CREATE TABLE origin_table (
id UInt64
) ENGINE = Log();
-- 紧接着创建一个 Live View
CREATE LIVE VIEW lv_origin AS SELECT COUNT(*) FROM origin_table;
-- 然后执行 watch 命令开启监听模式
-- 以后每当 origin_table 中的数据发生变化,就会执行 SELECT COUNT(*)
WATCH lv_origin;

如此一来 Live View 就进入监听模式了,首先 origin_table 里面是没有数据的,所以显示结果为 0:

img

然后再开启一个客户端,向 origin_table 里面写入数据,假设写入 10 条。数据写入之后会发现 Live View 做出了实时响应,查询的值变成了 10,并且虚拟字段 _version 会伴随着每一次的响应增加 1。

Null

Null 表引擎的功能与作用,与 Unix 系统的空设备 /dev/null 很相似,如果用户向 Null 表写入数据,系统会正确返回,但数据会被 Null 表自动忽略,永远不会将它们保存。如果用户向 Null 表发起查询,那么它将返回空。在使用物化视图的时候,如果不希望保留源表的数据,那么将源表设置成 Null 引擎将会是非常好的选择。下面就来举个栗子:

1
2
3
4
5
-- 首先创建一张 Null 表
CREATE TABLE null_table (id UInt8) ENGINE = Null();
-- 接着以 null_table 为源表,建立一张物化视图
CREATE MATERIALIZED VIEW view_table
ENGINE = TinyLog() AS SELECT * FROM null_table

如果往 null_table 里面写数据,那么数据会被顺利同步到 view_table 中,但是 null_table 中是查询不到数据的。

img

URL

URL 表引擎的作用等价于 HTTP 客户端,它可以通过 HTTP/HTTPS 协议,直接访问远端的 REST 服务。当执行 SELECT 查询的时候,底层会将其转换为 GET 请求的远程调用;而执行 INSERT 查询的时候,会将其转成 POST 请求的远程调用,并将数据以字节流的形式传递。URL 表引擎的声明方式如下所示:

1
ENGINE = URL('url', format)

其中 url 表示远端的服务地址,而 format 则是 ClickHouse 支持的数据格式,如 TSV、CSV 和 JSON 等。

这里我们用 Python 的 FastAPI 编写一个 web 服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import re
from fastapi import FastAPI, Request, Response
import orjson
import uvicorn

app = FastAPI()
table = []

@app.get("/girls")
async def get():
response = Response(orjson.dumps(table),
media_type="application/json")
return response

@app.post("/girls")
async def post(request: Request):
# 如果插入多行数据,那么这些数据之间会以 \n 进行分割
data = re.split(rb"(?<=})\n(?={)", await request.body())
rows = [orjson.loads(_) for _ in data]
if isinstance(rows, dict):
table.append(rows)
else:
table.extend(rows)
return True

if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=5555)

启动之后我们来创建表:

1
2
3
4
5
CREATE TABLE url_table (
id UInt32,
name String,
place String
) ENGINE = URL('http://localhost:5555/girls', JSONEachRow)

以后每执行一次 SELECT 都相当于发起了一次 GET 请求,执行一次 INSERT 相当于发起了一次 POST 请求,我们来测试一下:

img

可以看出 ClickHouse 想的真是无比周到,考虑了大量的数据源,以上就是其它表引擎的全部内容。至于 Dictionary 和 Distributed 两个表引擎我们后面再说,因为涉及了还没介绍的内容。

到目前为止我们知道了除了 MergeTree 家族表引擎之外还有另外 5 种表引擎,这些表引擎丰富了 ClickHouse 的使用场景,扩充了 ClickHouse 的使用界限。下面再总结一下:

  • 外部存储类型的表引擎和 Hive 的外部表很类似,它们只负责元数据的管理和数据查询,自身并不负责数据的生成,数据文件直接由外部系统维护。它们可以直接读取 HDFS、本地文件、常见关系型数据库以及 Kafka 的数据。
  • 内存类型的表引擎中的数据是常驻内存的,所以它们拥有堪比 MergeTree 的查询性能(1 亿数据量以内),其中 Set 和 Join 表引擎拥有物理存储,数据在写入内存的同时也会被刷到磁盘;而 Memory 和 Buffer 表引擎在服务重启之后,数据便会被清空。内存类表引擎是一把双刃剑,在数据大于 1 亿的场景下不建议使用内存类型的表引擎。
  • 接口类型的表引擎自身并不存储任何数据,而是像粘合剂一样可以整合其它的数据表,其中 Merge 表引擎能够合并查询任意两张表结构相同的数据表;Dictionary 表引擎能够代理查询数据字典;而 Distributed 表引擎的作用类似分布式数据库的分表中间件,能够帮助用户简化数据的分发和路由工作。
  • 其它类型的表引擎用途各不相同,其中 Live View 是一种特殊的视图,能够对 SQL 查询进行实时监听;Null 表引擎类似于 Linux 系统的空设备 /dev/null,通常和物化视图一起搭配使用;而 URL 表引擎类似于 HTTP 客户端,能够代理调用远端的 REST 服务。