Skip to content

Commit

Permalink
feat(chunked): add Flow.chunked as an alias to Flow.bufferCount.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoc081098 committed Oct 8, 2023
1 parent c626d2c commit 3758c62
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Change Log

## [Unreleased] - TODO

### Changed

- Update dependencies
- `Kotlin` to `1.9.10`.
- `Gradle` to `8.4`.

### Added

- Add `Flow.chunked` operator, it is an alias to `Flow.bufferCount` operator.

## [0.7.2] - Oct 7, 2023

### Changed
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,12 @@ dependencies {
- [`timer`](#timer)

- Intermediate operators
- [`bufferCount`](#buffercount)
- [`bufferCount`](#buffercount--chunked)
- [`combine`](#combine)
- [`cast`](#cast--castnotnull--castnullable--safeCast)
- [`castNotNull`](#cast--castnotnull--castnullable--safeCast)
- [`castNullable`](#cast--castnotnull--castnullable--safeCast)
- [`chunked`](#buffercount--chunked)
- [`safeCast`](#cast--castnotnull--castnullable--safeCast)
- [`concatWith`](#concatwith)
- [`startWith`](#startwith)
Expand Down Expand Up @@ -157,13 +158,14 @@ dependencies {
- [`throttleTime`](#throttletime)
- [`withLatestFrom`](#withlatestfrom)

#### bufferCount
#### bufferCount / chunked

- Similar to [RxJS bufferCount](https://rxjs.dev/api/operators/bufferCount)
- Similar
to [RxJava buffer](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-int-int-)

Buffers the source `Flow` values until the size hits the maximum `bufferSize` given.
Note, `chunked` is an alias to `bufferCount`.

```kotlin
range(start = 0, count = 10)
Expand Down
1 change: 1 addition & 0 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
public final class com/hoc081098/flowext/BufferCountKt {
public static final fun bufferCount (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun bufferCount (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
}

public final class com/hoc081098/flowext/CastKt {
Expand Down
26 changes: 23 additions & 3 deletions src/commonMain/kotlin/com/hoc081098/flowext/bufferCount.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,38 @@ package com.hoc081098.flowext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

/**
* This function is an alias to [bufferCount] operator.
*
* @see bufferCount
*/
public fun <T> Flow<T>.chunked(bufferSize: Int): Flow<List<T>> = bufferCount(bufferSize)

/**
* Buffers the source [Flow] values until the size hits the maximum [bufferSize] given.
*
* Returns a [Flow] that emits buffers of items it collects from the current [Flow].
* It emits connected, non-overlapping buffers, each containing [bufferSize] items.
* When the current [Flow] completes, the resulting [Flow] emits the current buffer
* and propagates the complete event from the current [Flow].
* Note that if the current [Flow] throws an exception,
* that exception is passed on immediately without first emitting the buffer it is in the process of assembling.
*
* @param bufferSize The maximum size of the buffer emitted.
*/
public fun <T> Flow<T>.bufferCount(bufferSize: Int): Flow<List<T>> = bufferExact(bufferSize)

/**
* Buffers the source [Flow] values until the size hits the maximum [bufferSize] given.
* Buffers a number of values from the source [Flow] by [bufferSize]
* then emits the buffer and clears it, and starts a new buffer each [startBufferEvery] values.
*
* When the current [Flow] completes, the resulting [Flow] emits active buffers
* and propagates the complete event from the current [Flow].
* Note that if the current [Flow] throws an exception,
* that exception is passed on immediately without first emitting the buffer it is in the process of assembling.
*
* @param bufferSize The maximum size of the buffer emitted.
* @param startBufferEvery Optional. Default is null.
* Interval at which to start a new buffer.
* @param startBufferEvery Interval at which to start a new buffer.
* For example if [startBufferEvery] is 2, then a new buffer will be started on every other value from the source.
* A new buffer is started at the beginning of the source by default.
*/
Expand Down

0 comments on commit 3758c62

Please sign in to comment.