异步代理: 使用异步代理库进行基于角色的编程


 随着多核处理器在市场上的日益普及,它已广泛用于服务器、台式机以及便携式计算机,代码并行化的重要性也前所未有地凸显出来。 为了满足这一关键需求,Visual Studio 2010 引入了若干新的方法,帮助 C++ 开发人员利用新的并行运行时和新的并行编程模型带来的这些功能。 然而,开发人员面临的一个主要障碍是确定哪种编程模型适合于他们的应用程序。 正确的模型可以充分利用底层并行性,不过也需要重新考虑程序结构和实际的执行方式。

  目前,最常见的并行编程模型涉及到通用的并发感知容器以及并行循环迭代等算法。 虽然这些传统技术功能强大,可扩展应用程序来配合多核计算机使用,但它们并未解决影响并行性能的其他主要因素之一,那就是不断加深的延迟影响。 由于并行技术加快计算速度并将计算分布在多个内核之间,因此,Amdahl 定律 (wikipedia.org/wiki/Amdahl's_law) 告诉我们性能改进受到执行速度最慢的那一部分制约。 在许多情况下,等待来自 I/O(例如磁盘或网络)的数据所花的时间比例越来越大。

  基于角色的编程模型能够很好地处理延迟等问题,这些模型最初是在二十世纪七十年代初引入的,目的是利用具有成百上千个独立处理器的高度并行计算机资源。 角色模型背后的基本概念是将应用程序的各个组件视为单独的角色,这些角色可以通过发送、接收和处理消息与外界交互。

  最近,随着大量多核处理器的运用,角色模型已作为一种减少延迟、实现高效并行执行的有效方法重新露面。 Visual Studio 2010 引入了异步代理库 (AAL),这是一个令人激动的基于角色的新模型,它具有消息传递接口,在该模型中代理就是角色。 AAL 使开发人员可以通过更加以数据流为中心的方式设计自己的应用程序。 这样的设计通常有利于在等待数据时有效使用延迟。

  在本文中,我们将概述 AAL 并介绍如何在应用程序中使用它。
并发运行时

  Visual Studio 2010 和 AAL 中并发支持的基础是新的并发运行时,该运行时作为 Visual Studio 2010 中 C 运行时 (CRT) 的一部分提供。 并发运行时提供协调任务计划程序和资源管理器,后者对计算机的底层资源有深入了解。 这就允许运行时以负载平衡的方式在整个多核计算机中执行任务。

  图 1 简要地展示了 Visual Studio 2010 中对本机代码并发的支持。 计划程序是确定何时何地执行任务的主要组件。 它借助资源管理器收集的信息来充分地利用执行资源。 尽管应用程序和库也可以直接与运行时交互,但它们本身主要还是通过两个位于计划程序之上的编程模型(即 AAL 和并行模式库 (PPL))与并发运行时交互。

  图 1 并发运行时

  PPL 提供更为传统的并行技术(例如 parallel_for 和 parallel_for_each constructs)、可识别运行时的锁和并发数据结构(例如队列和向量)。 虽然 PPL 不是本文介绍的重点,但它也是一种功能强大的工具,开发人员可以将其与 AAL 中引入的所有新方法配合使用。 有关 PPL 的详细信息,请参阅 2009 年 2 月刊载的《使用 C++ 的 Windows》专栏 (msdn.microsoft.com/magazine/dd434652)。

  相比之下,AAL 能够在更高级别以不同于传统技术的角度来并行化应用程序。 开发人员需要从待处理数据的角度思考应用程序,并思考如何将数据处理分隔到可并行执行的组件或阶段中。

  AAL 提供两个主要组件:消息传递框架和异步代理。

  消息传递框架包括一组消息块,用于接收、处理和传播消息。 通过将消息块串连起来,可创建能够同时执行的工作管道。

  异步代理是通过接收消息、在自己维护的状态下执行本地工作和发送消息,以此与外界交互的角色。

  这两个组件结合在一起,使开发人员能够在数据流而不是控制流方面利用并行性,并通过更高效地使用并行资源来改善对延迟的容忍度。
消息传递框架

  AAL 的第一个重要组件是消息传递框架,该框架是协助开发数据流网络以便将工作管道化的一组构造。 将工作管道化是数据流模型的基本部分,因为它允许将工作分解为多个独立的阶段,只要数据就绪便可对流数据进行并行处理。 当一个阶段的数据处理结束时,该阶段可将数据传递到下一阶段,同时第一个阶段寻找要处理的新数据。

  我们以设置传出消息格式并审查消息中是否存在不当内容的电子邮件应用程序为例。 这种类型操作的代码显示如下:

          std::foreach(reader.begin(); reader.end();
  [](const string& word) {
    auto w1 = censor(word);
    auto w2 = format(w1);
    writer.write_word(w2);
  });
       

  对于电子邮件中的每个词,该应用程序都需要检查它是否存在于审查词的字典中,如果存在则予以替换。 然后,代码根据一组指导原则设置每个词的格式。

  这种方案中存在大量固有的并行性。 但是,传统并行技术还不能满足要求。 例如,一种简单的方法是对文本中的字符串使用 parallel_for_each 算法,审查这些字符串并设置格式。

  这种解决方案的第一个主要阻碍是必须读取整个文件,以便迭代器能够正确地划分工作。
强制读取整个文件会导致进程受到 I/O 的限制,并且会降低并行效率。 当然,您可以使用智能迭代器将词的处理与读取输入的操作重叠进行。

  传统并行方法的第二个主要问题是排序。 显然,对于电子邮件来说,对文本的并行处理必须保持文本顺序,否则会完全无法理解邮件的含义。 为了保持文本顺序,parallel_for_each 技术会产生同步和缓冲方面的大量开销,而这一过程可由 AAL 自动处理。

  通过采用管道技术处理邮件,您可以避免上述两个问题,同时还能利用并行能力。 请看一下图 2,其中创建了一个简单管道。 在此示例中,应用程序的主要任务(审查和设置格式)被分为两个阶段。 第一个阶段接收字符串并在审查词的字典中查找该字符串。 如果找到匹配项,审查块会使用字典中的另一个词替换该字符串。 否则,它会输出已输入的同一封邮件。 同样,在第二个阶段中,格式设置块接收每个词并将其恰当地设置为特定样式。

  图 2 电子邮件处理管道

  此示例可在以下几个方面从数据流方法获益。 首先,由于它不需要在处理前读取整封邮件,邮件中的字符串可以通过审查和设置格式阶段立即开始流处理。 其次,管道处理允许一个字符串由设置格式块进行处理,同时下一个字符串由审查块进行处理。 最后,由于字符串的处理顺序是它们在原文中出现的顺序,因此不需要执行额外的同步。
消息块

  消息块接收、处理、存储和传播消息。 消息块有三种形式:源、目标和传播器。 源只能传播消息,而目标能够接收、存储和处理消息。 大多数块都是传播器,既是源又是目标。 换句话说,它们能够接收、存储和处理消息,也可以转而将这些消息发送出去。

  AAL 包含一组消息块基元,能够满足开发人员的大部分使用需求。 图 3 简要概述了 AAL 中包括的所有消息块。 不过该模型仍然是开放式的,因此,如果您的应用程序需要具有特定行为的消息块,可以自己编写可与所有预定义块交互的自定义块。 每个块都有各自处理、存储和传播消息的独有特征。

  图 3 AAL 消息块

消息块 用途
unbounded_buffer<Type> 存储不限数量的消息并将其传播到目标。
overwrite_buffer<Type> 存储一条消息,每次有新消息传播进来时都会覆盖该消息,然后将其广播到目标。
single_assignment<Type> 存储一条一次写入的消息,然后将其广播到目标。
transformer<Input,Output> 接收一条类型为 Input 的消息,然后运行用户提供的函数将其转换为类型为 Output 的消息。

将这条转换后的消息传播到目标。

call<Type> 接收一条消息,然后使用该消息的负载作为参数来运行用户提供的函数。

这种块是纯粹的消息目标。

timer<Type> 在用户定义的时间量之后将消息传播到目标。

可以是重复或非重复的块。

这种块是纯粹的消息源。

choice<Type1,Type2,...> 接收来自多种类型的多个源的消息,但只接受来自传播到所选类型的第一个块的消息。
join<Type> 接收来自多个源的消息,将它们组合起来输出单条消息。

异步等待从各个源输入的消息准备就绪。

multitype_join<Type1,Type2,...> 接收来自多种类型的多个源的消息,将它们组合起来。

异步等待从各个源输入的消息准备就绪。

  AAL 提供的消息块基元的一个主要优势是它们的可组合性。 因此,您可以根据所需行为进行组合。 例如,您可以轻松创建将多个输入添加到一起的块,方法是将转换器块附加到联接块的末尾。 当联接块成功检索到来自它的各个源的消息时,可将消息传递给转换器,而转换器将汇总消息负载。

  您也可以将重复的计时器块连接为联接块的源。 这会形成一个限制消息的块,只在计时器块触发其消息时允许消息通过。 图 4 中说明了这两种可组合块。

  图 4 组合来自基元的加法器块与消息限制块
创建消息传递管道

  现在,我们来看看创建上文所示的消息块管道的代码。 我们可以用两个转换器消息块替换此管道,如图 5 所示。 转换器块的用途是接收特定类型的消息并对消息执行用户定义的函数,这一操作可修改消息负载甚至彻底更改消息类型。 例如,审查块将包含字符串的消息作为输入接收,然后需要对其进行处理。

  图 5 消息块管道

  图 6 中显示了创建和连接消息块的代码。 此代码从实例化两个转换器消息块开始。 审查块构造函数中的 C++0x lambda 参数定义转换函数,该转换函数在字典内查找消息的存储输入字符串,看看是否应更改为其他字符串。 系统返回结果字符串,然后在审查块内将其封装成单条消息并从该块传播出去。 除非转换器块的输出是格式设置函数更改过的字符串,否则对于格式设置转换器块会采用类似途径。

  图 6 简单消息管道

          dictionary dict;

transformer<string, string>
  censor([&dict](const string& s) -> string {

  string result = s;
  auto iter = dict.find(s);

  if (iter != dict.end()) {
    result =  iter->second;
  }

  return result;
});

transformer<string, string>
  format([](const string& s) -> string {

  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }

  return result;
});

censor.link_target(&format);

asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);
       

  两个块实例化以后,下一行代码通过对审查块调用 link_target 方法,将两个块链接到一起。 每个源块和传播器块都有 link_target 方法,用于确定源应该将它的消息传播到哪些消息块。

  审查块和格式设置块链接到一起后,转换函数会处理传播到审查块的任何消息,生成的消息将隐式传递到格式设置块进行处理。 如果消息块是没有连接目标的源或传播器,它可以按特定于块的方式存储消息,直到链接了目标或消息被检索。

  示例代码的最后三行显示将消息初始化到块中以及从块中检索消息的过程。 AAL 中有两种消息初始化 API:send 和 asend。 它们分别将消息同步或异步输入块中。

  主要区别是,当 send 调用返回时,保证已将消息推送到块,并且已通过块将消息发送到所需目标。 asend 调用可以立即返回,并且允许并发运行时计划传播。 同样地,AAL 中有两种消息检索 API:receive 和 try_receive。 receive 方法在消息到达前始终处于阻止状态,而 try_receive 则会在无法检索消息时立即返回。

  如图 6 所示,字符串“foo”会异步发送到审查块。 审查块将接收该消息,检查其字符串是否存在于审查词的字典中,然后将结果字符串传播到消息中。 接着,结果字符串被传递到格式设置块,后者接收该字符串,将每个字母变成大写,然后由于没有目标而保留到消息中。 当调用 receive 时,将从格式设置块中获取该消息。 这样,假定字典中没有“foo”,此示例的输出将是“FOO”。虽然此示例只是通过网络推送单个字符串,但可以看到输入字符串流是如何形成执行管道的。

  请看一下此消息示例,注意消息本身明显缺少引用。 消息只是一个信封,其中封装要在数据流网络中传递的数据。 消息传递本身是通过提供和接受过程来处理的。 当消息块收到消息时,能够以任何想要的方式存储该消息。 如果稍后要将消息发送出去,它会将该消息提供给每个连接的目标。 若要真正将消息送出,接收方必须接受提供的消息,以完成该事务。 消息在块间传递的整个过程是由并发运行时计划和执行的任务来计划和处理的。
消息块传播

  现在,您已了解消息块是如何创建和关联在一起的,以及如何将消息初始化到每个块中并从中检索消息。接下来让我们简单了解一下消息如何在块间传递,以及并发运行时如何成为 AAL 的核心。

  使用消息块或 AAL 不一定需要了解此信息,但它有助于加深对消息传递协议工作方式及其使用方式的理解。 在本节的其余部分,我将介绍传播器块,因为它们既是源又是目标。 显然,纯粹的源块或纯粹的目标块只是传播器块实现的子集。

  在内部,每个传播器块都有一个消息输入队列和另一个特定于块的消息存储容器。 链接到此传播器块的其他块会发送存储在输入队列中的消息。

  例如,在图 7 中,审查转换器块有一个输入队列,该队列当前存储包含字符串 str6 的消息。 实际转换器本身包含两个消息:str4 和 str5。 因为这是转换器,所以它的特定于块的存储是另一队列。 不同的块类型可以有不同的存储容器。 例如,overwrite_buffer 块只存储始终会被覆盖的单条消息。

  图 7 消息传递协议

  从某个链接的源(或 send/asend API)向块提供消息时,此块首先会检查筛选器函数,以决定是否接受消息。 如果决定接受消息,则将消息放入输入队列。 筛选器是一个可选函数,可传递到返回布尔值的每个目标的构造函数或传播器块中,该布尔值决定是否应接受某个源提供的消息。 如果消息被拒绝,该源会继续向下一个目标提供消息。

  一旦消息放入输入队列,它的源块就不再保留此消息。 不过,接受块尚未准备好传播消息。 因此在等待处理时,消息可以缓冲到输入队列中。

  当消息到达某个消息块的输入队列时,并发运行时计划程序会计划一个轻型任务 (LWT)。 此 LWT 有双重目的。 首先,它必须将消息从输入队列移到块的内部存储中(我们称之为消息处理)。 其次,它还必须尝试将消息传播到任意目标(我们称之为消息传播)。

  例如,在图 7 中,输入队列中存在提示系统计划 LWT 的消息。 接下来 LWT 会处理消息,方法是先对消息执行用户提供的转换器函数,在审查字符串字典中检查该消息,然后将它移到块的存储缓冲区。

  将消息转移到存储缓冲区之后,LWT 开始执行传播步骤,将消息发送到目标设置格式块。 在这种情况下,由于消息 str4 位于转换器的前端,它会先传播到格式设置块,然后再传播下一条消息 str5。 同样的整个过程会在格式设置块中发生。

  根据消息块的类型,消息处理方式会有所不同。 例如,unbounded_buffer 只有将消息移到存储缓冲区的简单处理步骤。 转换器处理消息的方式是先对消息调用用户定义的函数,然后再将其移到存储缓冲区。 其他块的处理方式甚至更复杂,例如联接,它必须组合来自不同源的多条消息,然后将它们存储到缓冲区中以备传播。

  就性能效率而言,AAL 在创建 LWT 方面是智能化的,因此每次只会为每个消息块计划一个 LWT。 如果处理 LWT 处于活动状态时有更多消息到达输入队列,LWT 会继续选取并处理这些消息。 因此,如图 7 所示,如果消息 str7 进入输入队列时转换器的 LWT 仍在处理,它将选取并处理此消息,而不是启动新的处理和传播任务。

  每个消息块都有各自用于控制处理和传播的 LWT,这是此设计的核心,它允许消息传递框架按数据流的方式将工作管道化。 因为每个消息块在自己的 LWT 中处理和传播消息,所以 AAL 可以将块彼此分离,并允许跨多个块执行并行工作。 每个 LWT 必须只将自己的消息传播到目标块的输入队列,而每个目标仅计划一个 LWT 来处理自己的输入。 使用单个 LWT 处理和传播消息可确保为消息块保持消息次序。
异步代理

  AAL 的第二个主要组件是异步代理。 异步代理是粗粒度应用程序组件,专门用于异步处理较大型的计算任务和 I/O。 代理应该可以与其他代理通信,并启动较低级别的并行处理。 这些代理是隔离的,因为它们对于外界的理解完全包含在类中,它们可以通过消息传递与其他应用程序组件通信。 代理本身被计划为并发运行时内部的任务。 这允许它们配合同时执行的其他工作来阻止和运行。

  异步代理有固定生命周期,如图 8 所示。 可以监视和等待此生命周期。 绿色状态表示运行状态,而红色状态表示终止状态。 开发人员可通过从代理基类派生的方式创建自己的代理。

  图 8 异步代理生命周期

  三种基类函数(start、cancel 和 done)可转换代理的不同状态。 一旦完成构造,代理即处于已创建状态。 启动代理和启动线程类似。 除非对代理调用 start 方法,否则代理不会执行任何操作。 此时,代理将根据计划执行,并进入可运行状态。

  当并发运行时选取此代理时,它会进入已启动状态并继续运行,直到用户调用 done 方法,该方法指示它的工作已经完成。 已计划但尚未启动代理时,调用 cancel 会将代理转换成已取消状态,代理将不再执行。

  让我们回顾一下电子邮件筛选示例。在此示例中,管道式消息块将数据流引入应用程序,并提高自己并行处理词语的能力。 但是,此示例没有显示如何控制处理电子邮件本身的 I/O,以及如何将它们分解成字符串流,以便管道进行处理。 此外,一旦字符串通过管道,必须进行收集,以便以新的已审查和已设置格式的状态重新编写文本。 这就是代理可以发挥作用的地方,目的是帮助容忍 I/O 延迟差异。

  例如,请看一下电子邮件管道的末尾。 此时,字符串正由格式设置块输出,并需要写入邮箱的文件中。 图 9 显示输出代理如何捕获字符串和创建输出电子邮件。 WriterAgent 的 run 函数接收来自循环中的格式设置块的消息。

  图 9 代理捕获格式设置块的输出

  此应用程序中的大部分处理工作是使用数据流完成的,而 WriterAgent 则显示了如何在程序中引入某些控制流。 例如,当文件结尾消息到达时,根据接收的输入字符串,WriterAgent 必须有不同的行为;它必须知道停止操作。 图 10 中显示了 WriterAgent 的代码。

  图 10 WriterAgent

          class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }

  ~WriterAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ...
          );

    string s;
    string eof("EOF");

    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }

    fclose(stream);
    done();
  }

private:

  ISource<string> * m_source;
};
       

  此代码有几个值得关注的地方。 首先是析构函数中对静态函数 agent::wait 的调用。 可以使用指向任何代理的指针调用此函数,并且此函数会一直处于阻止状态,直到代理进入一种终止状态(done 或 canceled)。 虽然并不是所有代理都需要在析构函数中调用 wait,但多数情况下应让它完成,这样可确保析构时代理不再执行任何代码。

  其次,此代码的有趣部分是 run 方法本身。 此方法定义代理的主执行过程。 在此代码中,代理正在处理从源(在本例中是格式设置块)读取的字符串的写出操作。

  最后,请注意 run 方法的最后一行,此行是对代理函数 done 的调用。 对 done 方法的调用可将代理从运行状态转变成完成状态。 在大多数情况下,需在 run 方法末尾调用此方法。 不过,在某些情况下,应用程序可能希望使用代理来设置状态。例如在数据流网络中,该网络在 run 方法生存期后仍应保持活动状态。
将所有内容整合在一起

  现在,我们已经创建了消息传递管道对字符串进行筛选和设置格式,创建了输出代理对字符串进行处理,我们可以将具有相似行为的输入代理附加到输出代理。 图 11 举例说明了此应用程序如何组合到一起。

  图 11 用于处理电子邮件的代理

  代理处理的一个优势是能够在应用程序中使用异步角色。 这样,当数据到达并等待处理时,输入代理将异步开始通过管道发送字符串,输出代理同样可以读取和输出文件。 这些角色可以完全独立地开始和停止处理,并且完全由数据驱动。 此类行为在许多情况下非常有用,特别是在延迟驱动和异步 I/O 情况下,例如电子邮件处理示例。

  在此示例中,我添加了另一个代理 ReaderAgent,它与 WriterAgent 工作方式类似,不同的是,它处理 I/O 以读取电子邮件并向网络发送字符串。 图 12 中显示了 ReaderAgent 的代码。

  图 12 ReaderAgent

          class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }

  ~ReaderAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;      
    fopen_s( &stream, ...);

    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }

    fclose( stream );

    asend(m_target, string("eof"));
    done();
  }

private:

  ITarget<string> * m_target;
};
       

  现在,我们已经有 ReaderAgent 和 WriterAgent 对程序 I/O 进行异步处理,只需将它们链接至网络中的转换器块,便可开始处理。 将两个块链接在一起之后就可轻松完成此任务:

          censor.link_target(&format);

ReaderAgent r(&censor);
r.start();

WriterAgent w(&format);
w.start();
       

  ReaderAgent 是通过对审查的引用创建的,因此可以正确地将消息发送到该代理,而 WriterAgent 是通过对格式设置的引用创建的,因此可以检索消息。 每个代理都使用启动 API 进行启动,该 API 安排代理在并发运行时中执行。 每个代理都在自己的析构函数中调用 agent::wait(this),因此要等到两个代理都到达完成状态才会开始执行。
同步

  本文旨在让读者初步了解内置于 Visual Studio 2010 中的基于角色的编程和数据流管道的一些新功能。 我们鼓励您试用一下。

  如果深入探索的话,可以了解到许多本文无法一一详述的其他功能:自定义消息块创建和筛选消息等等。 MSDN 上的并行计算开发中心 (msdn.microsoft.com/concurrency) 包含更多有关这一令人兴奋的新编程模型如何帮助您以全新方式并行化程序的详细信息和使用步骤。


« 
» 
快速导航

Copyright © 2016 phpStudy | 豫ICP备2021030365号-3