読者です 読者をやめる 読者になる 読者になる

かたちづくり

つれづれに、だらだらと、おきらくに

継続は力なり #FsAdvent

この記事は F# Advent Calendar 2014 - connpass の18日目の記事です。昨日は teramonagi さんの FsLab JournalでReproducible Research(レポート)を簡単に作りたい - My Life as a Mock Quant でした。

分をわきまえずに継続モナドとかいうヤツについて書きます。
ちょっと前のことですが、継続(continuation)の概念がストンと腑に落ちる瞬間を味わいまして割と気持ち良かったので、その感動を忘れないうちに書きます。まあ、経験上最初の「分かった!」はぬか喜びであることが多いので今回も誤解している可能性はありそうですが、多少の間違いはあっても感動の熱さが残っているうちに書かないと書く気が失せてしまうこともやはり経験から分かっているので、勇気を出してこの機会に書いてみようと思うのです。

なお、私はこの程度のオッサンですので過度な期待は禁物ですよ。

  • Haskell 分かりません。
  • 圏論知りません。
  • モナド則は見たことあるけど「ふーん?」という感じ。
  • 継続モナドで調べてると call/cc というのが出てくるのですが、これはまだ理解できてません。
  • 限定継続という言葉を見た気がしますが、何のことやらさっぱり。

書いたコードは gist にアップしてあります。
https://gist.github.com/u1roh/5e12b8819a5aa0e5eca5

課題として直線作図機能を考えてみよう

私はCAD系ツールの開発がお仕事なので、作図ツールの設計には関心があります。ですのでここでは、画面上の2点をクリックするとその2点を結ぶ直線を作図する機能を課題として考えていくことにします。

アプリケーション外観

具体的なイメージを持ってもらうため、サンプルとして作ったアプリケーションの外観を見せておきます。何やら怪しげなメニューが見えていますが今は気にしない気にしない。
まず直線作図メニューを選んで、次に画面上を2点クリックすると直線が作図されます。

準備

簡単な Forms アプリケーションを作ることにしましょう。準備として次のようなコードを用意しておきます。

[<EntryPoint>]
let main argv = 
  let form =
    let form = new Form ()

    let drawObjs = List<Graphics -> unit> ()
    form.Paint |> Event.add (fun e -> for draw in drawObjs do draw e.Graphics)

  Application.Run form
  0

drawObjs に描画処理を登録しておくと Paint イベントで描画が実行されるようになっています。drawObjs に直線を作図する関数を Add すれば直線が作図できるというわけです。

コンソールプログラムだったら…

突然ですが、コンソールプログラムでユーザー入力を受け付けるのは簡単でした。次のプログラムはユーザー入力を2行受け付けて、入力文字列を連結して出力します。

let line1 = Console.ReadLine ()
let line2 = Console.ReadLine ()
Console.WriteLine (line1 + line2)

これと似たような感じでマウスクリック入力を2回受け付けることができたらいいですね!

しかしマウスイベントの場合は…

マウスイベントはコンソールの入力のようには扱えません。イベントハンドラを登録しておいてクリックイベントが飛んでくるのを待ち構えるしかありません。ですので直線作図機能を作るためには例えば次のようなコードを書くことになります。

let firstPos = ref None
let subscription = ref Option<IDisposable>.None
subscription :=
  form.MouseClick |> Observable.subscribe (fun e ->
    match !firstPos with
    | None -> firstPos := Some e.Location
    | Some p ->
      drawObjs.Add (fun g -> g.DrawLine (Pens.Blue, p, e.Location))
      form.Invalidate ()
      !subscription |> Option.iter (fun d -> d.Dispose ())
      subscription := None
      firstPos := None
  ) |> Some)

飛んできたクリックイベントが1点目のクリックなのか2点目のクリックなのかを判定する必要があります。また、1点目のクリック位置は一旦どこかに保持しておかなくてはなりません。結果として出来上がったプログラムは、「2回クリックを受け付けて直線を作図する」というシンプルな脳内モデルとは似ても似つかない構造のプログラムとなってしまいます。

継続モナドを使うと!

結論から書きましょう。継続モナドを使うとこう書けるんです!

cont {
  let! click1 = form.MouseClick |> Cont.ofObservable
  let! click2 = form.MouseClick |> Cont.ofObservable
  drawObjs.Add (fun g -> g.DrawLine (Pens.Black, click1.Location, click2.Location))
  form.Invalidate ()
} |> Cont.run
  • コンソールプログラムと同じような流れでクリックイベントを取得しているところにご注目!
  • cont { ... } は継続モナドのためのコンピュテーション式です
  • Cont は継続モナドのための関数が定義されているモジュールです
  • コードには見えていませんが、Cont<'a> という継続モナドの型を定義して利用しています

少し不思議な印象を受けるかもしれません。このコードを実行すると、クリック待ち状態でいったんイベントループに処理が戻り、クリック入力を受け取ると再びコードに戻ってきてさっきの続きから処理が再開(つまり処理が継続!)されます。この不思議さは seq { yield ... } に似ていると思いますし、裏を返せば yield と同程度の不思議さでしかないとも言えます。
このコードを実現している仕組みを以下でひも解いていきましょう。

継続って何?

さて、いったん上記の直線作図の課題は忘れて、基礎的なところからコツコツと継続について説明していくことにします。例として次のような単純な処理を考えます。
f:id:u_1roh:20141217230441p:plain

  • 関数 f1 は a0 を受け取って a1 を返します
  • 関数 f2 は a1 を受け取って a2 を返します
  • 関数 f3 は a2 を受け取って a3 を返します

「普通」に関数の戻り値を使って実装すると…

let a1 = f1 a0
let a2 = f2 a1
let a3 = f3 a2

(* パイプライン演算子を使って次のように書くこともできる *)
a0 |> f1 |> f2 |> f3

何の変哲もない、普通の処理ですね。このコードは明らかに一気に最後まで実行されますから、上で見たような「処理がいったん中断して途中から再開する」ようなトリックが入り込む余地はどこにもありません。一応これらの関数の型を見ておきましょう。

val f1 : A0 -> A1
val f2 : A1 -> A2
val f3 : A2 -> A3
※ a0, a1, a2, a3 の型をそれぞれ A0, A1, A2, A3 としました

継続渡し(CPS; Continuation Passing Style) について

上記コードを「継続渡し」というスタイル(=CPS)に書き換えて行きたいのですが、その前にここでCPSとは何か説明しておきたいと思います。
CPSでは関数の結果を戻り値として受け取るのではなく、コールバック関数に結果を入れるスタイルとなります。…と言われても分からないと思いますから、まずは f1 だけを例にとってCPSに書き換えてみましょう。
まず f1 を次のような形式に書き換えます。

(* CPS で呼び出せる f1。型は A0 -> (A1 -> unit) -> unit *)
let f1 (a0 : A0) (continuation : A1 -> unit) =
  ...

そして次のようにCPSで関数呼び出しをすることになります。

f1 a0 (fun a1 -> ...)

要点を箇条書きしてみます。

  • f1 の戻り値は unit になっています。つまり結果を戻り値として返していません。
  • f1 の引数が1つ増えていて、2つ目の引数にコールバック関数を渡すようになっています。
  • コールバック関数のシグネチャが A1 -> unit であることから分かるように、f1 の結果をコールバック関数で受け取るようになっています。

このコールバック関数のことを「継続」と呼び、このような継続を引数に渡すスタイルを「継続渡し」と呼ぶのです。
私見になりますが、「継続」などといういかめしい用語に惑わされている人って結構いるのではないでしょうかね。というか私は惑わされていましたね、とほほ。代わりに「続き」とか「続き渡し」と呼んでみれば一気に理解のハードルが下がる気がします。単に計算の「続き」をコールバック関数として渡しているだけです。

ではCPSは戻り値を使う方法と比べて何が優れているのでしょうか。何が本質的な違いなのでしょうか。
戻り値を使う方法は、f1 を呼び出すことで結果 a1 を pull するスタイルとなります。しかし CPS では f1 に「続き」を渡して結果を push してもらうスタイルとなります。つまり、CPSでは主導権を呼び出し先に委ねてしまっているのですね。ですから「続き」を呼び出すタイミングは呼び出し先である f1 が自由に決められるのです。いったんメッセージループに戻ってクリックイベントを検知してから「続き」を呼び出してもいいし、非同期計算の終了後に「続き」を呼び出してもいいのです。ここに継続モナドのトリックの種明かしがあるのです。

余談ですが、「続きはウェブで」っていうTVCMありますよね。あれも一種の継続渡しと言えるかもしれません。CMの最後に表示される検索キーワードが「継続」なんですね。CMの作り手側の視点に立つと分かります。TVCMは時間の制約が厳しく、TV放送という「スレッド」を長時間占有することは許されませんが、視聴者にはより長いCMを見てほしいので最後に継続(=続き)を渡してくるわけです。もちろん、継続を受け取った視聴者のその後の行動の主導権は視聴者自身にあります。すぐにスマホで検索してもよし、後でPCでググってもよし、もちろん無視してもよし(大半は無視でしょうね)。

CPSに書き換えてみる

さて、f1, f2, f3 をすべてCPS用に書き換えるとこうなります。

let f1 (a0 : A0) (continuation : A1 -> unit) = ...
let f2 (a1 : A1) (continuation : A2 -> unit) = ...
let f3 (a2 : A2) (continuation : A3 -> unit) = ...

関数の型は下記の通りです。

val f1 : A0 -> (A1 -> unit) -> unit
val f2 : A1 -> (A2 -> unit) -> unit
val f3 : A2 -> (A3 -> unit) -> unit

これら f1, f2, f3 をCPSで呼び出すコードは下記のようになります。

f1 a0 (fun a1 ->
f2 a1 (fun a2 ->
f3 a2 (fun a3 -> ...)))

「継続(=続き)」として渡しているコールバック関数が入れ子になって呼び出されている構造に注目してください。なお、入れ子になっていることをはっきりさせるためにはインデントを入れるべきですが、ここでは敢えてインデントを入れませんでした。その理由は f1, f2, f3 があたかも対等な立場でフラットに連続して処理されているように見せかけるためですが、好みに合わせて適宜インデントを入れて読んでください。
さて、次はこの入れ子構造を少し考察してみましょう。

ラッキョウとリストと継続

少し話が飛びますが、ここでリストのデータ構造を復習してみます。リスト型は次のように自己再帰的に定義されています。

type List<'a> =
  | Empty
  | List of Head:'a * Tail:List<'a>

Head が先頭要素で、Tail が「残り」です。Head を剥がしても剥がしても中からリストが出てくる構造です。つまり

1 :: 2 :: 3 :: 4 :: []

というリストは、各要素がフラットに並んでいるのではなく次のような自己再帰的な入れ子構造になっているわけです。

1 :: (2 :: (3 :: (4 :: [])))

まるでラッキョウのような構造ですね。ラッキョウは皮を剥いても剥いても中からラッキョウが出てくる構造をしています(実際に確かめたことはありませんが)。ラッキョウ型は次のように定義できるでしょうか。

type ラッキョウ =
  || ラッキョウ of* ラッキョウ

そして「継続」にも、このようなラッキョウ型の自己再帰的な構造が見られるように思うのです。割と一般的な見方かと思いますが、プログラムというものを何らかの処理命令を連続して次々に処理していくものと捉えてることができます。つまり処理命令がフラットにシーケンシャルに並んでいる構造です(ループとか条件分岐は考えないとして)。しかし「継続」を理解するためには、プログラムを次のような自己再帰的な構造に捉える視点が必要なのではないか、と思うに至りました。

type プログラム =
  || プログラム of 処理命令 * 継続(=続き) : プログラム

つまり命令を処理しても処理しても中から続きのプログラムが出てくるという自己再帰的な構造です。ラッキョウのようにプログラムが入れ子になっていると捉えるわけです。前節に示した継続渡しのコードには、そういう入れ子構造が現れているのです。
次はいよいよ継続モナドの導出です。CPSのコードを蒸留して純粋な継続渡しの構造だけを抽出し、モナドという構造に昇華する作業です。

継続モナドの導出

さて、前節で書いた継続渡しのコードを書き換えて継続モナドを導出してみたいと思います。まず関数の型を再掲します。

val f1 : A0 -> (A1 -> unit) -> unit
val f2 : A1 -> (A2 -> unit) -> unit
val f3 : A2 -> (A3 -> unit) -> unit

型名にエイリアスをつけて分かりやすく

継続(=続き)として渡すコールバック関数の型に別名をつけます。

type Callback<'a> = 'a -> unit

するとf1,f2,f3の型は次のように読み替えることができます。

val f1 : A0 -> Callback<A1> -> unit
val f2 : A1 -> Callback<A2> -> unit
val f3 : A2 -> Callback<A3> -> unit

さらに次の型を定義します。(これが継続モナド型!)

type Cont<'a> = Callback<'a> -> unit

するとf1,f2,f3の型は更に次のように読み替えることが出来ます。

val f1 : A0 -> Cont<A1>
val f2 : A1 -> Cont<A2>
val f3 : A2 -> Cont<A3>

ずいぶんスッキリして構造が分かりやすくなりました。重要なポイントは次の2つです。

  • Cont<'a> 型として「継続を受け取る構造」だけが抽出されていること
  • f1,f2,f3 が、元は戻り値なしの関数(unitを返す関数)であったにも関わらず、Cont<'a> という「値」を返す関数であるかのように書き換わっていること

合わせて f1, f2, f3 の定義も次のように書き換えておくと、「Cont<'a> という値を返している」ことがハッキリしますね。

let f1 (a0 : A0) : Cont<A1> = fun (continuation : Callback<A1>) -> ...
let f2 (a1 : A1) : Cont<A2> = fun (continuation : Callback<A2>) -> ...
let f3 (a2 : A2) : Cont<A3> = fun (continuation : Callback<A3>) -> ...

bind関数の導出

f1, f2, f3 を呼び出しているコードを再掲します。

f1 a0 (fun a1 ->
f2 a1 (fun a2 ->
f3 a2 (fun a3 -> ...)))

f1 の処理を f2 の処理を「くっつける」ことを考えていきます。つまり、f1 と f2 の処理をまとめて行う関数 f12 : A0 -> Cont を定義したいのです。

(* f1 と f2 をくっつけた関数 *)
let f12 (a0 : A0) : Cont<A2> =
  fun (continuation : Callback<A2>) ->
    f1 a0 (fun a1 -> f2 a1 continuation)

(* f12 を使うと呼び出し側のコードはこうなる *)
f12 a0 (fun a2 ->
f3  a2 (fun a3 -> ...))

さらに f12 と f3 もくっつけることができます。

(* f12 と f3 をくっつけた関数 *)
let f123 (a0 : A0) : Cont<A3> =
  fun (continuation : Callback<A3>) ->
    f12 a0 (fun a2 -> f3 a2 continuation)

(* f123 を使うと呼び出し側のコードはこうなる *)
f123 a0 (fun a3 -> ...)

このように次々と関数呼び出しをくっつけることができます。この「くっつける」操作だけを bind 関数として定義してみましょう。

(* 「くっつける」操作だけを取り出した関数 *)
let bind (f : 'a1 -> Cont<'a2>) (x : Cont<'a1>) : Cont<'a2> =
  fun (continuation : Callback<'a2>) ->
    x (fun a1 -> f a1 continuation) 

(* bind を使うと呼び出し側のコードはこうなる *)
let a1 = f1 a0
let a2 = bind f2 a1
let a3 = bind f3 a2
a3 (fun a3 -> ...)

(* パイプライン演算子を使うとこうなる *)
(f1 a0 |> bind f2 |> bind f3) (fun a3 -> ...)

おおお!?なんだか「らしく」なってきたと思いませんか?
ここで「継続って何?」の冒頭に戻って、普通の戻り値による関数の呼び出しとソックリであることを確認してください。

コードの整理

ここで一旦、コードを整理しなおしておきます。

(* ↓単純な型エイリアスはやめて、要素が1つだけの判別共用体として定義しなおした *)
type Callback<'a> = Callback  of ('a -> unit)
type Cont<'a>     = Cont      of (Callback<'a> -> unit)

(* Cont モナド用の関数をモジュールに整理 *)
module Cont =

  let inline bind binder (Cont invoke) =
    Cont <| fun callback ->
      invoke <| Callback (fun a -> let (Cont invoke) = binder a in invoke callback)

  (* Cont モナドを実行する *)
  let inline run (Cont invoke) = invoke (Callback (fun () -> ()))

  (* 単位元 *)
  let inline ofValue x =
    Cont <| fun (Callback callback) -> callback x

継続モナドを使って直線作図機能を作ろう

ようやく私たちは継続モナドを手に入れました(← ホントはモナド則を満たしているべきかきちんと確認するべきなのでしょうが私自身の理解があやふやなのでパス m(_ _)m)。これを使って、いよいよ本記事の冒頭の課題「画面上の2点をクリックするとその2点を結ぶ直線を作図する機能」に挑戦していきます。

ofObservable 関数

課題を解くには、クリックイベントを継続モナドとして扱えるようにしなくてはなりません。F# ではイベントは IObservable 型として扱うことができます。ですので、IObservable 型を「イベントを検知すると継続を呼び出す継続モナド」に変換する関数が定義できれば汎用性がありそうです。私は次のように定義してみました。

module Cont =
  let ofObservable o =
    Cont <| fun (Callback callback) ->
      let subscr  = ref Option<IDisposable>.None
      subscr := o |> Observable.subscribe (fun a ->
        callback a
        !subscr |> Option.iter (fun d -> d.Dispose (); subscr := None)) |> Some

ちょっと分かりにくいので簡単に解説します。
IObservable はイベントがストリームとして次々と流れてくるイメージで、そのイベントの「流れ」を subscribe することができるというものです。これに対して今回必要なのは、クリックイベントを1回だけ検知して継続に渡して終了するというモノですから、イベントを検知した時点でイベントの購読を終了させなくてはなりません。そのため subscribe に渡しているイベントハンドラの中で自身をイベントから外す処理を行っています。
これを使うことで、次のようにして MouseClick イベントを Cont に変換することができます。

let clickCont = Cont.ofObservable form.MouseClick

直線作図機能

準備が整いましたので直線作図機能を書いてみます。

Cont.ofObservable form.MouseClick |> Cont.bind (fun click1 ->
Cont.ofObservable form.MouseClick |> Cont.bind (fun click2 ->
  drawObjs.Add (fun g -> g.DrawLine (Pens.Black, click1.Location, click2.Location))
  form.Invalidate ()))
|> Cont.run

素晴らしい!

コンピュテーション式

モナドができたらコンピュテーション式も定義しておくと素敵になります。モナドがまるで言語自体に組み込まれたような感じで利用することができます。(コンピュテーション式については既に様々な方が素晴らしい説明を書いてくださっているので説明は割愛します。)
次のようなビルダーを定義しておくと、

type ContBuilder () =
  member __.Bind (x, f)  = x |> Cont.bind f
  member __.Return x  = Cont.ofValue x
  member __.Zero ()   = Cont.ofValue ()

let cont = ContBuilder ()

直線作図機能は次のようになります。(本記事の冒頭のコードの再掲)

cont {
  let! click1 = form.MouseClick |> Cont.ofObservable
  let! click2 = form.MouseClick |> Cont.ofObservable
  drawObjs.Add (fun g -> g.DrawLine (Pens.Black, click1.Location, click2.Location))
  form.Invalidate ()
} |> Cont.run

ますますもって素晴らしい!
これにてようやく冒頭のコードの種明かしが終了したわけですが、ご理解いただけましたでしょうか。とかく複雑で訳が分からなくなりがちなモナドですが、私なりにできる限り分かりやすい説明を試みたつもりです。私が感じた「腑に落ちた時の気持ちよさ」を少しでもお裾分け出来たなら嬉しいです。

(ここで記事としては一旦終わり。あとはゆるゆるダラダラと、応用とか、拡張とか)

応用

直線作図機能以外にも応用してみましょう。

モードレスダイアログ

モーダルダイアログはとても簡単ですが、モードレスダイアログになると途端に面倒になる気がしませんか。モーダルダイアログのような気軽さでモードレスダイアログが利用できたら良いと思いませんか。
下記はダイアログをモードレスで表示して、OKボタンが押されるとダイアログを閉じて MessageBox を表示するプログラムです。

cont {
  let dialog = new Form (Text = "dialog")
  let! _ =
    let btnOK = new Button (Text = "OK")
    dialog.Controls.Add btnOK
    dialog.Show ()
    btnOK.Click |> Cont.ofObservable
  dialog.Close ()
  MessageBox.Show "OK" |> ignore
  return ()
} |> Cont.run)

非同期計算

何らかの時間のかかる処理をバックグラウンドで実行し、終了したらUIスレッドに戻って続きを実行する、というのはよくあるパターンですね。もちろん async だけで十分簡単に出来るのですが、ここではあえて継続モナド化してみます。
まず準備として下記の ofAsync という関数を定義します。これは Async<'a> 型の計算オブジェクトを Cont<'a> に変換する関数です。

module Cont =
  ...
  let ofAsync (cts : CancellationTokenSource) computation =
    Cont <| fun (Callback callback) ->
      let con = SynchronizationContext.Current
      let cts = if cts = null then new CancellationTokenSource () else cts
      Async.StartWithContinuations (
        async {
          do! Async.SwitchToThreadPool ()
          return! computation cts.Token
        },
        (fun a -> con.Post ((fun _ -> callback a), null)), ignore, ignore, cts.Token)
  • 非同期計算をキャンセルすることも考えて CancellationToken が登場させましたが、結局このサンプルではキャンセル処理は扱っていないので例として少し良くなかったかもしれません。CancellationToken 周りは無視して読んでください。
  • 大事な点として、継続の callback がUIスレッドで実行されるように SynchronizationContext に Post しています。

これを使って、1秒間のダミー処理をバックグラウンドで動かしてみます。プログレスバーの更新も行っています。

cont {
  let ui = SynchronizationContext.Current
  let! result = Cont.ofAsync null (fun token ->
    async {
      for i = 1 to 10 do
        Thread.Sleep 100
        printf "."
        ui.Post ((fun _ -> progressBar.Value <- progressBar.Maximum * i / 10), null)
      return "Async calculation result"
    })
  MessageBox.Show result |> ignore
  progressBar.Value <- 0
} |> Cont.run)

全部ちゃんぽん

全然意味ない処理ですが、全部ちゃんぽん出来ちゃいます。

cont {
  // クリックイベントを取得
  let! click = form.MouseClick |> Cont.ofObservable
  printfn "clicked: Location = %A" click.Location

  // モードレスダイアログを表示
  let dialog = new Form (Text = "dialog")
  let! _ =
    let btnOK = new Button (Text = "OK")
    dialog.Controls.Add btnOK
    dialog.Show ()
    btnOK.Click |> Cont.ofObservable
  dialog.Close ()

  // 非同期計算
  let ui = SynchronizationContext.Current
  do! Cont.ofAsync null (fun _ ->
    async {
      for i = 1 to 10 do
        Thread.Sleep 100
        ui.Post ((fun _ -> progressBar.Value <- progressBar.Maximum * i / 10), null)
    })

  // 最後にメッセージを表示
  MessageBox.Show "done" |> ignore
  progressBar.Value <- 0
} |> Cont.run)

コンピュテーション式の拡張

ここからは全くもって理解不足で、今後の私の課題になるのですが…。たぶん、コンピュテーション式のビルダーに Combine とか Using とか For とかも定義してあげたほうが色々と便利そうです。しかしまだコンピュテーション式の変換ルールが理解し切れておらず…という状態です。トライしてみたところまで書きます。

評価の遅延

例えば次のように書いたとき、

cont { printfn "hoge" }

これがすぐ評価されて "hoge" がプリントされてしまうよりは、Cont.run の呼び出しまで遅延されるほうが使いやすそうです。そのためにビルダーに Delay と Run を定義してみました。

    // DelayRun はコンピュテーション式の評価を遅延させるための仕掛け
    member __.Delay f = f
    member __.Run   f = ofValue () |> bind f

Combine

これでいいのかな?

    // Combine は例えば if による条件分岐の後に続けて処理を書きたいときに必要になる
    // - x の後に「継続」して f を実行する。その際、x の結果の値は無視する。
    // - 第二引数の f が関数なのは Delay によって処理が遅延されているから(たぶん…)
    member __.Combine (x, f) = x |> bind (fun _ -> f ())

Using

概ねこんな感じだと思うのですが、例外が起きた場合に対応できてないです。

    // use 構文が使えるようにする。
    // f x で得られる計算処理の終了時に x.Dispose () を呼べば良いはず。
    // しかし例外が起きた時には対応できていない。
    member __.Using (x: #IDisposable, f : _ -> Cont<_>) =
      Cont <| fun (Callback callback) ->
        let (Cont invoke) = f x
        invoke <| Callback (fun a -> x.Dispose(); callback a)

というか、Using に限らず今回のサンプルコード全体にわたって例外のことは考慮できていないですね。例外に対応するには継続のコールバック関数が単純な関数ではダメで、 IObserver みたいに OnError メソッドがないとダメかな?