a wandering wolf

Does a wandering wolf dreams of a wondering, sometimes programming sheep?

このエントリーをはてなブックマークに追加

MailboxProcessor に処理をさせよう

はじめに

この記事は F# Advent Calendar 2013 4日目の記事です。(12/4 20時現在、3日目はまだ上がっていないので)2日目は @7shi さんの F#を教えるための環境構築 - 七誌の開発日記 でした。

MailboxProcessor について

MailboxProcessor は F# に標準でライブラリに組み込まれている非同期メッセージ処理用の機能です。以前、私のブログで取り扱ったことがあります。

#102 MailboxProcessorで非同期プログラミング - a wandering wolf

値をメッセージとして MailboxProcessor (以下、「アクター」と呼びます)に渡すと、それを受けて何らかの処理をする。結果が必要な場合もメッセージとして受け取る。そういう感じの動作をするのがこの MailboxProcessor です。

結果は同期的にも非同期的にも受け取ることが出来るため、非同期処理に使われることも多いです。

本記事の目的

この MailboxProcessor に単純な処理を非同期で並列に実行させてみましょう。簡単に処理時間も計測してみます。(あくまで比較するための参考値です)

(* f は計測対象 *)
let timer (f: unit -> unit) =
    let sw = System.Diagnostics.Stopwatch()
    sw.Start()
    f()
    sw.Stop()
    printfn "処理時間: %d[ms]" sw.ElapsedMilliseconds

私のブログでは、「困ったときは処理時間計測ネタ」をモットーにしております。ミヤモト・マサシもそうしろって言ってた。

FizzBuzz してみる

まずは趣味プログラミングで Hello, World の次に書くであろう FizzBuzz を MailboxProcessor に実行させてみましょう。(FizzBuzz の説明はしないので、分からない方はぐぐってください)

(* メッセージの型を定義 *)
type FizzBuzzMessage = int * AsyncReplyChannel<int * string>

(* 簡単に FizzBuzz する *)
let fizzbuzz = function
| x when x % 15 = 0 -> "FizzBuzz"
| x when x % 3 = 0  -> "Fizz"
| x when x % 5 = 0  -> "Buzz"
| x                 -> x.ToString()

(* アクターにしてほしい処理を記述 *)
let fizzbuzzBody (inbox: MailboxProcessor<FizzBuzzMessage>) =
    let rec loop () = async {
        let! x, replyChannel = inbox.Receive()
        let converted = x |> fizzbuzz
        replyChannel.Reply (x, converted)
        return! loop()
    }
    loop()

(* アクターを作る *)
let fizzbuzzActor = new MailboxProcessor<FizzBuzzMessage>(fizzbuzzBody)

(* Ouverture! *)
fizzbuzzActor.Start()

以下は、このアクターに仕事をさせる処理です。

(* 出力処理。min から max までの整数に対して処理をさせる *)
let output<'T> (inbox: MailboxProcessor<int * AsyncReplyChannel<int * 'T>>) min max =
    seq {min..max}
    |> Seq.map(fun i ->
                let f = fun replyChannel -> i, replyChannel
                inbox.PostAndAsyncReply f)
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.iter (fun tpl -> snd tpl |> printfn "%A" )

(* 使い方 *)
fun _ -> output fizzbuzzActor 1 30
|> timer

これを実行すると、FizzBuzz の実行結果のあとに処理時間が表示されます。

1: 150[ms]
2: 126[ms]
3: 145[ms]
4: 127[ms]
5: 149[ms]

5回実施してみましたが、いずれも一瞬ですね。

Fibonacci してみる

FizzBuzz の次は、こちらも頻出ネタの Fibonacci (与えられた番号のフィボナッチ数を求める)に挑戦してみましょう。

(* メッセージの型を定義 *)
type FibonacciMessage = int * AsyncReplyChannel<int * int64>

(* 簡単に Fibonacci する *)
let rec fibonacci = function
| 0 -> 0L
| 1 -> 1L
| x     -> (fibonacci (x-1)) + (fibonacci (x-2))

(* アクターにしてほしい処理を記述 *)
let fibBody (inbox: MailboxProcessor<FibonacciMessage>) =
    let rec loop() = async {
        let! x, replyChannel = inbox.Receive()
        let converted = x |> fibonacci
        replyChannel.Reply (x, converted)
        return! loop()
    }
    loop()

(* アクターを作って開始 *)
let fibActor = new MailboxProcessor<FibonacciMessage>(fibBody)
fibActor.Start()

(* 0 から 50 までの整数を並列に渡して処理時間を計測 *)
fun _ -> output fibActor 0 50
|> timer

さて、皆さんに注意しておかないといけないんですが、この処理は大変に遅いです。実行される際は、くれぐれも時間に余裕を持っていただきたい。

1: 512231[ms]
2: 512068[ms]
3: 512477[ms]
4: 519210[ms]
5: 512220[ms]

これはひどい(真顔

やり方を工夫する

さて、F# は純粋な関数型言語ではありません。副作用をなるだけ避けながら、必要とあらばそれを手許に持ってきます。

先ほどの Fibonacci を書き直しましょう。メモ化してみようと思います。

(* メモ化用のマップ。初期値を与えておく *)
let dict = ref Map.empty
dict := Map.add 0 0L !dict
dict := Map.add 1 1L !dict

(* メモ化対応の Fibonacci。参照セルのマップを破壊的に変更してメモ化する *)
let rec fibonacci2 n =
    if Map.containsKey n !dict then (!dict).[n]
    else
        let result = (fibonacci2 (n-1)) + (fibonacci2 (n-2))
        dict := Map.add n result !dict
        result

(* アクターにしてほしい処理を記述。メッセージの型は先述の Fibonacci と一緒 *)
let fibBody2 (inbox: MailboxProcessor<FibonacciMessage>) =
    let rec loop() = async {
        let! x, replyChannel = inbox.Receive()
        let converted = x |> fibonacci2
        replyChannel.Reply (x, converted)
        return! loop()
    }
    loop()

(* アクターを作って開始 *)
let fibActor2 = new MailboxProcessor<FibonacciMessage>(fibBody2)
fibActor2.Start()

(* 再び 0 から 50 までの整数を並列に投げる *)
fun _ -> output fibActor2 0 50
|> timer

実行してみましょう。

1: 128[ms]
2: 152[ms]
3:  89[ms]
4: 105[ms]
5: 107[ms]

速きこと、島風の如し、です!

「並列に dict にアクセスして、大丈夫?」という疑問はあろうかと思いますが、Map.add で同じキーに対して追加したとしても、値が上書きされるだけですし、同じ計算結果を設定しようとしているので、何ら問題なしです。(い、一応気には留めてるんだからね!)

最後に

何か「メモ化最高!」みたいな結論になってしまった気がしますが、私が伝えたかったのは「async コンピュテーション式(または非同期ワークフロー)と MailboxProcessor を使うことで、並列処理も簡単にかつ見通しよく書ける」ということです。実際、body の関数にさせたいことを逆算していけば、すっきりと書き上げられると思います。

こういう MailboxProcessor を使ってやりとりするのを「Actor モデル」と言ったりもするようですが、それはまた詳しい人に聞いてみましょう。(やらんのか)

Adieu!ヾ(๑╹◡╹)ノ