a wandering wolf

Does a wandering wolf dreams of a wondering, sometimes programming sheep?

このエントリーをはてなブックマークに追加

MailboxProcessor でのエラーハンドリング

以前にもいくつか MailboxProcessor<'Msg> ネタで記事を書いているんですが、なにぶん情報が少ないため、ちょっとやり方を変えると途端に詰まってしまいます。

ということで、今回の話題も F# の非同期メッセージ処理エージェントであるところの MailboxProcessor<'Msg> です。本記事では、 MailboxProcessor<'Msg> が例外処理する方法について説明します。

ふつうの Async

まずは何の変哲もない Async<'T> 型における例外処理を見ていきます。

Async.Catch

Async.Catch<'T> は非同期計算 Async<'T> を受け取り、計算が完了したことを示す Choice1Of2 と、完了前に例外が発生したことを示す Choice2Of2 のペアである Choice<'T, exn> を返します。

// 同期計算を行うヘルパー関数
let runSync asyncomp = asyncomp |> Async.Catch |> Async.RunSynchronously

// 結果を調べるヘルパー関数
let catchAsync = function
| Choice1Of2 _          -> printfn "Succeeded."
| Choice2Of2 (err: exn) -> printfn "Error occured: %s" err.Message

まずは、計算が完了する例を見てみましょう:

> async {
-     printfn "begin..."
-     do! Async.Sleep 1000
-     printfn "end."
- }
- |> runSync
- |> catchAsync;;
begin...
end.
Succeeded.
val it : unit = ()

続いて、計算中に例外が発生する例です:

> async {
-    printfn "begin..."
-    do! Async.Sleep 1000
-    failwith "Something has come!"
-    printfn "end."
- }
- |> runSync
- |> catchAsync;;
begin...
Error occured: Something has come!
val it : unit = ()

シンプルですね。

Async.StartWithContinuations

Async.StartWithContinuations<'T> は、非同期計算と、成功の継続、エラーの継続、キャンセルの継続を受け取り(必要であれば、さらにキャンセルトークンも指定します)、非同期に計算を行います。

// ヘルパー関数
let startWithCont comp =
    Async.StartWithContinuations(
        comp,
        (fun _   -> printfn "Succeeded."),
        (fun err -> printfn "Error occured: %s" err.Message),
        (fun _   -> printfn "Canceled."))

まずは、成功する場合です:

> async {
-    printfn "begin..."
-    do! Async.Sleep 1000
-    printfn "end."
- }
- |> startWithCont;;
begin...
val it : unit = ()
> end.
Succeeded.

非同期に処理が進むため、標準出力に出力される文字がちょっと変な感じになっています。また、この例は F# Interactive 上で実行している想定ですが、Visual Studio などでビルドしたものを実行した場合、何も印字されないかもしれません。

続いて、例外が発生する場合です:

> async {
-     printfn "begin..."
-     do! Async.Sleep 1000
-     failwith "Something has come!"
-     printfn "end."
- }
- |> startWithCont;;
begin...
val it : unit = ()
> Error occured: Something has come!

あまり意味のない例を出しますが、キャンセルした場合も見てみましょう:

> async {
-     printfn "begin..."
-     do! Async.Sleep 1000
-     Async.CancelDefaultToken()
-     printfn "end."
- }
- startWithCont;;
begin...
val it : unit = ()
> end.
Canceled.

全体として、こちらも分かりやすいかなぁと思います。

MailboxProcessor

さて、 MailboxProcessor<'Msg> ですが、まずは標準的な使い方を見てみましょう:

// エージェントの作成と開始
let agent = MailboxProcessor<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message = inbox.Receive()
            printfn "message: %s" message
            do! Async.Sleep 1000
            printfn "end."
        }
    loop 0)

// メッセージの送信
agent.Post("F#!F#!")

これを実行すると、以下のようになります:

begin...: 0
message: F#!F#!
end.

Async.Catch

まずは正常パターンから:

// 記述量を減らしたい…
type MessageAgent<'Msg> = MailboxProcessor<'Msg * AsyncReplyChannel<'Msg>>

// エージェントの作成と開始
let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)

// メッセージの送受信
agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> runSync
|> catchAsync

これを実行すると:

begin...: 0
message: F#!F#!
end.
Succeeded.

大丈夫ですね。続いて、例外パターンです:

let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            failwith "Something has come!"
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)

agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> runSync
|> catchAsync

これを実行すると:

begin...: 0
message: F#!F#!

あ、あれ?処理が返ってこない…。

Async.StartWithContinuations

気を取り直して、 Async.StartWithContinuations の動きを見てみましょう。まずは計算が完了する方から:

let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)

agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> startWithcont

実行してみましょう:

val agent : MailboxProcessor<string * AsyncReplyChannelbegin...: 0
<string>>
val it : unit = ()

> ssage: F#!F#!
end.
Succeeded.

だいぶひどい感じに出力されていますが、成功しているようです。

例外を発生させてみましょう:

let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            failwith "Something has come!"
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)
agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> startWithCont

これを実行すると:

val agent : MailboxProcessor<string * AsyncReplyChannel<begin...: 0
string>>
val it : unit = ()

> mege: F#!F#!

非同期実行しているため制御は返ってきますが、やはり中の処理は途中で止まってしまっているようです。

Error プロパティ

MailboxProcessor<'Msg> でエラー処理をする場合、 Async<'T> の時と同じやり方では上手くいかないようです。ここは、 Error<'Msg> プロパティを使います。

MailboxProcessor.Error<’Msg> プロパティ (F#) - MSDN Library

MailboxProcessor<'Msg> が計算している間に例外が発生すると、エラーイベントが発生します。そのイベントを提供しているのが、この Error<'Msg> プロパティとなります。

Async.Catch

Async.Catch<'T> の例を見てみましょう:

let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            failwith "Something has come!"
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)

// エラー処理を登録
agent.Error.Add(fun err -> printfn "Error event occured: %s" err.Message)

// メッセージの送受信
agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> runSync
|> catchAsync;;

これを実行します:

begin...: 0
message: F#!F#!
Error event occured: Something has come!

エラー時に処理するように登録した印字処理が行われています。

注意しなければならないのは、エラー処理はされるものの、結局制御は返ってこない、ということです。これを防ぐための1つの方法は、 PostAndAsyncReply<'T> メソッドにタイムアウト用のミリ秒パラメータを渡しておくことです。これによって、指定時間後にタイムアウトエラーとして制御が戻ってきます。(タイムアウトエラーは Async.Catch<'T> が拾ってくれます)

Async.StartWithContinuations

続いて Async.StartWithContinuations<'T> を使う場面です:

let agent = MessageAgent<string>.Start(fun inbox ->
    let rec loop i =
        async {
            printfn "begin...: %d" i
            let! message, replyChannel = inbox.Receive()
            printfn "message: %s" message
            failwith "Something has come!"
            do! Async.Sleep 1000
            replyChannel.Reply("Received: " + message)
            printfn "end."
        }
    loop 0)

agent.Error.Add(fun err -> printfn "Error event occured: %s" err.Message)

agent.PostAndAsyncReply(fun replyChannel -> "F#!F#!", replyChannel)
|> startWithCont

実行してみます:

val agent : MailboxProcessor<string * AsyncReplyChannel<string>>
val it : unit = ()

begin...: 0
> message: F#!F#!
Error event occured: Something has come!

エラーイベント発生がちゃんと拾われています。

まとめ

MailboxProcessor<'Msg> は非同期計算をさせる上で便利なクラスですが、一方で Async<'T> と同じ使い方ができるわけでもありません。特に、同期実行時にエラーが発生すると制御が返ってこない可能性があるので、非同期実行するように変更したり、タイムアウトさせるようにしておきましょう。また、エラー時に何かさせたい場合は Error<'Msg> プロパティにイベントハンドラを貼っておきます。