Front-End Web & Mobile
Using RxJava with AWS’ Amplify Android Library
This article was written by Jameson Williams, Senior Software Engineer, AWS.
Today I’m going to talk about Reactive Extensions (RxJava), and how you can use them with AWS’ Amplify Framework. This blog is mostly geared towards readers who are new to “Rx.”
The Amplify Android library is AWS’ preferred means for interacting with AWS from an Android device. Most methods in Amplify are asynchronous in that they return immediately, rendering results at a later time via callbacks. We designed Amplify this way to be consistent with Android’s event-driven programming model. Even the very first Android platform hook you ever learned, Activity
’s onCreate(), is an example of an async callback.
It has been almost 12 years since Android 1.0 was released. In that time, Android developers have built increasingly complex applications. As Android developers, we have encountered conceptual challenges with the platform’s async programming model. Concurrency, thread-safety, and sequencing of dependent operations are difficult problems. Many of us have encountered a phenomenon known as “callback hell,” resulting from deeply nested, dependent, async calls.
So, Why RxJava?
Android’s original platform language, Java, is almost 25 years old. While Java continues to evolve and adapt to modern programming paradigms in its most recent versions, Android’s flavor of Java largely has not. Even today, Android supports only a subset of Java 8’s functionality. To contextualize, Java 8 has been fully supported by the Oracle JVM since 2014, and the most recent version is Java 14.
The last quarter-century has seen significant evolution in the design of programming languages. In the Java ecosystem, we now have entire books dedicated to avoiding the pitfalls of Java’s early incantations. Meanwhile, today’s contemporary generation of programming languages — Dart, Kotlin, Swift, Rust, Golang and others — have been built with concurrency and/or functional programming at front of mind.
RxJava effectively back-ports some of that thinking into Android’s enfeebled Java ecosystem. For that reason, RxJava has become one of the most popular Android libraries — something of a de-facto standard for Android codebases written in Java. Unlike Android’s Java 8 support, RxJava is broadly backwards-compatible with older versions of the Android RunTime (and even its predecessor, Dalvik).
Rx Primitives & Platform Comparison
It isn’t only that RxJava fills a void in the Android-Java ecosystem. Functional and reactive programming concepts have been finding their way into all front-end platforms in recent years. Here, I’ll introduce some of the key Rx primitives and discuss analogies to other platforms, noting constructs you may have encountered before.
Singles
Futures and Promises will be familiar to many of you.
JavaScript began including Promise
s as of ECMAScript6. A Promise
models an asynchronous behavior which terminates one of two ways:
- by rendering a result value, or
- by emitting an error.
Facebook’s React Native framework, based on JavaScript (well, TypeScript) makes prominent use of Promises
.
A Future
is a very similar construct that is found in Dart (language of Flutter), Java 8, and iOS Combine. Android does include a CompletableFuture implementation, but it only runs on fairly recent versions of the operating system (Android API levels >= 24).
In Rx, the basic equivalent of this construct is called a Single
. In addition to modeling asynchronous behavior as a blocking API like Future
s and Promise
s do, Single
s offer a bunch of functional programming utilities to map, filter, and otherwise manipulate a result, once it becomes available.
Observables
The Observable is really the flagship of Rx.
Whereas a Single models an operation that completes by yielding a single value, an Observable models an operation that emits an arbitrary number of values. An observable can emit 0..n values, after which it either completes (with no value), or terminates with an error. This behavior is codified in the Observable Contract.
Observables have fewer analogs. But, iOS Combine’s Publisher
is pretty close. The Swift Signals
library, which creates a pub/sub channel for events, is vaguely related, though it does not strongly model termination of a stream like an Observable
does.
Completables
The third and final primitive I’ll mention is the Completable.
So far, we can model behaviors that produce a single result, and behaviors that produce an arbitrary number. But what if we just want to know if some work is done? That is to say, we want some signal that says “I’m done”, even though there’s no explicit result data rendered. That’s called a Completable.
The Completable is probably my favorite, since it’s the simplest conceptually, and is pretty unique to Rx. A Promise
or a Future
that emits a void/sentinel value might be similar, though less elegant due to the extra, purposeless value.
Let’s Use it with Amplify, Already ?♀️
Let’s suppose we’re using the Amplify DataStore (see Getting Started guide.) And, let’s suppose our GraphQL schema includes a data @model
for a BlogPost
:
type BlogPost @model {
id: ID!
title: String!
linksTo: [String!]
}
Using Amplify’s default interface, you could save a BlogPost to the DataStore, and inspect the results of the operation:
BlogPost blogPost = BlogPost.builder()
.title("Draft of Rx Blog")
.linksTo(Collections.singletonList("www.reactive-streams.org/"))
.build();
// Time t1
Amplify.DataStore.save(blogPost,
// Time t3
saveInfo -> Log.i(TAG, "Saved a blog post.", saveInfo),
failure -> Log.e(TAG, "Failed to save.", failure)
);
// Time t2
At time (1), the save(...)
method is executed. At time (2), the next statements in your program begin executing. Some time in the future, one of the callbacks are invoked, with the results of the save(...)
operation, at time (3).
The Rx version of this looks remarkably similar:
// Time t1
RxAmplify.DataStore.save(blogPost).subscribe(
// Time t3
saveInfo -> Log.i(TAG, "Saved a blog post.", saveInfo),
failure -> Log.e(TAG, "Failed to save.", failure)
);
// Time t2
Note the use of RxAmplify
, instead of Amplify
, and the addition of the .subscribe(...)
method, to consume the results.
Even though the two call patterns look remarkably similar, the Rx example is quite different under the hood. Let’s tear both apart a bit, to understand what’s going on. Only for the purpose of demonstrating these differences, we’ll drop the lambda expressions, for a moment. (If you’re new to lambda expressions, see my Gentle Introduction to Lambda Expressions in Java.)
Under the hood, the first example expands to:
// The Amplify facade returns an interface
DataStoreCategoryBehavior datastore = Amplify.DataStore;
// The save() method accepts anonymous callback functions
// as its arguments. These may be expressed as lambdas.
datastore.save(blogPost, new Consumer<DataStoreItemChange<T>> {
@Override
public void accept(DataStoreItemChange<T> saveInfo) {
Log.i(TAG, "Saved a blog post.", saveInfo);
}
}, new Consumer<DataStoreException>() {
@Override
public void accept(DataStoreException failure) {
Log.e(TAG, "Failed to save.", failure);
}
});
Whereas, in the Rx case, the equivalent is:
// The Rx Amplify facade returns *a different* interface.
RxDataStoreCategoryBehavior datastore = RxAmplify.DataStore;
// The save operation is represented as an Rx Completable
Completable saveOperation = datastore.save(blogPost);
// We can subscribe to the operation, to check its ongoing status.
Disposable disposable = saveOperation.subscribe(new Action() {
@Override
public void call() {
Log.i(TAG, "Saved a blog post.", saveInfo);
}
}, new Consumer<DataStoreException> {
@Override
public void accept(DataStoreException failure) {
Log.e(TAG, "Failed to save.", failure);
}
});
// To stop observing the status, we dispose the disposable.
disposable.dispose();
Indeed, the Amplify library modeled its callback types (Consumer
, Action
) directly from the similarly named constructs in Rx. In the vanilla Amplify example, the callback types come from us. In the Rx example, those are Rx types.
Amplify’s vanilla signatures generally require both callbacks to be provided. Conveniently in Rx, that is not the case. There are a bunch of overloads for .subscribe(...)
and you can provide only the callback hooks you care about. There’s event a .subscribe()
which doesn’t need any callbacks — it will just perform the operation’s work, but not act on its output.
A More Complex Example
If all you ever had to do was make a single call to Amplify.DataStore.save(...)
, you probably wouldn’t bother with any of this. So, let’s try something a little more intricate.
Let’s try to sign in a user, then get a list of all their blog posts. We’ll sort the list by title, join the titles, and display all of the titles to the UI.
With the Rx bindings, this is relatively straight-forward:
RxAmplify.Auth.signIn("joe", "koolpass")
.ignoreElement()
.andThen(RxAmplify.API.query(ModelQuery.list(BlogPost.class)))
.map(GraphQLResponse::getData)
.flatMapObservable(Observable::fromIterable)
.map(BlogPost::getTitle)
.toSortedList()
.map(list -> TextUtils.join(",", list))
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(text ->
Toast.makeText(context, text, Toast.LENGTH_SHORT)
.show()
);
For comparison, here’s the equivalent without Rx:
Handler handler = new Handler(Looper.getMainLooper());
Amplify.Auth.signIn("joe", "koolpass", signInResult -> {
Amplify.API.query(ModelQuery.list(BlogPost.class), queryResults -> {
List<String> titles = new ArrayList<>();
for (BlogPost post : queryResults.getData()) {
titles.add(post.getTitle());
}
Collections.sort(titles);
String titleString = TextUtils.join(",", titles);
handler.post(() ->
Toast.makeText(context, titleString, Toast.LENGTH_SHORT)
.show()
);
}, failure -> {});
}, failure -> {});
Rx Equivalents of Amplify APIs
The Rx Bindings have been made available for Auth, Storage, Predictions, DataStore, API and Hub. The mapping between Amplify methods and their Rx equivalents are governed by a few simple rules:
- For Amplify methods that have a
void
return type (or return an operation that isn’t cancelable) the Rx equivalent returns a Rx primitive (Completable
,Single
, orObservable
) directly. - For Amplify methods that return a cancelable operation, the Rx equivalent will return a simple operation structure. The structure exposes a
cancel()
method (to stop the operation), and anobserveResult()
method, to access the result of the operation.
There are a few exceptions to these rules, however:
- Storage upload and download work a little differently. They’ll return a cancelable operation structure that includes
cancel()
,observeResult()
, andobserveProgress()
. The result method emits the download result via aSingle
, whereas the progress method emits a stream of progress updates on anObservable
. - API subscriptions are another special case. The operation structure returned by
RxAmplify.API.subscribe(...)
provides anobserveConnectionState()
, which can be used to determine successful connection establishment, as well as anobserveData()
, used to observe data arriving on the subscription.
Next Steps
There are only a few steps needed to start using the Rx bindings in your projects.
Add a dependency in your module-level build.gradle
:
dependencies {
// Add this line.
implementation 'com.amplifyframework:rxbindings:1.3.0'
}
Change every use of Amplify
into RxAmplify
. For example, when you initialize your project:
RxAmplify.addPlugin(new AWSDataStorePlugin());
RxAmplify.configure(getApplicationContext());
Be sure to checkout the Amplify documentation to reference Rx binding APIs for each of Amplify’s various methods. You can also checkout this Simple DataStore sample application, which uses the Rx Bindings from a Kotlin code base.
If you’re building iOS apps, be sure to checkout Amplify’s support for iOS Combine!