通过libuv使用可重入函数


[原文发表地址]通过libuv使用可重入函数

[原文发表时间] 2017/2/7 9:20AM

在这篇文章之前我们曾经就已经讨论过可重入函数了,甚至最近关于Visual Studio 2017 版本在我们的实现中已经就“yield”关键字在VS2017中被“co_yield”替换进行了更深入的讨论。这种有意义的C++ 标准模块让我非常兴奋, 因此在这篇博客中我很高兴和您分享一些在具体实践中是如何通过libuv 库来使用它。你可以用微软编译器或者其他支持可重入函数的编译器来使用这些代码,在进入这些代码之前,让我们首先认识下问题空间以及为什么你应该考虑它。

问题空间

等待磁盘的响应或者从网络上下载数据这本身就比较漫长,而且我们通常被告知写入块崩溃了, 对吗? 对于客户端编程而言,进行I/O 操作或者阻塞UI线程都将会引起诸如像app瘫痪或者挂起的这样糟糕的用户体验。对于服务器端编程而言,新的请求在其他线程都被阻塞的情况下,通常仅仅只需要创建一个新的线程, 然而这样会引起一个线程的资源利用率较低的问题,这些资源却往往并不便宜。

 

然而, 写一个高效的真正意义上的异步代码的确仍然存在一个普遍的困难。不同的平台提供了不同的机制以及实现异步的API。 许多的API也将不再支持异步的功能。通常,解决方案是通知辅助线程,这将会调用阻塞API, 并且返回结果到主线程。 这同样也是有一定难度的而且需要使用异步机制去避免并发问题。有一些库提供了基于这些不同机制的抽象, 然而,许多例子都包含了Boost ASIO,以及C++ Rest SDK 以及libuv Boost ASIO Rest SDK C++ 库, 而libuv 是一个C 库。 在他们之间有一些重叠的地方, 但是他们都有他们各自的优势。

 

Libuv 是一个C库,它在Node.js 中提供了支持异步的I/O, 尽管Node.js 显式地为用户设计的这一功能, 但是它也可以独立使用并且提供了一个公共的跨平台API, 不需要顾及到多种特定平台异步的API。 另外,API 即使在Windows 上仍然是UTF8-only 格式, 这也是非常方便的。每一个会阻塞的API 可以接受一个指向回调函数的指针,当所有的请求操作完成后,这些回调函数将被调用。 一个事件循环启动等待多种请求完成以及调用特定的回调函数。对于我而言, 写libuv 代码是简单直观的,但是遵循程序的逻辑却并不是那么容易的。 使用C++ lambdas表达式来调用回调函数或多或少可以有点帮助, 但是在随着一系列回调请求, 传输数据需要许多的样板代码, 关于libuv 你可以在他的官方网站上获取更多信息。

 

最近对协同程序产生了浓厚兴趣,发现它已经开始支持多种语言了, 而且有一些协同建议已经被提交到了C++ 委员会。尽管目前还没有人来着手处理, 但是有一部分将在某种程度上被协同支持。关于C++ 标准化中就协同建议而言, 有一条就是可重入函数,尽管仍然存在一些新的改变,但是已经被写入了当前版本N4402 中。 他建议用一种新的语言句法来支持不涉及堆栈的协同, 不需要定义实际上的实现 ,但是可以替代一些关于如何绑定到库具体实现句法的详细说明。 这样可以增加灵活性并且可以支持不同的运行时机制。

 

Libuv 支持可重入函数

当我开始调查这个的之前从未使用过libuv, 因此一开始我仅仅写了一些简单的libuv 调用, 并且开始思考我希望如何去写这些代码。 随着可重入函数的引入, 你可以写一些看起来非常连续,却是异步执行的代码。 无论在什么时候你在一个可重入函数中看到co_await 这个关键字, 如果await 表达式的结果无效的, 这个函数将被“return”。

在创建这个库的时候我有以下几个目标。

  1. 非常好的性能
  2. 没有冗余的C++
  3. 被现有用户熟识的模块
  4. 支持简单libuv 和可重入函数的混合

我在这里展示的所有代码和实际库代码以及下面的例子都在github 是有效的, 可以使用Visual Studio 2015 来编译, 或者在Clang LLVM 分支上,这刚好实现了那些建议, 你也可以用CMake libuv 来替代。 我使用的版本是在基于Linux libuv 1.8 和基于Windows 1.10.1 版本。 如果你想使用Clang/LLVM , 按照这些标准指令来构建它。

 

我尝试几种不同的方法把libuv 绑定到可重入函数上,在我的库里我考虑到了这两个方面。 首先(例1)使用和std::promise std::future 类似的函数: awaituv::promise_t and awaituv::future_t, 这两个函数指向一个共享状态对象,这个共享状态对象携带从libuv 调用得到的返回值。 我将这个“ return value”放在引用中是因为通过在libuv 中的一次回调异步提供这个“return value. 这种机制要求一个堆分配来保存这个共享状态。 第二个机制是让开发者将这个共享状态放在调用函数的堆栈上, 这也将免去了一个独立的堆分配并且可以关联shared_ptr 机器上, 尽管它没有第一种机制那么明晰,但是在性能方面还是有意义的。

 

例如:

让我们看一下这个简单例子: 1000次异步输出“hello world”

 

future_t<void> start_hello_world()
{
  for (int i = 0; i < 1000; ++i)
  {
    string_buf_t buf("\nhello world\n");
    fs_t req;
    (void) co_await fs_write(uv_default_loop(), &req, 1 /*stdout*/, &buf, 1, -1);
  }
}

调用co_await的函数需要返回一个awaitable类型,因此这个返回了一个future_t<void>类型, 对一个可重入函数编译器生成代码而言,这种方法将是有效的。这个函数将循环执行1000次并且异步输出“hello world”。 在awaituv 命名空间的函数“fs_write”是libuv uv_fs_write 函数的一个封装,同样它也将返回类型是future_t<int>, 这也是一个awaitable类型。在这种情况下我忽略了实际值但是仍然等待它的完成。 如果await 表达式的结果没有立即生效,那么函数start_hello_world 返回并保存指向重入函数的指针, 也就是说写操作完成,函数被回调。string_buf_t uv_buf_t 类型的一个封装, 尽管uv_buf_t 类型也可以被直接使用。 同样fs_t uv_fs_t类型的一个封装,并且fs_t类型还有一个允许调用uv_fs_cleanup的一个析构函数。虽然这个不是必须要的,但是却可以提升代码质量。

 

注意: 与std::future 不同,future_t 没有提供get 方法, 因为这种方法总是被阻塞。就libuv 而言, 除非事件循环启动,否则没有一个回调可以被正常执行,程序往往因此而挂起。关于这个的后续工作,敬请期待。

 

 

现在让我们看一个有点复杂的例子,读取一个文件并且转储到标准输出。

future_t<void> start_dump_file(const std::string& str)
{
  // We can use the same request object for all file operations as they don't overlap.
  static_buf_t<1024> buffer;

  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(), &openreq, str.c_str(), O_RDONLY, 0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(), &readreq, file, &buffer, 1, -1);
      if (result <= 0)
        break;
      buffer.len = result;
      fs_t req;
      (void) co_await fs_write(uv_default_loop(), &req, 1 /*stdout*/, &buffer, 1, -1);
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(), &closereq, file);
  }
}

这个函数浅显易懂,类似于异步写入。static_buf_t uv_buf_t类型的 另一种C++ 封装,并且提供了一个固定大小缓存区。 这个函数打开一个文件, 读取到缓存区, 写入到标准输出,反复执行直到数据读取完成, 最后关闭文件。 在这种情况下,当我们打开文件、读取数据的时候,你可以看到我们使用的await 表达式的结果。

接下来,让我们来看一个函数, 可以在自动定时改变标准输出中字体的颜色。

bool run_timer = true;
uv_timer_t color_timer;
future_t<void> start_color_changer()
{
  static string_buf_t normal = "\033[40;37m";
  static string_buf_t red = "\033[41;37m";

  uv_timer_init(uv_default_loop(), &color_timer);

  uv_write_t writereq;
  uv_tty_t tty;
  uv_tty_init(uv_default_loop(), &tty, 1, 0);
  uv_tty_set_mode(&tty, UV_TTY_MODE_NORMAL);

  int cnt = 0;
  unref(&color_timer);

  auto timer = timer_start(&color_timer, 1, 1);

  while (run_timer)
  {
    (void) co_await timer.next_future();

    if (++cnt % 2 == 0)
      (void) co_await write(&writereq, reinterpret_cast<uv_stream_t*>(&tty), &normal, 1);
    else
      (void) co_await write(&writereq, reinterpret_cast<uv_stream_t*>(&tty), &red, 1);
  }

  //reset back to normal
  (void) co_await write(&writereq, reinterpret_cast<uv_stream_t*>(&tty), &normal, 1);

  uv_tty_reset_mode();
  co_await close(&tty);
  co_await close(&color_timer); // close handle
}

这个函数的绝大部分都是简单明了的libuv 代码,它可以支持ANSI 转义序列去设置颜色。 在这个函数中体现出的一个新概念是一个定时器可以循环使用,而不是一次简单的执行。 timer_start 函数(包装于 uv_timer_start) 返回了一个promise_t 类型而不是future_t. 为了获取一个允许等待的对象,在需要定时器的地方调用next_future。 这可以重置内部状态,比如说它可以再等待一次。color_timer是全局变量,所以stop_color_changer 函数可以停止定时器。

 

最后,这里有一个函数,它可以打开一个socket,并且发送一个http 请求到google.com

future_t<void> start_http_google()
{
  uv_tcp_t socket;
  if (uv_tcp_init(uv_default_loop(), &socket) == 0)
  {
    // Use HTTP/1.0 rather than 1.1 so that socket is closed by server when done sending data.
    // Makes it easier than figuring it out on our end...
    const char* httpget =
      "GET / HTTP/1.0\r\n"
      "Host: www.google.com\r\n"
      "Cache-Control: max-age=0\r\n"
      "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n"
      "\r\n";
    const char* host = "www.google.com";

    uv_getaddrinfo_t req;
    addrinfo_state addrstate;
    if (co_await getaddrinfo(addrstate, uv_default_loop(), &req, host, "http", nullptr) == 0)
    {
      uv_connect_t connectreq;
      awaitable_state<int> connectstate;
      if (co_await tcp_connect(connectstate, &connectreq, &socket, addrstate._addrinfo->ai_addr) == 0)
      {
        string_buf_t buffer{ httpget };
        ::uv_write_t writereq;
        awaitable_state<int> writestate;
        if (co_await write(writestate, &writereq, connectreq.handle, &buffer, 1) == 0)
        {
          read_request_t reader;
          if (read_start(connectreq.handle, &reader) == 0)
          {
            while (1)
            {
              auto state = co_await reader.read_next();
              if (state->_nread <= 0)
                break;
              uv_buf_t buf = uv_buf_init(state->_buf.base, state->_nread);
              fs_t writereq;
              awaitable_state<int> writestate;
              (void) co_await fs_write(writestate, uv_default_loop(), &writereq, 1 /*stdout*/, &buf, 1, -1);
            }
          }
        }
      }
    }
    awaitable_state<void> closestate;
    co_await close(closestate, &socket);
  }
}

另外, 在这个例子中提出了两个新概念。第一,我们不需要完全等待它的携带内容。

Getaddrinfo 函数返回了一个future_t<addrinfo_state>它包含了两块信息。

future_t<addrinfo_state>结果包含了一个整型,用来表明结果是成功的还是失败。但是也有一个指向携带信息的指针,在tcp_connect 调用的时候会被使用。第二, 在socket 上读取数据将会在数据到达的时候,潜在地引发多次回调。 这就需要一个不同的机制而不仅仅是单纯地等待读取。 就这点而言, 有read_request_t 类型。在socket 上随着数据到达, 如果有一个非常明显的等待,它将会直接绕过数据。 否则,在下一次等待启动之前它将一直处于等待状态。

 

最后,让我们看一个结合使用这些函数的例子

int main(int argc, char* argv[])
{
  // Process command line
  if (argc == 1)
  {
    printf("testuv [--sequential] <file1> <file2> ...");
    return -1;
  }

  bool fRunSequentially = false;
  vector<string> files;
  for (int i = 1; i < argc; ++i)
  {
    string str = argv[i];
    if (str == "--sequential")
      fRunSequentially = true;
    else
      files.push_back(str);
  }

  // start async color changer
  start_color_changer();

  start_hello_world();
  if (fRunSequentially)
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

  for (auto& file : files)
  {
    start_dump_file(file.c_str());
    if (fRunSequentially)
      uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  }

  start_http_google();
  if (fRunSequentially)
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

  if (!fRunSequentially)
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

  // stop the color changer and let it get cleaned up
  stop_color_changer();
  uv_run(uv_default_loop(), UV_RUN_DEFAULT);

  uv_loop_close(uv_default_loop());

  return 0;
}

这个函数包含两种模式: 并发模式和顺序模式。在顺序模式下,在一个任务启动之后,我们开始运行libuv 事件循环, 在下一个任务启动之前确保该事件执行完成。 在并发模式下,所有的任务(可重入函数)启动,然后等待所有重入函数执行完成。

实现:

这个库当下仅仅还只是头文件,让我们来看其中一个封装函数。

auto fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode)
{
  promise_t<uv_file> awaitable;
  auto state = awaitable._state->lock();
  req->data = state;

  auto ret = uv_fs_open(loop, req, path, flags, mode,
    [](uv_fs_t* req) -> void
  {
    auto state = static_cast<promise_t<uv_file>::state_type*>(req->data);
    state->set_value(req->result);
    state->unlock();
  });

  if (ret != 0)
  {
    state->set_value(ret);
    state->unlock();
  }
  return awaitable.get_future();
}

这个函数封装了uv_fs_open 函数,并且基本可以被识别。 它没有启动回调,只是返回了future<int> 而不是int 。 就自身而言promise_t<int> 保存了一个用来统计状态对象的引用,而且它包含了一个int 还有其他的附加信息。Libuv提供了一个数据成员去保存这些特定的数据信息,这对我们而言是一个指向状态对象的一个裸指针。真正的转到uv_fs_open 函数的回调是一个lambda 表达式,它会强制转化数据为状态对象并且调用他的set_value 方法。 如果uv_fs_open 返回失败(这也将意味着不再引入回调),我们直接设置promise 的值, 最后。我们返回一个失败,当然这个失败也将有一个指向状态的引用计数指针。 返回失败将进一步说明对于co_await 而言需要有一些必要的方法和它一起工作。

 

当前我们已经封装了下面的libuv 函数:

  • uv_ref/uv_unref
  • uv_fs_open
  • uv_fs_close
  • uv_fs_read
  • uv_fs_write
  • uv_write
  • uv_close
  • uv_timer_start
  • uv_tcp_connect
  • uv_getaddrinfo
  • uv_read_start

当下的库还是远远不够的,封装其他的libuv 函数仍然需要进一步完成。 目前我还没有探索到撤销操作或者错误信息的传送。 我坚信一定有一个更好的方法来处理这种多次回调uv_read_start uv_timer_start 函数, 但是我还没有发现任何让我可以很愉快完成的东西。 或许就循环而言仍然需要基于回调来实现。

 

总结

就我而言, 协同程序提供了一个较为简单的方法来使用libuv 实现异步编程的模式,你可以下载库和在Github可重现的样例。 请让我了解到你对这种方法的想法以及它给你带来的方便。


Comments (0)

Skip to main content