Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This unified guide shows how to: (1) model and distribute event streams, (2) ingest at scale, (3) run low-latency analytics, (4) roll up and expire data, (5) apply approximate distinct counting (HLL), and (6) incorporate semi-structured JSONB.
Note
This article assumes you have a running Citus cluster (elastic clusters on Azure Database for PostgreSQL, or self-managed) and psql access. See Getting Started if you still need a local setup.
1. Use case and architecture
Real-time dashboards need fast slice and trend queries over recent events while retaining longer-term history efficiently. Typical requirements:
| Requirement | Challenge | Citus Approach |
|---|---|---|
| Low-latency per-minute metrics | High ingest + frequent aggregations | Per-tenant/partition hash sharding & local aggregation |
| Time-window graphs (minutes→years) | Rescan large raw tables | Preaggregated rollup tables (multi granularity) |
| Cost control | Raw events grow unbounded | Retention windows + rollups + partitioning |
| Distinct counts (visitors/IPs) | Exact counting expensive | Probabilistic HLL sketches |
| Semi-structured attributes | Schema churn & sparse columns | JSONB with selective indexing |
This article shows two example domains that map to the same design: HTTP request telemetry and GitHub events. Substitute your own eventstream and dimensions.
2. Base event model
Example: GitHub-like event firehose (subset). Two core tables: users and events.
CREATE TABLE github_users (
user_id bigint PRIMARY KEY,
url text,
login text,
avatar_url text,
gravatar_id text,
display_login text
);
CREATE TABLE github_events (
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
user_id bigint,
org jsonb,
created_at timestamp
);
CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);
Choose a distribution column
Pick a column with high cardinality, balanced access, and frequent filter predicates (here user_id). Distribute both tables on it to colocate joins and rollups:
SELECT create_distributed_table('github_users', 'user_id');
SELECT create_distributed_table('github_events', 'user_id');
Alternate: HTTP request stream
For per-site dashboards, use site_id instead:
CREATE TABLE http_request (
site_id INT,
ingest_time timestamptz DEFAULT now(),
url text,
request_region text,
ip_address text,
status_code int,
response_time_msec int
);
SELECT create_distributed_table('http_request', 'site_id');
3. Loading data
Sample CSV ingestion (GitHub example):
\copy github_users from 'users.csv' with csv
\copy github_events from 'events.csv' with csv
4. Immediate querying (raw layer)
-- count users
SELECT count(*) FROM github_users;
-- commits per minute (PushEvent)
SELECT date_trunc('minute', created_at) AS minute,
sum((payload->>'distinct_size')::int) AS num_commits
FROM github_events
WHERE event_type = 'PushEvent'
GROUP BY minute
ORDER BY minute;
-- top repo creators
SELECT login, count(*)
FROM github_events ge
JOIN github_users gu USING (user_id)
WHERE event_type = 'CreateEvent'
AND payload @> '{"ref_type":"repository"}'
GROUP BY login
ORDER BY count(*) DESC
LIMIT 10;
5. Rollup design (performance layer)
Motivation: dashboards repeatedly rescan raw events for time buckets. Introduce 1‑minute (and optionally 1‑hour or 1‑day) rollup tables keyed by same distribution column and time bucket.
HTTP example rollup (per minute):
CREATE TABLE http_request_1min (
site_id INT,
ingest_time timestamptz, -- truncated minute
error_count INT,
success_count INT,
request_count INT,
average_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time))
);
SELECT create_distributed_table('http_request_1min', 'site_id');
CREATE INDEX ON http_request_1min (site_id, ingest_time);
Rollup function
CREATE TABLE latest_rollup (
minute timestamptz PRIMARY KEY CHECK (minute = date_trunc('minute', minute))
);
INSERT INTO latest_rollup VALUES ('1901-01-01');
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void LANGUAGE plpgsql AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now() - interval '1 minute');
last_rollup_time timestamptz := minute FROM latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
)
SELECT site_id,
date_trunc('minute', ingest_time),
COUNT(*) request_count,
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 1 ELSE 0 END) success_count,
SUM(CASE WHEN status_code BETWEEN 200 AND 299 THEN 0 ELSE 1 END) error_count,
SUM(response_time_msec) / COUNT(*) average_response_time_msec
FROM http_request
WHERE ingest_time <@ tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1,2;
UPDATE latest_rollup SET minute = curr_rollup_time;
END;$$;
Schedule every minute (cron or pg_cron).
Dashboard query (rollup layer)
SELECT site_id, ingest_time AS minute, request_count,
success_count, error_count, average_response_time_msec
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes';
6. Data retention and partitioning
Apply TTL policies to limit Azure Storage:
DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';
For faster purges, layer native range partitioning on ingest_time on top of Citus hash sharding (see timeseries).
7. Approximate distinct counts (HLL)
Install or enable HLL (already available in Azure Storage managed service):
CREATE EXTENSION IF NOT EXISTS hll;
ALTER TABLE http_request_1min ADD COLUMN distinct_ip_addresses hll;
Extend rollup INSERT with:
, hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
Query distinct across interval:
SELECT hll_cardinality(hll_union_agg(distinct_ip_addresses)) AS unique_ips
FROM http_request_1min
WHERE ingest_time > now() - interval '5 minutes';
8. Semi-structured aggregations (JSONB)
Add flexible region counts:
ALTER TABLE http_request_1min ADD COLUMN region_counters jsonb;
Augment rollup (window + jsonb_object_agg):
, jsonb_object_agg(request_region, region_count) AS region_counters
FROM (
SELECT *, count(*) OVER (
PARTITION BY site_id, date_trunc('minute', ingest_time), request_region
) AS region_count
FROM http_request
) h
Dashboard extract example:
SELECT request_count,
success_count,
error_count,
average_response_time_msec,
COALESCE(region_counters->>'USA','0')::int AS us_visitors
FROM http_request_1min
WHERE ingest_time > now() - interval '5 minutes';
9. Maintenance and operations
| Task | Notes |
|---|---|
| Shard Rebalancing | Run SELECT citus_rebalance_start(); after adding workers. |
| Schema Evolution | Standard DDL on coordinator propagates. Keep rollup code in a versioned function. |
| Monitoring | Tracks ingest lag (now - latest_rollup.minute). |
| Backfill | Temporarily widen rollup window; disable cron overlap. |
| Large Tenants / Hot Keys | Consider secondary sharding or isolating via isolate_tenant_to_new_shard if single key dominates. |