Skip to content
Carmel Eve By Carmel Eve Software Engineer I
Rx operators deep dive Part 4: A window into scheduling in Rx

So, this week we are looking at the Buffer and Window Rx operators. (If you have no idea what I'm on about, I suggest you start at the beginning!)

There are a few different implementations of these operators, and we are going to focus on the time-based versions. In order to do this, we need to talk about Schedulers.

Schedulers

In Rx there are several different schedulers which can be used to invoke actions (or queue up work). Depending on the scheduler, that work will be run using the context defined by that scheduler.

When you invoke work via a scheduler, you can either tell it to "run this immediately" or to run it at a specific time.

This is done via the Schedule method, of which there are a few overloads. The Schedule(Action) method schedules work to be done immediately (or at the earliest time possible).

The Schedule(DateTimeOffset, Action) method schedules work to be done at a specified time, and the Schedule(TimeSpan, Action) method schedules work to be done after a certain amount of time.

When work is run immediately, or when it reaches its due time, the scheduler will run that work using the context defined by the scheduler type.

There are several concrete implementations of the IScheduler interface in Rx:

Scheduler.Immediate is the immediate scheduler. When you "schedule" work through this scheduler, it is in fact not scheduled, just executed immediately.

Scheduler.CurrentThread will queue up any work to be executed on the thread that makes the original call.

Even if you don't supply a due time, the work will not happen immediately but will be queued up to execute after the current action has completed.

This is because this work must be executed on the current thread, which is currently in use by the caller action.

Scheduler.NewThread will create a thread to use for the task. For example, if you have a long running operation, e.g. a timer that will continuously produce ticks.

It would make sense for this timer to run on a different thread to your main one, which may be otherwise occupied (with user input, for example).

This scheduler is ideal for long-running work, especially that which needs a dedicated thread, i.e. it will completely occupy whichever thread it is using.

The Introduction to Rx.NET 2nd Edition (2024) Book, by Ian Griffiths & Lee Campbell, is now available to download for FREE.

Scheduler.TaskPool schedules actions using the default Task Factory from the Task Programming Library (TPL). Creating threads is an expensive process, so re-use is often the best strategy.

This is especially true in the case of short running operations, where creating a new thread is unnecessary and extremely expensive.

The task pool scheduler is also optimised for multicore systems.

Finally, Scheduler.Threadpool also uses the threadpool. This works in a very similar way to the task pool scheduler, but it is less optimised.

This should only be used in situations where Scheduler.TaskPool is unavailable, which is the case in Silverlight 4 and .NET 3.5.

When you are subscribing to an observable, you can use the SubscribeOn and ObserverOn methods on an IObservable.

These methods have an overload which takes an IScheduler. When the SubscribeOn method is used, when anything subscribes, the subscribe method will be run on the supplied scheduler.

Likewise, using ObserveOn, the OnNext invocations will be run using the corresponding scheduler.

In order to get the default scheduler that the platform is running on, you can use the concrete Scheduler.Default implementation.

This retrieves the scheduler on which your application is based. In Rx this will use the thread pool scheduler by default (due to backwards compatibility with Rx v1).

So, another operator we need to touch on at this point is the Timer operator. We will use this to demonstrate the timed grouping using the Buffer/Window operators.

The Timer operator

The Timer operator has a few overloads. There is one which just produces an event after a certain amount of time.

There is also an overload which starts after a certain amount of time, and then periodically produces an event at a regular interval. It does this via scheduling a new "tick" to happen after a specified TimeSpan.

So if you did:

IObservable timer = System.Reactive.Linq.Observable.Timer(TimeSpan.FromSeconds(0.5), TimeSpan.FromSeconds(1))

(The inclusion of the namespace here is due to the fact that I do not want to bring in the built in LINQ operators, would make this whole 3 month project somewhat redundant!) and subscribed to the output, then after 0.5 seconds you would be passed the value 0, and then every second after than you would be passed 1,2,3 etc.

Okay, so now we have all the information we need…

OnNext(The Buffer operator)

You use the buffer operator as follows:

IObservable<IList<long>> bufferedTimes = timer.Buffer(TimeSpan.FromSeconds(4));

This will split the items produced by the original source into four-minute windows.

Programming C# 10 Book, by Ian Griffiths, published by O'Reilly Media, is now available to buy.

So, when you pass a source through a buffer operator, it will return an IObservable<IList<TSource>> where TSource is the type of the underlying source (here TSource is long because the timer returns each "tick" as a long). Each of these lists will contain all of the values produced by the original source within each time window.

This means that you are passed a new IList (which will be fully populated) at the end of each timed period.

So, following our usual pattern…

static IObservable<IList<TSource>>(this IObservable<TSource> source, TimeSpan timeSpan)
{
    return new BufferOperator(source, timeSpan);
}

Taking in an IObservable of type TSource, we are returned an IObservable<IList<TSource>>.

The BufferOperator then looks like this:

internal class BufferOperator<TSource> : IObservable<IList<TSource>>
{
    private IObservable<TSource> source;
    private TimeSpan timeSpan;

    public BufferOperator(IObservable<TSource> source, TimeSpan timeSpan)
    {
        this.source = source;
        this.timeSpan = timeSpan;
    }

    public IDisposable Subscribe(IObserver<IList<TSource>> observer)
    {
        var bufferer = new Bufferer(observer, this.timeSpan);
        return this.source.Subscribe(bufferer);
    }
    
    ...

With the Bufferer as follows:

private class Bufferer : IObserver<TSource>
{
    private readonly IObserver<IList<TSource>> observer;
    private TimeSpan timeSpan;
    private IList<TSource> currentList;
    private IDisposable endOfWindowSchedulerWorkItem;

    public Bufferer(IObserver<IList<TSource>> observer, TimeSpan timeSpan)
    {
        this.observer = observer;
        this.timeSpan = timeSpan;

        StartNewWindow();
    }
    ...

When a new Bufferer is created, the first window starts straight away.

This has an internal list which is added to whenever a new value is produced by the underlying source:

public void OnNext(TSource value)
{
    this.currentList.Add(value);
}

The StartNewWindow method looks like this:

private void StartNewWindow()
{
    var scheduler = Scheduler.Default;
    this.currentList = new List<TSource>();

    this.endOfWindowSchedulerWorkItem = scheduler.Schedule(this.timeSpan, OnWindowCompleteDueTime);
}

This creates a new list for the current window (removing the old values in the process), which any new items will be added to.

It also gets the default scheduler that the application is using, and schedules the OnWindowCompleteDueTime method to run, setting it to run after your specified time period.

This will mean that after an amount of time equal to the specified time span, the following method will be called:

private void OnWindowCompleteDueTime()
{
    this.observer.OnNext(currentList);

    this.StartNewWindow();
}

When a window finishes, the completed list for that window is passed to the listening observer, and a new window is started.

Notice that the endOfWindowSchedulerWorkItem is saved. This is so that the following clean up can be done when the underlying source is completed (or errored):

public void OnCompleted()
{
    CloseLastWindow();

    this.observer.OnCompleted();
}

public void OnError(Exception error)
{
    CloseLastWindow();

    this.observer.OnError(error);
}

private void CloseLastWindow()
{
    this.endOfWindowSchedulerWorkItem.Dispose();
    this.observer.OnNext(currentList);
}

This disposes the work item left in the schedulers queue, so that it does not attempt to do more work once the source has already finished, and then passes the list for the last window onto the observer, before notifying it that it has finished.

So, when we subscribe (using an anonymous observer):

DateTime start = DateTime.Now;
int bufferNumber = 1;

bufferedTimes.Subscribe(ticks =&gt;
{
    int thisBuffer = bufferNumber++;

    Console.WriteLine($"TIME PASSED: {(int)(DateTime.Now - start).TotalSeconds} seconds");
    Console.WriteLine($"New window: {thisBuffer}");

    foreach (var tick in ticks)
    { 
        Console.WriteLine($"tick {tick}");
    }
);

We will be passed a IList<long> (ticks) which is passed through out lambda, and each tick is written out to the console. The output will be as follows:

Output showing buffered values.

With each window appearing all at once, at the end of each 4 second time period.

OnNext(The Window operator)

The window operator is very similar, but instead of lists, we are working with IObservable<IObservable<TSource>>.

static IObservable<IObservable<TSource>> Window(this IObservable<TSource> source, TimeSpan timeSpan)
{
    return new WindowOperator(source, timeSpan);
}

Then, in the same way to Buffer, we have:

internal class WindowOperator<TSource> : IObservable<IObservable<TSource>>
{
    private IObservable<TSource> source;
    private TimeSpan timeSpan;

    public WindowOperator(IObservable<TSource> source, TimeSpan timeSpan)
    {
        this.source = source;
        this.timeSpan = timeSpan;
    }

    public IDisposable Subscribe(IObserver<IObservable<TSource>> observer)
    {
        var bufferer = new Windower(observer, this.timeSpan);
        return this.source.Subscribe(bufferer);
    }

    private class Windower : IObserver<TSource>
    {
        private readonly IObserver<IObservable<TSource>> observer;
        private TimeSpan timeSpan;
        private Subject<TSource> currentWindow;
        private IDisposable endOfWindowSchedulerWorkItem;

        public Windower(IObserver<IObservable<TSource>> observer, TimeSpan timeSpan)
        {
            this.observer = observer;
            this.timeSpan = timeSpan;

            StartNewWindow();
        }

        ...

But the StartNewWindow method is as follows:

private void StartNewWindow()
{
    var scheduler = Scheduler.Default;
    this.currentWindow = new Subject<TSource>();
    this.observer.OnNext(currentWindow);
    this.endOfWindowSchedulerWorkItem = scheduler.Schedule(this.timeSpan, OnWindowCompleteDueTime);
}

Instead of a new list, this starts a new Subject<TSource>. This is a new observable. This could be replaced by your own implementation of IObservable<TSource>, but here we've just used the predefined Subject class, defined as part of the System.Reactive NuGet package. This also has the advantage that it will manage any observers subscribed to each window for us (see my blog on the basics of Rx for details).

This is then passed straight the the OnNext of the listening observer. This is the main difference between buffer and window. With buffer, you are passed fully complete lists at the end of each time period, which you can then just read the values from. With window you are passed a new observable at the start of each window, which you then subscribe to in order to be passed the values.

Finally, a work item is scheduled to close the window after the given time period. The OnWindowCompletedDueTime method then looks like this:

private void OnWindowCompleteDueTime()
{
    this.currentWindow.OnCompleted();
    this.StartNewWindow();
}

This calls OnCompleted on the subject, which will notify any observers subscribed to the window that the window has completed. It then starts a new window.

Finally, we do our clean up:


public void OnCompleted()
{
    CloseLastWindow();

    this.observer.OnCompleted();
}

public void OnError(Exception error)
{
    CloseLastWindow();

    this.observer.OnError(error);
}

private void CloseLastWindow()
{
    this.currentWindow.OnCompleted();
    this.endOfWindowSchedulerWorkItem.Dispose();
}

When the underlying source completes, or errors, it will complete the current window (here the last window is completed not errored because, as discussed last time, we only want the error to appear once). The observer subscribed to the overall operator is then notified that the source has finished.

Then, If we do the same as before:

IObservable<IObservable<long>> windowedTimes = timer.Window(TimeSpan.FromSeconds(4));

DateTime start = DateTime.Now;
int windowNumber = 1;

windowedTimes.Subscribe(w =>
{
    int thisWindow = windowNumber++;
    Console.WriteLine($"TIME PASSED: {(int)(DateTime.Now - start).TotalSeconds} seconds");
    Console.WriteLine($"New window: {thisWindow}");
    w.Subscribe(tick =>
    {
        Console.WriteLine($"{(int)(DateTime.Now - start).TotalSeconds} seconds: tick {tick}");
    }
    );
});

Here we are subscribing a first observer to the IObservable<IObservable<TSource>> that is returned by the window operator. This observer then subscribes an inner observer to each individual window in order to read the values out of each window.

Then the output will be:

Output showing windows with time passed.

With the first window being passed to our overall window observer straight way, and each value being passed into our inner observer as they are raised by the underlying timer source. This can be useful if you have large windows and would like to get started on processing as soon as the events are available.

However, if you need to window to complete before analysis (e.g. if you want to count the number of events) there is no advantage of using window over buffer, and it adds unnecessary complexity.

And there we have it, the timed versions of the Window and Buffer operators!

Here's me, not entirely prepared for the windows flying in my general direction... It wouldn't take a very small TimeSpan to defeat my catching skills... Especially if half my brain is focused on avoiding the unfortunate typos that were almost inevitable in this post!

Doodle of author catching windows.

Now, I shouldn't really tell you this, as the source of these blog posts I really have no responsibility to give you any more information... But I think we may be swiftly approaching the end of this deep dive!

But for now, until the next OnNext call!

Carmel Eve

Software Engineer I

Carmel Eve

Carmel is a software engineer, LinkedIn Learning instructor and STEM ambassador.

Over the past four years she has been focused on delivering cloud-first solutions to a variety of problems. These have ranged from highly-performant serverless architectures, to web applications, to reporting and insight pipelines and data analytics engines.

In her time at endjin, she has written many blog posts covering a huge range of topics, including deconstructing Rx operators and mental well-being and managing remote working.

Carmel's first LinkedIn Learning course on how to prepare for the Az-204 exam - developing solutions for Microsoft Azure - was released in April 2021. Over the last couple of years she has also spoken at NDC, APISpecs and SQLBits. These talks covered a range of topics, from reactive big-data processing to secure Azure architectures.

She is also passionate about diversity and inclusivity in tech. She is a STEM ambassador in her local community and is taking part in a local mentorship scheme. Through this work she hopes to be a part of positive change in the industry.

Carmel won "Apprentice Engineer of the Year" at the Computing Rising Star Awards 2019.

Carmel worked at endjin from 2016 to 2021.