You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This RFC proposes an enhancement to OpenSearch's aggregation framework through the integration of the newly introduced streaming transport capabilities. The enhancement transitions the existing aggregation model to a streaming paradigm, where partial aggregation results transmitted continuously to the coordinator. This approach redistributes memory load from data nodes to coordinator nodes, resulting in improved cluster stability and resource utilization. Furthermore, this enhancement facilitates future horizontal scaling of aggregation computations through the introduction of intermediate processing workers.
Challenge
The existing aggregation framework distributes requests to data nodes that hold relevant shards. Each data node must maintain partial aggregation results in memory until processing completes, creating several operational challenges:
Memory Constraints: Data nodes must maintain substantial in-memory structures for partial aggregation results, particularly when performing terms aggregations on high-cardinality fields. This forces users to either restrict query scope or over-provision hardware resources on data nodes.
Garbage Collection Overhead: Large in-memory data structures trigger frequent garbage collection cycles, consuming CPU resources and degrading performance of other operations on data nodes. This can create a cascading effect: GC pauses extend response times, causing request queuing, which further increases memory demands.
Resource Contention: Memory-intensive aggregations compete with other critical operations for resources on data nodes, leading to unpredictable cluster performance and potential service degradation.
Opportunity
The proposed streaming model eliminates the need for data nodes to accumulate results by implementing controlled streaming of partial results. This transformation offers several benefits:
Liberation of Data Nodes: The streaming model caps peak memory usage on data nodes to the configured streaming buffer size, providing predictable resource allocation and improved isolation between operations.
Enhanced Stability: Stream processing prevents data nodes from becoming overwhelmed by sudden spikes in aggregation workloads of unpredictable memory patterns.
Flexible and Cost-Effective Scaling: This change enables independent scaling of coordinator fleet, which typically comprises less than 10% of the cluster and often under-utilized in terms of heap usage. For example, you can vertically scale coordinators to handle aggregations with memory-optimized hardware with big heap, while reduce heap on data nodes where page cache is more important. This would make resource utilization more balanced across the cluster, improving overall performance efficiency.
Proposed Solution
Stream Producer (Data Node)
Implement Arrow-based memory representation for partial aggregation buckets
Replace bulk partial result accumulation with incremental streaming to coordinator
Stream Consumer (Coordinator Node)
Implement a streaming merger to efficiently buffer and merge streamed partial results
Back-pressure mechanisms to prevent overwhelming the coordinator
Execution Planner (Coordinator Node)
Refactor the existing routing and transport code to suit for streaming communication
Adaptive smart per-request buffer sizing based on statistics like system load, throughput and latency
---
title: Data Flow Diagram
---
flowchart TB
%% Legend
subgraph Legend["Legend"]
direction LR
NewComponent["New"] ~~~ OldComponent["Existing"]
style NewComponent fill:#bbf,stroke-width:0px,font-size:9pt,width:30px,height:30px
style OldComponent fill:#bfb,stroke-width:0px,font-size:9pt,width:30px,height:30px
end
subgraph Coordinator["Coordinator Node"]
QueryPlanner["Execution Planner"]
StreamConsumer["Stream Consumer"]
StreamMerger["Stream Merger"]
end
subgraph DataNode1["Data Node 1"]
Scanner1["Searcher"]
LocalAgg1["Aggregator"]
Stream1["Stream Producer"]
end
subgraph DataNode2["Data Node 2"]
Scanner2["Searcher"]
LocalAgg2["Aggregator"]
Stream2["Stream Producer"]
end
%% Query Flow
Query["Aggregation Query"] --> QueryPlanner
QueryPlanner -->|shard request| Scanner1
QueryPlanner -->|shard request| Scanner2
%% Data Node 1 Flow
Scanner1 --> LocalAgg1
LocalAgg1 -->|results in arrow format| Stream1
Stream1 --> StreamConsumer
%% Data Node 2 Flow
Scanner2 --> LocalAgg2
LocalAgg2 -->|results in arrow format| Stream2
Stream2 --> StreamConsumer
%% Coordinator Processing
StreamConsumer --> StreamMerger
StreamMerger --> Result["Final Result"]
%% Styling
classDef new fill:#bbf,stroke:#333,stroke-width:2px
classDef old fill:#bfb,stroke:#333,stroke-width:2px
class QueryPlanner,StreamMerger,StreamConsumer new
class Stream1,Stream2 new
class Scanner1,LocalAgg1,Scanner2,LocalAgg2 old
class Query,Result old
Loading
Compatibility Considerations
We plan to start with terms bucket aggregation and stats metric aggregation, and evaluate the approach before extending this to more aggregation types.
Streaming aggregation works within cluster between nodes and should be compatible with existing aggregation APIs
Provides configuration options to enable/disable streaming per request for users to compare with old aggregation implementation
Success Criteria
We plan to work on terms bucket aggregation and stats metric aggregation first.
We plan to use nyc_taxis and big5 dataset and simulated real-world workload of queries with aggregations to benchmark
Memory usage and GC pause time reduction on data nodes under heavy aggregation load (target: 30~50% reduction)
Improved latency for query with streamed aggregation (target: 30~50% improvement)
Enhanced cluster stability under heavy aggregation loads (target: 100% improvement on red-line QPS)
Abstract
This RFC proposes an enhancement to OpenSearch's aggregation framework through the integration of the newly introduced streaming transport capabilities. The enhancement transitions the existing aggregation model to a streaming paradigm, where partial aggregation results transmitted continuously to the coordinator. This approach redistributes memory load from data nodes to coordinator nodes, resulting in improved cluster stability and resource utilization. Furthermore, this enhancement facilitates future horizontal scaling of aggregation computations through the introduction of intermediate processing workers.
Challenge
The existing aggregation framework distributes requests to data nodes that hold relevant shards. Each data node must maintain partial aggregation results in memory until processing completes, creating several operational challenges:
Opportunity
The proposed streaming model eliminates the need for data nodes to accumulate results by implementing controlled streaming of partial results. This transformation offers several benefits:
Proposed Solution
Stream Producer (Data Node)
Stream Consumer (Coordinator Node)
Execution Planner (Coordinator Node)
Compatibility Considerations
We plan to start with terms bucket aggregation and stats metric aggregation, and evaluate the approach before extending this to more aggregation types.
Success Criteria
We plan to work on terms bucket aggregation and stats metric aggregation first.
We plan to use nyc_taxis and big5 dataset and simulated real-world workload of queries with aggregations to benchmark
Call for Feedback
We welcome community feedback on:
References
The text was updated successfully, but these errors were encountered: