Tasks and awaits and Rx! (And Drones!) Oh My!

A few people I work with are tinkering with an off-the-shelf drone in our spare time and so we are writing a C# library to control it.

The way it works is you send UDP commands to the drone and you receive a stream of status & navigation UDP packets from it. So everything is asynchronous by default. You don’t send a command and get back an “I got it!” response. You have to send a command and then monitor the status for a change reflecting your desired state,

For example, to start flying, you must repeatedly send the “take off” packet every few milliseconds until you see the “is flying” flag set in the status packets. Lets see what that would look like.

We want the SendCommand method to be asynchronous and totally decoupled from the UI. So the send process looks like this.

image

When we call SendCommandAsync() the command gets queued and the method returns immediately. Then there is a Task whose sole job is to take commands out of a queue and send them to the drone. This keeps them in order but prevents the caller from waiting behind anything that had previously been queued.

Now that SendCommandAsync() is async, the trick is “how do we know when the command has actually been sent?” Well we can use a TaskCompletionSource and queue it with the command. Then the worker Task sets the completion when it actually sends the command.

  1: public Task SendCommandAsync(DroneCommandBase command, CancellationToken token)
  2: {
  3:     TaskCompletionSource<object> tsc = new TaskCompletionSource<object>();
  4:     this._cmdQueue.Add(new CommandInfo()
  5:     {
  6:         TSC = tsc,
  7:         Cancel = token,
  8:         Command = command,
  9:     });
  10:     return tsc.Task;
  11: }
  2: if (success)
  3: {
  4:     // send the command data
  5:     this._cmdWriter.WriteString(commandInfo.Command.ToString());
  6:     await this._cmdWriter.StoreAsync();
  7:     // signal completion of the send Task
  8:     commandInfo.TSC.SetResult(new object());
  9: }

(As an aside, this is an excellent example of when Tasks are not Threads. We can dump 10,000 commands into the queue and get 10,000 Tasks but will never generate a new thread. The Task represents the completion of the operation “Send a command to the drone”.)

Now that we have an async send command we can start to build on top it in a very natural way. The Take Off procedure requires us to send the command multiple times until it takes off. So we can add a method that sends multiple commands until a cancellation is requested

  1: public async Task SendRepeatedCommandAsync(DroneCommandBase command, TimeSpan delay, CancellationToken token)
  2: {
  3:     while (!token.IsCancellationRequested)
  4:     {
  5:         await SendCommandAsync(command, token);
  6:         await Task.Delay(delay);
  7:     }
  8: }

then we can add a method to provide a condition upon which the send will stop repeating

  2: {
  3:     var localCTS = new CancellationTokenSource();
  4:  
  5:     var navDataStream = from data in NavigationDataStream
  6:                         where condition(data)
  7:                         select data;
  8:  
  9:     // start the repeated command sequence
  10:     SendRepeatedCommandAsync(command, delay, localCTS.Token);
  11:  
  12:     try
  13:     {
  14:         // Take(1) so that the sequence ends after the first item
  15:         await navDataStream.Take(1).ToTask(token);
  16:     }
  17:     catch (TaskCanceledException cancelEx)
  18:     { 
  19:         // the caller cancelled the operation
  20:     }
  21:     finally
  22:     {
  23:         // stop the commands from repeating further
  24:         localCTS.Cancel();
  25:     }
  26: }

Notice we are not ‘await’ing the SendRepeatedCommandAsync call. We just want to initiate that process and will cancel it when we are ready. Also in this case I’m using Reactive Extensions to model the navigation data stream. It is an asynchronous stream so its feels natural and provides a nice LINQ experience on top of it. NavigationDataStream is defined elsewhere as IObservable<NavData>.

Now from a higher level we can write this code to perform the take off.

  1: public async Task TakeOff()
  2: {
  3:     Command takeOffCommand = new Command(Argument.TakeOff);
  4:     await _connection.SendRepeatedCommandUntilAsync(
  5:         takeOffCommand,
  6:         TimeSpan.FromMilliseconds(20),
  7:         data => (data.drone_state & (uint)Constants.FLY_MASK) == (uint)Constants.FLY_MASK
  8:         );
  9: }

This describes exactly what I want, send the command every 20 milliseconds until the drone state shows that its flying. Following that pattern of thought, I can later start composing other items and scripting things

  1: public async Task SampleScript()
  2: {
  3:     await _droneController.TakeOff();
  4:     await _droneController.FlyForward();
  5:     await Task.Delay(5000);
  6:     await _droneController.FlyBack();
  7:     await Task.Delay(5000);
  8:     await _droneController.Land();
  9: }

Broken down, each function is simple enough to understand and debug. This simplicity only comes from the power of Rx, the TPL and the async/await functionality. Imagine what the code would look like before when all the timers and .NET events and state would have to be managed directly.

 

Small disclaimer: all this code was written *without* the drone to test it (Its in Seattle and I'm in Dallas). So while the logic is sound and it compiles I haven't been able to test it outside of a simple console app harness. Any glaring issues that arise due to eagle eyed commenters will be corrected. Even so, the larger point is that async/await is much more than just "do an async call here." The concept of creating your own Tasks to represent your own operations becomes a powerful mechanism that allows easier higher level composition logic.