Partager via


Control.MailboxProcessor<'Msg>, classe (F#)

Agent de traitement des messages qui exécute un calcul asynchrone.

Espace de noms/Chemin du module : Microsoft.FSharp.Control

Assembly : FSharp.Core (in FSharp.Core.dll)

[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end

Notes

L'agent encapsule une file d'attente de messages qui prend en charge plusieurs writers et un agent de lecteur unique. Les writers envoient des messages à l'agent à l'aide de la méthode Post et de ses variantes. L'agent peut attendre les messages à l'aide des méthodes Receive ou TryReceive ou analyser tous les messages disponibles à l'aide de la méthode Scan ou TryScan.

Ce type se nomme FSharpMailboxProcessor dans l'assembly .NET. Si vous accédez au type à partir d'un langage .NET autre que F#, ou par réflexion, utilisez ce nom.

Constructeurs

Membre

Description

new

Crée un agent. La fonction body est utilisée pour générer le calcul asynchrone exécuté par l'agent. Cette fonction n'est exécutée qu'une fois Start appelé.

Membres d'instance

Membre

Description

add_Error

Se produit lorsque l'exécution de l'agent génère une exception.

CurrentQueueLength

Retourne le nombre de messages non traités dans la file d'attente des messages de l'agent.

DefaultTimeout

Lève une exception de délai d'attente si aucun message n'est reçu dans cet intervalle. Par défaut, aucun délai d'attente n'est utilisé.

Erreur

Se produit lorsque l'exécution de l'agent génère une exception.

Post

Publie un message dans la file d'attente de messages de MailboxProcessor, de façon asynchrone.

PostAndAsyncReply

Publie un message à un agent et attend une réponse sur le canal, de façon asynchrone.

PostAndReply

Publie un message à un agent et attend une réponse sur le canal, de façon synchrone.

PostAndTryAsyncReply

Semblable à AsyncPostAndReply, mais retourne None en l'absence de réponse pendant le délai d'attente.

Receive

Attend un message. Le premier message dans l'ordre d'arrivée sera utilisé.

remove_Error

Se produit lorsque l'exécution de l'agent génère une exception.

Scan

Recherche un message en consultant les messages dans l'ordre d'arrivée jusqu'à ce que le scanner retourne une valeur Some. Les autres messages demeurent dans la file d'attente.

Démarrer

Démarre l'agent.

TryPostAndReply

Semblable à PostAndReply, mais retourne None en l'absence de réponse pendant le délai d'attente.

TryReceive

Attend un message. Le premier message dans l'ordre d'arrivée sera utilisé.

TryScan

Recherche un message en consultant les messages dans l'ordre d'arrivée jusqu'à ce que le scanner retourne une valeur Some. Les autres messages demeurent dans la file d'attente.

Membres statiques

Membre

Description

Démarrer

Crée et démarre un agent. La fonction body est utilisée pour générer le calcul asynchrone exécuté par l'agent.

Exemple

L'exemple suivant illustre l'utilisation de base de la classe MailboxProcessor.

open System
open Microsoft.FSharp.Control

type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)

let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)

mailbox.Start()

mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))


Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Résultat de l'exemple

  
  
  
  
  
  
  
  
  

L'exemple suivant montre comment utiliser MailboxProcessor pour créer un agent simple qui accepte différents types de messages et retourne les réponses appropriées. Cet agent serveur représente un opérateur en bourse, qui est un agent d'achat et de vente d'une bourse qui définit des enchères et demande les prix des ressources. Les clients peuvent demander des prix ou acheter et vendre des actions.

open System

type AssetCode = string

type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value


type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int

type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType

let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]

let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray

let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0

let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())

marketMaker.Start()

// Query price. 
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))

// Test Buy Order. 
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))

// Test Sell Order. 
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))

// Test incorrect code. 
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))

// Test too large a number of shares. 

let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))

// Too large an amount of money for one transaction. 

let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))

let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)

let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }

let simulate =
    async {
        while (true) do 
            do! simulateOne()
            // Insert a delay so that you can see the results more easily. 
            do! Async.Sleep(1000)
    }

Async.Start(simulate)

Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Résultat de l'exemple

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

Plateformes

Windows 8, Windows 7, Windows Server 2012, Windows Server 2008 R2

Informations de version

Versions de bibliothèque principale F#

Prise en charge dans : 2,0, 4,0, portables

Voir aussi

Référence

Microsoft.FSharp.Control, espace de noms (F#)