Federated Query Execution: Implementation & Routing Workflows
Federated query execution sits at the operational core of Cross-Partition Querying & Aggregation Strategies: it unifies data access across distributed nodes that may run different database engines without centralizing storage. Unlike proxy routing architectures, which handle connection topology and traffic interception, federated execution addresses the harder problem of query planning, predicate pushdown, and result merging across nodes that share no common coordinator catalog.
Problem Framing
Consider a backend serving 800 million order rows spread across eight PostgreSQL shards by tenant_id, with analytics landing in a separate Redshift cluster and a legacy MySQL instance holding product catalog data. A single SELECT joining orders to products and computing per-tenant revenue touches three engines and six network hops. Without a federated layer, application code must fan out the query manually, merge partial result sets in memory, and handle partial node failures β logic that accumulates in every service that needs cross-node access.
The specific failure modes this architecture is designed to prevent are:
- Coordinator OOM when result sets from multiple nodes are merged without memory caps.
- Silent stale reads when node health is not tracked and failed shards are silently skipped.
- Unbounded query fans when broadcast strategies fire against all nodes instead of only the shards that satisfy the predicate.
Architecture Overview
A federated execution layer has three distinct tiers:
Coordinator. Receives the original SQL or API query, parses the AST to extract partition keys and predicates, selects a dispatch strategy (targeted single-shard, scatter-gather, or broadcast), and merges partial results. The coordinator holds no persistent data β only catalog metadata describing which shards own which key ranges.
Dispatch layer. Routes decomposed sub-queries to leaf nodes in parallel. In environments using application-level sharding logic, this tier may live inside the application process rather than as a standalone service.
Leaf nodes. Execute local sub-queries, apply local indexes, and stream partial result sets back to the coordinator. Pushdown optimization lets the leaf nodes evaluate WHERE clauses and aggregations (GROUP BY, COUNT, SUM) locally, minimising the bytes that cross the network.
The cross-shard aggregation patterns that govern how partial aggregates are combined at the coordinator are a direct dependency: a misconfigured merge step produces incorrect totals even when every leaf node returns correct partial data.
Implementation Walkthrough
Step 1 β Define Routing Rules and Shard Mapping
Routing rules are the single source of truth for dispatch. They map query patterns to dispatch strategies and shard targets. Store them in a YAML or JSON file consumed by the coordinator at startup:
routing_rules:
- name: orders_by_region
match:
table: orders
predicate_columns: [region]
strategy: scatter_gather
shards: [shard-us-east, shard-us-west, shard-eu-central]
timeout_ms: 4000
pushdown:
aggregates: true
filters: true
- name: orders_by_tenant
match:
table: orders
predicate_columns: [tenant_id]
strategy: targeted
shard_map: tenant_shard_index
timeout_ms: 1500
pushdown:
aggregates: false
filters: true
- name: product_catalog
match:
table: products
strategy: broadcast
shards: [mysql-catalog-primary]
timeout_ms: 2000
pushdown:
filters: true
The strategy field drives the dispatch cost. targeted is cheapest β one node, no fan-out. scatter_gather fans out in parallel and merges. broadcast sends the same query to every listed node and is appropriate only for small reference tables.
Step 2 β Configure the Execution Engine Connection Pool
The coordinator must multiplex connections across heterogeneous endpoints without exhausting file descriptors on high-concurrency workloads. A production SQLAlchemy configuration using the coordinator as the access point:
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
coordinator_engine = create_engine(
"postgresql+psycopg2://coordinator.internal:5432/federated_db",
poolclass=QueuePool,
pool_size=30, # baseline connections kept alive
max_overflow=20, # burst headroom above pool_size
pool_timeout=10, # seconds to wait for a connection before raising
pool_pre_ping=True, # discard stale connections proactively
connect_args={
"connect_timeout": 5,
"options": "-c statement_timeout=8000" # 8 s hard stop per statement
},
execution_options={"shard_routing": "auto"}
)
@event.listens_for(coordinator_engine, "connect")
def set_search_path(dbapi_conn, _):
dbapi_conn.cursor().execute("SET search_path = federated, public")
Set pool_pre_ping=True so connections checked out from the pool are validated before use β this is critical when leaf nodes restart independently.
Step 3 β Enable Query Pushdown
Pushdown is the single biggest performance lever in federated execution. Without it, the coordinator pulls entire tables over the network and filters locally. With it, each leaf runs the WHERE clause and aggregations against local indexes before transmitting rows.
For PostgreSQL leaf nodes using postgres_fdw:
-- On the coordinator node, create a foreign server for each leaf
CREATE SERVER shard_us_east
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard-us-east.internal', port '5432', dbname 'orders_db');
CREATE USER MAPPING FOR federated_role
SERVER shard_us_east
OPTIONS (user 'shard_reader', password 'REDACTED');
-- Import the remote table schema
IMPORT FOREIGN SCHEMA public
LIMIT TO (orders)
FROM SERVER shard_us_east
INTO federated;
-- Enable cost-based pushdown
ALTER SERVER shard_us_east OPTIONS (ADD use_remote_estimate 'true');
ALTER FOREIGN TABLE federated.orders OPTIONS (ADD fetch_size '5000');
Verify pushdown is active before enabling production traffic:
EXPLAIN (VERBOSE, FORMAT TEXT)
SELECT region, COUNT(*) AS order_count
FROM federated.orders
WHERE region = 'us-east'
AND created_at > '2025-01-01';
The output must include a Remote SQL block containing the WHERE clause. If the filter appears only in the local plan, pushdown is not active β check use_remote_estimate and that statistics have been gathered on the foreign table.
Step 4 β Implement Result Merging with Memory Guards
Partial results arrive asynchronously. Two merge strategies apply:
Stream merge (preferred for ordered result sets): each leaf streams sorted rows to the coordinator, which performs a k-way merge using a priority queue. Memory consumption is bounded by the merge buffer size, not total result set size.
Batch consolidation (preferred for aggregations): collect all SUM/COUNT partial results from leaves and reduce them in a final coordinator-side pass.
import heapq
from typing import Iterator
def stream_merge(
leaf_iterators: list[Iterator[tuple]],
key_index: int = 0
) -> Iterator[tuple]:
"""
K-way merge of pre-sorted partial result sets from leaf nodes.
Yields rows in global sort order without loading all results into memory.
"""
heap: list[tuple] = []
for i, it in enumerate(leaf_iterators):
try:
row = next(it)
heapq.heappush(heap, (row[key_index], i, row, it))
except StopIteration:
pass # leaf returned zero rows
while heap:
_, i, row, it = heapq.heappop(heap)
yield row
try:
next_row = next(it)
heapq.heappush(heap, (next_row[key_index], i, next_row, it))
except StopIteration:
pass
Cap coordinator memory by configuring a spill-to-disk threshold in your execution engine (e.g. work_mem in PostgreSQL, spark.sql.shuffle.partitions in Spark-based federators, or an explicit buffer limit in Trinoβs query.max-memory-per-node).
Step 5 β Configure Fallback and Retry Paths
Partial node failure should return degraded results, not a full error. Implement tiered retry logic at the dispatch layer:
import time
from dataclasses import dataclass
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay_s: float = 0.2
max_delay_s: float = 4.0
fallback_to_replica: bool = True
return_partial_on_timeout: bool = True
def execute_with_retry(shard_id: str, query: str, cfg: RetryConfig) -> list:
delay = cfg.base_delay_s
for attempt in range(cfg.max_attempts):
try:
return _execute_on_shard(shard_id, query)
except TimeoutError:
if cfg.return_partial_on_timeout and attempt == cfg.max_attempts - 1:
return [] # caller sees empty partial β coordinator marks shard degraded
time.sleep(min(delay, cfg.max_delay_s))
delay *= 2
except NodeUnavailableError:
if cfg.fallback_to_replica:
return _execute_on_replica(shard_id, query)
raise
return []
Track degraded shards in coordinator state so subsequent queries can serve cached results or raise a DEGRADED_RESULT_SET warning header to the client.
Configuration Reference
| Parameter | Recommended Value | Rationale |
|---|---|---|
pool_size |
20β40 per coordinator instance | Keeps persistent connections without exhausting backend max_connections |
max_overflow |
50 % of pool_size | Absorbs traffic spikes; above this, callers queue |
pool_timeout |
8β12 s | Matches p99 query latency SLA; fail-fast prevents cascade |
statement_timeout |
6000β10000 ms | Hard ceiling per sub-query; prevents long-running fan-out from blocking the coordinator |
fetch_size (FDW) |
5000β10000 rows | Reduces round-trips for large result sets; increase for analytics, decrease for OLTP |
use_remote_estimate |
true | Lets the coordinator use leaf-node statistics for cost-based pushdown decisions |
max_retry_attempts |
3 | Balances fault tolerance against tail latency inflation |
base_retry_delay_s |
0.2 | Exponential backoff starts here; prevents thundering-herd on transient node restarts |
Operational Contrast
Federated execution differs from the sibling approaches in this section in one fundamental way: it handles schema heterogeneity. Proxy routing architectures assume all shards are identical in schema β the proxy intercepts and re-routes without understanding query semantics. Application-level sharding logic pushes routing decisions into the application layer, which means every service must independently implement fan-out and merge.
Federated execution centralises that complexity at the coordinator. The tradeoff is added infrastructure (the coordinator is a potential bottleneck and single point of failure) and added operational burden (catalog metadata must be kept in sync with leaf schema changes). For uniform single-engine environments, proxy routing is simpler. For multi-engine or multi-schema environments, federation is the right model.
Failure Modes
Coordinator OOM During Result Merge
Root cause. All leaf nodes return large partial result sets simultaneously. The coordinator loads them into memory before merging. At scale, this exceeds available heap.
Detection.
process_resident_memory_bytes{job="federated-coordinator"} > 6e9
Mitigation. Switch to stream merge with a fixed merge buffer. Set work_mem or equivalent to a per-query cap. Enforce LIMIT clauses on unbounded analytical queries at the gateway layer before dispatch.
Pushdown Silently Disabled After Schema Change
Root cause. A FOREIGN TABLE schema is out of sync with the upstream leaf β for example, a column was added without re-running IMPORT FOREIGN SCHEMA. PostgreSQL falls back to local filtering on the outdated column definition.
Detection. Run EXPLAIN (VERBOSE) and check whether Remote SQL contains your WHERE predicate. If the filter appears only in the coordinator plan, pushdown has regressed.
Mitigation. Automate IMPORT FOREIGN SCHEMA as part of your schema migration pipeline. Treat schema drift between coordinator catalog and leaf as a deployment blocker.
Shard Key Mismatch Triggers Full Broadcast
Root cause. A routing rule references a predicate column (tenant_id), but the incoming query uses an alias or a computed expression (LOWER(tenant_id)). The coordinator cannot extract the shard key and defaults to broadcasting to all nodes.
Detection.
-- On the coordinator: check for unplanned broadcast activity
SELECT query_text, dispatch_strategy, shards_contacted
FROM federated.query_audit_log
WHERE dispatch_strategy = 'broadcast'
AND expected_strategy = 'targeted'
ORDER BY executed_at DESC
LIMIT 20;
Mitigation. Normalise predicate expressions in the routing rule pattern matcher. Reject queries at the gateway that do not include a resolvable shard key rather than silently broadcasting.
Partial Node Failure Returns Incomplete Aggregates Silently
Root cause. One leaf node times out. The coordinator returns a merged aggregate using only the responding nodes, with no indication to the client that the result set is incomplete.
Detection. Instrument coordinator responses with a X-Shard-Coverage header or equivalent metadata field:
def build_response(results: dict, total_shards: int) -> dict:
responding = len([r for r in results.values() if r is not None])
return {
"data": merge_results(results),
"meta": {
"shard_coverage": f"{responding}/{total_shards}",
"degraded": responding < total_shards
}
}
Mitigation. Surface degraded coverage to the client. For financial aggregations, fail hard rather than return incomplete totals.
Common Mistakes
-
Omitting
pool_pre_pingon coordinator connections. When a leaf node restarts, the coordinator holds stale connections that error on first use.pool_pre_ping=Trueevicts them before they cause a request failure. -
Not re-importing foreign table schemas after migrations. A column added to a leafβs table is invisible to the coordinator until
IMPORT FOREIGN SCHEMAis re-run. Filters on that column fall back to local evaluation, silently disabling pushdown. -
Using
broadcastfor large tables. Broadcast is appropriate only for small reference data (product catalogs, config tables, lookup lists). Broadcasting against a 500 M-row orders table saturates coordinator memory and degrades unrelated queries sharing the connection pool. -
Skipping distributed trace propagation. Without a consistent trace ID threaded from the client through the coordinator to every leaf, correlating a slow
EXPLAIN ANALYZEoutput on a leaf with a specific client request is nearly impossible. Propagate OpenTelemetry span context in every sub-query header.
Monitoring & Debugging
Distributed trace IDs must propagate from the initial request through every execution node. Capture EXPLAIN ANALYZE at both coordinator and leaf nodes to identify pushdown regressions.
P99 sub-query latency by shard:
histogram_quantile(0.99,
sum(rate(federated_subquery_duration_seconds_bucket[5m])) by (le, shard_id)
)
Shard fan-out ratio β how many shards are contacted per query (high values indicate broadcast leakage):
avg(federated_shards_contacted_per_query) by (routing_strategy)
Coordinator merge buffer usage:
federated_merge_buffer_bytes_used
/ federated_merge_buffer_bytes_limit
Alert when buffer utilisation exceeds 80 % β that signals an imminent OOM event under the next traffic spike.
For PostgreSQL FDW leaf nodes, verify remote statistics are current before enabling production traffic:
-- On the coordinator: check foreign table analyze age
SELECT foreign_table_name,
last_analyze,
n_live_tup
FROM information_schema.foreign_tables ft
JOIN pg_stat_user_tables st ON st.relname = ft.foreign_table_name
ORDER BY last_analyze NULLS FIRST;
FAQ
How does federated query execution differ from standard cross-partition aggregation?
Federated execution handles heterogeneous schemas and disparate engines β leaf nodes may be PostgreSQL, MySQL, a Redshift cluster, or an object store. Standard cross-shard aggregation assumes a uniform schema coordinated by a single database engine with shared catalog metadata.
What is the recommended timeout strategy for cross-node queries?
Implement tiered timeouts: a short per-node statement_timeout (6β10 s) at the leaf level, and a longer overall query timeout at the coordinator (15β30 s). Configure circuit breakers so a node that times out on three consecutive requests is temporarily removed from the routing pool. Return partial results with a degraded flag rather than blocking until the full timeout expires.
Can federated queries use existing database indexes on leaf nodes?
Yes, through pushdown. The coordinator translates global predicates into local index scans on each leaf. Verify by running EXPLAIN (VERBOSE) on the coordinator and checking that the Remote SQL block includes your WHERE clause. If the filter is absent from Remote SQL, the leaf is not using its index for that predicate β re-import the foreign schema and re-run ANALYZE on the foreign table.
Related
- Cross-Partition Querying & Aggregation Strategies β parent section covering the full query execution model
- Cross-Shard Aggregation Patterns β how partial aggregates from leaf nodes are combined correctly at the coordinator
- Proxy Routing Architectures β the simpler alternative when all shards share a uniform schema and engine
- Application-Level Sharding Logic β how service-layer routing compares to a centralised federated coordinator
- Executing Federated Queries Across Multiple PostgreSQL Instances β step-by-step
postgres_fdwconfiguration for this architecture