無序訊息處理

你可以使用 ScanTryScan 方法查詢佇列中的特定訊息,並處理它們,而不管它們之前有多少訊息。這兩種方法按照它們到達的順序檢視佇列中的訊息,並查詢指定的訊息(直到可選的超時)。如果沒有這樣的訊息,TryScan 將返回 None,而 Scan 將一直等待,直到此訊息到達或操作超時。

讓我們在實踐中看到它。我們希望處理器儘可能地處理 RegularOperations,但是只要有 PriorityOperation,就應該儘快處理它,無論佇列中有多少其他 RegularOperations

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())