Skip to main content

Federated Query Execution: Implementation & Routing Workflows

Federated query execution sits at the operational core of Cross-Partition Querying & Aggregation Strategies: it unifies data access across distributed nodes that may run different database engines without centralizing storage. Unlike proxy routing architectures, which handle connection topology and traffic interception, federated execution addresses the harder problem of query planning, predicate pushdown, and result merging across nodes that share no common coordinator catalog.

Problem Framing

Consider a backend serving 800 million order rows spread across eight PostgreSQL shards by tenant_id, with analytics landing in a separate Redshift cluster and a legacy MySQL instance holding product catalog data. A single SELECT joining orders to products and computing per-tenant revenue touches three engines and six network hops. Without a federated layer, application code must fan out the query manually, merge partial result sets in memory, and handle partial node failures β€” logic that accumulates in every service that needs cross-node access.

The specific failure modes this architecture is designed to prevent are:

  • Coordinator OOM when result sets from multiple nodes are merged without memory caps.
  • Silent stale reads when node health is not tracked and failed shards are silently skipped.
  • Unbounded query fans when broadcast strategies fire against all nodes instead of only the shards that satisfy the predicate.

Federated Query Execution Architecture A client issues a query to a federated coordinator. The coordinator parses the AST, selects a scatter-gather or targeted dispatch strategy, and fans out sub-queries to leaf nodes (PostgreSQL shards, MySQL, Redshift). Results stream back and are merged at the coordinator before being returned to the client. Client SQL / API query Coordinator AST parse shard key extract dispatch strategy result merge merged result PostgreSQL Shards A–D PostgreSQL Shards E–H Redshift / DWH Analytics cluster sub-query sub-query sub-query partial rows partial rows partial rows

Architecture Overview

A federated execution layer has three distinct tiers:

Coordinator. Receives the original SQL or API query, parses the AST to extract partition keys and predicates, selects a dispatch strategy (targeted single-shard, scatter-gather, or broadcast), and merges partial results. The coordinator holds no persistent data β€” only catalog metadata describing which shards own which key ranges.

Dispatch layer. Routes decomposed sub-queries to leaf nodes in parallel. In environments using application-level sharding logic, this tier may live inside the application process rather than as a standalone service.

Leaf nodes. Execute local sub-queries, apply local indexes, and stream partial result sets back to the coordinator. Pushdown optimization lets the leaf nodes evaluate WHERE clauses and aggregations (GROUP BY, COUNT, SUM) locally, minimising the bytes that cross the network.

The cross-shard aggregation patterns that govern how partial aggregates are combined at the coordinator are a direct dependency: a misconfigured merge step produces incorrect totals even when every leaf node returns correct partial data.


Implementation Walkthrough

Step 1 β€” Define Routing Rules and Shard Mapping

Routing rules are the single source of truth for dispatch. They map query patterns to dispatch strategies and shard targets. Store them in a YAML or JSON file consumed by the coordinator at startup:

routing_rules:
  - name: orders_by_region
    match:
      table: orders
      predicate_columns: [region]
    strategy: scatter_gather
    shards: [shard-us-east, shard-us-west, shard-eu-central]
    timeout_ms: 4000
    pushdown:
      aggregates: true
      filters: true

  - name: orders_by_tenant
    match:
      table: orders
      predicate_columns: [tenant_id]
    strategy: targeted
    shard_map: tenant_shard_index
    timeout_ms: 1500
    pushdown:
      aggregates: false
      filters: true

  - name: product_catalog
    match:
      table: products
    strategy: broadcast
    shards: [mysql-catalog-primary]
    timeout_ms: 2000
    pushdown:
      filters: true

The strategy field drives the dispatch cost. targeted is cheapest β€” one node, no fan-out. scatter_gather fans out in parallel and merges. broadcast sends the same query to every listed node and is appropriate only for small reference tables.

Step 2 β€” Configure the Execution Engine Connection Pool

The coordinator must multiplex connections across heterogeneous endpoints without exhausting file descriptors on high-concurrency workloads. A production SQLAlchemy configuration using the coordinator as the access point:

from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool

coordinator_engine = create_engine(
    "postgresql+psycopg2://coordinator.internal:5432/federated_db",
    poolclass=QueuePool,
    pool_size=30,          # baseline connections kept alive
    max_overflow=20,       # burst headroom above pool_size
    pool_timeout=10,       # seconds to wait for a connection before raising
    pool_pre_ping=True,    # discard stale connections proactively
    connect_args={
        "connect_timeout": 5,
        "options": "-c statement_timeout=8000"  # 8 s hard stop per statement
    },
    execution_options={"shard_routing": "auto"}
)

@event.listens_for(coordinator_engine, "connect")
def set_search_path(dbapi_conn, _):
    dbapi_conn.cursor().execute("SET search_path = federated, public")

Set pool_pre_ping=True so connections checked out from the pool are validated before use β€” this is critical when leaf nodes restart independently.

Step 3 β€” Enable Query Pushdown

Pushdown is the single biggest performance lever in federated execution. Without it, the coordinator pulls entire tables over the network and filters locally. With it, each leaf runs the WHERE clause and aggregations against local indexes before transmitting rows.

For PostgreSQL leaf nodes using postgres_fdw:

-- On the coordinator node, create a foreign server for each leaf
CREATE SERVER shard_us_east
  FOREIGN DATA WRAPPER postgres_fdw
  OPTIONS (host 'shard-us-east.internal', port '5432', dbname 'orders_db');

CREATE USER MAPPING FOR federated_role
  SERVER shard_us_east
  OPTIONS (user 'shard_reader', password 'REDACTED');

-- Import the remote table schema
IMPORT FOREIGN SCHEMA public
  LIMIT TO (orders)
  FROM SERVER shard_us_east
  INTO federated;

-- Enable cost-based pushdown
ALTER SERVER shard_us_east OPTIONS (ADD use_remote_estimate 'true');
ALTER FOREIGN TABLE federated.orders OPTIONS (ADD fetch_size '5000');

Verify pushdown is active before enabling production traffic:

EXPLAIN (VERBOSE, FORMAT TEXT)
SELECT region, COUNT(*) AS order_count
FROM federated.orders
WHERE region = 'us-east'
  AND created_at > '2025-01-01';

The output must include a Remote SQL block containing the WHERE clause. If the filter appears only in the local plan, pushdown is not active β€” check use_remote_estimate and that statistics have been gathered on the foreign table.

Step 4 β€” Implement Result Merging with Memory Guards

Partial results arrive asynchronously. Two merge strategies apply:

Stream merge (preferred for ordered result sets): each leaf streams sorted rows to the coordinator, which performs a k-way merge using a priority queue. Memory consumption is bounded by the merge buffer size, not total result set size.

Batch consolidation (preferred for aggregations): collect all SUM/COUNT partial results from leaves and reduce them in a final coordinator-side pass.

import heapq
from typing import Iterator

def stream_merge(
    leaf_iterators: list[Iterator[tuple]],
    key_index: int = 0
) -> Iterator[tuple]:
    """
    K-way merge of pre-sorted partial result sets from leaf nodes.
    Yields rows in global sort order without loading all results into memory.
    """
    heap: list[tuple] = []
    for i, it in enumerate(leaf_iterators):
        try:
            row = next(it)
            heapq.heappush(heap, (row[key_index], i, row, it))
        except StopIteration:
            pass  # leaf returned zero rows

    while heap:
        _, i, row, it = heapq.heappop(heap)
        yield row
        try:
            next_row = next(it)
            heapq.heappush(heap, (next_row[key_index], i, next_row, it))
        except StopIteration:
            pass

Cap coordinator memory by configuring a spill-to-disk threshold in your execution engine (e.g. work_mem in PostgreSQL, spark.sql.shuffle.partitions in Spark-based federators, or an explicit buffer limit in Trino’s query.max-memory-per-node).

Step 5 β€” Configure Fallback and Retry Paths

Partial node failure should return degraded results, not a full error. Implement tiered retry logic at the dispatch layer:

import time
from dataclasses import dataclass

@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay_s: float = 0.2
    max_delay_s: float = 4.0
    fallback_to_replica: bool = True
    return_partial_on_timeout: bool = True

def execute_with_retry(shard_id: str, query: str, cfg: RetryConfig) -> list:
    delay = cfg.base_delay_s
    for attempt in range(cfg.max_attempts):
        try:
            return _execute_on_shard(shard_id, query)
        except TimeoutError:
            if cfg.return_partial_on_timeout and attempt == cfg.max_attempts - 1:
                return []   # caller sees empty partial β€” coordinator marks shard degraded
            time.sleep(min(delay, cfg.max_delay_s))
            delay *= 2
        except NodeUnavailableError:
            if cfg.fallback_to_replica:
                return _execute_on_replica(shard_id, query)
            raise
    return []

Track degraded shards in coordinator state so subsequent queries can serve cached results or raise a DEGRADED_RESULT_SET warning header to the client.


Configuration Reference

Parameter Recommended Value Rationale
pool_size 20–40 per coordinator instance Keeps persistent connections without exhausting backend max_connections
max_overflow 50 % of pool_size Absorbs traffic spikes; above this, callers queue
pool_timeout 8–12 s Matches p99 query latency SLA; fail-fast prevents cascade
statement_timeout 6000–10000 ms Hard ceiling per sub-query; prevents long-running fan-out from blocking the coordinator
fetch_size (FDW) 5000–10000 rows Reduces round-trips for large result sets; increase for analytics, decrease for OLTP
use_remote_estimate true Lets the coordinator use leaf-node statistics for cost-based pushdown decisions
max_retry_attempts 3 Balances fault tolerance against tail latency inflation
base_retry_delay_s 0.2 Exponential backoff starts here; prevents thundering-herd on transient node restarts

Operational Contrast

Federated execution differs from the sibling approaches in this section in one fundamental way: it handles schema heterogeneity. Proxy routing architectures assume all shards are identical in schema β€” the proxy intercepts and re-routes without understanding query semantics. Application-level sharding logic pushes routing decisions into the application layer, which means every service must independently implement fan-out and merge.

Federated execution centralises that complexity at the coordinator. The tradeoff is added infrastructure (the coordinator is a potential bottleneck and single point of failure) and added operational burden (catalog metadata must be kept in sync with leaf schema changes). For uniform single-engine environments, proxy routing is simpler. For multi-engine or multi-schema environments, federation is the right model.


Failure Modes

Coordinator OOM During Result Merge

Root cause. All leaf nodes return large partial result sets simultaneously. The coordinator loads them into memory before merging. At scale, this exceeds available heap.

Detection.

process_resident_memory_bytes{job="federated-coordinator"} > 6e9

Mitigation. Switch to stream merge with a fixed merge buffer. Set work_mem or equivalent to a per-query cap. Enforce LIMIT clauses on unbounded analytical queries at the gateway layer before dispatch.


Pushdown Silently Disabled After Schema Change

Root cause. A FOREIGN TABLE schema is out of sync with the upstream leaf β€” for example, a column was added without re-running IMPORT FOREIGN SCHEMA. PostgreSQL falls back to local filtering on the outdated column definition.

Detection. Run EXPLAIN (VERBOSE) and check whether Remote SQL contains your WHERE predicate. If the filter appears only in the coordinator plan, pushdown has regressed.

Mitigation. Automate IMPORT FOREIGN SCHEMA as part of your schema migration pipeline. Treat schema drift between coordinator catalog and leaf as a deployment blocker.


Shard Key Mismatch Triggers Full Broadcast

Root cause. A routing rule references a predicate column (tenant_id), but the incoming query uses an alias or a computed expression (LOWER(tenant_id)). The coordinator cannot extract the shard key and defaults to broadcasting to all nodes.

Detection.

-- On the coordinator: check for unplanned broadcast activity
SELECT query_text, dispatch_strategy, shards_contacted
FROM federated.query_audit_log
WHERE dispatch_strategy = 'broadcast'
  AND expected_strategy = 'targeted'
ORDER BY executed_at DESC
LIMIT 20;

Mitigation. Normalise predicate expressions in the routing rule pattern matcher. Reject queries at the gateway that do not include a resolvable shard key rather than silently broadcasting.


Partial Node Failure Returns Incomplete Aggregates Silently

Root cause. One leaf node times out. The coordinator returns a merged aggregate using only the responding nodes, with no indication to the client that the result set is incomplete.

Detection. Instrument coordinator responses with a X-Shard-Coverage header or equivalent metadata field:

def build_response(results: dict, total_shards: int) -> dict:
    responding = len([r for r in results.values() if r is not None])
    return {
        "data": merge_results(results),
        "meta": {
            "shard_coverage": f"{responding}/{total_shards}",
            "degraded": responding < total_shards
        }
    }

Mitigation. Surface degraded coverage to the client. For financial aggregations, fail hard rather than return incomplete totals.


Common Mistakes

  • Omitting pool_pre_ping on coordinator connections. When a leaf node restarts, the coordinator holds stale connections that error on first use. pool_pre_ping=True evicts them before they cause a request failure.

  • Not re-importing foreign table schemas after migrations. A column added to a leaf’s table is invisible to the coordinator until IMPORT FOREIGN SCHEMA is re-run. Filters on that column fall back to local evaluation, silently disabling pushdown.

  • Using broadcast for large tables. Broadcast is appropriate only for small reference data (product catalogs, config tables, lookup lists). Broadcasting against a 500 M-row orders table saturates coordinator memory and degrades unrelated queries sharing the connection pool.

  • Skipping distributed trace propagation. Without a consistent trace ID threaded from the client through the coordinator to every leaf, correlating a slow EXPLAIN ANALYZE output on a leaf with a specific client request is nearly impossible. Propagate OpenTelemetry span context in every sub-query header.


Monitoring & Debugging

Distributed trace IDs must propagate from the initial request through every execution node. Capture EXPLAIN ANALYZE at both coordinator and leaf nodes to identify pushdown regressions.

P99 sub-query latency by shard:

histogram_quantile(0.99,
  sum(rate(federated_subquery_duration_seconds_bucket[5m])) by (le, shard_id)
)

Shard fan-out ratio β€” how many shards are contacted per query (high values indicate broadcast leakage):

avg(federated_shards_contacted_per_query) by (routing_strategy)

Coordinator merge buffer usage:

federated_merge_buffer_bytes_used
  / federated_merge_buffer_bytes_limit

Alert when buffer utilisation exceeds 80 % β€” that signals an imminent OOM event under the next traffic spike.

For PostgreSQL FDW leaf nodes, verify remote statistics are current before enabling production traffic:

-- On the coordinator: check foreign table analyze age
SELECT foreign_table_name,
       last_analyze,
       n_live_tup
FROM information_schema.foreign_tables ft
JOIN pg_stat_user_tables st ON st.relname = ft.foreign_table_name
ORDER BY last_analyze NULLS FIRST;

FAQ

How does federated query execution differ from standard cross-partition aggregation?

Federated execution handles heterogeneous schemas and disparate engines β€” leaf nodes may be PostgreSQL, MySQL, a Redshift cluster, or an object store. Standard cross-shard aggregation assumes a uniform schema coordinated by a single database engine with shared catalog metadata.

What is the recommended timeout strategy for cross-node queries?

Implement tiered timeouts: a short per-node statement_timeout (6–10 s) at the leaf level, and a longer overall query timeout at the coordinator (15–30 s). Configure circuit breakers so a node that times out on three consecutive requests is temporarily removed from the routing pool. Return partial results with a degraded flag rather than blocking until the full timeout expires.

Can federated queries use existing database indexes on leaf nodes?

Yes, through pushdown. The coordinator translates global predicates into local index scans on each leaf. Verify by running EXPLAIN (VERBOSE) on the coordinator and checking that the Remote SQL block includes your WHERE clause. If the filter is absent from Remote SQL, the leaf is not using its index for that predicate β€” re-import the foreign schema and re-run ANALYZE on the foreign table.


Articles in This Section