Disposing sequence of resources with Reactive Extensions


Recall my previous post on Disposing sequence of resources where we were solving imperatively the following problems:

  • Create single disposable representation for a sequence of disposable resources
  • Defer resources allocation to avoid exception propagation until cleanup can be guaranteed and avoid unnecessary (allocated resources that aren’t used) allocations
  • Dispose only consumed resources and preserve nested try{}finally{} blocks semantics (resources that are allocated first from sequence order perspective disposed last; any exception thrown from a finally block hides exception being propagated)

Now with Reactive Extensions for .NET (Rx) is out we will do it in more LINQ-ish manner with the help of interactive features of Reactive Extensions:

  • EnumerableEx.Publish – publishes the values of source to each use of the bound parameter.
  • EnumerableEx.Share - shares cursor of all enumerators to the sequence.
  • EnumerableEx.Finally – invokes finallyAction after source sequence terminates normally or by an exception.

A great explanation of how EnumerableEx.Publish works (disregard naming difference) is given in Taming Your Sequence’s Side-Effects Through IEnumerable.Let. The following example illustrates the point.

static Random m_seeder = new Random();

static IEnumerable<int> GenerateRandomSequence(int count)
{
  var rnd = new Random(m_seeder.Next());
  for(int i = 0; i < count; i++)
  {
    yield return rnd.Next(count);
  }
}

...

const int count = 5;
var xs = GenerateRandomSequence(count);
// Each we iterate xs we may get a different sequence 
var equals = xs.SequenceEqual(xs);
// However it can be solved through memoization
xs.Publish(seq =>
             {
               // Every time we iterate seq we get the same 
               // sequence
               equals = seq.SequenceEqual(seq);
               return seq;
             });

EnumerableEx.Share makes sure that any iteration is made with respect to the same cursor.

var xs = Enumerable.Range(0, count);
// Prints every sequence element to console
// Without sharing for each of count iterations it will print 
// first element of a potentially different sequence (recall 
// random sequence example) 
var shared = xs.Share(); 
for(int i = 0; i < count; i++)
{
  shared.Take(1).Run(Console.WriteLine);
}

EnumerableEx.Finally does exactly what its description says (see more details here).

static IEnumerable<int> GenerateThrowingSequence(int count)
{
  for(int i = 0; i < count; i++)
  {
    if (i > 0 && i % 3 == 0)
    {
      throw new Exception();
    }
    yield return i;
  }
}

...

// Prints 0, 1, 2, Finally, Caught
try
{
  GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally"))
    .Run(Console.WriteLine);
}
catch (Exception)
{
  Console.WriteLine("Caught");
}

// Prints 0, 1, Finally
GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally"))
  .Take(2).Run(Console.WriteLine);

Now putting everything together. Publish will help us to defer resources allocation and avoid unnecessary allocations. Share and Finally will take care of disposal.

static class Disposables
{
  // Disposes projected resources once they are no longer needed
  public static void Using<TSource, TResource>(
    // Source sequence projected to disposable resources
    this IEnumerable<TSource> source,
    // Resource projection
    Func<TSource, TResource> resourceSelector,
    // Resources usage action
    Action<IEnumerable<TResource>> resourcesUsage)
      where TResource : IDisposable
  {
    var rcount = 0;
    source
      // At this point resources are not created but 
      // only projection is set
      .Select(
      s =>
        {
          // As we do not want to unnecessarily create 
          // and then immediately dispose potentially expensive
          // resources we will count created resources
          // and later dispose only used ones
          rcount++;
          return resourceSelector(s);
        })
      .Publish(
      rs =>
        {
          // During sequence iteration resources will be created
          // However not all resources may be iterated through or 
          // an exception may be thrown in the middle and thus 
          // not all resources may be created (therefore not 
          // disposed)
          try
          {
            // Supplied resources sequence can be iterated 
            // multiple times with each of side effects occurs 
            // only once and sequence elements memoized and 
            // reused during each iteration
            resourcesUsage(rs);
            return Enumerable.Empty<TResource>();
          }
          finally
          {
            // We must dispose only those resources we used
            // (counted and memoized above during first 
            // iteration)
            rs = rs.Take(rcount)
              // Disposing resources must be done in the opposite 
              // order to preserve nested try{}finally{} blocks 
              // semantics
              .Reverse().Do(r =>
                              {
                                rcount--;
                                r.Dispose();
                              })
              // Once resource is disposed it must not be 
              // iterated again and this what Share takes 
              // care of
              .Share();

            Action final = null;
            final = () =>
                      {
                        // Stop once every resource was given 
                        // a chance to dispose as Finally is 
                        // called even on empty sequences and 
                        // otherwise it leads to stack overflow
                        if (rcount > 0)
                        {
                          // Dispose only used resources and 
                          // leave untouched the rest
                          rs.Finally(final).Run();
                        }
                      };
            final();
          }
        })
      // Evaluate the sequence (triggers resources usage)
      .Run();
  }
}

Usage example below illustrates situation where during resource disposal an exception is thrown. In this case we must give chance to preceding (from resource sequence order perspective) resource to be disposed. However if an exception is thrown while disposing preceding resources that exception will hide previous one.

// Fake full of side effects resource =)
class Resource : IDisposable
{
  private readonly int m_i;

  public Resource(int i)
  {
    m_i = i;
    Console.WriteLine("Created {0}", m_i);
  }

  public void Use()
  {
    Console.WriteLine("Using {0}", m_i);
  }

  public void Dispose()
  {
    Console.WriteLine("Disposed {0}", m_i);
    // Simulate resource disposal that results in exception
    if (m_i % 2 == 1)
    {
      throw new Exception(m_i.ToString());
    }
  }
}

...

try
{
  Enumerable.Range(0, 5)
    .Using(i => new Resource(i),
           rs =>
             {
               // First resources 0, 1 and 2 are created 
               // and used
               rs.Take(3).Run(r => r.Use());
               // then already created resource 2 is used 
               // and resource 3 is created and used
               rs.Skip(1).Take(3).Run(r => r.Use());
             });
}
catch (Exception ex)
{
  // As resources are disposed in the opposite order
  // the latest exception is propagated
  Console.WriteLine("Exception {0}", ex.Message);
}

This produces the following output:

Created 0 // iterating, if not iterated previously resource is created
Using 0
Created 1
Using 1
Created 2
Using 2
Using 1   // otherwise reused
Using 2   // reused again
Created 3 // wasn’t iterated previously, created
Using 3
Disposed 3 // disposing in the opposite order, throws exception
Disposed 2 // still disposing continues
Disposed 1 // throws exception that hides exception thrown earlier
Disposed 0 // disposing continues
Exception 1 // exception is caught

That’s it! Hopefully you’ve enjoyed.

I hope we’ll meet next year. Happy New Year!

Skip to main content