I've been building Android apps for a few years now. You learn to cope. Background thread for the database. Another one for the API. Callbacks everywhere. AsyncTasks that weren't actually async. Handlers and Looper. Loaders that were too rigid and died with the activity.
Every project ended up with its own ad-hoc solution. It worked, mostly, but it never felt right.
The Problem
Here's what a typical data fetch looked like:
new AsyncTask<Void, Void, User>() {
@Override
protected User doInBackground(Void... params) {
return userApi.getUser(userId);
}
@Override
protected void onPostExecute(User user) {
nameText.setText(user.getName());
emailText.setText(user.getEmail());
}
}.execute();
Seems fine. Except:
- No way to chain operations cleanly
- Error handling is manual and easy to forget
- Good luck canceling mid-flight
- Rotate the device and hope the Activity is still alive
- Need database first, then API? Enjoy the callback hell
Enter Observables
RxJava gives you Observable. A stream of data that you can transform, combine, and subscribe to:
Observable<User> userObservable = api.getUser(userId);
The chaining is where it gets useful.
Database → Network → UI
Say you need to check local cache, fetch from API if stale, save to DB, then show in UI:
Observable<User> cachedUser = database.getUser(userId);
Observable<User> freshUser = api.getUser(userId)
.doOnNext(user -> database.saveUser(user))
.onErrorResumeNext(throwable -> cachedUser);
Observable.concat(cachedUser, freshUser)
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::displayUser, this::showError);
That chain does four things — cache, network, save, UI — in one readable stream. No callbacks inside callbacks.
Here's what each part does:
cachedUseremits from local databasefreshUserhits the API and saves to DB on successonErrorResumeNextfalls back to cache if network failsconcatruns them in order — cache first, then networkfirst()takes only the first successful emission
Operators
map — transform
api.getUser(userId)
.map(user -> user.getName())
.subscribe(name -> textView.setText(name));
flatMap — chain operations
api.getUser(userId)
.flatMap(user -> api.getPosts(user.getId()))
.subscribe(posts -> adapter.setPosts(posts));
filter — skip items
api.getAllUsers()
.filter(user -> user.getAge() >= 18)
.subscribe(this::displayUser);
zip — combine sources
Observable<User> user = api.getUser(userId);
Observable<List<Post>> posts = api.getPosts(userId);
Observable.zip(user, posts, (u, p) -> new UserWithPosts(u, p))
.subscribe(this::displayUserWithPosts);
Android Lifecycle
RxAndroid gives you AndroidSchedulers.mainThread(). That's the main thing it adds — a scheduler that runs on the UI thread.
The real problem is memory leaks. If your Observable is still emitting after the Activity is destroyed, boom. CompositeDisposable fixes this:
public class UserActivity extends AppCompatActivity {
private CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Disposable d = api.getUser(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::displayUser, this::showError);
disposables.add(d);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
}
Clear the disposables in onDestroy and you're safe.
Thoughts
Is it verbose? Sometimes. Does it beat the alternative? For anything beyond a single API call, yes. Once you start thinking in streams, it clicks.
There's also RxBinding from Jake Wharton if you want to treat UI events as streams:
RxView.clicks(button)
.flatMap(v -> api.getData())
.subscribe(this::updateUI);
It's a different way to think about UI. Not better or worse — just different. RxJava 2 is in the works with Flow, which might change things further. But these ideas — streams, operators, threading — they're worth understanding regardless of which reactive library you end up using.