Windows Workflow 4.0 – CopyDirectory (Part 3 of 3)

So far we have written ParallelItemScheduler<T>, which is a generic activity that goes dormant and schedules actions based on input via bookmark resumption. We then used this activity to design ProducerConsumer<T>, which takes this idea a little further and provides a generic mechanism for running a producer thread that can pump data into a consumer thread. In the last part of this series we finally have all required building blocks to design our highly efficient and parallel copy directory activity!

As with any activity, the first thing we need to determine is what the inputs and outputs should be. Since this is simply a copy operation, there will be no outputs. The inputs I have chosen are as follows:

  • InArgument<String> Source
    • The source directory for the copy operation.
  • InArgument<String> Target
    • The target directory for the copy operation.
  • InArgument<Int32> FileCopyBufferSize
    • The buffer size which should be used for individual copy operations.
  • InArgument<Int32> FileCopyThreadCount
    • The number of concurrent copies that should be allowed.

Now that we have our inputs, we can move onto the producer and the consumer for our directory copy activity.

Implementing CopyDirectory

The producer of our copy directory activity is what will actually walk the source hierarchy and accumulate files for consumption. I will not be posting the code in-line here, but I will describe the implementation so the rest of this makes sense without having to download an attachment. Basically the producer maintains a queue or stack (depending on how the copy operation should work, breadth-first or depth-first respectively) and begins by pushing the root directory onto the list. It then begins a loop that retrieves the next directory from the list, finds the files and directories that are immediate children, pushes the directories onto the aforementioned list and then queues the files for consumption by our consumer (which will be discussed next). In order to help illustrate this concept, I have rewritten it into an outline form.

  • Get the next directory
  • Calculate the target directory by removing Source and replacing it with Target
  • Create the target directory
  • Asynchronously walk the current directory to find all immediate child objects
  • For-each directory
    • Push directory into our list
  • For-each file
    • Queue a Tuple<String, String> with Item1 set to the source file and Item2 set to the target file

Although it’s not entirely necessary the directory/file discovery activity is asynchronous to allow for maximum parallelism between the producer and the consumer. Just like our activity needs a source and target for the operation, so does our consumer which will be the CopyFile activity. For this reason we will use the ProducerConsumer<Tuple<String, String>> in our implementation below.

Note: The activities Enqueue, Dequeue, CreateDirectory and GetFilesAndDirectories have been omitted for length. I have attached a full project that includes all code and supporting classes that you may download.

 public sealed class CopyDirectory : Activity
{
    public CopyDirectory()
    {
        base.Implementation = () => CreateBody();

        m_fileCopyBufferSize = new InArgument<Int32>(new VisualBasicValue<Int32>("1024 * 1024 * 8"));
        m_fileCopyThreadCount = new InArgument<Int32>(new VisualBasicValue<Int32>("System.Environment.ProcessorCount * 2"));
    }

    [RequiredArgument]
    [DefaultValue(null)]
    public InArgument<String> Source
    {
        get;
        set;
    }

    [RequiredArgument]
    [DefaultValue(null)]
    public InArgument<String> Target
    {
        get;
        set;
    }


    public InArgument<Int32> FileCopyBufferSize
    {
        get
        {
            return m_fileCopyBufferSize;
        }
        set
        {
            m_fileCopyBufferSize = value;
            m_fileCopyBufferSizeSet = true;
        }
    }

    public InArgument<Int32> FileCopyThreadCount
    {
        get
        {
            return m_fileCopyThreadCount;
        }
        set
        {
            m_fileCopyThreadCount = value;
            m_fileCopyThreadCountSet = true;
        }
    }

    [EditorBrowsable(EditorBrowsableState.Never)]
    public Boolean ShouldSerializeFileCopyBufferSize()
    {
        return m_fileCopyBufferSizeSet;
    }

    [EditorBrowsable(EditorBrowsableState.Never)]
    public Boolean ShouldSerializeFileCopyThreadCount()
    {
        return m_fileCopyThreadCountSet;
    }

    Activity CreateBody()
    {
        Variable<String> bookmarkName = new Variable<String>("bookmarkName");
        Variable<String> sourceDirectory = new Variable<String>("sourceDirectory");
        Variable<String> targetDirectory = new Variable<String>("targetDirectory");
        Variable<Queue<String>> pendingDirectories = new Variable<Queue<String>>("pendingDirectories");
        Variable<IEnumerable<String>> childDirs = new Variable<IEnumerable<String>>("childDirs");
        Variable<IEnumerable<String>> childFiles = new Variable<IEnumerable<String>>("childFiles");

        DelegateInArgument<String> childDirArgument = new DelegateInArgument<String>("childDirArgument");
        DelegateInArgument<String> childFileArgument = new DelegateInArgument<String>("childFileArgument");
        DelegateInArgument<Tuple<String, String>> fileCopyArgument = new DelegateInArgument<Tuple<String, String>>("fileCopyArgument");
        DelegateOutArgument<ICollection<Tuple<String, String>>> childFileCollection = 
            new DelegateOutArgument<ICollection<Tuple<String, String>>>("childFileCollection");

        return new Sequence
        {
            Variables = 
            {
                pendingDirectories,
            },
            Activities =
            {
                new Assign<Queue<String>>
                {
                    Value = new InArgument<Queue<String>>(env => new Queue<String>(new String[] { Source.Get(env) })),
                    To = new OutArgument<Queue<String>>(pendingDirectories),
                },
                new ProducerConsumer<Tuple<String, String>>
                {
                    ConsumerCount = new InArgument<Int32>(env => FileCopyThreadCount.Get(env)),
                    ContinueProducing = ExpressionServices.Convert<Boolean>(env => pendingDirectories.Get(env).Count > 0),
                    Consumer = new ActivityAction<Tuple<String, String>>
                    {
                        Argument = fileCopyArgument,
                        Handler = new CopyFile
                        {
                            Source = new InArgument<String>(env => fileCopyArgument.Get(env).Item1),
                            Target = new InArgument<String>(env => fileCopyArgument.Get(env).Item2),
                            BufferSize = new InArgument<Int32>(env => FileCopyBufferSize.Get(env)),
                        },
                    },
                    Producer = new ActivityFunc<ICollection<Tuple<String, String>>>
                    {
                        Result = childFileCollection,
                        Handler = new Sequence
                        {
                            Variables = 
                            {
                                childDirs,
                                childFiles,
                                sourceDirectory,
                                targetDirectory,
                            },
                            Activities = 
                            {
                                new Dequeue<String>
                                {
                                    Queue = new InArgument<Queue<String>>(pendingDirectories),
                                    Result = new OutArgument<String>(sourceDirectory),
                                },
                                new Assign<String>
                                {
                                    Value = new InArgument<String>(env => System.IO.Path.Combine(Target.Get(env),
                                                                 sourceDirectory.Get(env).Remove(0, Source.Get(env).Length).TrimStart('/', '\\'))),
                                    To = new OutArgument<String>(targetDirectory),
                                },
                                new If
                                {
                                    Condition = new InArgument<Boolean>(env => !System.IO.Directory.Exists(targetDirectory.Get(env))),
                                    Then = new CreateDirectory
                                    {
                                        Directory = new InArgument<String>(targetDirectory),
                                    },
                                },
                                new GetFilesAndDirectories
                                {
                                    Directory = new InArgument<String>(sourceDirectory),
                                    Files = new OutArgument<IEnumerable<String>>(childFiles),
                                    Directories = new OutArgument<IEnumerable<String>>(childDirs),
                                },
                                new ForEach<String>
                                {
                                    Values = new InArgument<IEnumerable<String>>(childDirs),
                                    Body = new ActivityAction<String>
                                    {
                                        Argument = childDirArgument,
                                        Handler = new Enqueue<String>
                                        {
                                            Queue = new InArgument<Queue<String>>(pendingDirectories),
                                            Item = new InArgument<String>(childDirArgument),
                                        },
                                    },
                                },
                                new Assign<ICollection<Tuple<String, String>>>
                                {
                                    To = new OutArgument<ICollection<Tuple<String, String>>>(childFileCollection),
                                    Value = new InArgument<ICollection<Tuple<String, String>>>(env => new List<Tuple<String, String>>()),
                                },
                                new ForEach<String>
                                {
                                    Values = new InArgument<IEnumerable<String>>(childFiles),
                                    Body = new ActivityAction<String>
                                    {
                                        Argument = childFileArgument,
                                        Handler = new AddToCollection<Tuple<String, String>>
                                        {
                                            Collection = new InArgument<ICollection<Tuple<String, String>>>(childFileCollection),
                                            Item = new InArgument<Tuple<String, String>>(env => 
                                                new Tuple<String, String>(
                                                    childFileArgument.Get(env), 
                                                    System.IO.Path.Combine(targetDirectory.Get(env), System.IO.Path.GetFileName(childFileArgument.Get(env))))),
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        };
    }

    private Boolean m_fileCopyBufferSizeSet;
    private Boolean m_fileCopyThreadCountSet;
    private InArgument<Int32> m_fileCopyBufferSize;
    private InArgument<Int32> m_fileCopyThreadCount;
}

As you can see from our implementation, the code matches exactly (almost) what was outlined in the bulleted list. When this activity executes it will begin walking the source directory hierarchy and discover files and directories. While it’s performing this discovery operation it can have FileCopyThreadCount files being copied at any given time. Since this implementation uses a Queue<String> for the directories the files will be copied in breadth-first fashion. Another thing I would like to point out about this activity is its inherent ability to be canceled. Since everything is modeled in workflow, the framework will handle cancelation of our composite activity for us in between any of the workflow “statements” above. We also modeled our CopyFile activity to support cancelation in between reading from/writing to the file (in blocks of size FileCopyBufferSize), so if the activity is canceled we can gracefully and quickly respond to the request.

There are quite a few more improvements we could add to this activity to make it much more robust, such as the ability to specify behavior when the target file exists or provide the ability to retry a failed file copy operation up to some configurable amount of times. I will leave this as an exercise to the reader as it does nothing to further your knowledge on workflow.

Conclusion

Over the past 3 posts we have developed a much more efficient directory copy activity that may be reused in the workflow. Along the way we also developed CopyFile, ParallelItemScheduler<T>, ProducerConsumer<T> , and various other supporting activities. In the beginning we started out very low level as we required scheduling functionality that was not built into the framework. After that initial hump, however, you should notice that we very rarely dropped back to code and typically composed all of our logic using standard workflow activities. There are numerous benefits to building activities this way, one of which I have tried to harp on, such as:

  1. Automatic support for cancelation by the framework (if a cancel requests comes in during a short running code activity then when it finishes the parent will automatically clean up)
  2. We verified our lower level activities by reusing them in our own activity library
  3. We have small modular pieces of workflow logic that is now reusable in other, similar activities (this will most likely not be the only producer-consumer model you will need to implement)

Hopefully as you read through this 3 part series you got a better feel for how to design activities as well as a taste of the more advanced behaviors you can model with Windows Workflow. I have attached a zip file that includes this activity library along with a sample executable that links to and utilizes the CopyDirectory activity that we just developed. Enjoy!

BuildExtensions.zip