The curiously-named Reactive Extensions is a .NET library that lets you work with streams of events as first-class objects.
The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
For example, once you have an Observable, that is, a stream of events, you can query it with LINQ extension methods (these return new streams):
stream.Where(condition) stream.GroupBy(selector) stream.Count()
Reactive Extensions with other upcoming parallel and asynchronous features in the spuriously-versioned .NET framework 4.5. Alas, Reactive Extensions hasn’t been adopted widely yet, perhaps only because it isn’t understood. The official documentation explains in academic language:
The IObservable interface is a dual of the familiar IEnumerable interface.
Regardless, it’s great stuff. Here’s how I’ve used Reactive Extensions. I have a tool that watches a folder for new files and emails me about them. It subscribes to filesystem events with a .NET FileSystemWatcher. To save my inbox, I want less than one email per new file, so I’ll batch events together. How do I batch them? I could batch events until I reach a limit in number, say into groups of ten, but then it’s possible I might not hear about a new file for days. Instead I’ll do this - batch events until there’s a half hour period of calm, and then send the email. This means the events might even be grouped logically. But I’d best add a numerical limit in case there’s no respite in events.
Before Reactive Extensions I solved this with a complicated system of .NET Timers.
With Reactive Extensions, the solution is much simpler. There isn’t a function in the standard library that does what I want, but the functions
Delay are enough to define it.
I’m sharing this because there a few posts on Stack Overflow looking to solve the same problem. Names differ - rolling buffers, buffer with inactivity, sliding buffers. I call my method buffer until calm. Help yourself to it.