Skip to content

Transforming Observables

DavidMGross edited this page Nov 16, 2014 · 83 revisions

This section explains operators with which you can transform items that are emitted by an Observable.

  • map( ) — transform the items emitted by an Observable by applying a function to each of them
  • flatMap( ), concatMap( ), and flatMapIterable( ) — transform the items emitted by an Observable into Observables (or Iterables), then flatten this into a single Observable
  • switchMap( ) — transform the items emitted by an Observable into Observables, and mirror those items emitted by the most-recently transformed Observable
  • scan( ) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • groupBy( ) — divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key
  • buffer( ) — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • window( ) — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
  • cast( ) — cast all items from the source Observable into a particular type before reemitting them

map( )

transform the items emitted by an Observable by applying a function to each of them

The map( ) method applies a function of your choosing to every item emitted by an Observable, and returns this transformation as a new Observable. For example, the following code maps a function that squares the incoming value onto the values in numbers:

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.map({it * it}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
4
9
16
25
Sequence complete

scheduler

map( ) does not by default operate on any particular scheduler.

see also:


flatMap( ), concatMap( ) and flatMapIterable( )

Transform the items emitted by an Observable into Observables or Iterables, then flatten this into a single Observable

The flatMap( ) method creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.

This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.

// this closure is an Observable that emits three numbers
numbers   = Observable.from([1, 2, 3]);
// this closure is an Observable that emits two numbers based on what number it is passed
multiples = { n -> Observable.from([ n*2, n*3 ]) };   

numbers.flatMap(multiples).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
2
3
4
6
6
9
Sequence complete

If any of the individual Observables mapped to the items from the source Observable in flatMap( ) aborts by invoking onError, the flatMap( ) call itself will immediately abort and invoke onError.

Another version of flatMap (illustrated in the following marble diagram) creates (and flattens) a new Observable for each item and notification from the source Observable.

Another version combines items from the source Observable with the items emitted by an Observable triggered by those source items, and emits these combinations:

The flatMapIterable variants pair up source items and generated Iterables rather than source items and generated Observables, but otherwise work in much the same way.

Note that flatMap( ) may interleave the items emitted by the Observables that result from transforming the items emitted by the source Observable. If it is important that these items not be interleaved, you can instead use the similar concatMap( ) method:

scheduler

flatMap( ), concatMap( ), and flatMapIterable( ) do not by default operate on any particular scheduler.

see also:


switchMap( )

transform the items emitted by an Observable into Observables, and mirror those items emitted by the most-recently transformed Observable

The switchMap( ) operator is similar to the flatMap( ) and concatMap( ) methods described above, however, rather than emitting all of the items emitted by all of the Observables that the operator generates by transforming items from the source Observable, switchMap( ) instead emits items from each such transformed Observable only until the next such Observable is emitted, then it ignores the previous one and begins emitting items emitted by the new one.

scheduler

switchMap( ) does not by default operate on any particular scheduler.

see also:


scan( )

Apply a function to each item emitted by an Observable and emit each successive value

The scan( ) method returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. It emits the result of each of these iterations from the returned Observable. This sort of function is sometimes called an “accumulator.”

For example, the following code takes an Observable that emits a consecutive sequence of n integers starting with 1 and converts it into an Observable that emits the first n triangular numbers:

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.scan({ a, b -> a+b }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
3
6
10
15
Sequence complete

There is also a version of scan( ) to which you can pass a seed item in addition to an accumulator function:

my_observable.scan(initial_seed, accumulator_closure)

Note: if you pass a seed item to scan( ), it will emit the seed itself as its first item.

Note also that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your scan with null, and scan( ) will emit null as its first item.

It is a bad idea to use scan to collect emitted items into a mutable data structure. Instead, use collect for that purpose.

scheduler

scan( ) does not by default operate on any particular scheduler.

see also:


groupBy( )

divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key

The groupBy( ) method creates or extracts a key from all of the items emitted by a source Observable. For each unique key created in this way, groupBy( ) creates an Observable of the subclass GroupedObservable that emits all of the items emitted by the source Observable that match that key. groupBy( ) then emits each of these GroupedObservable items. Each Observable of this subclass has a method, getKey( ) with which you can retrieve the key that defined the GroupedObservable.

The following sample code uses groupBy( ) to transform a list of numbers into two lists, grouped by whether or not the numbers are even:

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };

numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete

There is also a version of this operator that adds another parameter: an Observable that emits duration markers. When a duration marker is emitted by this Observable, any grouped Observables that have been opened are closed, and groupBy( ) will create new grouped Observables for any subsequent emissions by the source Observable.

Another variety of groupBy( ) limits the number of groups that can be active at any particular time. If an item is emitted by the source Observable that would cause the number of groups to exceed this maximum, before the new group is emitted, one of the existing groups is closed (that is, the Observable it represents terminates by calling its Subscribers' onCompleted methods and then expires).

Note that when groupBy( ) splits up the source Observable into an Observable that emits GroupedObservables, each of these GroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of these GroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring a GroupedObservable that you have no interest in observing, you should instead apply an operator like take(0) to it as a way of signalling to it that it may discard its buffer.

If you unsubscribe from one of the GroupedObservables, that GroupedObservable will be terminated. If the source Observable later emits an item whose key matches the GroupedObservable that was terminated in this way, groupBy( ) will create and emit a new GroupedObservable to match the key.

scheduler

groupBy( ) does not by default operate on any particular scheduler.

see also:


buffer( )

periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time

The buffer( ) method periodically gathers items emitted by a source Observable into bundles, and emits these bundles as its own emissions.

Note that if the source Observable issues an onError notification, buffer( ) will pass on this notification immediately without first emitting the buffer it is in the process of assembling, even if that buffer contains items that were emitted by the source Observable before it issued the error notification.

There are a number of ways with which you can regulate how buffer( ) gathers items from the source Observable into bundles:

  • buffer(bufferOpenings, closingSelector)

This version of buffer( ) monitors an Observable, bufferOpenings, that emits BufferOpening objects. Each time it observes such an emitted object, it creates a new bundle to begin collecting items emitted by the source Observable and it passes the bufferOpenings Observable into the closingSelector function. That function returns an Observable. buffer( ) monitors that Observable and when it detects an emitted object, it closes its bundle and emits it as its own emission.

  • buffer(count)

This version of buffer( ) emits a new bundle of items for every count items emitted by the source Observable.

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);

numbers.buffer(3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2, 3]
[4, 5, 6]
[7, 8]
  • buffer(count, skip)

This version of buffer( ) create a new bundle of items for every skip item(s) emitted by the source Observable, each containing count elements. If skip is less than count this means that the bundles will overlap and contain duplicate items. For example, compare the following two uses of buffer( ) on the same sequence:

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);

numbers.buffer(2,3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
numbers.buffer(3,2).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2]
[4, 5]
[7, 8]
Sequence complete
[1, 2, 3]
[3, 4, 5]
[5, 6, 7]
[7, 8]
Sequence complete
  • buffer(timespan) and buffer(timespan, scheduler)

This version of buffer( ) emits a new bundle of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission.

  • buffer(timespan, count) and buffer(timespan, count, scheduler)

This version of buffer( ) emits a new bundle of items for every count items emitted by the source Observable, or, if timespan has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is less than count.

  • buffer(timespan, timeshift) and buffer(timespan, timeshift, scheduler)

This version of buffer( ) creates a new bundle of items every timeshift, and fills this bundle with every item emitted by the source Observable from that time until timespan time has passed since the bundle's creation, before emitting the bundle as its own emission. If timespan is longer than timeshift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items.

scheduler

Those variants of buffer( ) that use timespans by default operate on the computation scheduler and also have variants that allow you to select which scheduler to use by passing it in as a parameter. The other variants of buffer( ) do not operate by default on any particular scheduler.

see also:


window( )

periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Window is similar to buffer( ), but rather than emitting packets of items from the original Observable, it emits Observables, each one of which emits a subset of items from the original Observable and then terminates with an onCompleted( ) call.

Like buffer( ), window( ) has many varieties, each with its own way of subdividing the original Observable into the resulting Observable emissions, each one of which contains a "window" onto the original emitted items. In the terminology of the window( ) method, when a window "opens," this means that a new Observable is emitted and that Observable will begin emitting items emitted by the source Observable. When a window "closes," this means that the emitted Observable stops emitting items from the source Observable and calls its Subscribers' onCompleted( ) method and terminates.

  • window(source, closingSelector)

This version of window( ) opens its first window immediately. It closes the currently open window and immediately opens a new one each time it observes an object emitted by the Observable that is returned from closingSelector. In this way, this version of window( ) emits a series of non-overlapping windows whose collective onNext( ) emissions correspond one-to-one with those of the source Observable.

  • window(source, windowOpenings, closingSelector)

This version of window( ) opens a window whenever it observes the windowOpenings Observable emit an Opening object and at the same time calls closingSelector to generate a closing Observable associated with that window. When that closing Observable emits an object, window( ) closes that window. Since the closing of currently open windows and the opening of new windows are activities that are regulated by independent Observables, this version of window( ) may create windows that overlap (duplicating items from the source Observable) or that leave gaps (discarding items from the source Observable).

  • window(source, count)

This version of window( ) opens its first window immediately. It closes the currently open window and immediately opens a new one whenever the current window has emitted count items. It will also close the currently open window if it receives an onCompleted( ) or onError( ) call from the source Observable. This version of window( ) emits a series of non-overlapping windows whose collective onNext( ) emissions correspond one-to-one with those of the source Observable.

  • window(source, count, skip)

This version of window( ) opens its first window immediately. It opens a new window beginning with every skip item from the source Observable (e.g. if skip is 3, then it opens a new window starting with every third item). It closes each window when that window has emitted count items or if it receives an onCompleted( ) or onError( ) call from the source Observable. If skip = count then this behaves the same as window(source, count); if skip < count this will emit windows that overlap by count - skip items; if skip > count this will emit windows that drop skip - count items from the source Observable between every window.

  • window(source, timespan, unit) and window(source, timespan, unit, scheduler)

This version of window( ) opens its first window immediately. It closes the currently open window and opens another one every timespan period of time (measured in unit, and optionally on a particular scheduler). It will also close the currently open window if it receives an onCompleted( ) or onError( ) call from the source Observable. This version of window( ) emits a series of non-overlapping windows whose collective onNext( ) emissions correspond one-to-one with those of the source Observable.

  • window(source, timespan, unit, count) and window(source, timespan, unit, count, scheduler)

This version of window( ) opens its first window immediately. It closes the currently open window and opens another one every timespan period of time (measured in unit, and optionally on a particular scheduler) or whenever the currently open window has emitted count items. It will also close the currently open window if it receives an onCompleted( ) or onError( ) call from the source Observable. This version of window( ) emits a series of non-overlapping windows whose collective onNext( ) emissions correspond one-to-one with those of the source Observable.

  • window(source, timespan, timeshift, unit) and window(source, timespan, timeshift, unit, scheduler)

This version of window( ) opens its first window immediately, and thereafter opens a new window every timeshift period of time (measured in unit, and optionally on a particular scheduler). It closes a currently open window after timespan period of time has passed since that window was opened. It will also close any currently open window if it receives an onCompleted( ) or onError( ) call from the source Observable. Depending on how you set timespan and timeshift the windows that result from this operation may overlap or have gaps.

scheduler

Those variants of window( ) that use timespans by default operate on the computation scheduler and also have variants that allow you to select which scheduler to use by passing it in as a parameter. The other variants of window( ) do not operate by default on any particular scheduler.

see also:


cast( )

cast all items from the source Observable into a particular type before reemitting them

scheduler

cast( ) does not by default operate on any particular scheduler.

see also:

Clone this wiki locally