概述
最近编写了大量需要借助长连接通讯的程序,经常编写客户端的自动重连代码。其模式具备相似之处,遂撰文总结。
本文中涉及的代码可见于 chuanwise/reconnect-supported-client。
问题定义
在启动客户端后,客户端将尝试连接服务端。当连接失败或意外断开时,其将等待一段时间并尝试再次链接服务端,直到符合停止自动重连条件时,客户端放弃重连服务端。
客户端收发消息和调用生命周期函数(即那些
start()
之类的会影响状态的)很可能在不同的线程,因此客户端对象应当是线程安全的。
设计约定
客户端的主要接口如下:
interface ReconnectSupportedClient : CoroutineScope {
val isStarted: Boolean
val isConnected: Boolean
val isWaiting: Boolean
val isClosed: Boolean
val isPaused: Boolean
fun start()
fun pause()
fun resume()
suspend fun close()
fun await()
fun await(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)
val channel: Channel<String>
suspend fun send(string: String)
}
start()
、pause()
和 resume()
是分别用于启动连接、暂停自动重连和恢复自动重连的生命周期函数。await(...)
系列函数是在客户端被关闭前等待其状态变化的函数。channel
是一个用于接收消息的管道,可以用于读取服务端发送的消息。send(String)
是用于向客户端发送消息的函数,二者都只能在连接正常(即 isConnected
)时调用。
开发者应当通过符合下面规范的代码调用客户端的函数:
// 客户端创建好后并不会立刻分配资源开始连接。
val client = createClient(...)
// 直到某一时刻调用 #start() 时才会开始连接。
// 这一函数在整个过程中只需要调用一次。
client.start()
// 不断等待直到客户端连接到服务端。
while (!client.isConnected) {
// 等待客户端状态发生变化。
// 如果其他线程调用了 #close() 并退出,
// 调用 #await() 将抛出异常(因为客户端已经不可能再有其他新的状态)
client.await()
}
// 此时服务端已经连接到了客户端,可以发送消息。
// 如果连接在 client.isConnected 返回 true 到调用 #send(String) 的间隙断开了,
// 此时发送消息时会抛出异常。
client.send("我们的同志在困难的时候,要看到成绩,要看到光明,要提升我们的勇气。")
// 使用 #close() 关闭客户端。
// 若客户端已经被关闭,则将抛出异常。
client.close()
状态模型
这是一个非常典型的状态机,显然客户端的稳定状态如下所示:
- 已创建
CREATED
:客户端刚刚创建,但尚未调用start()
开始连接的状态。 - 已连接
CONNECTED
:客户端连接到服务端。 - 已关闭
CLOSED
:客户端的close()
方法被调用。 - 已暂停
PAUSED
:客户端多次重连服务器失败,重连被停止。可以用resume()
再次启动。
只设计这些状态是无法满足需求的,因为它们之间的状态切换并不会瞬间完成。例如在连接过程中,我们需要调用系统底层功能,它们返回之前,客户端实际上处于一种转换中间态中,即不稳定状态。它们如下所示:
- 连接中
CONNECTING
:客户端正在连接到服务端,但底层函数尚未返回。 - 关闭中
CLOSING
:客户端的close()
方法被调用但尚未返回。 - 等待中
WAITING
:客户端决定自动重连,但尚未到重连时机。
上述状态之间的切换是瞬间的,因此符合设计要求,其状态机如下所示:
stateDiagram
[*] --> CREATED
CREATED --> CONNECTING: start()
CONNECTING --> CONNECTED: connected
CONNECTED --> WAITING: connection lost
CONNECTED --> PAUSED: connection lost
CONNECTING --> WAITING: waiting for reconnecting
WAITING --> CONNECTING: reconnect
CONNECTING --> PAUSED: fail to reconnect or pause()
PAUSED --> CONNECTING: resume()
CREATED --> CLOSING: close()
CONNECTING --> CLOSING: close()
CONNECTED --> CLOSING: close()
WAITING --> CLOSING: close()
PAUSED --> CLOSING: close()
CLOSING --> CLOSED: closed
CLOSED --> [*]
实现方法
使用读写锁保证线程安全
由于客户端对象内需要同时维护:底层表示连接的对象、连接的状态和心跳任务等,涉及多个属性的读写,故需要使用读写锁保证其线程安全:
private val lock = ReentrantReadWriteLock()
为什么我们不用 AtomicReference<State>
维护状态呢?因为我们还有很多其他属性(例如用于收发消息的 channelNoLock
和 sessionNoLock
)需要和状态一同修改。具体地,只有在状态为 CONNECTED
时,这两个属性都必须同时非空,否则必须同时为空。如果用原子类型,尽管状态可以做到安全修改,但无法保证状态和这两个属性同时变化。
这么说并不严谨,因为锁也不会使二者同时变化,甚至没有任何办法可以让两个属性严格同时变化。但锁的存在可以让我们观测状态和这俩属性时,保证其状态一致而不会处于某种中间态。
那有没有使用无锁并发实现安全的办法呢?有的,我们为每个状态编写一个类型。而 CONNECTED
对应的状态才有这两个属性即可,就像下面这样:
private inner class ConnectedState(
val sessionNoLock: ClientWebSocketSession
) : State {
val channelNoLock: Channel<String> = Channel(Channel.UNLIMITED)
}
然后把状态用原子类型存储即可:
private interface State
private object CreatedState
private val state = AtomicReference<State>(CreatedState)
但这样会引入一些其他类型,个人感觉代价可能不如用锁,因为生命周期函数的并发程度并不高(应该不会同时有 114514 个线程调用生命周期函数)。
为了方便,我们增加两个属性:stateNoLock
和 state
,后者读时会自动加锁:
private var stateNoLock = State.CREATED
private val state: State get() = lock.read { stateNoLock }
以便实现各种状态的检查函数,例如:
override val isClosing: Boolean get() = state == State.CLOSING
override val isClosed: Boolean get() = state == State.CLOSED
override val isConnected: Boolean get() = state == State.CONNECTED
override val isWaiting: Boolean get() = state == State.WAITING
override val isPaused: Boolean get() = state == State.PAUSED
看起来代码没加锁,实际上都加了读锁,非常好。
值得一提的是有些状态的检查函数较为特殊,例如 isStarted
,连接中也是一种已启动状态。实际上任何非 CREATED
、CLOSING
和 CLOSED
状态都是 isStarted
,所以它的实现如下:
override val isStarted: Boolean
get() = lock.read { stateNoLock != State.CREATED && stateNoLock != State.CLOSED && stateNoLock != State.CLOSING }
关键操作前后检查系统状态
根据个人经验,在关键耗时操作开始和结束时应当检查和修改此刻的状态,以免出现非预期行为。
例 1:外界调用生命周期函数时
除非有特别的约定,否则对外暴露的函数在开始时都需要检查状态。
例如,在用户调用 start()
时,检查状态是否是 CREATED
(即刚开始的状态),就像下面这样:
override fun start() {
lock.write {
stateNoLock = when (stateNoLock) {
State.CREATED -> State.CONNECTING
else -> error("Cannot start when the state is $stateNoLock.")
}
stateUpdateCondition.signalAll()
}
setConnectingJob()
}
因为我们需要检查 stateNoLock
的状态并修改它,stateNoLock = when (stateNoLock) { ... }
的结构正好符合我们的需求。在后文中这种写法将经常出现。
setConnectingJobNoLock()
的实现背后是 createConnectingJob()
:
private var connectingJobNoLock: Job? = null
private fun setConnectingJobNoLock() {
connectingJobNoLock?.cancel()
connectingJobNoLock = createConnectingJob()
}
它可能在 start()
和 resume()
时调用一次,否则创建多个连接任务时将导致状态错乱。
编写代码时,我们规定创建连接任务时其状态为 CONNECTING
。由于我们在 start()
和 resume()
内都检查并修改状态为 CONNECTING
并继续执行后面的操作,这会阻止同时调用这些函数的其他线程进入 setConnectingJobNoLock()
,因此这一操作是安全的。
在用户调用 pause()
时,也有类似的操作。代码先检查状态是否是 CONNECTING
、CONNECTED
或 WAITING
,然后将其修改为 PAUSED
并进行操作。
override fun pause() {
lock.write {
stateNoLock = when (stateNoLock) {
State.CREATED -> error("Client not started.")
State.CLOSING, State.CLOSED -> error("Client is closing or closed.")
// CONNECTING, CONNECTED: 此次连接断开后暂停重连等待。
// WAITING: 此次重连等待结束后暂停重连等待(需要 stateUpdateCondition.signalAll() 提醒连接任务)
State.CONNECTING, State.CONNECTED, State.WAITING -> State.PAUSED
// PAUSED: 无需进行任何操作。
State.PAUSED -> return
}
stateUpdateCondition.signalAll()
}
}
值得一提的是,我们希望 pause()
可以中断正在等待重连的连接任务,而不是在重连等待时间到后检查状态时它才退出。因此在连接任务中我们并不是使用 delay
等待,而是在 stateUpdateCondition
上等待,以便在状态更新后连接任务可以立刻感知到这种变化。
又如在 close()
函数内,我们也先检查状态并修改为 CLOSING
。执行关闭操作后客户端直接将状态修改为 CLOSED
,因为不可能有其他情况(就算有这个客户端对象也不能再使用了)。
override suspend fun close() {
lock.write {
stateNoLock = when (stateNoLock) {
State.CLOSED -> error("Client already closed.")
// CREATED: 还没 start() 就 close() 了,真的是太逊了。
// CONNECTING: 连接中就关闭了。此处只需要修改状态,等待连接建立成功或失败时会检查状态的。
// CONNECTED: 此时连接已经建立,需要关闭连接(后面会关闭)。
// WAITING: 正在等待是否需要重连,只需要更新状态,连接任务就会醒来并立刻检查状态。
// PAUSED: 连接任务暂停,不需要做什么操作。
State.CREATED, State.CONNECTING, State.CONNECTED, State.WAITING, State.PAUSED -> State.CLOSING
else -> error("Unexpected state: $stateNoLock.")
}
stateUpdateCondition.signalAll()
}
sessionNoLock?.close()
connectingJobNoLock?.cancel()
job.cancelAndJoin()
if (manageHttpClient) {
httpClient.close()
}
lock.write {
stateNoLock = State.CLOSED
}
}
例 2:连接任务耗时操作前后
createConnectingJob()
创建的任务内部是一个循环,每次循环代表一次连接尝试,执行如下动作:
- 检查状态,并在状态是
CONNECTING
和WAITING
时修改为CONNECTING
。 - 显示连接日志。
- 使用底层 API(本例是 Ktor)启动连接。
- 连接失败时,检查并修改状态为
WAITING
并等待下一步判断。 - 连接成功后,首先检查并修改状态为
CONNECTED
,设置对应的底层 API 属性(如用于收发消息的channelNoLock
和sessionNoLock
),并重置重连计数器。在连接断开时,检查并修改状态为WAITING
。 - 连接断开后,状态可能是
WAITING
,也可能中途被其他线程修改。若是WAITING
,检查尝试次数是否超过限制。若是则放弃重连进入PAUSED
状态,否则进行一次等待,随后来到循环开头。
其源代码可以在这里看到。
总结
编写线程安全的需要维护内部状态的代码时,应当:
- 在任何耗时操作前后,以及对外暴露的 API 里,必须检查当前状态。应当经常问自己:
- 正在编写的这一块代码有没有可能同时被多个线程进入,且此时此刻其对应的状态(如果只能是一个的话)应当是哪个。
- 正在编写的这一块代码是否有可能在执行中途,对象状态被另一个线程修改?如果可能,这块代码必须能接受这种修改,否则只能上锁。
- 获取和释放锁也可能是耗时操作,有时需要在加锁前后分别检查一次条件。
- 阅读系统中的可变值(如
sessionNoLock
)时应当获取读锁,修改其时应当获取写锁。对系统中的不可变值,如maxConnectAttempts
,可以不上锁。 - 最小化锁粒度,避免可挂起点中间用锁。否则协程在另一个线程恢复时,会导致锁状态异常。
- 修改系统中的可变值时最好秉持着“谁设置谁置空”原则。例如对于
sessionNoLock
和channelNoLock
,都是在连接成功建立时才为非null
值的属性。连接任务有义务维护其值。其他地方在发现其为null
时便能快速判断连接尚未建立,例如在send
函数中。
有几个好的习惯可能有助于避免犯错:
- 为那些需要锁保护的变量名称添加
NoLock
后缀,以便时时提醒自己读写时加锁。 - 管理好由本类创建的对象,如
HttpClient
,我们使用额外的manageHttpClient
变量控制客户端在close()
时是否连带关闭它:
class WebSocketReconnectSupportedClient(
// ...
httpClient: HttpClient? = null,
manageHttpClient: Boolean? = null,
// ...
) : ReconnectSupportedClient {
// ...
private val manageHttpClient = manageHttpClient ?: (httpClient == null)
private val httpClient = httpClient ?: HttpClient { install(WebSockets) }
override suspend fun close() {
// ...
if (manageHttpClient) {
httpClient.close()
}
// ...
}
}
- 在
init
块内检查参数是否符合要求,就像下面这样:
init {
val maxConnectAttemptsLocal = maxConnectAttempts
require(maxConnectAttemptsLocal == null || maxConnectAttemptsLocal > 0) {
"maxConnectAttempts must be positive, but got $maxConnectAttemptsLocal."
}
}
- 对于只会在当前上下文中被修改的可空属性,可以将其值存在一个局部变量免去繁琐的非空断言。个人的习惯是在名称后加
Local
后缀。就像3
里给出的代码里,我们用maxConnectAttemptsLocal
缓存maxConnectAttempts
的值,使得在require
参数里的第二个maxConnectAttemptsLocal
得以被智能转换为非空类型。 - 如果模型内涉及回调外界代码,应当添加对应的
~ING
和~ING_ERRORED
状态表示回调尚未返回或出现异常。
最后,一个好的实践是让系统里的许多对象不可变,这样它自然是线程安全的,也就没有这些心智负担了。