Skip to main content

Cross-Shard Aggregation Patterns

Cross-shard aggregation is one of the operationally demanding corners of the broader Cross-Partition Querying & Aggregation Strategies space. Where single-shard queries are fast and local, aggregations that span every partition require a coordinator to orchestrate parallel dispatch, collect partial results, and merge them without exhausting memory or blocking on a slow shard. This page covers the architecture, implementation, and failure modes of that full pipeline β€” from scatter-gather wiring through pre-aggregation pushdown to streaming analytics. For the routing layer that sits upstream of aggregation, see Proxy Routing Architectures.

Problem Framing

Consider a 2 TB events table sharded across 16 PostgreSQL nodes by tenant_id. A billing job needs SUM(usage_bytes) GROUP BY tenant_id, DATE_TRUNC('day', created_at) for the past 30 days. If the coordinator naively fans out a full-table query to every shard and buffers all rows before grouping, it transfers hundreds of gigabytes across the network and risks coordinator OOM. With a shard timeout of 2 s, a single slow node stalls the entire merge until the timeout fires β€” at which point the result is either dropped or returned as approximate. The patterns below address each of these failure vectors in turn.


Architecture Overview

The diagram below shows the full scatter-gather pipeline: a coordinator decomposes the query, dispatches in parallel to each shard, collects partial results (already pre-aggregated on the shard), merges them, and returns a final result with an accuracy flag.

Scatter-Gather Aggregation Pipeline A coordinator node on the left decomposes the query and fans out to four shard nodes. Each shard runs a local pre-aggregation and returns a partial result. The coordinator merges all partials into a final response, flagging accuracy as exact or approximate depending on shard availability. Coordinator Decompose query Merge partials Flag accuracy Shard 1 Local GROUP BY β†’ partial SUM Shard 2 Local GROUP BY β†’ partial SUM Shard 3 Local GROUP BY β†’ partial SUM Shard 4 βœ• timeout excluded from merge Merge SUM of SUMs accuracy=approximate pre-aggregation on each shard

Implementation Walkthrough

Step 1 β€” Decompose the Query and Prune Shards

Before dispatching, identify which shards hold relevant data. If the query includes a tenant_id filter, only send to the shard(s) that own those tenants. Full scatter is only correct when no shard key is present in the predicate.

-- On each shard: push the GROUP BY down before shipping rows
SELECT
  tenant_id,
  DATE_TRUNC('day', created_at) AS day,
  SUM(usage_bytes)              AS partial_sum,
  COUNT(*)                      AS partial_count
FROM   events
WHERE  created_at >= NOW() - INTERVAL '30 days'
GROUP  BY 1, 2;

Returning partial_sum and partial_count separately lets the coordinator compute a correct weighted average (SUM(partial_sum) / SUM(partial_count)) without raw rows crossing the network.

Step 2 β€” Dispatch in Parallel with Per-Shard Timeouts

Use Promise.allSettled (or its equivalent in your language) so a single slow shard cannot block the merge indefinitely.

async function executeCrossShardAggregation(query, shards) {
  const partials = await Promise.allSettled(
    shards.map(s => dispatchToShard(s, query, { timeoutMs: 2000 }))
  );

  const successful = partials
    .filter(p => p.status === 'fulfilled')
    .map(p => p.value);

  const failedCount = partials.filter(p => p.status === 'rejected').length;

  return {
    result:       mergePartials(successful),
    accuracy:     failedCount > 0 ? 'approximate' : 'exact',
    failedShards: failedCount
  };
}

failedShards flows directly into your observability pipeline. Wire it to an alert so on-call knows when results are approximate before a downstream consumer flags a billing discrepancy.

Step 3 β€” Merge Partial Results on the Coordinator

The merge logic must be associative. SUM and COUNT are straightforward; AVG requires the count-weighted form; DISTINCT COUNT requires either exact distinct counts per shard (only correct when keys do not overlap) or a HyperLogLog sketch.

from functools import reduce

def merge_partials(partials: list[dict]) -> dict:
    """Merge pre-aggregated shard results for SUM and COUNT."""
    return reduce(
        lambda acc, p: {
            'total_sum':   acc['total_sum']   + p['partial_sum'],
            'total_count': acc['total_count'] + p['partial_count'],
        },
        partials,
        {'total_sum': 0, 'total_count': 0}
    )

Step 4 β€” Stream Results When Cardinality Is High

For high-cardinality GROUP BY (millions of groups), buffering all partial rows in coordinator memory before merging triggers OOM. Use a streaming merge instead: open a cursor on each shard, read rows in sorted order, and merge-sort across shards.

-- On each shard: open a named cursor so rows stream on FETCH
BEGIN;
DECLARE partial_cur CURSOR FOR
  SELECT tenant_id, day, SUM(usage_bytes) AS partial_sum
  FROM   events
  WHERE  created_at >= NOW() - INTERVAL '30 days'
  GROUP  BY 1, 2
  ORDER  BY tenant_id, day;

FETCH 500 FROM partial_cur;

The coordinator reads 500-row batches from all shards in lockstep, emitting merged rows as it goes. Peak coordinator memory stays proportional to the number of shards Γ— batch size, not to total result cardinality.

Step 5 β€” Persist Expensive Aggregations as Materialized Views

For aggregations that run on a schedule (daily billing, hourly dashboards), avoid scatter-gather on every request. Pre-compute and cache the result. See Optimizing Cross-Partition Aggregations with Materialized Views for the full refresh workflow.

-- Citus: parallel materialized view across distributed shards
SET citus.max_parallel_tasks_per_job = 4;

CREATE MATERIALIZED VIEW mv_daily_usage AS
SELECT
  tenant_id,
  DATE_TRUNC('day', created_at) AS day,
  SUM(usage_bytes)              AS total_bytes
FROM   events
GROUP  BY 1, 2;

-- Refresh only the delta partition (last 2 days) to reduce compute
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_daily_usage;

Enable enable_partition_pruning = on and jit = on on each shard node so the local aggregation step is JIT-compiled and only touches the relevant date partitions.


Configuration Reference

Parameter Recommended value Rationale
statement_timeout (shard) 2000ms Prevents a slow shard from blocking the coordinator beyond the SLA budget
citus.max_parallel_tasks_per_job 4 Limits concurrent shard tasks; raise only if shard CPU/IO headroom allows
enable_partition_pruning on Skips partitions outside the query’s date range at the shard planner level
jit on JIT-compiles inner aggregation loops; net positive for CPU-bound GROUP BY
work_mem (coordinator) 256MB Caps per-query sort/hash memory; combine with streaming to stay within limit
max.poll.records (Kafka consumer) 500 Controls micro-batch size for streaming aggregation; tune against lag target
fetch.max.bytes (Kafka consumer) 10485760 10 MB per fetch; balance throughput vs. coordinator memory per partition
enable.idempotence (Kafka producer) true Guarantees exactly-once delivery when flushing aggregation results

Operational Contrast

This scatter-gather model differs from the two adjacent approaches in the same section:

  • Application-Level Sharding Logic embeds routing directly in the application process, eliminating a network hop but requiring every service to carry its own shard topology awareness. Aggregation logic then also lives in the application, making cross-team reuse harder.
  • Proxy Routing Architectures centralise routing in a dedicated proxy (e.g. Pgpool, ProxySQL, or a custom HAProxy config) that can cache routing tables and connection pools. The proxy is the right place to colocate a coordinator when multiple services need the same aggregation paths, because the routing config lives in one place. For how to build that proxy layer, see Building a Custom Query Router with HAProxy.
  • Federated Query Execution goes a step further: engines such as Trino and DuckDB treat heterogeneous data sources (Parquet, S3, JDBC) as first-class shards, handling the scatter-gather and merge internally. Choose federated execution when your data spans multiple storage systems; use the native scatter-gather patterns on this page when all shards share the same engine (PostgreSQL, MySQL, Citus).

Monitoring and Observability

Track these signals to detect aggregation problems before they affect downstream consumers.

# Coordinator latency β€” alert when P95 exceeds 1.5 s
histogram_quantile(0.95,
  rate(scatter_gather_duration_seconds_bucket[5m])
) > 1.5

# Approximate-result rate β€” alert when more than 5 % of aggregations are partial
rate(scatter_gather_approximate_total[5m])
  / rate(scatter_gather_requests_total[5m]) > 0.05

# Partial result payload β€” detect unbounded memory growth
rate(coordinator_partial_result_bytes_total[5m]) > 104857600

Also instrument failedShards as a gauge so your on-call dashboard shows exactly which shard is timing out and for how long.


Failure Modes

Failure Root cause Detection Mitigation
Coordinator OOM during large GROUP BY All shard partial results buffered in coordinator memory simultaneously coordinator_memory_used_bytes exceeds JVM/process limit; OOM kill in logs Switch to streaming merge via shard-side cursors; reduce work_mem; add spill-to-disk
Stale approximate result served as exact Timed-out shard later recovers and re-joins without resetting accuracy flag Missing accuracy=approximate on responses where failedShards > 0 Always propagate accuracy flag from executeCrossShardAggregation; add integration test for the timeout path
Shard routing breaks during rebalance Hardcoded shard-to-host mappings in application config become stale when topology shifts Query errors spike; partial_result_bytes drops to zero for affected shard Replace static maps with dynamic service discovery; add fallback routing to replica shards during rebalancing
Pre-aggregation pushdown skipped Query planner on shard does not recognise the GROUP BY as pushdown-eligible (missing index or stats) Full-table scan in EXPLAIN output; seq_scan metric rising on shard nodes Run ANALYZE after bulk loads; ensure enable_partition_pruning = on; verify the shard-side index covers the GROUP BY columns

Common Mistakes

  • Full table scans on every shard for every aggregation. Not applying shard-key predicates means the coordinator fans out to all shards even when only a small tenant cohort is needed. Add tenant_id to the WHERE clause whenever the caller knows the scope, and let the routing layer prune shards before dispatch.

  • Treating DISTINCT COUNT like SUM. Summing per-shard distinct counts double-counts keys that appear on multiple shards. Use HyperLogLog sketches (e.g. pg_hll) for approximate distinct counts, or reroute the query through a single shard if exact counts are required.

  • No accuracy flag on the response. When a shard times out and is excluded, the merge result is incorrect but looks exact. Downstream services (billing, reporting) must know the result is approximate so they can hold it for re-computation rather than committing it.

  • Shared coordinator for OLTP and OLAP traffic. Running scatter-gather aggregations on the same coordinator that handles OLTP writes creates CPU contention and unpredictable latency. Route aggregation queries to a dedicated read-replica pool and set statement_timeout independently on each pool.


FAQ

How do I handle partial aggregation results when a shard times out?

Implement circuit breakers that exclude degraded shards from the merge set and expose an accuracy flag on the response so downstream consumers know the result is incomplete. For SUM and COUNT you can apply statistical interpolation based on the shard’s historical share of total data volume. Always emit a failedShards metric so your alerting pipeline can page when approximation exceeds an acceptable threshold β€” typically above 5 % of shards.

Should aggregation coordinators run in the application layer or as a dedicated proxy?

Dedicated proxies centralise routing tables, connection pooling, and partial-result caching β€” the right choice when multiple services run the same aggregation queries. Application-level routing eliminates a network hop and is preferable for high-frequency, low-cardinality aggregations where latency dominates. See Proxy Routing Architectures for a full comparison of the tradeoffs.

How can I prevent coordinator memory exhaustion during large cross-shard GROUP BY operations?

Push partial GROUP BY down to each shard before shipping rows to the coordinator so the merge set is already reduced. Stream partial results via named cursors rather than buffering all shard responses in memory simultaneously. Set an explicit work_mem limit on the coordinator process and configure automatic spill-to-disk for intermediate hash aggregation states when that limit is reached.


Articles in This Section