thumbnail
可自动重连客户端的设计与实现
由 ChatGPT 生成的文章摘要
本文讨论了一个支持自动重连的客户端设计与实现,主要用于长连接通讯。客户端在启动后会尝试连接服务端,若连接失败则会等待并重试,直到满足停止重连的条件。设计中强调了客户端的线程安全性,并定义了主要接口,包括启动、暂停、恢复和关闭连接的功能。客户端的状态机包括多个状态,如已创建、已连接、已关闭、已暂停等,并且在连接和关闭过程中存在中间状态。通过这些设计,开发者可以有效地管理客户端的连接状态和消息传递。

概述

最近编写了大量需要借助长连接通讯的程序,经常编写客户端的自动重连代码。其模式具备相似之处,遂撰文总结。

本文中涉及的代码可见于 chuanwise/reconnect-supported-client

问题定义

在启动客户端后,客户端将尝试连接服务端。当连接失败或意外断开时,其将等待一段时间并尝试再次链接服务端,直到符合停止自动重连条件时,客户端放弃重连服务端。

客户端收发消息和调用生命周期函数(即那些 start() 之类的会影响状态的)很可能在不同的线程,因此客户端对象应当是线程安全的。

设计约定

客户端的主要接口如下:

interface ReconnectSupportedClient : CoroutineScope {
    val isStarted: Boolean
    val isConnected: Boolean
    val isWaiting: Boolean
    val isClosed: Boolean
    val isPaused: Boolean

    fun start()
    fun pause()
    fun resume()

    suspend fun close()

    fun await()
    fun await(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)

    val channel: Channel<String>
    suspend fun send(string: String)
}

start()pause()resume() 是分别用于启动连接、暂停自动重连和恢复自动重连的生命周期函数。await(...) 系列函数是在客户端被关闭前等待其状态变化的函数。channel 是一个用于接收消息的管道,可以用于读取服务端发送的消息。send(String) 是用于向客户端发送消息的函数,二者都只能在连接正常(即 isConnected)时调用。

开发者应当通过符合下面规范的代码调用客户端的函数:

// 客户端创建好后并不会立刻分配资源开始连接。
val client = createClient(...)

// 直到某一时刻调用 #start() 时才会开始连接。
// 这一函数在整个过程中只需要调用一次。
client.start()

// 不断等待直到客户端连接到服务端。
while (!client.isConnected) {
    // 等待客户端状态发生变化。
    // 如果其他线程调用了 #close() 并退出,
    // 调用 #await() 将抛出异常(因为客户端已经不可能再有其他新的状态)
    client.await()
}

// 此时服务端已经连接到了客户端,可以发送消息。
// 如果连接在 client.isConnected 返回 true 到调用 #send(String) 的间隙断开了,
// 此时发送消息时会抛出异常。
client.send("我们的同志在困难的时候,要看到成绩,要看到光明,要提升我们的勇气。")

// 使用 #close() 关闭客户端。
// 若客户端已经被关闭,则将抛出异常。
client.close()

状态模型

这是一个非常典型的状态机,显然客户端的稳定状态如下所示:

  1. 已创建 CREATED:客户端刚刚创建,但尚未调用 start() 开始连接的状态。
  2. 已连接 CONNECTED:客户端连接到服务端。
  3. 已关闭 CLOSED:客户端的 close() 方法被调用。
  4. 已暂停 PAUSED:客户端多次重连服务器失败,重连被停止。可以用 resume() 再次启动。

只设计这些状态是无法满足需求的,因为它们之间的状态切换并不会瞬间完成。例如在连接过程中,我们需要调用系统底层功能,它们返回之前,客户端实际上处于一种转换中间态中,即不稳定状态。它们如下所示:

  1. 连接中 CONNECTING:客户端正在连接到服务端,但底层函数尚未返回。
  2. 关闭中 CLOSING:客户端的 close() 方法被调用但尚未返回。
  3. 等待中 WAITING:客户端决定自动重连,但尚未到重连时机。

上述状态之间的切换是瞬间的,因此符合设计要求,其状态机如下所示:

stateDiagram

[*] --> CREATED
CREATED --> CONNECTING: start()
CONNECTING --> CONNECTED: connected
CONNECTED --> WAITING: connection lost
CONNECTED --> PAUSED: connection lost
CONNECTING --> WAITING: waiting for reconnecting
WAITING --> CONNECTING: reconnect
CONNECTING --> PAUSED: fail to reconnect or pause()
PAUSED --> CONNECTING: resume()

CREATED --> CLOSING: close()
CONNECTING --> CLOSING: close()
CONNECTED --> CLOSING: close()
WAITING --> CLOSING: close()
PAUSED --> CLOSING: close()

CLOSING --> CLOSED: closed
CLOSED --> [*]

实现方法

使用读写锁保证线程安全

由于客户端对象内需要同时维护:底层表示连接的对象、连接的状态和心跳任务等,涉及多个属性的读写,故需要使用读写锁保证其线程安全:

private val lock = ReentrantReadWriteLock()

为什么我们不用 AtomicReference<State> 维护状态呢?因为我们还有很多其他属性(例如用于收发消息的 channelNoLocksessionNoLock)需要和状态一同修改。具体地,只有在状态为 CONNECTED 时,这两个属性都必须同时非空,否则必须同时为空。如果用原子类型,尽管状态可以做到安全修改,但无法保证状态和这两个属性同时变化。

这么说并不严谨,因为锁也不会使二者同时变化,甚至没有任何办法可以让两个属性严格同时变化。但锁的存在可以让我们观测状态和这俩属性时,保证其状态一致而不会处于某种中间态。

那有没有使用无锁并发实现安全的办法呢?有的,我们为每个状态编写一个类型。而 CONNECTED 对应的状态才有这两个属性即可,就像下面这样:

private inner class ConnectedState(
    val sessionNoLock: ClientWebSocketSession
) : State {
    val channelNoLock: Channel<String> = Channel(Channel.UNLIMITED)
}

然后把状态用原子类型存储即可:

private interface State
private object CreatedState

private val state = AtomicReference<State>(CreatedState)

但这样会引入一些其他类型,个人感觉代价可能不如用锁,因为生命周期函数的并发程度并不高(应该不会同时有 114514 个线程调用生命周期函数)。

为了方便,我们增加两个属性:stateNoLockstate,后者读时会自动加锁:

private var stateNoLock = State.CREATED
private val state: State get() = lock.read { stateNoLock }

以便实现各种状态的检查函数,例如:

override val isClosing: Boolean get() = state == State.CLOSING
override val isClosed: Boolean get() = state == State.CLOSED
override val isConnected: Boolean get() = state == State.CONNECTED
override val isWaiting: Boolean get() = state == State.WAITING
override val isPaused: Boolean get() = state == State.PAUSED

看起来代码没加锁,实际上都加了读锁,非常好。

值得一提的是有些状态的检查函数较为特殊,例如 isStarted,连接中也是一种已启动状态。实际上任何非 CREATEDCLOSINGCLOSED 状态都是 isStarted,所以它的实现如下:

override val isStarted: Boolean
    get() = lock.read { stateNoLock != State.CREATED && stateNoLock != State.CLOSED && stateNoLock != State.CLOSING }

关键操作前后检查系统状态

根据个人经验,在关键耗时操作开始和结束时应当检查和修改此刻的状态,以免出现非预期行为。

例 1:外界调用生命周期函数时

除非有特别的约定,否则对外暴露的函数在开始时都需要检查状态。

例如,在用户调用 start() 时,检查状态是否是 CREATED(即刚开始的状态),就像下面这样:

override fun start() {
    lock.write {
        stateNoLock = when (stateNoLock) {
            State.CREATED -> State.CONNECTING
            else -> error("Cannot start when the state is $stateNoLock.")
        }
        stateUpdateCondition.signalAll()
    }
    setConnectingJob()
}

因为我们需要检查 stateNoLock 的状态并修改它,stateNoLock = when (stateNoLock) { ... } 的结构正好符合我们的需求。在后文中这种写法将经常出现。

setConnectingJobNoLock() 的实现背后是 createConnectingJob()

private var connectingJobNoLock: Job? = null

private fun setConnectingJobNoLock() {
    connectingJobNoLock?.cancel()
    connectingJobNoLock = createConnectingJob()
}

它可能在 start()resume() 时调用一次,否则创建多个连接任务时将导致状态错乱。

编写代码时,我们规定创建连接任务时其状态为 CONNECTING。由于我们在 start()resume() 内都检查并修改状态为 CONNECTING 并继续执行后面的操作,这会阻止同时调用这些函数的其他线程进入 setConnectingJobNoLock(),因此这一操作是安全的。

在用户调用 pause() 时,也有类似的操作。代码先检查状态是否是 CONNECTINGCONNECTEDWAITING,然后将其修改为 PAUSED 并进行操作。

override fun pause() {
    lock.write {
        stateNoLock = when (stateNoLock) {
            State.CREATED -> error("Client not started.")
            State.CLOSING, State.CLOSED -> error("Client is closing or closed.")

            // CONNECTING, CONNECTED: 此次连接断开后暂停重连等待。
            // WAITING: 此次重连等待结束后暂停重连等待(需要 stateUpdateCondition.signalAll() 提醒连接任务)
            State.CONNECTING, State.CONNECTED, State.WAITING -> State.PAUSED

            // PAUSED: 无需进行任何操作。
            State.PAUSED -> return
        }
        stateUpdateCondition.signalAll()
    }
}

值得一提的是,我们希望 pause() 可以中断正在等待重连的连接任务,而不是在重连等待时间到后检查状态时它才退出。因此在连接任务中我们并不是使用 delay 等待,而是在 stateUpdateCondition 上等待,以便在状态更新后连接任务可以立刻感知到这种变化。

又如在 close() 函数内,我们也先检查状态并修改为 CLOSING。执行关闭操作后客户端直接将状态修改为 CLOSED,因为不可能有其他情况(就算有这个客户端对象也不能再使用了)。

override suspend fun close() {
    lock.write {
        stateNoLock = when (stateNoLock) {
            State.CLOSED -> error("Client already closed.")

            // CREATED: 还没 start() 就 close() 了,真的是太逊了。
            // CONNECTING: 连接中就关闭了。此处只需要修改状态,等待连接建立成功或失败时会检查状态的。
            // CONNECTED: 此时连接已经建立,需要关闭连接(后面会关闭)。
            // WAITING: 正在等待是否需要重连,只需要更新状态,连接任务就会醒来并立刻检查状态。
            // PAUSED: 连接任务暂停,不需要做什么操作。
            State.CREATED, State.CONNECTING, State.CONNECTED, State.WAITING, State.PAUSED -> State.CLOSING

            else -> error("Unexpected state: $stateNoLock.")
        }

        stateUpdateCondition.signalAll()
    }

    sessionNoLock?.close()
    connectingJobNoLock?.cancel()
    job.cancelAndJoin()

    if (manageHttpClient) {
        httpClient.close()
    }

    lock.write {
        stateNoLock = State.CLOSED
    }
}

例 2:连接任务耗时操作前后

createConnectingJob() 创建的任务内部是一个循环,每次循环代表一次连接尝试,执行如下动作:

  1. 检查状态,并在状态是 CONNECTINGWAITING 时修改为 CONNECTING
  2. 显示连接日志。
  3. 使用底层 API(本例是 Ktor)启动连接。
  4. 连接失败时,检查并修改状态为 WAITING 并等待下一步判断。
  5. 连接成功后,首先检查并修改状态为 CONNECTED,设置对应的底层 API 属性(如用于收发消息的 channelNoLocksessionNoLock),并重置重连计数器。在连接断开时,检查并修改状态为 WAITING
  6. 连接断开后,状态可能是 WAITING,也可能中途被其他线程修改。若是 WAITING,检查尝试次数是否超过限制。若是则放弃重连进入 PAUSED 状态,否则进行一次等待,随后来到循环开头。

其源代码可以在这里看到。

总结

编写线程安全的需要维护内部状态的代码时,应当:

  1. 在任何耗时操作前后,以及对外暴露的 API 里,必须检查当前状态。应当经常问自己:
    1. 正在编写的这一块代码有没有可能同时被多个线程进入,且此时此刻其对应的状态(如果只能是一个的话)应当是哪个。
    2. 正在编写的这一块代码是否有可能在执行中途,对象状态被另一个线程修改?如果可能,这块代码必须能接受这种修改,否则只能上锁。
  2. 获取和释放锁也可能是耗时操作,有时需要在加锁前后分别检查一次条件。
  3. 阅读系统中的可变值(如 sessionNoLock)时应当获取读锁,修改其时应当获取写锁。对系统中的不可变值,如 maxConnectAttempts,可以不上锁。
  4. 最小化锁粒度,避免可挂起点中间用锁。否则协程在另一个线程恢复时,会导致锁状态异常。
  5. 修改系统中的可变值时最好秉持着“谁设置谁置空”原则。例如对于 sessionNoLockchannelNoLock,都是在连接成功建立时才为非 null 值的属性。连接任务有义务维护其值。其他地方在发现其为 null 时便能快速判断连接尚未建立,例如在 send 函数中。

有几个好的习惯可能有助于避免犯错:

  1. 为那些需要锁保护的变量名称添加 NoLock 后缀,以便时时提醒自己读写时加锁。
  2. 管理好由本类创建的对象,如 HttpClient,我们使用额外的 manageHttpClient 变量控制客户端在 close() 时是否连带关闭它:
class WebSocketReconnectSupportedClient(
    // ...
    httpClient: HttpClient? = null,
    manageHttpClient: Boolean? = null,
    // ...
) : ReconnectSupportedClient {
    // ...
    private val manageHttpClient = manageHttpClient ?: (httpClient == null)
    private val httpClient = httpClient ?: HttpClient { install(WebSockets) }

    override suspend fun close() {
        // ...
        if (manageHttpClient) {
            httpClient.close()
        }
        // ...
    }
}
  1. init 块内检查参数是否符合要求,就像下面这样:
init {
    val maxConnectAttemptsLocal = maxConnectAttempts
    require(maxConnectAttemptsLocal == null || maxConnectAttemptsLocal > 0) {
        "maxConnectAttempts must be positive, but got $maxConnectAttemptsLocal."
    }
}
  1. 对于只会在当前上下文中被修改的可空属性,可以将其值存在一个局部变量免去繁琐的非空断言。个人的习惯是在名称后加 Local 后缀。就像 3 里给出的代码里,我们用 maxConnectAttemptsLocal 缓存 maxConnectAttempts 的值,使得在 require 参数里的第二个 maxConnectAttemptsLocal 得以被智能转换为非空类型。
  2. 如果模型内涉及回调外界代码,应当添加对应的 ~ING~ING_ERRORED 状态表示回调尚未返回或出现异常。

最后,一个好的实践是让系统里的许多对象不可变,这样它自然是线程安全的,也就没有这些心智负担了。

Not by AI

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇