Skip to content

Commit

Permalink
impl first_value with custom state
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 11, 2024
1 parent ba161b7 commit 7e65cb2
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions src/expr/impl/src/aggregate/first_last_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Datum, DatumRef, ScalarRefImpl};
use risingwave_common::types::{Datum, ScalarRefImpl};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::aggregate;
use risingwave_expr::aggregate::AggStateDyn;

/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values.
///
Expand All @@ -32,9 +34,32 @@ use risingwave_expr::aggregate;
/// statement ok
/// drop table t;
/// ```
#[aggregate("first_value(*) -> auto", state = "ref")]
fn first_value<T>(state: Option<T>, _: Option<T>) -> Option<T> {
state
#[aggregate("first_value(any) -> any")]
fn first_value(state: &mut FirstValueState, input: Option<ScalarRefImpl<'_>>) {
if state.0.is_none() {
state.0 = Some(input.map(|x| x.into_scalar_impl()));
}
}

#[derive(Debug, Clone, Default)]
struct FirstValueState(Option<Datum>);

impl EstimateSize for FirstValueState {
fn estimated_heap_size(&self) -> usize {
self.0.estimated_heap_size()
}
}

impl AggStateDyn for FirstValueState {}

impl From<&FirstValueState> for Datum {
fn from(state: &FirstValueState) -> Self {
if let Some(state) = &state.0 {
state.clone()
} else {
None
}
}
}

/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values.
Expand All @@ -54,7 +79,7 @@ fn first_value<T>(state: Option<T>, _: Option<T>) -> Option<T> {
/// statement ok
/// drop table t;
/// ```
#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(): `last_value(any) -> any`
#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(rc): `last_value(any) -> any`
fn last_value<T>(_: Option<T>, input: Option<T>) -> Option<T> {
input
}
Expand Down

0 comments on commit 7e65cb2

Please sign in to comment.