What is Backpressure?
Backpressure is simply the process of handling a fast producer. In the case of an Observable producing 1 million items per second, how can a subscriber that can only process 100 items per second handle these items? The Observable
class has an unbounded buffer size, which means it will buffer everything and pushes it to the subscriber, that’s where we get the OutOfMemoryException
.
By applying Backpressure onto a stream, it’ll be possible to handle items as needed, unnecessary items can be discarded, or even let the producer know when to create and push the new items.
What is the problem?
The problem with the code above is, it’s fetching all the rows from the database and pushing it downstream, which results in high memory usage because it buffers all the data into memory. Do we want all of the data? Yes! but do we need all of it at once? No
I see this kind of usage in lots of projects, I’m pretty sure most of us have done something like this before, even though knowing something is wrong, querying a database and assuming there won’t be lots of data, then it will and we end up with poor performance in production. If we are lucky enough we’ll get an `OutOfMemoryException` but most of the time the app behaves slow and sluggish.
What is the solution?
Backpressure to rescue!! back in RxJava 1, the Observable
class was responsible for the backpressure of streams, since RxJava 2 there is a separate class for handling the backpressure, Flowable
.
How to create a Flowable?
There are multiple ways for creating a backpressure stream:
1. Convert the Observable to Flowable with the toFloawable()
method
Observable.range(1, 1_000_000).toFlowable(BackpressureStrategy.Drop)
With the Drop strategy, the downstream doesn’t get all the one million items, it gets the items as it handles the previous items.
example output:
1
2
3
...
100
101
drops some of the items here
100,051
100,052
100,053
...
drops again
523,020
523,021
...
and so on
Note: If you subscribe without changing the schedular you will get the whole one million items, since it’s synchronous the producers are blocked by the subscriber.
BackpressureStrategy
There are a few backpressure strategies:
- Drop: Discards the unrequested items if it exceeds the buffer size
- Buffer: Buffers all the items from the producer, watch for OOMs
- Latest: Keeps only the most recent item
- Error: throws a
MissingBackpressureException
in case of over emission - Missing: No strategy, it would throw a
MissingBackpressureException
sooner or later somewhere on the downstream
2. Use the Flowable.create()
factory method:
We won’t get much more functionality than toFlowable()
here. let’s skip this one.
3. Use the Flowable.generate()
factory method:
This is what we were looking for, the generate()
method has a few overloads, this one does what we need.
The first parameter is a Callable
to return an initial state. The second one is a BiFunction
which gets called upon on every request to create a new item, its parameters are the current state and an emitter. So let’s apply it to our database code:
Now we can call it like this:
That’s all there’s to it. 😃
Conclusion
In this post, we learned how not to flood the precious memory with a stream of unneeded items. We learned how to backpressure a stream and how to create a backpressure stream with the help of the Flowable
class. Hope you liked it and would use it to create better apps.
If you have any questions feel free to write a comment below and don’t forget to subscribe to my blog.
The motivation behind this post was this StackOverflow question I answered a while ago.