可變狀態管理

郵箱處理器可用於以透明和執行緒安全的方式管理可變狀態。讓我們構建一個簡單的計數器。

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

現在讓我們生成一些操作

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

你將看到以下日誌

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

併發

由於郵箱處理器逐個處理訊息並且沒有交錯,因此你還可以從多個執行緒生成訊息,並且你將看不到丟失或重複操作的典型問題。除非你專門實現處理器,否則訊息無法使用其他訊息的舊狀態。

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

所有訊息都是從不同的執行緒釋出的。訊息釋出到郵箱的順序不確定,因此處理它們的順序不是確定性的,但由於增量和減量的總數是平衡的,因此你將看到最終狀態為 0,無論順序如何以及從哪些執行緒傳送訊息。

真正的可變狀態

在前面的示例中,我們僅通過傳遞遞迴迴圈引數來模擬可變狀態,但郵箱處理器即使對於真正可變的狀態也具有所有這些屬性。當你保持較大的狀態時,這很重要,並且由於效能原因,不可變性是不切實際的。

我們可以將我們的計數器重寫為以下實現

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

即使直接從多個執行緒修改計數器狀態,這肯定不是執行緒安全的,你可以通過使用上一節中的並行訊息 Posts 看到郵箱處理器一個接一個地處理訊息而沒有交錯,因此每條訊息都使用最新的價值。