Control.Observable モジュール (F#)
ファースト クラスのイベントおよび他の観測可能なオブジェクトに対する基本操作。
名前空間/モジュール パス: Microsoft.FSharp.Control
アセンブリ: FSharp.Core (FSharp.Core.dll 内)
module Observable
Values
値 |
説明 |
---|---|
add : ('T -> unit) -> IObservable<'T> -> unit |
オブザーバーを作成します。このオブザーバーは、指定された観測可能なオブジェクトを永続的にサブスクライブし、指定された関数を各観測で呼び出します。 |
choose : ('T -> 'U option) -> IObservable<'T> -> IObservable<'U> |
指定された関数を使用してソースから観測のプロジェクションを選択する、観測可能なオブジェクトを返します。返されたオブジェクトは、分割関数が Some 値を返す観測をトリガーします。また、返されたオブジェクトは、ソースから発生したすべてのエラーも反映し、ソースが完了すると完了します。 |
filter : ('T -> bool) -> IObservable<'T> -> IObservable<'T> |
指定された関数によってソースの観測をフィルター処理する、観測可能なオブジェクトを返します。この観測可能なオブジェクトは、述語が true を返す観測のみを対象とします。述語は、サブスクライブされたオブザーバーごとに 1 回実行されます。返されたオブジェクトは、ソースから発生したエラーの観測も反映し、ソースが完了すると完了します。 |
map : ('T -> 'U) -> IObservable<'T> -> IObservable<'U> |
指定された関数によってソースの観測を変換する、観測可能なオブジェクトを返します。変換関数は、サブスクライブされたオブザーバーごとに 1 回のみ実行されます。返されたオブジェクトは、ソースから発生したエラーの観測も反映し、ソースが完了すると完了します。 |
merge : IObservable<'T> -> IObservable<'T> -> IObservable<'T> |
ソースからマージされた観測の観測可能なオブジェクトを返します。返されたオブジェクトは、いずれかのソースから発生した成功とエラーの値を反映し、両方のソースが完了すると完了します。 |
pairwise : IObservable<'T> -> IObservable<'T * 'T> |
観測可能な入力オブジェクトの 2 番目のトリガーとそれ以降のトリガーによってトリガーされる、新しい観測可能なオブジェクトを返します。観測可能な入力オブジェクトの N 番目のトリガーは、N-1 番目と N 番目のトリガーの引数をペアにして渡します。N-1 番目のトリガーに渡される引数は、N 番目のトリガーが発生するまで、隠された内部状態で保持されます。 |
partition : ('T -> bool) -> IObservable<'T> -> IObservable<'T> * IObservable<'T> |
指定された関数によってソースの観測を分割する、2 つの観測可能なオブジェクトを返します。最初の観測可能なオブジェクトは、述語が true を返す値の観測をトリガーします。2 番目の観測可能なオブジェクトは、述語が false を返す値の観測をトリガーします。述語は、サブスクライブされたオブザーバーごとに 1 回実行されます。どちらもソースから発生したすべてのエラーの観測を反映し、それぞれソースが完了すると完了します。 |
scan : ('U -> 'T -> 'U) -> 'U -> IObservable<'T> -> IObservable<'T> |
各オブザーバーに対して状態の項目を割り当て、指定された累積関数を入力から生成された一連の値に適用する、観測可能なオブジェクトを返します。返されたオブジェクトは、初期値を除外して、計算された各状態値の観測をトリガーします。返されたオブジェクトは、ソースから発生したすべてのエラーを反映し、ソースが完了すると完了します。 |
split : ('T -> Choice<'U1,'U2>) -> IObservable<'T> -> IObservable<'U1> * IObservable<'U2> |
指定された関数によってソースの観測を分割する、2 つの観測可能なオブジェクトを返します。最初のオブジェクトは、分割関数が Choice1Of2 を返す観察をトリガーします。2 つ目のオブジェクトは、分割関数が Choice2Of2 を返す観察 y をトリガーします。分割関数は、サブスクライブされているオブザーバーごとに 1 回実行されます。どちらもソースから発生したエラーの観測を反映し、それぞれソースが完了すると完了します。 |
subscribe : ('T -> unit) -> IObservable<'T> -> IDisposable |
指定された観測可能なオブジェクトをサブスクライブし、指定された関数を各観測で呼び出すオブザーバーを作成します。 |
使用例
観測可能なオブジェクトを使用する方法を次のコード例に示します。この例で定義される ObserverSource クラスは、観察可能なイベントのソースとして使用できる汎用的な再利用可能クラスです。ここでは、このモジュールの一部の関数を使用する例を示します。その他の関数については、「Control.Event モジュール (F#)」のコード例を参照してください。
open System
open System.Diagnostics
// Represents a stream of IObserver events.
type ObservableSource<'T>() =
let protect function1 =
let mutable ok = false
try
function1()
ok <- true
finally
Debug.Assert(ok, "IObserver method threw an exception.")
let mutable key = 0
// Use a Map, not a Dictionary, because callers might unsubscribe in the OnNext
// method, so thread-safe snapshots of subscribers to iterate over are needed.
let mutable subscriptions = Map.empty : Map<int, IObserver<'T>>
let next(obs) =
subscriptions |> Seq.iter (fun (KeyValue(_, value)) ->
protect (fun () -> value.OnNext(obs)))
let completed() =
subscriptions |> Seq.iter (fun (KeyValue(_, value)) ->
protect (fun () -> value.OnCompleted()))
let error(err) =
subscriptions |> Seq.iter (fun (KeyValue(_, value)) ->
protect (fun () -> value.OnError(err)))
let thisLock = new obj()
let obs =
{ new IObservable<'T> with
member this.Subscribe(obs) =
let key1 =
lock thisLock (fun () ->
let key1 = key
key <- key + 1
subscriptions <- subscriptions.Add(key1, obs)
key1)
{ new IDisposable with
member this.Dispose() =
lock thisLock (fun () ->
subscriptions <- subscriptions.Remove(key1)) } }
let mutable finished = false
// The source ought to call these methods in serialized fashion (from
// any thread, but serialized and non-reentrant).
member this.Next(obs) =
Debug.Assert(not finished, "IObserver is already finished")
next obs
member this.Completed() =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
completed()
member this.Error(err) =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
error err
// The IObservable object returned is thread-safe; you can subscribe
// and unsubscribe (Dispose) concurrently.
member this.AsObservable = obs
// Create a source.
let source = new ObservableSource<int>()
// Get an IObservable from the source.
let obs = source.AsObservable
// Add a simple subscriber.
let unsubA = obs |> Observable.subscribe (fun x -> printfn "A: %d" x)
// Send some messages from the source.
// Output: A: 1
source.Next(1)
// Output: A: 2
source.Next(2)
// Add another subscriber. This subscriber has a filter.
let unsubB =
obs
|> Observable.filter (fun num -> num % 2 = 0)
|> Observable.subscribe (fun num -> printfn "B: %d" num)
// Send more messages from the source.
// Output: A: 3
source.Next(3)
// Output: A: 4
// B: 4
source.Next(4)
// Have subscriber A unsubscribe.
unsubA.Dispose()
// Send more messages from the source.
// No output
source.Next(5)
// Output: B: 6
source.Next(6)
// If you use add, there is no way to unsubscribe from the event.
obs |> Observable.add(fun x -> printfn "C: %d" x)
// Now add a subscriber that only does positive numbers and transforms
// the numbers into another type, here a string.
let unsubD =
obs |> Observable.choose (fun int1 ->
if int1 >= 0 then None else Some(int1.ToString()))
|> Observable.subscribe(fun string1 -> printfn "D: %s" string1)
let unsubE =
obs |> Observable.filter (fun int1 -> int1 >= 0)
|> Observable.subscribe(fun int1 -> printfn "E: %d" int1)
let unsubF =
obs |> Observable.map (fun int1 -> int1.ToString())
|> Observable.subscribe (fun string1 -> printfn "F: %s" string1)
プラットフォーム
Windows 8、Windows 7、Windows Server 2012 で Windows Server 2008 R2
バージョン情報
F# コア ライブラリのバージョン
サポート: ポータブル 2.0、4.0