Emitting values from an IObservable at random intervals

Published on Monday, July 28, 2014

Photo by Jan Loyde Cabrera on Unsplash

This one is mostly for my own reference; I've had to re-learn how to do this twice now, and I don't want to have figure it out a third time.

Once in a while I need an Observable sequence to emit its values at random intervals. The recent example is a simulation app which copies files into a directory at random intervals between a minimum and maximum number of seconds. Because it doesn't come up often, I always end up screwing around with Interval() and Throttle() and all the other time-based extensions before I finally remember that Observable.Generate() has an overload which handles this requirement.

As with everything else in software, there's more than one way to do it. One option is to generate a sequence of values (doesn't matter what they are) emitted at random intervals and then use the Zip() extension with your actual sequence:

var files = Directory.GetFiles(@"C:\Users\hartez\Pictures", "*.jpg").ToList();

var zeroes = Observable.Generate(0, i => true, i => 0, i => 0,
    i => TimeSpan.FromSeconds(new Random().Next(1, 5)));

zeroes.Zip(files, (i, filename) => filename)

This gets a list of .jpg files from a folder. It then creates a sequence called zeroes (which will continue forever since the i => true is the termination condition); the last part of the Generate() call tells the sequence to emit each zero at a random interval between 1 and 5 seconds.

We then take zeroes and use Zip() to combine it with the list of files. Zip() will combine them 1-to-1 so that the results will only be emitted when there's a value from each sequence. So the first value will be emitted as soon as zeroes emits its first 0 (between 1 and 5 seconds after Subscribe() is called). The second value won't be emitted until the second 0 is emitted by zeroes, and so on. Once all of the values from files have been consumed, no more values will be emitted.

The values emitted are tuples of (0, filename); the second parameter of Zip() tells it to take each tuple and just return the filename.

(By the way, this is just example code. In a real application, I would suggest creating an instance of Random to re-use instead of creating a new one in each iteration of your Observable.)

Using Zip() for this is a little awkward; we can actually handle this more elegantly by doing the iteration over the list directly in the generated observable:

var files = Directory.GetFiles(@"C:\Users\hartez\Pictures", "*.jpg").ToList().GetEnumerator();

    Console.WriteLine("No matching files.");

var seq = Observable.Generate(files,
    enumerator => enumerator.Current != null,
    enumerator =>
        return enumerator;
    enumerator => enumerator.Current,
    interval => TimeSpan.FromSeconds(new Random().Next(1, 5)));


In this version we're just generating the randomly-emitting observable right out of the original file list. It's kind of nice in that we don't need the second sequence; the enumerator semantics make it a little weird, but they do allow us to avoid having any sort of indexer variable. So that's an option. However, the Zip() version gives us one other cool option which is appealing if you have to use this pattern often - we can create an extension method that takes any IEnumerable<T> and emits the values at random intervals:

public static class RandomSequenceExtensions
    public static IObservable<T> RandomInterval<T>(this IEnumerable<T> source, int minSeconds, 
    int maxSeconds)
        var ints = Observable.Generate(0, i => true, i => 0, i => 0,
            i => TimeSpan.FromSeconds(new Random().Next(minSeconds, maxSeconds)));

        return ints.Zip(source, (i, value) => value);

Which we can use like this:

var files = Directory.GetFiles(@"C:\Users\hartez\Pictures", "*.jpg").ToList();
files.RandomInterval(1, 5).Subscribe(Console.WriteLine);

I hope this helps someone out (or at least helps me out next time I forget how to do this).