<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"
	xmlns:content="http://purl.org/rss/1.0/modules/content/"
	xmlns:wfw="http://wellformedweb.org/CommentAPI/"
	xmlns:dc="http://purl.org/dc/elements/1.1/"
	xmlns:atom="http://www.w3.org/2005/Atom"
	xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"
	xmlns:slash="http://purl.org/rss/1.0/modules/slash/"
	>

<channel>
	<title>有道技术沙龙博客 &#187; java</title>
	<atom:link href="https://techblog.youdao.com/?feed=rss2&#038;tag=java" rel="self" type="application/rss+xml" />
	<link>https://techblog.youdao.com</link>
	<description>分享有道人的技术思考</description>
	<lastBuildDate>Thu, 05 Mar 2026 10:02:57 +0000</lastBuildDate>
	<language>zh-CN</language>
		<sy:updatePeriod>hourly</sy:updatePeriod>
		<sy:updateFrequency>1</sy:updateFrequency>
	<generator>https://wordpress.org/?v=3.9.40</generator>
	<item>
		<title>JAVA语言异步非阻塞设计模式（原理篇）</title>
		<link>https://techblog.youdao.com/?p=2307</link>
		<comments>https://techblog.youdao.com/?p=2307#comments</comments>
		<pubDate>Wed, 14 Jul 2021 08:17:49 +0000</pubDate>
		<dc:creator><![CDATA[youdao]]></dc:creator>
				<category><![CDATA[未分类]]></category>
		<category><![CDATA[java]]></category>
		<category><![CDATA[有道]]></category>

		<guid isPermaLink="false">http://techblog.youdao.com/?p=2307</guid>
		<description><![CDATA[本系列文章共2篇，对 Java 语言的异步非阻塞模式进行科普。《原理篇》讲解异步非阻塞模型的原理，以及核心设计 [&#8230;]]]></description>
				<content:encoded><![CDATA[<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/封面3.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/封面3.png" alt="封面3" width="700" height="298" class="aligncenter size-full wp-image-2311" /></a></p>
<p>本系列文章共2篇，对 Java 语言的异步非阻塞模式进行科普。《原理篇》讲解异步非阻塞模型的原理，以及核心设计模式“Promise”的基本特性。《应用篇》会展示更加丰富的应用场景，介绍 Promise 的变体，如异常处理、调度策略等，并将 Promise 和现有工具进行对比。</p>
<p>限于个人水平和篇幅，本系列以科普为主，内容更偏重于原理、API 设计、应用实践，但是不会深入讲解并发优化的具体细节。</p>
<p><span id="more-2307"></span></p>
<p>1&#46;概述</p>
<p>异步非阻塞[A]是一种高性能的线程模型，在 IO 密集型系统中得到广泛应用。在该模型下，系统发起耗时请求后不需要等待响应，期间可以执行其他操作；当收到响应后，系统收到通知并执行后续处理。由于消除了不必要的等待，这种模型能够充分利用 cpu、线程等资源，提高资源利用率。</p>
<p>然而，异步非阻塞模式在提升性能的同时，也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中，需要编写额外代码完成响应结果的传递。Promise 设计模式可以降低这种复杂性，封装数据传递、时序控制、线程安全等实现细节，从而提供简洁的 API 形式。</p>
<p>本文首先介绍异步非阻塞模式，从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后，提供一种简易的 Java 实 现，能够实现基本的功能需求，并做到线程安全。</p>
<p>在正式探索技术问题之前，我们先来看看什么是<strong>异步非阻塞模型</strong>。如图1-1所示，展示了两个小人通信的场景：</p>
<ol>
<li>两个小人代表互相通信的两个<strong>线程</strong>，如数据库的客户端和服务端；他们可以部署在不同的机器上。</li>
<li>小人之间互相投递苹果，代表要<strong>传递的消息</strong>。根据具体业务场景，这些消息可能会称为 request、response、packet、document、record 等。</li>
<li>小人之间需要<strong>建立信道</strong>，消息才得以传递。根据场景，信道称为 channel、connection 等。</li>
</ol>
<p>假设左侧小人发起请求，而右侧小人处理请求并发送响应：左侧小人先投出一个苹果request，被右侧小人接收到；右侧小人进行处理后，再投出苹果 response，被左侧小人接收到。我们考察左侧小人在等待响应期间的行为，根据他在等待 response 期间是否能处理其他工作，将其归纳为“同步阻塞”和“异步非阻塞”两种模式。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/1-1.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/1-1.png" alt="1-1" width="600" height="326" class="aligncenter size-full wp-image-2313" /></a></p>
<p><em>图1-1 两个小人通信</em></p>
<p>首先我们看看同步阻塞式通信的流程，如图1-2a所示。</p>
<ol>
<li><strong>投递</strong>。左侧小人投递 request，并等待接收 response。</li>
<li><strong>等待</strong>。在等待接收 response 期间，左侧小人休息。不论是否还有其他 request需要投递、是否还有其他工作需要处理，他都视若无睹，绝对不会因此打断休息。</li>
<li><strong>响应</strong>。在收到 response 后，小人从休息中唤醒并处理 response。</li>
</ol>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/1-2.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/1-2.png" alt="1-2" width="700" height="360" class="aligncenter size-full wp-image-2315" /></a></p>
<p><em>图1-2a 同步阻塞式通信</em></p>
<p>接下来我们看看异步非阻塞式通信的流程，如图1-2b所示。</p>
<blockquote>
<ol>
<li><strong>缓存</strong>。左侧小人投递 request，并等待接收 response。和同步阻塞模式不同，小人并不需要亲手接住苹果 response，而是在地上放置一个盘子称为“buffer”；如果小人暂时不在场，那么所收到的苹果可以先存在盘子里，稍后再处理。</li>
<li><strong>暂离</strong>。由于有盘子 buffer 的存在，小人投递 request 后就可以暂时离开，去处理其他工作，当然也可以去投递下一个 request；如果需要向不同的channel投递request，那么小人可以多摆放几个盘子，和 channel 一一对应。</li>
<li><strong>响应</strong>。小人离开后，一旦某个盘子收到了 response，一只“大喇叭”就会响起，发出“channelRead”通知，呼唤小人回来处理 response。如果要处理多个response 或多个 channel，那么 channelRead 通知还需要携带参数，以说明从哪个 channel 上收到了哪个 response。</li>
</ol>
</blockquote>
<p>这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说，NIO 是指不停地轮询每个盘子，一旦看到苹果就发出通知；AIO 是指在收到苹果时直接触发通知，而没有轮询的过程。当然，本系列文章的读者并不需要了解更多实现细节，只需知道异步非阻塞模式依赖于“大喇叭”来实现，它替代小人等待接收 response，从而解放小人去处理其他工作。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/1-2b.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/1-2b.png" alt="1-2b" width="700" height="507" class="aligncenter size-full wp-image-2345" /></a></p>
<p><em>图1-2b 异步非阻塞式通信</em></p>
<p>根据上面的分析，同步模式具有下列严重<strong>缺点</strong>：</p>
<ol>
<li><strong>同步阻塞模式的工作效率十分低下</strong>。小人大部分时间都在休息，仅当投递请求、处理响应时，才偶尔醒来工作一小会；而在异步非阻塞模式下，小人从不休息，马不停蹄地投递请求、处理响应，或处理其他工作。</li>
<li><strong>同步阻塞模式会带来延迟</strong>。</li>
</ol>
<p>我们考虑下面两种情况，如图1-3所示。</p>
<ul>
<li><strong>channel 复用</strong>，即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下，一轮（即请求+响应）只能投递一个请求（苹果1），而后续请求（苹果2-4）都只能排队等待，右侧小人需要等待很多轮才能收到所期望的全部消息。此外，左侧小人在等待接收某个 response 期间，没有机会处理收到的其他消息，造成了数据处理的延迟。不得不感慨，左侧小人太懒惰了！</li>
<li><strong>线程复用</strong>，即一个线程（小人）向多个 channel 发送消息（苹果1-3，分别发向不同 channel）。左侧小人同一时刻只能做一件事，要么在工作，要么在休息；他投递了苹果1后就躺下休息，等待响应，全然不顾右侧小人2、3还在等待他们想要的苹果2、3。</li>
</ul>
<p><img src="https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9e39c192e62240698bb8648ff473fcee~tplv-k3u1fbpfcp-watermark.image" alt="1-3a.png" /></p>
<p><em>图1-3a channel复用</em></p>
<p><img src="https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/630407a091ad4a9f99fd653ebc576e84~tplv-k3u1fbpfcp-watermark.image" alt="1-3b.png" /></p>
<p><em>图1-3b 线程复用</em></p>
<p>在这一章里我们用漫画的形式，初步体验了同步阻塞模式与异步非阻塞模式，并分析了两种模式的区别。接下来我们从Java线程入手，对两种模式进行更加正式、更加贴近实际的分析。</p>
<h2>2&#46;异步非阻塞模型</h2>
<h3>2&#46;1 Java 线程状态</h3>
<p>在 Java 程序中，线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码，从而完成有意义的工作。工作进行期间，有时会因为等待获取锁、等待网络 IO 等原因而暂停，通称“同步”或“阻塞”；如果多项工作能够同时进行，之间不存在约束、不需要互相等待，这种情况就称为“异步”或“非阻塞”。 受限于内存、系统线程数、上下文切换开销，Java 程序并不能无限创建线程；因此，我们只能创建有限个线程，并尽量提高线程的利用率，即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然，这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态，其中哪些阻塞是必要的，哪些阻塞可以避免。</p>
<p>Java 线程状态包括：</p>
<ul>
<li><strong>RUNNABLE</strong>：线程在执行有意义的工作 如图2-1a，线程如果在执行纯内存运算，那么处于RUNNABLE状态 根据是否获得cpu使用权，又分为两个子状态：READY、RUNNING</li>
<li><strong>BLOCKED/WAITING/TIMED_WAITING</strong>：线程正在阻塞 如图2-1b、2-1c、2-1d，根据阻塞原因，线程处于下列状态之一 BLOCKED：synchronized等待获取锁 WAITING/TIMED_WAITING：Lock等待获取锁。两种状态的区别为是否设置超时时长</li>
</ul>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-1.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-1.png" alt="图2-1" width="800" height="340" class="aligncenter size-full wp-image-2317" /></a> <em>图2-1 Java 线程状态</em></p>
<p>此外，如果 Java 线程正在进行网络 IO，则线程状态为 RUNNABLE，但是实际上也发生了阻塞。以 socket 编程为例，如图2-2所示，在收到数据之前InputStream.read() 会阻塞，此时线程状态为RUNNABLE。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-2.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-2.png" alt="图2-2" width="650" height="85" class="aligncenter size-full wp-image-2319" /></a></p>
<p><em>图2-2 网络IO</em></p>
<p>综上，Java 线程状态包括：RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中，RUNNABLE 状态又分为内存计算（非阻塞）、网络IO（阻塞）两种情况，而其余状态都是阻塞的。 根据阻塞原因，本文将 Java 线程状态归纳为以下3类：RUNNABLE、IO、BLOCKED</p>
<blockquote>
<ol>
<li><strong>RUNNABLE</strong>：Java 线程状态为 RUNNABLE，并且在执行有用的内存计算，无阻塞</li>
<li><strong>IO</strong>：Java线程状态为RUNNABLE，但是正在进行网络IO，发生阻塞</li>
<li><strong>BLOCKED</strong>：Java线程状态为BLOCKED/WAITING/TIMED_WAITING，在并发工具的控制下，线程等待获取某一种锁，发生阻塞</li>
</ol>
</blockquote>
<p>要提高线程利用率，就要增加线程处于 RUNNABLE 状态的时长，降低处于 IO 和BLOCKED状态的时长。BLOCKED 状态一般是不可避免的，因为线程间需要通信，需要对临界区进行并发控制；但是，如果采用适当的线程模型，那么 IO 状态的时长就可以得到降低，而这就是异步非阻塞模型。</p>
<h3>2&#46;2 线程模型：阻塞 vs 非阻塞</h3>
<p>异步非阻塞模型能够降低 IO 阻塞时长，提高线程利用率。下面以数据库访问为例，分析同步和异步 API 的线程模型。如图3所示，过程中涉及3个函数：</p>
<ol>
<li>writeSync()或writeAsync()：数据库访问，发送请求</li>
<li>process(result)：处理服务器响应（以result表示）</li>
<li>doOtherThings()：任意其他操作，逻辑上不依赖服务器响应</li>
</ol>
<p><strong>同步 API</strong> 如图3-a 所示：调用者首先发送请求，然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞，直至收到响应才返回；期间调用者线程无法执行其他操作，即使该操作并不依赖服务器响应。实际的<strong>执行顺序</strong>为： 1. writeSync() 2. process(result) 3. doOtherThings() // 直至收到结果，当前线程才能执行其他操作</p>
<p><strong>异步 API</strong> 如图2-3b所示：调用者发送请求并注册回调，然后API立刻返回，接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据，触发调用者所注册的回调。实际的<strong>执行顺序</strong>为：</p>
<ol>
<li>writeAsync()</li>
<li>doOtherThings() // 已经可以执行其他操作，并不需要等待响应</li>
<li>process(result)</li>
</ol>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-3.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-3.png" alt="图2-3" width="600" height="106" class="aligncenter size-full wp-image-2321" /></a></p>
<p><em>图2-3 同步API &amp; 异步API</em></p>
<p>在上述过程中，函数 doOtherThings() 并不依赖服务器响应，原则上可以和数据库访问同时执行。然而对于同步 API，调用者被迫等待服务器响应，然后才可以执行 doOtherThings()；即数据库访问期间线程阻塞于 IO 状态，无法执行其他有用的操作，利用率十分低下。而异步 API 就没有这个限制，显得更加紧凑、高效。</p>
<p>在 IO 密集型系统中，适当使用异步非阻塞模型，可以提升数据库访问吞吐量。考虑这样一个场景：需要执行多条数据库访问请求，且请求之间互相独立，无依赖关系。使用同步 API 和异步 API，线程状态随时间变化的过程如图2-4所示。 线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下，线程执行内存计算，如提交请求、处理响应。在 IO 状态下，线程在网络连接上等待响应数据。在实际系统中，内存计算的速度非常快，RUNNABLE 状态的时长基本可忽略；而网络传输的耗时会相对更长（几十到几百毫秒），IO 状态的时长更加可观。</p>
<p>a.<strong>同步 API</strong>：调用者线程一次只能提交一个请求；直到请求返回后，才能再提交下一个请求。线程利用率很低，大部分时间消耗在 IO 状态上。</p>
<p>b.<strong>异步 API</strong>：调用者线程可以连续提交多个请求，而之前提交的请求都还没有收到响应。调用者线程会注册一些回调，这些回调存储在内存中；稍后网络连接上收到响应数据，某个接收线程被通知处理响应数据，从内存中取出所注册的回调，并触发回调。这种模型下，请求可以连续地提交、连续的响应，从而节约IO状态的耗时。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-4.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图2-4.png" alt="图2-4" width="700" height="273" class="aligncenter size-full wp-image-2323" /></a></p>
<p><em>图2-4 线程时间线：数据库访问</em></p>
<p>异步非阻塞模式在IO密集型系统中应用非常广泛。常用的中间件，如http请求[D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H]，都支持异步 API。各位读者可以在参考文献中，查阅这些异步 API的样例代码。关于中间件的异步API，下面有几个注意事项：</p>
<ol>
<li>
<p>redis 的常见客户端有 jedis和lettuce [E]。其中lettuce提供了异步API，而jedis只能提供同步 API；二者对比参见文章[I]。</p>
</li>
<li>
<p>kafka producer[J]的 send() 方法也支持异步 API，但是该 API 实际上不是纯异步的[K]：当底层缓存满，或者无法获取服务器（broker）信息时，send() 方法会发生阻塞。个人认为这是一个非常严重的设计缺陷。kafka 常用于低延迟日志采集场景，系统会将日志通过网络写入到 kafka 服务器，以减少线程内的阻塞，提升线程吞吐量；稍后其他进程会从 kafka 消费所写入的日志，进行持久存储。设想一个实时通信系统，单条线程每秒需要处理几万到几十万条消息，响应时间一般为几毫秒到几十毫秒。系统在处理期间需要经常调用 send() 来上报日志，如果每次调用都发生哪怕1秒的延迟（实际有可能达几十秒），延迟积累起来也会严重劣化吞吐量和延迟。</p>
</li>
</ol>
<p>最后，异步 API 有多种实现，包括线程池、select（如netty 4.x[L]）、epoll等。其共同点是调用者不需要在某一条网络连接上阻塞，以等待接收数据；相反，API底层常驻有限数目的线程，当收到数据后，某一线程得到通知并触发回调。这种模型也称为“响应式”模型，非常贴切。限于篇幅原因，本文主要关注<strong>异步 API 设计</strong>，而不深入讲解异步 API 的实现原理。</p>
<h2>3&#46;Promise设计模式</h2>
<h3>3&#46;1 API 形式：同步、异步 listener、异步 Promise</h3>
<p>上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征：</p>
<blockquote>
<ol>
<li>在提交请求时注册回调；</li>
<li>提交请求后，函数立刻返回，不需要等待收到响应；</li>
<li>收到响应后，触发所注册的回调；根据底层实现，可以利用有限数目的线程来接收响应数据，并在这些线程中执行回调。</li>
</ol>
</blockquote>
<p>在保留异步特性的基础上，异步 API 的形式可以进一步优化。上一章图2-3b展示了异步 API 的 listener 版本，特点是在提交请求时必须注册恰好一个回调；因而在下列场景下，listener API 会难以满足功能需求，需要调用者做进一步处理：</p>
<ol>
<li>多个对象都关注响应数据，即需要注册多个回调；但是 listener 只支持注册一个回调。</li>
<li>需要将异步调用转为同步调用。例如某些框架（如spring）需要同步返回，或者我们希望主线程阻塞直至操作完成，然后主线程结束、进程退出；但是 listener 只支持纯异步，调用者需要重复编写异步转同步的代码。</li>
</ol>
<p>为了应对上述场景，我们可以使用 Promise 设计模式来重构异步 API，以支持多个回调和同步调用。下面对同步 API、异步 listener API、异步 Promise API 的函数形式进行对比，如图3-1所示：</p>
<ul>
<li>
<p><strong>a.同步</strong>：调用 writeSync() 方法并阻塞；收到响应后函数停止阻塞，并返回响应数据</p>
</li>
<li>
<p><strong>b.异步listener</strong>：调用 writeAsync() 方法并注册 listener，函数立刻返回；收到响应后，在其他线程触发所注册的 listener；</p>
</li>
<li>
<p><strong>c.异步Promise</strong>：调用 writeAsync()，但不需要在函数中注册 listener，函数立刻返回 Promise 对象。调用者可以调用异步的Promise.await(listener)，注册任意数目的 listener，收到响应后会按顺序触发；此外，也可以调用同步的 Promise.await()，阻塞直至收到响应。</p>
</li>
</ul>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-1.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-1.png" alt="图3-1" width="700" height="289" class="aligncenter size-full wp-image-2325" /></a></p>
<p><em>图3-1 API形式：同步、异步listener、异步Promise</em></p>
<p>综上，Promise API 在保持异步特性的前提下，提供了更高的灵活性。调用者可以自由选择函数是否阻塞，以及注册任意数目的回调。</p>
<h3>3&#46;2 Promise的特性与实现</h3>
<p>上一节介绍了 Promise API 的使用样例，其核心是一个 Promise 对象，支持注册 listener，以及同步获取响应 result；而本节将对 Promise 的功能进行更加详细的定义。注意，本节并不限定 Promise 的某一具体实现（例：jdk CompletableFuture、netty DefaultPromise），只展示共有的、必须具备的特性；缺少这些特性，Promise 将无法完成异步传递响应数据的工作。</p>
<h4>3&#46;2.1 功能特性</h4>
<ul>
<li><strong>Promise的基本方法</strong></li>
</ul>
<p>Promise 的基本功能是传递响应数据，需要支持下列方法，如表 3-1 所示：</p>
<p><img src="https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7709217def6d4a7eb936661c2deee08e~tplv-k3u1fbpfcp-watermark.image" alt="表格.jpg" /></p>
<p>下面以上一小节的数据库访问 API 为例，演示 Promise 的工作流程，如图 3-2 所示：</p>
<ul>
<li>
<p>a.调用者调用 writeAsync() API，提交数据库访问请求并获取 Promise 对象；然后调用 Promise.await(listener)，注册对响应数据的 listener。Promise 对象也可以传递给程序中其他地方，使得关心响应数据的其他代码，各自注册更多listener。</p>
</li>
<li>
<p>b.writeAsync() 内部，创建 Promise 对象并和这次请求关联起来，假设以requestId 标识。</p>
</li>
<li>
<p>c.writeAsync()底层常驻有限数目的线程，用于发送请求和接收响应。以 netty为例，当从网络上收到响应据后，其中一个线程得到通知，执行 channelRead() 函数进行处理；函数取出响应数据和对应的 Promise 对象，并调用Promise.signalAll() 进行通知。注意这里是伪代码，和 netty 中回调函数的实际签名略有区别。</p>
</li>
</ul>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2a.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2a.png" alt="图3-2a" width="835" height="60" class="aligncenter size-full wp-image-2327" /></a></p>
<p><em>图3-2a 提交数据库访问请求</em></p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2b.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2b.png" alt="图3-2b" width="600" height="219" class="aligncenter size-full wp-image-2329" /></a></p>
<p><em>图3-2b 创建 Promise 对象</em></p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2c.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-2c.png" alt="图3-2c" width="700" height="181" class="aligncenter size-full wp-image-2331" /></a></p>
<p><em>图3-2c 通知 Promise 对象</em></p>
<p><strong>· Promise 的时序</strong></p>
<p>Promise 的方法需要保证以下时序。此处以“A对B可见”来描述时序，即：如果先执行操作A（注册listener）就会产生某种永久效应（永久记录这个listener），之后再执行操作B（通知result）就必须考虑到这种效应，执行相应的处理（触发之前记录的listener）。</p>
<blockquote>
<ol>
<li>await(listener) 对 signalAll(result) 可见：注册若干 listener 后，通知result时必须触发每一个listener，不允许遗漏。</li>
<li>signalAll(result) 对 await(listener) 可见：通知 result 后，再注册listener就会立刻触发。</li>
<li>首次 signalAll(result) 对后续 signalAll(result) 可见。首次通知result后，result即唯一确定，永不改变。之后再通知 result 就会忽略，不产生任何副作用。请求超时是该特性一种典型应用：在提交请求的同时创建一个定时任务；如果能在超时时长内正确收到响应数据，则通知 Promise 正常结束；否则定时任务超时，通知Promise 异常结束。不论上述事件哪个先发生，都保证只采纳首次通知，使得请求结果唯一确定。</li>
</ol>
</blockquote>
<p>此外，某次 await(listener) 最好对后续 await(listener) 可见，以保证 listener 严格按照注册顺序来触发。</p>
<p><strong>· Promise 的非线程安全实现</strong></p>
<p>如不考虑线程安全，那么下列代码清单可以实现Promise的基本特性；线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个<strong>注意事项</strong>：</p>
<blockquote>
<ol>
<li>
<p><strong>字段 listeners 存储 await(listener) 所注册的 listener</strong>。字段类型为LinkedList，以存储任意数目的 listener，同时维护 listener 的触发顺序。</p>
</li>
<li>
<p><strong>字段 isSignaled 记录是否通知过 result</strong>。如果 isSignaled=true，则后续调用 await(listener) 时立刻触发 listener，且后续调用 signalAll(result) 时直接忽略。此外，我们以 isSignaled=true 而不是 result=null 来判断是否通知过 result，因为某些情况下 null 本身也可以作为响应数据。例如，我们以Promise<exception>表示数据库写入的结果，通知 null 表示写入成功，通知Exception 对象（或某一子类）表示失败原因。</exception></p>
</li>
<li>
<p><strong>signalAll(T result)在末尾处调用 listeners.clear() 以释放内存</strong>，因为listeners 已经触发过，不再需要在内存中存储。</p>
</li>
</ol>
</blockquote>
<pre><code class="js">public class Promise&lt;T&gt; {

    private boolean isSignaled = false;
    private T result;

    private final List&lt;Consumer&lt;T&gt;&gt; listeners = new LinkedList&lt;&gt;();

    public void await(Consumer&lt;T&gt; listener) {
        if (isSignaled) {
            listener.accept(result);
            return;
        }

        listeners.add(listener);
    }

    public void signalAll(T result) {
        if (isSignaled) {
            return;
        }

        this.result = result;
        isSignaled = true;
        for (Consumer&lt;T&gt; listener : listeners) {
            listener.accept(result);
        }
        listeners.clear();
    }

    public T await() {
        // 适当阻塞，直至signalAll()被调用；实际实现见3.3节
        return result;
    }
}
</code></pre>
<h4>3&#46;2.2 线程安全特性</h4>
<p>上一章3.2.1节讲解了 Promise 的功能，并提供了非线程安全的实现。本节展示如何使用并发工具，实现线程安全的 Promise ，如下所示。有下列几个注意事项：</p>
<ol>
<li>线程安全。各个字段均被多个线程访问，因此都属于临界区，需要使用适当的线程安全工具进行上锁，如 synchronized、Lock。一种最简单的实现，是将全部代码纳入临界区内，进入方法时上锁，离开方法时放锁。注意在使用 return 进行提前返回时，不要忘记放锁。</li>
<li>在临界区外触发 listener，以减少在临界区内停留的时长，并减少潜在的死锁风险。</li>
<li>同步 await() 。可以使用任何一种同步等待的工具来实现，如 CountDownLatch、Condition。此处使用 Condition 实现，注意根据 java 语法，操作 Condition 时必须先获取 Condition 所关联的锁。</li>
</ol>
<pre><code class="js">public class Promise&lt;T&gt; {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition resultCondition = lock.newCondition();

    private boolean isSignaled = false;
    private T result;

    private final List&lt;Consumer&lt;T&gt;&gt; listeners = new LinkedList&lt;&gt;();

    public void await(Consumer&lt;T&gt; listener) {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            listener.accept(result); // 在临界区外触发listener
            return;
        }

        listeners.add(listener);
        lock.unlock();
    }

    public void signalAll(T result) {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            return;
        }

        this.result = result;
        isSignaled = true;

        // this.listeners的副本
        List&lt;Consumer&lt;T&gt;&gt; listeners = new ArrayList&lt;&gt;(this.listeners);
        this.listeners.clear();
        lock.unlock();

        for (Consumer&lt;T&gt; listener : listeners) {
            listener.accept(result); // 在临界区外触发listener
        }

/* 操作Condition时须上锁*/
        lock.lock();
        resultCondition.signalAll();
        lock.unlock();
    }

    public T await() {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            return result;
        }

        while (!isSignaled) {
            resultCondition.awaitUninterruptibly();
        }
        lock.unlock();

        return result;
    }
}
</code></pre>
<p>上述实现仅做演示使用，仍有较大的改进空间。生产环境的实现原理，读者可以参考jdk CompletableFutre、netty DefaultPromise。可以改进的地方包括：</p>
<ol>
<li><strong>使用 CAS 设置响应数据</strong>。字段 isSignaled、result 可以合并为一个数据对象，然后使用 CAS 进行设值，从而进一步降低阻塞时长。</li>
<li>
<p><strong>触发 listener 的时序</strong>。在上述代码中，Promise.signalAll() 会依次触发listener；在此期间，如果其他线程调用了异步 await(listener)，由于 Promise的响应数据已通知，该线程也会触发 listener。上述过程中，两个线程同时触发listener，因此没有严格保证触发顺序。作为改进，类似于 netty DefaultPromise，Promise.signalAll() 内部可以设置一个循环，不断触发 listener 直至 listeners 排空，以防期间注册了新的 listener ；在此期间，新注册的 listener 可以直接加入到 listeners 中，而不是立刻触发。</p>
</li>
<li>
<p><strong>listener 的移除</strong>。在通知响应数据之前，Promise 长期持有 listener 的引用，导致 listener 对象无法被 gc 。可以添加 remove(listener) 方法，或者允许仅持有 listener 的弱引用。</p>
</li>
</ol>
<h4>3&#46;2.3 须避免的特性</h4>
<p>前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具，其应当只实现必要的数据传递特性，而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看，Promise 在实现时应避免哪些特性，以防限制调用者所能做出的决策。</p>
<p>1&#46;异步 await() 发生阻塞；该规则不仅适用于 Promise，也适用于任何异步 API ·。异步API常用于实时通信等延时敏感的场景，作用是减少线程阻塞，避免推迟后续其他操作。一旦发生阻塞，系统的响应速度和吞吐量就会受到严重冲击。</p>
<p>以连续提交数据库请求为例。如图3-3a 所示，调用者调用了一个异步 API，连续提交3次写入请求，并在所返回的 Promise 上注册回调。</p>
<p>我们考察 writeAsync()与await() 如发生阻塞阻塞，将会对调用者造成什么影响，如图3-3b所示。提交请求是纯内存操作，线程处于 RUNNABLE 状态；writeAsync() 或 await() 如果发生阻塞，则线程处于 BLOCKED 状态，暂停工作而无法执行后续操作。当发生阻塞时，调用者每提交一个请求就不得不等待一段时间，从而降低了提交请求的频率，进而推迟了服务器对这些请求的响应，使得系统的吞吐量降低、延迟上升。特别地，如果系统采用了多路复用机制，即一个线程可以处理多个网络连接或多个请求，那么线程阻塞将会严重拖慢后续请求的处理，造成比较难排查的故障。</p>
<p>常见的<strong>阻塞原因</strong>包括：</p>
<ul>
<li>Thread.sleep()</li>
<li>向队列提交任务，调用了BlockingQueue.put()和take()；应改为非阻塞的offer()和poll()</li>
<li>向线程池提交任务，ExecutorService.submit()，如果线程池拒绝策略为CallerRunsPolicy，而任务本身又是耗时的。</li>
<li>调用了阻塞的函数，包括：InputStream.read()、同步的 Promise.await()、KafkaProducer.send()。注意 KafkaProducer.send() 虽然形式上是异步 API，但是在底层缓存满或者无法获取服务器（broker）信息时，send()方法仍会发生阻塞。</li>
</ul>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/3-3a.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/3-3a.png" alt="3-3a" width="650" height="75" class="aligncenter size-full wp-image-2333" /></a></p>
<p><em>图3-3a 连续提交请求</em></p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-3b.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-3b.png" alt="图3-3b" width="650" height="257" class="aligncenter size-full wp-image-2335" /></a></p>
<p><em>图3-3b 请求处理时间线</em></p>
<p>2&#46;绑定线程池（ExecutorService），用于执行请求。如图3-4所示，线程池是异步API的一种可选模型，但并不是唯一实现。</p>
<blockquote>
<ul>
<li><strong>线程池模型</strong>。为了不阻塞调用者，API 内置了线程池来提交请求、处理响应；调用者可以向线程池连续提交多个请求，但是不需要等待响应。调用者提交一条请求后，线程池中的某条线程就会被独占，等待接收响应并进行处理，但在此之前无法再处理其他请求；完成处理后，该条线程重新变为空闲，可以继续处理后续请求。</li>
<li><strong>响应式模型</strong>。类似地，API 内置了发送和接收线程来提交请求、处理响应，调用者也不需要同步等待。调用者提交一条请求后，发送线程向网络发送请求；完成发送后，线程立刻变为空闲，可以发送后续请求。当收到响应数据时，接收线程得到通知以处理响应；完成处理后，线程立刻变为空闲，可以处理后续响应数据。上述过程中，任何一条线程都不会被某一请求独占，即线程随时都可以处理请求，而不需要等待之前的请求被响应。</li>
</ul>
</blockquote>
<p>综上，如果绑定了线程池，Promise 就实现了对其他模型（如响应式模型）的兼容性。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-4.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/图3-4.png" alt="图3-4" width="550" height="273" class="aligncenter size-full wp-image-2337" /></a></p>
<p><em>图3-4 线程时间线：线程池 vs select</em></p>
<p>3&#46;在构造方法创建 Promise 对象时，定义如何提交请求。这种方式只能定义如何处理单条请求，而无法实现请求的批量处理。</p>
<p>以数据库访问为例，现代数据库一般支持批量读写，以略微提升单次访问的延迟为代价，换来吞吐量显著提升；如果吞吐量得到提升，那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API ：数据对象 BulkRequest 可以携带多条普通请求 Request，从而实现批量提交。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/代码31.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/代码31.png" alt="代码3" width="714" height="358" class="aligncenter size-full wp-image-2355" /></a></p>
<p>为了充分利用“批量请求”的特性，调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来；等待一段时间后，取出所缓存的多条请求，组装一个批量请求来一并提交。因此，如下面的代码片段所示，在构造Promise时指定如何提交单条请求是没有意义的，这部分代码（client.submit(new Request(&#8230;))）并不会被执行；而实际希望执行的代码，其实是提交批量请求（client.submit(new BulkRequest(&#8230;))）。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/代码41.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/代码41.png" alt="代码4" width="710" height="169" class="aligncenter size-full wp-image-2353" /></a></p>
<p>4&#46;在构造方法创建 Promise 对象时，定义如何处理响应数据，而不允许后续对响应数据注册回调。如下面的代码片段所示，在构造 Promise 对象时，注册了对响应数据的处理 process(result)；但是除此以外，其他代码也有可能关心响应数据，需要注册回调 process1(result)、process2(result)。如果 Promise 只能在构造时注册唯一回调，那么其他关注者就无法注册所需回调函数，即 Promise API 退化回listener API。</p>
<p><a href="https://techblog.youdao.com/wp-content/uploads/2021/07/代码51.png"><img src="https://techblog.youdao.com/wp-content/uploads/2021/07/代码51.png" alt="代码5" width="717" height="231" class="aligncenter size-full wp-image-2351" /></a></p>
<p>综上，Promise 应该是一个纯粹的数据对象，其职责是存储回调函数、存储响应数据；同时做好时序控制，保证触发回调函数无遗漏、保证触发顺序。除此以外，Promise 不应该和任何实现策略相耦合，不应该杂糅提交请求、处理响应的逻辑。</p>
<h2>4&#46;总结</h2>
<p>本文讲解了异步非阻塞设计模式，并对同步 API、异步 listener API、异步Promise API 进行了对比。相比于其他两种API，Promise API 具有无可比拟的灵活性，调用者可以自由决定同步返回还是异步返回，并允许对响应数据注册多个回调函数。最后，本文讲解了Promise基本功能的实现，并初步实现了线程安全特性。</p>
<p>本系列共2篇文章，本文为第1篇《原理篇》。在下一篇《应用篇》中，我们将看到Promise 设计模式丰富的应用场景，将其和现有工具进行结合或对比，以及对Promise API 进行进一步变形和封装，提供异常处理、调度策略等特性。</p>
<h2>参考文献</h2>
<blockquote>
<p>[A] 异步非阻塞IO https://en.wikipedia.org/wiki/Asynchronous_I/O</p>
<p>[B] Promise https://en.wikipedia.org/wiki/Futures_and_promises</p>
<p>[C] java线程状态 https://segmentfault.com/a/1190000038392244</p>
<p>[D] http异步API样例：apache HttpAsyncClient https://hc.apache.org/httpcomponents-asyncclient-4.1.x/quickstart.html</p>
<p>[E] redis异步API样例：lettuce https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API</p>
<p>[F] mongo DB异步API样例：AsyncMongoClient https://mongodb.github.io/mongo-java-driver/3.0/driver-async/getting-started/quick-tour/</p>
<p>[G] elasticsearch异步API样例：RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-index.html</p>
<p>[H] influx DB异步API样例：influxdb-java https://github.com/influxdata/influxdb-java/blob/master/MANUAL.md</p>
<p>[I] jedis vs lettuce https://redislabs.com/blog/jedis-vs-lettuce-an-exploration/</p>
<p>[J] kafka http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html</p>
<p>[K] KafkaProducer.send()阻塞 https://stackoverflow.com/questions/57140680/kafka-asynchronous-send-not-really-asynchronous</p>
<p>[L] netty https://netty.io/wiki/user-guide-for-4.x.html</p>
</blockquote>
]]></content:encoded>
			<wfw:commentRss>https://techblog.youdao.com/?feed=rss2&#038;p=2307</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
	</channel>
</rss>
