RxJava Backpressure and why should you care?

What is Backpressure?

Backpressure is simply the process of handling a fast producer. In the case of an Observable producing 1 million items per second, how can a subscriber that can only process 100 items per second handle these items? The Observable class has an unbounded buffer size, which means it will buffer everything and pushes it to the subscriber, that’s where we get the OutOfMemoryException.
By applying Backpressure onto a stream, it’ll be possible to handle items as needed, unnecessary items can be discarded, or even let the producer know when to create and push the new items.

What is the problem?

bigTable.selectAll() // or a hot observable like mouse movement
  .observeOn(yourSchedular())
  .subscribe(data -> { doSomethingHere(data) });
A Fast Producer

The problem with the code above is, it’s fetching all the rows from the database and pushing it downstream, which results in high memory usage because it buffers all the data into memory. Do we want all of the data? Yes! but do we need all of it at once? No

I see this kind of usage in lots of projects, I’m pretty sure most of us have done something like this before, even though knowing something is wrong, querying a database and assuming there won’t be lots of data, then it will and we end up with poor performance in production. If we are lucky enough we’ll get an `OutOfMemoryException` but most of the time the app behaves slow and sluggish.

What is the solution?

Backpressure to rescue!! back in RxJava 1, the Observable class was responsible for the backpressure of streams, since RxJava 2 there is a separate class for handling the backpressure, Flowable.

How to create a Flowable?

There are multiple ways for creating a backpressure stream:

1. Convert the Observable to Flowable with the toFloawable() method

Observable.range(1, 1_000_000).toFlowable(BackpressureStrategy.Drop)

With the Drop strategy, the downstream doesn’t get all the one million items, it gets the items as it handles the previous items.

example output:
1
2
3
...
100
101
drops some of the items here
100,051
100,052
100,053
...
drops again
523,020
523,021
...
and so on

Note: If you subscribe without changing the schedular you will get the whole one million items, since it’s synchronous the producers are blocked by the subscriber.

BackpressureStrategy

There are a few backpressure strategies:

  • Drop: Discards the unrequested items if it exceeds the buffer size
  • Buffer: Buffers all the items from the producer, watch for OOMs
  • Latest: Keeps only the most recent item
  • Error: throws a MissingBackpressureException in case of over emission
  • Missing: No strategy, it would throw a MissingBackpressureException sooner or later somewhere on the downstream

2. Use the Flowable.create() factory method:

We won’t get much more functionality than toFlowable() here. let’s skip this one.

3. Use the Flowable.generate() factory method:

This is what we were looking for, the generate() method has a few overloads, this one does what we need.

Flowable.generate(
        () -> 0, //initial state
        (state, emitter) -> { //current state
            emitter.onNext(state);
            return current + 1; // next state
        }
)
This code generates a stream of positive numbers: 0,1,2,3,4,5,6,…

The first parameter is a Callable to return an initial state. The second one is a BiFunction which gets called upon on every request to create a new item, its parameters are the current state and an emitter. So let’s apply it to our database code:

Flowable<List<Data>> select(int page, int pageSize) {
    return Flowable.generate(
            () -> page, // initial page
            (currentPage, emitter) -> {
                emitter.onNext(table.select("Select * From myTable LIMIT $pageSize OFFSET ${page * pageSize}"));
                return currentPage + 1; // next page
            });
}
Why there is no string templating in recent java releases, java 9, 10, 11?!! WTH Java

Now we can call it like this:

myTable.select(1, 10)
    .map(/** mapping **/)
    .flatMap(items -> {}, 1)// <== 1 indicates how many concurrent task should be executed
    // observeOn uses a default 128 buffer size so we overwrite it
    .observeOn(Schedulers.single(), false, 1) 
    .subscribe(new DefaultSubscriber<List<Data>>() {
        @Override
        protected void onStart() {
            // super.onStart(); the the default implementation requests Long.MAX_VALUE
            request(1);
        }

        @Override
        public void onNext(List<Data> data) {
            doSomethingHere(data);
            request(1); // if you want more data
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
Select page with BackPressure Flowable

That’s all there’s to it. 😃

Conclusion

In this post, we learned how not to flood the precious memory with a stream of unneeded items. We learned how to backpressure a stream and how to create a backpressure stream with the help of the Flowable class. Hope you liked it and would use it to create better apps.

If you have any questions feel free to write a comment below and don’t forget to subscribe to my blog.

The motivation behind this post was this StackOverflow question I answered a while ago.

OutOfMemoryException when sending high volume POSTS with retrofit2 and rx java2
I have an app with a local db (room) and a service that POSTs all the “events” from the database using retrofit 2 and rxjava. When I send a high volume of POSTs (ie 1500+), the app throws an

Resources:

Backpressure (2.0) · ReactiveX/RxJava Wiki
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - Backpressure (2.0) · ReactiveX/RxJava Wiki