Skip to content

Commit

Permalink
subscribeOn and observeOn examples
Browse files Browse the repository at this point in the history
  • Loading branch information
tootedom committed Sep 24, 2016
1 parent e7a421e commit 5f38c54
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 0 deletions.
129 changes: 129 additions & 0 deletions README.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,138 @@ action to the subscribe. Example:
assertFalse("should have thrown an exception",success.get());
assertTrue("should have thrown custom exception", failure.get());
}
----

'''

=== subscribeOn and observeOn

If there ever was a confusing subject for RxJava, this is it. `subscribeOn` is where the Observable's code will execute.
`observeOn` is where the Subscribers code will execute.

- http://reactivex.io/documentation/operators/subscribeon.html
- http://reactivex.io/documentation/operators/observeon.html

Lets say we are running on the `main` Thread and we execute this:

[source,java]
----
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
----

The output would be:

[source,text]
----
Supplier Value: main
subscription: main
----

The above means that the execution of the following all occur on the `main` thread:

- memcached lookup (get)
- executing the `Supplier<V>`
- memcached write (set)
- Calling the Subscribers code



If we now set `subscribeOn(Schedulers.io())` on the `Single<CacheItem<String>>` what this does is, excute the following
on the IO scheduler `RxIoScheduler`:

- memcached lookup (get)
- executing the `Supplier<V>`
- memcached write (set)
- Calling the Subscribers code


[source,java]
----
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val = val.subscribeOn(Schedulers.io());
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
----

You will be able to see the following in the output.

[source,text]
----
Supplier Value: RxIoScheduler-2
subscription: RxIoScheduler-2
----


If we now set `observeOn(Schedulers.computation())` on the `Single<CacheItem<String>>` what this does is, excute the following
on the IO scheduler `RxIoScheduler`:

- memcached lookup (get)
- executing the `Supplier<V>`
- memcached write (set)

And execute the following on the computation scheduler, `RxComputationScheduler`:

- Calling the Subscribers code

[source,java]
----
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val = val.subscribeOn(Schedulers.io());
val = val.observeOn(Schedulers.computation());
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
----

You will be able to see the following in the output.

[source,text]
----
Supplier Value: RxIoScheduler-2
subscription: RxComputationScheduler-1
----


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.junit.Test;
import rx.Single;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -328,6 +330,48 @@ public void testLazyCacheSet() {
assertEquals(0, memcached.getDaemon().getCache().getCurrentItems());
}



@Test
public void testCacheSetSubscribeOn() {

cache = new SpyObservableMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setWaitForRemove(Duration.ofMillis(0))
.setKeyPrefix(Optional.of("elastic"))
.buildMemcachedConfig()

);

CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val = val.subscribeOn(Schedulers.io());
val = val.observeOn(Schedulers.computation());

val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
assertEquals(1, memcached.getDaemon().getCache().getCurrentItems());


}


@Test
public void testLazyCacheGet() {

Expand Down

0 comments on commit 5f38c54

Please sign in to comment.