Dataflow Programming with Maestro

Disclaimer: The Maestro project has been renamed. While referred to in this article as ‘Maestro’, moving forward the project is referred to with codename ‘Axum’.

The way you perform an operation in a typical imperative programming language goes like this: you call a method giving it the data it needs as arguments, wait for that method to finish, then pass the result to other methods, and so on. Sometimes a method you call relies not only on the parameters passed in, but also on some shared state, such a global variable, so you must be careful not call the method before the global variable is set. This is hard enough but it gets hairier still when multiple threads get involved.

The model where you arrange the control flow in your program explicitly to take into account implicit dependencies between data is sometimes contrasted with the dataflow programming model, in which the dependencies between data is explicit, and the flow of the operations on that data is determined by these dependencies. The graph of these dependencies is called the dataflow network.

The concept is familiar to anyone who ever worked with spreadsheets, such as Microsoft Excel or Lotus 1-2-3. When a cell in a spreadsheet depends on another cell, availability of new data in that cell drives re-calculation of the dependent cell. A group of cells that do not depend on each other can be re-calculated in parallel. This is the essence of the dataflow programming model.

Sources and Targets

In Maestro, the basic building blocks of the dataflow networks are called the source and the target. The source is where the data comes from, and the target is where the data goes to. In theory, there can be many ways to pass the data from a source to a target. In Maestro, this is done via message-passing.

This is how we can we can define a source and a target variables in Maestro:

ISource<int> source = ...;
ITarget<int> target = ...;

(Here I intentionally omitted the right-hand side of the initialization statement, because it would lead us into discussion about ports and channels – all the fascinating things that I want to cover in future articles!)

Given these two variables, we have enough to build our first dataflow network. Here it is:

source ==> target;

The immediate effect of the above statement is to take all the messages from the source and forward them to the target. But there is more to it than that. The forwarding operator ==> creates a lasting link between the source and the target, so that whenever a new message comes out of the source it will be automatically propagated to the target.

You can get a hold of the network produced by the forwarding expression, and close it when the forwarding is no longer required:

var myNetwork = source ==> target;
// do something
myNetwork.Dispose();

Or, with a using statement (which in Maestro looks just like in C#):

using(var myNetwork = source ==> target)
{
    // do something
}

This works, but doesn’t do anything interesting. Bear with me; things are going to get more exciting in a minute!

Sinks and Transforms

To do anything useful with a dataflow network, we want to be able to perform an operation on the data as it comes out of the source. Here is what you do: you create a target from a method that takes the message payload as a parameter:

// set up dataflow network
source ==> ProcessNumber;
...
void ProcessNumber(int n)
{
// print the number on the console
    Console.WriteLine("n={0}", n);
}

We can build something even more realistic than this. Imagine an event-driven GUI application that handles mouse click and key press:

ISource<Point> mouseClick;
ISource<Key> keyPress;
...
mouseClick ==> OnMouseClick;
keyPress ==> OnKeyPress;
...

void OnMouseClick (Point coordinates)
{
    // handle mouse click here
}
void OnKeyPress (Key key)
{
    // handle key press here
}

You know where I’m going with this: handling the two events asynchronously, making your application faster and more responsive, and so on. This is indeed the point, but I’m getting a little ahead of myself. Hold on to this thought…

OnMouseClick and OnKeyPress methods above take a single parameter and return nothing. Appropriately, they are called sinks – the data disappears in them, never to be seen again.

A method that takes a parameter and returns a value (not necessarily of the same type) is called a transform. Here is a transform that takes absolute screen coordinates and translates them to the current window coordinates:

Point ScreenToWindow(Point point)
{
return new Point(X = point.X – currentWindow.Left,
Y = point.Y – currentWindow.Top);
}

Transform is a target, because it accepts data. It is also a source, because it produces a result.

Using a transform in a network expression goes like this:

mouseClick ==> ScreenToWindow ==> OnMouseClick;

The dataflow network that we’ve just created is called a pipeline. In such a network, data entering the first element makes its way through the intermediate transforms, finally reaching the last element, which is usually a sink.

Here is another realistic example. You want to send the pictures from your recent family trip to Maui to your mother-in-law. You need to resize and filter each picture, remove red eye effect, save it in a different format and so on. The pipeline will look like this:

imageFromFile==>Resize==>Filter==>RemoveRedEye==>Save;

Once the pipeline is set up, you would send images to the leftmost element of the pipeline thusly:

for(var file in files)
{
    send(imageFromFile, file);
}

The goodness of the pipeline is that not only can we process multiple images at a time, when we do so, we can also invoke multiple stages of the pipeline in parallel. This wouldn’t be the case if we were to do it in the following manner:

for(var file in files)
{
    Save(RemoveRedEye(Filter(Resize(file))));
}

This gets us back to the question of the asynchronous invocation of the nodes of the dataflow network. It’s time we tackle this question head on.

Side Effects

Recall the GUI example with methods OnMouseClick and OnKeyPress. It’s tempting to let the two methods execute concurrently. However, if the methods access a shared object in an unsafe way, their concurrent execution would result in a race condition – a nasty, hard to debug defect.

To execute the two methods safely, one of the following conditions must hold:

  • The methods don’t read or write shared mutable state;
  • The methods can read a shared object, but no other thread in the system is allowed to modify the object at the same time.

The first would be best: a method that doesn’t rely on the shared mutable state is safe to execute at any time, regardless of anything else that goes on in the system. (I used the word mutable here because immutable state doesn’t change so access to it doesn’t require synchronization. Think of it as a constant in your code). Repeat invocations of such methods – they are called referentially transparent – would produce the same result, and can be cached. Referentially transparent methods are easy to compose. Programming models that allow only referentially transparent methods are very good for writing parallel programs. They are not very good for writing useful programs – not in the impure world we live in, anyways.

The other kinds of methods are called side-effect free. To be able to reason about side-effects we need a well-defined concept of isolation, i.e. the boundary of the side-effect. The way I stated it – “shared object” – leaves some ambiguity as to what exactly is the object, and its granularity. If a thread modifies an object’s field, can another thread safely read other fields of the object? What about objects referenced by this object? Objects referring to the original object?

The units of isolation in Maestro are the agent and the domain. The data in the domain is shared among the agents in a “disciplined” way that avoids data races. Multiple domains do not share data with one another. (To read more about it, check out this blog post by Niklas Gustafsson, our architect).

Since domains don’t share data (guaranteed by the language and enforced by the compiler), methods from different domains can execute in parallel with each other. So if OnMouseClick and OnKeyPress were defined in different domains, we would be done here.

Methods in the same domain can share domain data, and methods in the same agent can share both domain and agent data. If we want to execute such methods in parallel, we need to ensure that the two conditions above hold for them. Namely, either the methods don’t read any domain or agent state, or if they do, no other method can write to it while these methods are executing.

Modern compilers can do amazing things, and our friends at Microsoft Research have come up with a way to analyze a method for its read- and write-effects. You can find some of their results here. While this technology is promising, it’s not yet ready for prime time. Two things make such analysis complex. First, while it’s relatively easy to analyze the objects directly affected by the method, we also have to analyze all the invoked methods too, and that, transitively, means having to analyze the entire program. For the programmer, it means having to wait longer for the compiler to finish.

Second, restricting a method from modifying any object would be overly limiting. We still want to allow modification of objects created by the method itself, or by the methods it calls – provided that these objects don’t “leak” into different threads. Keeping track of the ownership of objects, and graphs of objects is incredibly complex. Again, this complexity translates into your time spent staring at the screen waiting for the build to complete.

We also thought about using the Software Transactional Memory technology, developed by our peers in the Parallel Computing Team. Using STM, we would allow the method to modify any object, while keeping the log of the changes. If two methods make a conflicting change, we would undo the changes made by one of the methods, and then re-execute it. STM seems like a good fit for us, but there are some challenges. For a method to be transactable, we must be able to undo everything caused by its execution. This includes sending and receiving messages, as well as everything that happens in agents that receive those messages. Suddenly, you have a transaction that spans multiple agents and threads, and that complicates things tremendously. An obvious tradeoff would be to disallow message-passing and other “questionable” operations inside a transaction – but that would make dataflow networks in Maestro less useful. In other words, we still have some thinking to do here.

We’ll keep looking into these and other alternatives. In the meantime, the solution that we settled on is more pragmatic: it makes the compiler job easier, but it requires a little bit of hand-holding by the programmer.

Instead of relying on the compiler to infer the effects of the method, you declare the method to be a side-effect free function:

function Point ScreenToWindow(Point point)
{
    ...
}

A function cannot modify the state of the agent or the domain it is defined in. A function can only call other functions of the same agent – this is how we avoid having to perform full program analysis.

There is actually more to it, and I hope to cover it in detail in some other post, but you’ve already got the gist of the idea: the programmer explicitly describes the read- and write-effects of the method, the compiler makes sure that the code conforms to the declaration, and the runtime schedules the execution of the methods in way that avoids data races but maximizes parallelism.

Scalars and Vectors

Passing messages between a single source and a single target lets us built forwarders and pipelines, but that’s about it. What if you want to take a message from a source and send a copy to several targets? You can do that, and an operator for that is called broadcast. It looks like this:

source -<< targets;

The right-hand side of the operator, variable targets can be defined as an array, list or IEnumerable of ITarget. We will use an umbrella-term term vector to describe them. In contrast, single sources and targets will be called scalars.

Because vectors are so common in Maestro network expressions, there is a convenient syntax for them, which is similar to inline array initializers in C#:

source -<< { target1, target2, target3 };

This statement sets up a network that propagates all messages coming out of source to target1, target2 and target3.

Another operator that operates on a scalar source and a vector target is called alternate. The alternate, written as “-<:” is useful in the scenarios where we want to load-balance work. As the data comes out of the source, alternate “round-robins” it between the targets. For example, you might want to use alternate to implement a pool of “workers” to handle incoming data. Here is how you do it:

ITarget<int>[] workers;
...
source -<: workers;

Finally, Maestro has two operators for propagating data from a vector source to a scalar target. One of them is the multiplex, written as “>>-”. The multiplex forwards data from each of the sources into the target, as soon as the data is available. Unlike the multiplex, combine operator “&>-” waits until all sources have produced data, then joins it into an array and propagates it to the target. Example:

{ num1, num2, num3 } &>- PrintManyNumbers;
...
void PrintManyNumbers(int[] nums)
{
foreach(var i in nums)
Console.WriteLine(i);
}

The beauty of the network expressions in Maestro lies in their composability. The operators make it easy to put several expressions together; you only need to make sure that the types of the operands “line up”.

To illustrate the point, consider a network that calculates the sum of two numbers. After everything I’ve told you about the dataflow networks, you probably expect something more sophisticated than a simple expression a+b. You won’t be disappointed – I’m going to go all out and put as many network operators as possible in a single statement. Enjoy:

ISourceAndTarget<int[]> join;
ISource<int> a;
ISource<int> b;
ITarget<int> sum;
...

{ a ==> TraceNumber, b ==> TraceNumber }
&>- join -<: { GetSum, GetSum } >>- sum;

...
private int TraceNumber(int n)
{
    Console.WriteLine("Got number {0}", n);
    return n;
}

private function int GetSum(int[] nums)
{
    int result=0;
    foreach(var num in nums) result += num;
    return result;
}

Here the data coming out of sources a and b is first forwarded to the TraceNumber method. This is a transform that prints the number on the console and then forwards it on. Next, the numbers are combined in the array join. From there, the data alternates between the two GetSum transforms. The alternation becomes useful when you send a stream of data through the network, allowing multiple GetSum functions to execute in parallel. Finally, the result is multiplexed in the resulting target sum.

This example is obviously far-fetched. In a real program we need to do something more substantial than adding a few numbers to amortize the overhead of setting up the network and spawning off the transforms. You also probably won’t have to construct such contrived networks when programming in Maestro, but it sure is nice to know what’s possible!

I hope you found this useful. Thanks, and as always, we’re looking for your comments.

Artur Laksberg,
Parallel Computing Team