Возвращаясь к стилю передачи продолжений. Часть 5: CPS и асинхронность

Сегодня повествование будет очень долгим и сложным. Но мы как-нибудь справимся.

Давайте рассмотрим следующую задачу: у нас есть список адресов URL и мы хотим получить документы, связанные с каждым адресом (давайте пока предположим, что эта операция всегда завершается успешно). Затем, мы хотим скопировать документ в сетевую папку на ленточный накопитель, поскольку мы являемся представителями старой школы. Все очень просто. Пара строк кода, даже метод создавать не нужно:

void ArchiveDocuments(List<Url> urls)
{
for(int i = 0; i < urls.Count; ++i)
Archive(Fetch(urls[i]));
}

Отлично. Однако проблема в том, что при большом списке адресов и медленном интернет-соединении поток управления будет следующим:

- Получить первый документ.

- Ждем, ждем, ждем, ждем, ждем…

- Ок. Получили первый документ. Архивируем на ленту.

- Ждем, ждем, ждем, ждем…

- Ок. Он сохранен. Получаем второй документ.

- Ждем, ждем, ждем, ждем…

- Ок. получили второй документ. Архивируем на ленту.

- Ждем, ждем, ждем, ждем…

Большую часть времени программа будет находиться в состоянии ожидания, в то время, как процессор не будет занят вовсе. Вы можете разумно предложить, что пока мы ожидаем сохранения первого документа на ленту, мы уже можем ожидать получение второго документа из сети. Но как, черт возьми, мы этого можем добиться?

Ну, мы знаем, что любая логика управления может быть построена с помощью продолжений. И мы знаем, что продолжение, по сути, является делегатом, который говорит нам «что будет дальше». Давайте предположим, что случилось чудо, и какой-то добрый человек предоставил нам метод FetchAsync, реализующий метод Fetch с помощью CPS.

void FetchAsync(Url url, Action<Document> continuation)

Метод FetchAsync каким-то образом, никто не знает каким именно, это его дело, выполняет свою задачу асинхронно. Возможно, он берет поток из пула потоков или использует поток асинхронного ввода-вывода, или использует магию операционной системы для получения сообщения о завершении операции через цикл сообщений. Я не знаю как, но на данный момент это и не важно. Все что мне нужно знать, что этот метод заключает следующий контракт: (1) он каким-то образом выполняет свою задачу асинхронно и (2) после завершения асинхронной задачи он вызывает указанное продолжение.

Обратите внимание, что продолжение, переданное методу получения документа, принимает документ в качестве параметра. Помните, что вызов продолжения логически эквивалентен возвращению значения при вызове подпрограммы, это именно то, что мы делаем при вызове синхронной версии метода.

Просто здорово. Давайте начнем переписывать этот код в CPS и посмотрим, что у нас выйдет.

И как только мы попытаемся переписать этот код в виде продолжений на языке C#, мы сразу же сталкиваемся с проблемой: как, черт возьми, мы собираемся продолжать выполнять итерацию цикла с корректным значением переменной “i”?

Нам явно нужно решить две проблемы:

- Получить значения всех локальных переменных на тот момент, когда мы выполняли этот фрагмент кода.

- Каким-то образом восстановить логику управления.

Первую проблему можно решить с помощью делегатов, захватывающих локальные переменные; мы уже знаем, что это преобразует локальные переменные в замыкания.

Со второй проблемой мы уже встречались в блоках итераторов (iterator block). В конце концов, блоки итераторов решают ту же проблему: восстанавливают управление после оператора yield return. В прошлом сообщении я уже приводил ссылку на статьи Реймонда о том, как компилятор с этим справляется: он генерирует конечный автомат с кучей “goto”, для передачи управления в нужные места. Очевидно, мы не хотим переписывать этот метод полностью в стиле передачи продолжений. Интеллектуальная сложность и потеря производительности слишком высоки. Мы, скорее, можем воспользоваться той же хитростью, что и в блоке итераторов.

Мы решим эту проблему, объединив два варианта решения: давайте создадим лямбда-выражение, в результате чего мы получим замыкание и создадим внутренний конечный автомат, реализующий продолжение, не переписывая все в CPS:

enum State { Start, AfterFetch }
void ArchiveDocuments(List<Url> urls)
{
State state = State.Start;
int i;
Document document;
Action archiveDocuments = () =>
{
switch(state)
{
case State.Start: goto Start;
case State.AfterFetch: goto AfterFetch;
}
Start: ;
for(i = 0; i < urls.Count; ++i)
{
state = State.AfterFetch;
FetchAsync(urls[i],
resultingDocument=>
{
document = resultingDocument;
archiveDocuments();
});
return;
AfterFetch: ;
Archive(document);
}
};
archiveDocuments();
}

Внимательный читатель заметит, что этот код содержит несколько проблем. Во-первых, это проблема определенного присваивания (definite assignment problem) из-за рекурсивного лямбда-выражения.

Давайте проанализируем этот код.

Кто-то вызывает метод ArchiveDocuments, которому передается список URL-адресов в качестве параметра. Этот метод создает действие, выполняющее всю работу, и вызывает его. «Настоящий» метод передает управление метке Start. Метод FetchAsync начинает получение первого документа асинхронно. Поскольку он выполняется асинхронно, то этот метод сразу же возвращает управление и нам больше ничего не нужно делать. Все, что нам нужно, так это дождаться получения документа. Метод возвращает управление и будет вызван уже тогда, когда документ будет получен.

После получения документа будет вызвано продолжение. Оно изменяет замыкание на последнее значение документа, и, поскольку мы уже изменили состояние, мы будем знать, откуда продолжить выполнение. Затем продолжение вызовет этот метод.

Метод переходит к метке AfterFetch. (Давайте опять забудем о том, что этот код некорректен). Мы переходим в середину метода и восстанавливаем предыдущее состояние. Затем мы синхронно копируем документ на ленту. После этого, мы переходим в начало цикла и начинаем асинхронное получение следующего документа и повторяем все заново.

Это явно шаг в правильном направлении, но мы явно еще не добились выполнения нашей цели. Ведь нам нужно сохранять документ на ленту тоже асинхронно.

А не можем ли мы поступить аналогичным образом? Т.е. выдумать такой метод:

void ArchiveAsync(Document document, Action continuation)

и использовать его в нашей программе? (Исходная версия метода Archive(document) возвращает void, поэтому продолжение асинхронной версии не принимает дополнительных аргументов.)

Давайте реализуем это и посмотрим, что у нас получится.

Action archiveDocuments = () =>
{
switch(state)
{
case State.Start: goto Start;
case State.AfterFetch: goto AfterFetch;
case State.AfterArchive: goto AfterArchive;
}
Start: ;
for(i = 0; i < urls.Count; ++i)
{
state = State.AfterFetch;
FetchAsync(urls[i], resultingDocument=>
{
document = resultingDocument;
archiveDocuments();
});
return;
AfterFetch: ;
state = State.AfterArchive;
ArchiveAsync(document, archiveDocuments);
return;
AfterArchive: ;
}
};

Хорошо, а что происходит теперь? Мы получаем документ асинхронно и немедленно возвращаем управление. Когда получение документа завершается, управление переходит к сохранению документа на ленту. Начинается процесс асинхронного сохранения документа, и управление сразу же возвращается. После завершения сохранения документа на ленту тело цикла завершается, и мы начинаем новую итерацию.

Вот, черт. Мы забыли реализовать необходимую возможность: время получения второго документа должно перекрывать время ожидания сохранения первого документа на ленту. Мы немедленно возвращаем управление после начала операции сохранения на ленту. Мы что-то упустили. Мы должны начать сохранение документа на ленту, начать загрузку следующего документа, дождаться окончания обоих операций перед началом сохранения второго документа на ленту.

Проблема здесь заключается в том, что мы все еще относимся к методу ArchiveAsync так, будто он выполняется синхронно. Весь этот код очень сильно усложняет реальную логику управления метода, не добавляя никаких новых логических потоков управления. На самом деле нам нужно начать асинхронную операцию записи на ленту, но не возвращать управление, а уже после этого продолжить выполнение других операций, таких как получение нового документа. Но если мы не должны возвращать управление сразу же, это означает, что метод, запускающий асинхронную операцию не должен принимать продолжение, поскольку «то, что должно произойти потом», должно произойти прямо в этом методе! Без возвращения значения.

Так куда мы должны передать продолжение?

Очевидно, что методов FetchAsync и ArchiveAsync недостаточно для реализации нашей задачи. Давайте предположим, что вместо возвращения void, метод FetchAsync будет возвращать, например, AsyncThingy<Documеnt> – тип, который волшебным образом может следующее: «Я отслеживаю состояние асинхронной операции и по ее завершении смогу вернуть документ». Именно эта штука принимает продолжение. Аналогично, метод ArchiveAsync может возвращать AsyncThingy, без параметра типа, поскольку этот метод возвращает void. Давайте перепишем наши методы с его помощью:

AsyncThingy<Document> fetchThingy = null;
AsyncThingy archiveThingy = null;

Action archiveDocuments = () =>
{
switch(state) { blah blah blah }
Start: ;
for(i = 0; i < urls.Count; ++i)
{
fetchThingy = FetchAsync(urls[i]);
state = State.AfterFetch;
fetchThingy.SetContinuation(archiveDocuments);
return;
AfterFetch: ;
document = fetchThingy.GetResult();
archiveThingy = ArchiveAsync(document);
state = State.AfterArchive;
archiveThingy.SetContinuation(archiveDocuments);
return;
AfterArchive: ;
}
};

Лучше, но это еще не все. Теперь мы получили практически такую же логику управления, что и раньше. Мы избавились от одного вложенного лямбда-выражения, поскольку теперь можем получить документ из этой штуки после завершения продолжения. Нам не нужен механизм изменения продолжением состояния, что очень хорошо.

Но как насчет нашей последней возможности: общего времени ожидания сохранения предыдущего документа на ленту и ожидания получения следующего документа? Все что нам нужно, так это не устанавливать продолжение сохранения документа на ленту сразу же:

Start: ;
for(i = 0; i < urls.Count; ++i)
{
fetchThingy = FetchAsync(urls[i]);
state = State.AfterFetch;
fetchThingy.SetContinuation(archiveDocuments);
return;
AfterFetch: ;
document = fetchThingy.GetResult();
if (archiveThingy != null)
{
state = State.AfterArchive;
archiveThingy.SetContinuation(archiveDocuments);
return;
AfterArchive: ;
}
archiveThingy = ArchiveAsync(document);
}

Это именно то, что мы хотели? Давайте разберем этот код. При первом вызове мы начнем асинхронное получение документа, после чего сразу же завершаем выполнение функции. После завершения получения документа, будет вызвано продолжение и управление перейдет к метке AfterFetch. В этот момент переменная archiveThingy все еще равна null, поэтому мы пропускаем сохранение документа на ленту и идем дальше. Затем мы получаем документ и начинаем его асинхронно сохранять на ленту, но пока не возвращаем управление. Вместо этого мы переходим к началу следующей итерации и начинаем асинхронную операцию получения следующего документа. Теперь мы ожидаем получение документа и завершения сохранения на ленту одновременно. Выполнение метода немедленно завершается после установки продолжения получения документа.

После завершения получения второго документа, будет опять вызвано продолжение. Управление перейдет метке AfterFetch, но в этот раз переменная archiveThingy не равна null, поэтому мы устанавливаем продолжение и завершаем выполнение. После завершения операции сохранения на ленту мы точно будем знать, что обе операции, и получения нового документа и сохранения на ленту предыдущего документа, завершены, поэтому мы можем перейти к сохранению на ленту текущего документа. Поэтому мы начинаем асинхронную операцию сохранения документа на ленту и переходим к следующей итерации цикла и начинаем получение следующего документа. И этот процесс продолжается столько, сколько нужно.

Ух, ты!

Мы можем здесь что-нибудь улучшить? Здесь кажется не все в порядке.

А что если сохранение первого документа завершится до получения второго документа? В этом случае нам не нужно проходить через все эти присваивания продолжений с немедленным завершением выполнения, ведь нам понадобится всего лишь вызвать продолжение сразу же.

Точно также, давайте предположим, что процесс получения документов FetchAsync содержит собственный локальный кэш на текущем компьютере и если другой процесс уже получал этот документ недавно, то процесс получения документа может завершиться немедленно, без всех этих асинхронных операций.

Для обработки этого варианта, давайте предположим, что метод SetContinuation возвращает bool, показывая тем самым завершилась уже операция или нет. Этот метод может возвращать true, если операция все еще выполняется асинхронно и false в противном случае. (Да, так и есть, будет возвращаться true, если позднее будет вызвано продолжение и false в противном случае). Давайте получим окончательный вариант:

void ArchiveDocuments(List<Url> urls)
{
State state = State.Start;
int i;
Document document;
AsyncThingy<Document> fetchThingy = null;
AsyncThingy archiveThing = null;
Action archiveDocuments = () =>
{
switch(state)
{
case State.Start: goto Start;
case State.AfterFetch: goto AfterFetch;
case State.AfterArchive: goto AfterArchive;
}
Start: ;
for(i = 0; i < urls.Count; ++i)
{
fetchThingy = FetchAsync(urls[i]);
state = State.AfterFetch;
if (fetchThingy.SetContinuation(archiveDocuments))
return;
AfterFetch: ;
document = fetchThingy.GetResult();
if (archiveThingy != null)
{
state = State.AfterArchive;
if (archiveThingy.SetContinuation(archiveDocuments))
return;
AfterArchive: ;
}
archiveThingy = ArchiveAsync(document);
}
};
archiveDocuments();
}

Вот и все. Посмотрите внимательно на код и сравните его с тем, с чего мы начали:

void ArchiveDocuments(List<Url> urls)
{
for(int i = 0; i < urls.Count; ++i)
Archive(Fetch(urls[i]));
}

Бог ты мой, что за мешанину мы здесь сотворили. Из двух простых строк кода мы получили больше двух десятков строк кода, и я уверен, что это худший спагетти-код, который вам приходилось видеть. И самое главное, этот код даже не компилируется, поскольку метки не находятся в нужной области видимости и из-за проблемы с определенным присваиванием (definite assignment error). И нам все еще нужно переписать этот код, для исправления этих проблем.

Помните, что я говорил о плюсах и минусах CPS?

Плюсы:

- Из простых частей можно создать логику управления любой сложности. – Есть

Минусы:

- Реификация потока управления с помощью продолжений, делает код сложным для чтения и понимания. – Есть.

- Код, обеспечивающий логику управления, полностью скрывает значение самого кода. – Есть.

- Компиляторы отлично справляются с задачей преобразования обычной логики управления в CPS, но больше это никому не под силу. – Есть.

Это не интеллектуальное упражнение. Обычные люди постоянно приходят к такому коду при работе с асинхронностью. И самое забавное, что в то время, как процессоры становятся все дешевле и быстрее, мы проводим все больше и больше времени ожидая завершения не процессорных операций. Во многих программах большую часть времени загрузка процессора равна нулю, а все время тратится на ожидание сетевых пакетов, путешествующих из Европы в Японию и назад, завершения дисковых операций или чего-то еще.

И я даже не говорю о том, что будет, когда мы начнем использовать большее количество асинхронных операций. Предположим, вы хотите, чтобы асинхронная операция ArchiveDocument принимала делегат. Тогда весь код, который использует эту функцию, придется переписать в CPS.

Правильная логика в асинхронных операциях очень важна, и в будущем это станет еще важнее. И инструменты, которые мы собираемся вам предоставить, и, как говорил профессор Дагган, заставят вас «стоять на голове и вывернуть себя наизнанку».

Да, стиль передачи продолжений является мощной техникой, но должен быть лучший путь ее использования, нежели приведенный ранее код.

В следующий раз: Невероятные приключения!

Оригинал статьи