Citus 群集由协调器实例和多个辅助角色实例组成。 数据在工作器上分片,而协调器存储有关这些分片的元数据。 通过协调器对群集执行所有查询。 协调器将查询分区为较小的查询片段,每个查询片段可以在分片上独立运行。 协调器将查询片段分配给辅助角色,监督其执行,合并结果,并将最终结果返回给用户。 下图简要介绍了查询处理体系结构。
Citus 的查询处理管道涉及两个组件:
- 分布式查询规划器和执行程序
- PostgreSQL Planner 和 Executor
以下部分更详细地讨论了这些组件。
分布式查询规划器
Citus 的分布式查询规划器采用 SQL 查询,并计划它进行分布式执行。
对于 SELECT 查询,规划器首先创建输入查询的计划树,并将其转换为其通勤和关联形式,以便可以并行化。 它还应用了多项优化,以确保以可缩放的方式执行查询,并将网络 I/O 最小化。
接下来,规划器将查询分为两个部分:协调器查询,该查询在协调器上运行,辅助角色查询片段在辅助角色上的单个分片上运行。 然后,规划器将查询片段分配给辅助角色,以便有效地使用其所有资源。 完成此步骤后,分布式查询计划将传递给分布式执行程序以供执行。
分布列或修改查询上的键值查找的规划过程略有不同,因为它们恰好命中了一个分片。 当规划器收到传入查询时,它会确定应将查询路由到的正确分片。 它通过提取传入行中的分布列并查找元数据来确定查询的正确分片。 然后,Planner 重写该命令的 SQL 以引用分片表而不是原始表。 然后,此重写计划将传递给分布式执行程序。
延迟快速路径规划 (Citus 13.2)
在 Citus 13.2 中,规划器会延迟生成快速路径占位符计划,直到它标识分片。 如果分片放置是处理客户端查询的节点(MX 模式)的本地位置,Citus 可以避免对分片查询进行解分析、分析和计划步骤,并重复使用缓存的计划。 此方法可提高吞吐量。
资格:
- 查询是
SELECT分布式UPDATE表(架构或列分片)或 Citus 托管的本地表。 - 无可变函数。
- 分片可以在计划时确定,并且是节点(MX 模式)的本地区域。
- 当前不支持引用表。
行为:
- 如果分片是本地且安全的,执行程序会将分布式表 OID 替换为分片 OID、调用
standard_planner和缓存任务中的计划。 - 否则,执行程序将回退到快速路径占位符计划。
GUC:
-
citus.enable_local_fast_path_query_optimization(默认值on)。
分布式查询执行程序
Citus 的分布式执行程序运行分布式查询计划并处理失败。 执行程序非常适合用于快速响应涉及筛选器、聚合和共置联接的查询。 它还适用于运行具有完整 SQL 覆盖范围的单租户查询。 执行程序根据需要为每个分片打开一个连接,并向其发送所有片段查询。 然后,它会从每个片段查询中提取结果,将其合并,并将最终结果返回给用户。
子查询和 CTE 推送拉取执行
如有必要,Citus 可以将子查询和公用表表达式(CTE)的结果收集到协调器节点,然后将其推送回辅助角色,供外部查询使用。 此体系结构允许 Citus 支持多种 SQL 构造。
例如,在 WHERE 子句中具有子查询不能始终与主查询同时内联执行,但必须单独执行。 假设 Web 分析应用程序维护 page_views 按其分区的 page_id表。 若要查询前 20 个访问次数页中的访问者主机数,请使用子查询查找页面列表,然后使用外部查询对主机进行计数。
SELECT page_id, count(distinct host_ip)
FROM page_views
WHERE page_id IN (
SELECT page_id
FROM page_views
GROUP BY page_id
ORDER BY count(*) DESC
LIMIT 20
)
GROUP BY page_id;
执行程序针对每个分片运行此查询的片段,对不同的host_ip分片page_id进行计数,并将结果合并到协调器上。 但是,子 LIMIT 查询中的表示子查询不能作为片段的一部分执行。 通过递归规划查询 Citus 可以单独运行子查询,将结果推送到所有辅助角色,运行主片段查询,并将结果拉回协调器。
推送拉取设计支持子查询,如上一示例中的子查询。
可以通过查看此查询的 EXPLAIN 输出来查看此推送拉取执行。
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
该过程涉及相当多,因此让我们将其分开并检查每个部分。
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
树的根是协调器节点对辅助角色的结果执行的作。 在这种情况下,它会对它们进行分组,并 GroupAggregate 要求先对其进行排序。
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
自定义扫描有两个大型子树,从 分布式子计划开始。
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=54.70..56.70 rows=200 width=12)
Group Key: page_id
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
工作器节点为每个 32 个分片运行此子计划(Citus 正在为显示选择一个代表)。 可以识别子查询的所有部分 IN (...) :排序、分组和限制。 当所有辅助角色完成此查询时,它们会将输出发送回协调器,后者将其作为 中间结果组合在一起。
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=9701 dbname=postgres
-> HashAggregate (cost=84.50..86.75 rows=225 width=36)
Group Key: page_views.page_id, page_views.host_ip
-> Hash Join (cost=17.00..78.88 rows=1124 width=36)
Hash Cond: (page_views.page_id = intermediate_result.page_id)
Citus 在此第二个子树中启动另一个执行程序作业。 它将计算不同的主机。page_views 它使用 JOIN 连接到中间结果。 中间结果有助于将其限制为前 20 页。
-> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=36)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
辅助角色使用函数在内部检索中间结果 read_intermediate_result ,该函数从协调器节点复制的文件加载数据。
此示例演示了 Citus 如何使用分布式子计划在多个步骤中执行查询,以及如何使用 EXPLAIN 了解分布式查询执行。
PostgreSQL 规划器和执行程序
分布式执行程序将查询片段发送到辅助角色后,辅助角色会像常规 PostgreSQL 查询一样处理片段。 每个辅助角色上的 PostgreSQL 规划器选择在相应的分片表本地执行查询的最佳计划。 PostgreSQL 执行程序运行查询,并将查询结果返回给分布式执行程序。 有关 PostgreSQL 规划器和执行程序的详细信息,请参阅 PostgreSQL 手册。 最后,分布式执行程序将结果传递给协调器进行最终聚合。