The Reactive Extensions (Rx) is a library for programming with asynchronous data streams. It was originally developed at Microsoft for the .NET Framework but is now available as a seperate library. Meanwhile it has been ported to many other languages and platforms as e.g. Java, Scala, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others.
Rx can be used in desktop as well as web-based applications on either server or client side. A few common situations / problems where you might benefit from using Rx are:
- Unresponsive UI
- Long running computations that could potentially block other threads
- Asynchronous programming where multiple sources of data and events have to be combined
- Enabling and managing concurrency on backend independent of the consumers implementation
- Handling errors and cancellation of asynchronous tasks
In this post I will introduce some general ideas behind Rx and show how it can help in some of the situations stated above. If you are new to Rx it will give you a basic understanding of what Rx is and where it can be useful to apply.
The examples are all presented in C#. But the concepts apply to all programming languages.
Definition
A definition of Rx could be:
Rx is a library for programming with asynchronous data streams. It is a combination of the observer and the iterator pattern as described by the Gang of Four combined with ideas and concepts from functional programming.
Querying Data
To get an idea when it is useful to use Rx let us look at ways how to retrieve data. Basically a query result can be either a single object or a collection of objects. The data can be queried either synchronous or asynchronous. This produces the following matrix:
Single | Multiple | |
---|---|---|
Sync | ||
Async |
Sync Single
A function that returns a single object and is executed synchronously is probably one of the most common scenarios that we encounter. In the example the function is called GetData()
and the return value is of the generic type T
as shown in the following table.
Single | Multiple | |
---|---|---|
Sync | T GetData() | |
Async |
Calling the method and processing the result might look something like this:
string s = GetData();
if(s.Equals(x))
// do something
else
// do something else
If this code was implemented behind a user interface the process will be blocked for however long it will take to retrieve the value because this is done synchronously.
Sync Multiple
When we examine the synchronous query of multiple values we will see the same behaviour except that the return value is a collection of objects of type T
.
Single | Multiple | |
---|---|---|
Sync | T GetData() | IEnumerable<T> GetData() |
Async |
The LINQ library provides a rich toolbox of query operators to create, transform, combine and filter collections. In the example however we want to iterate over all the values and do some processing with each of them.
IEnumerable<string> values = GetData();
foreach(string value in values)
{
if(value.Equals(x))
// do something
else
// do something else
}
Again this is done synchronously and the UI will be blocked.
Async Single
To make the UI more responsive values can be queried asynchronously. In case of a single value the function might return a Task<T>
. A Task
in C# can be seen as the equivalent of the Future
in Java. It is the result of an asynchronous computation.
Single | Multiple | |
---|---|---|
Sync | T GetData() | IEnumerable<T> GetData() |
Async | Task<T> GetData() |
Processing the result of a Task can be done synchronously which again will block.
Task<T> task = GetData();
if (task.Result.Equals(x))
// do something
else
// do something else
So that is not really an improvement. A better workflow when creating a responsive UI would be:
- Respond to some user action
- Do work on a background thread
- Pass the result back to the UI thread
- Update the UI
To achieve this we should consider the async features in .NET 4.5 or apply a callback. A callback is a function that is passed as an argument and will be called at a later time e.g. with the result of a long running computation. In this case we can use the ContinueWith
method of Task
that takes a function that is executed asynchronously.
task.ContinueWith(t =>
{
if (t.Result.Equals(x))
// do something
else
// do something else
});
Async Multiple
Asynchronous computations that yield collections of values can also be handled with callbacks. Actually this is what is done in many cases. But we might quite easily run into several problems as the application becomes more and more complex.
E.g. there is no way to start iterating over the result and process the values before the whole computation completes. If loading the whole collection takes a long time it would be very convenient to start working with the first part of the list while the rest is still processing. But when using callbacks this is not possible.
And what if we want to process each element again asynchronously and then each result of that yet again… and so on? The consequence would be a deeply nested composition of callbacks which will become very unwieldy and won’t be easily maintainable any more.
With nested composition callbacks become unwieldy and lead to the need to synchronize.
This is where Rx comes into play with the main type IObservable<T>
– the counter part of IEnumerable<T>
.
Single | Multiple | |
---|---|---|
Sync | T GetData() | IEnumerable<T> GetData() |
Async | Task<T> GetData() | IObservable<T> GetData() |
IObservable<T>
represents an asynchronous data stream of objects of type T
. Consuming the result is quite simple:
IObservable<T> o = GetData();
o.Subscribe(t =>
{
if (t.Equals(x))
// do something
else
// do something else
});
The Subscribe
method has several overloads. In the example above it takes an action as a single argument which will be applied to each value from the Observable.
Composable functions
As well as LINQ Rx also includes a rich library of higher-order functions that allow us to transform, combine, filter etc. observable sequences. Those functions are easily composable. This is one of the really great benefits of Rx because it makes querying, combining and transforming asynchronous data streams as simple as applying LINQ queries.
Roughly the set of functions can be divided into the following categories:
Transform: Select, SelectMany, Aggregate …
Filter: Skip, Take, TakeWhile, Where …
Combine: Concat, Merge, Zip …
Boolean Operators: Any, All, Contains …
Mathematical Operators: Max, Min, Average, Count, Sum …
Concurrency: ObserveOn, SubscribeOn …
Error Handling: OnErrorReturn, OnErrorResume …
Here is a simple example of how some of these functions can be combined:
IObservable<int> observable = GetData();
observable
.Skip(1)
.Take(3)
.Where(x => x < 10)
.Select(x => x + 1)
.Subscribe(Console.WriteLine);
IEnumerable vs. IObservable
To get an even better understanding of what an IObservable<T>
is we will compare it to the IEnumerable<T>
.
IEnumerable
The method IEnumerable.GetEnumerator()
returns an object of type IEnumerator
which iterates over collections and has the following methods:
bool MoveNext()
– Advances the enumerator to the next element of the collection. Returns true if it succeeds and false if it fails (when the enumerator points to the last element of the collection or empty list).T Current{ get; }
– Returns the current element in the list.throws Exception
– This is actually not a method of the interface. But an exception might be thrown when callingCurrent
so an exception is a potential implicit return value ofCurrent
. In other words the return value ofCurrent
could be described asEither<T, Exception>
orTuple<T, Exception>
or similar types.void Dispose()
– Disposes of all resources used by the enumerator.
The general task of an enumerator is to pull data from a collection.
Below we can see the enumerator in action (Note that this is just for demonstration and is not good code. Usually we would use a foreach
statement to do the iteration over a collection.):
var enumerator = new List<int> { 1, 2, 3 }.GetEnumerator();
while (enumerator.MoveNext())
{
Console.WriteLine(enumerator.Current);
}
enumerator.Dispose();
IObservable
Each method of the IEnumerator
interface has its corresponding method in the IObserver
interface:
OnCompleted()
– Notifies the observer that the provider has finished sending data.OnNext(T)
– Pushes the next value to the observer.OnError(Exception)
– Notifies the observer that an error occurred.
The general task of an observer is to push data into a stream.
Here is an example of how the methods described above can be called in order to create an observable sequence:
public static IObservable<int> GetData()
{
return Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnNext(2);
o.OnNext(3);
o.OnCompleted();
return Disposable.Empty;
});
}
For detailed information on how this works and on all the query operators and more check out the amazing online resource about Rx .NET by Lee Campbell.
The methods of IEnumerator
and IObservable
relate to each other in a special way as shown in the following diagram. Actually they are dual to each other.
IEnumerable | IObservable |
---|---|
pull | push |
bool MoveNext() | void OnCompleted() |
T Current { get; } | void OnNext() |
throws Exception | void OnError(Exception) |
On an observable sequence and on an enumerable sequence the same or at least very similar functions can be applied:
// IObservable<string>
// that emits 75 Strings
GetData()
.Skip(10)
.Take(5)
.Select(x => x + "_transformed")
.Subscribe(x => Console.WriteLine("next => " + x));
// IEnumerablee<string>
// that contains 75 Strings
GetData()
.Skip(10)
.Take(5)
.Select(x => x + "_transformed").ToList()
.ForEach(x => Console.WriteLine("next => " + x));
Summary
We have seen how Rx can help to make the UI responsive when dealing with computationally expensive tasks.
Also we could get an idea of how easy and comfortable it is to handle asynchronous data from multiple sources with the use of the LINQ-style query operators.
Enabling and managing concurrency on the backend as well as handling exceptions, cancellation etc. are other areas where Rx really shines. But these details go beyond the scope of this post. For more information on that please check out the resources below.
Things to keep in mind:
- Rx is a library for programming with asynchronous data streams
- Rx provides a rich library of composable functions to transform, combine, filter etc.
- The key types are
IObserver<T>
andIObservable<T>
- The pair of types
IObservable<T>
andIObserver<T>
is dual to the pairIEnumerable<T>
andIEnumerator<T>
- Rx provides rich functionalities for a scenario where a producer asynchronously pushes data to a consumer
Resources
- ReactiveX
- Getting started with the Reactive Extensions to .NET
- Java
- JavaScript
- C#
- Scala
- Clojure
- C++
- Ruby
- Python
- Groovy
The header image “Long exposure head lamps & tail lights” by JC+A is licensed under a Creative Commons Attribution 2.0. The picture was modified to fit this article.