无序消息处理

你可以使用 ScanTryScan 方法查找队列中的特定消息,并处理它们,而不管它们之前有多少消息。这两种方法按照它们到达的顺序查看队列中的消息,并查找指定的消息(直到可选的超时)。如果没有这样的消息,TryScan 将返回 None,而 Scan 将一直等待,直到此消息到达或操作超时。

让我们在实践中看到它。我们希望处理器尽可能地处理 RegularOperations,但是只要有 PriorityOperation,就应该尽快处理它,无论队列中有多少其他 RegularOperations

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())