§反応的なストリーム処理
§Enumeratee の守備範囲
Enumeratee は Iteratee API の中でとても重要なコンポーネントです。Enumeratee の役割は、データのストリームを適合・変換することです。その役割をイメージできるという意味で、 Enumeratee の中でも特に Enumeratee.map というメソッドが親しみやすいかもしれません。
単純な例から始めてみましょう。まずは、次のような Iteratee をつくります。
val sum: Iteratee[Int,Int] = Iteratee.fold[Int,Int](0){ (s,e) => s + e }
この Iteratee は Int を入力にとり、それらの和を計算します。次は以下のような Enumerator をつくりましょう。
val strings: Enumerator[String] = Enumerator("1","2","3","4")
さて、ここで string:Enumerator[String] は明らかに Iteratee[Int,Int] に適用できません。しかし、 Enumerator が生成する String を何らかのルールに基づいて Int へ変換することができれば、ストリームの生成側と消費側がうまく噛み合いそうです。つまり、 Iteratee[Int,Int] を Iteratee[String,Int] に適合させるか、もしくは Enumerator[String] を Enumerator[Int] へ適合させるか、どちらかを行う必要があるということです。Enumeratee はまさにこの用途のためにあります。 Enumeratee[String,Int] を利用すると、 Iteratee[Int,Int] を目的のインタフェースに適合させることができます。
//create am Enumeratee using the map method on Enumeratee
val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val adaptedIteratee: Iteratee[String,Int] = toInt.transform(sum)
//this works!
strings |>> adaptedIteratee
transform メソッドと全く同じ意味をもつ演算子 &>> も利用できます。
strings |>> toInt &>> sum
map メソッドは引数に渡された From => To という関数をつかって、From 型の入力データを To 型の値へ変換する Enumeratee を生成します。Enumeratee は Enumerator を変換することもできます。
val adaptedEnumerator: Enumerator[Int] = strings.through(toInt)
//this works!
adaptedEnumerator |>> sum
through メソッドについても、同じ意味の演算子が用意されています。
strings &> toInt |>> sum
Enumeratee トレイトに定義されている transform メソッドのシグネチャを見てみましょう。
trait Enumeratee[From, To] {
def transform[A](inner: Iteratee[To, A]): Iteratee[From, A] = ...
}
かなり簡単なシグネチャです。 Enumerator に定義されている through メソッドについても同様です。
trait Enumerator[E] {
def through[To](enumeratee: Enumeratee[E, To]): Enumerator[To]
}
Enumeratee と Enumerator における transform と through はどちらも Enumeratee の apply メソッドを利用しています。こちらのシグネチャはもう少し複雑です。
trait Enumeratee[From, To] {
def apply[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] = ...
}
Enumeratee が出来るのは、単に Iteratee の方を変換することだけではありません。 Enumeratee は取り外し可能なアダプターのようなものなので、 Enumeratee を通して異なる種類の入力データを送信し終わった後は、本来の Iteratee に戻すことができます。前述の例でいえば、 本来の Iteratee[Int,Int] に戻してから、今度は Int の入力データを送ることができます。
val sum:Iteratee[Int,Int] = Iteratee.fold[Int,Int](0){ (s,e) => s + e }
//create am Enumeratee using the map method on Enumeratee
val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val adaptedIteratee: Iteratee[String,Iteratee[Int,Int]] = toInt(sum)
// pushing some strings
val afterPushingStrings: Future[Iteratee[String,Iteratee[Int,Int]]] = {
Enumerator("1","2","3","4") |>> adaptedIteratee
}
val flattenAndRun:Future[Iteratee[Int,Int]] = Iteratee.flatten(afterPushingStrings).run
val originalIteratee = Iteratee.flatten(flattenAndRun)
val moreInts: Future[Iteratee[Int,Int]] = Enumerator(5,6,7) |>> originalIteratee
val sumFuture:Future[Int] = Iteratee.flatten(moreInts).run
sumFuture onSuccess {
case s => println(s)// eventually prints 28
}
このようなことが可能であるため、変換前の元々の Iteratee を「内側」、変換後の Iteratee を「外側」と呼びます。
Enumeratee の全体像が見えてきた所で、少し重要な話をします。実は、 transform は内側の Iteratee が Done 状態になったときにに与えられる最後の入力データを取りこぼしてしまいます。つまり、 Enumeratee.map を使って入力データを変換すると、内側の Iteratee が入力データの最後のチャンクとともに Done 状態になった際、 transform メソッドがそれを無視してしまいます。
この場で説明するには少し詳細に入りすぎていると思われるかもしれませんが、モデルを把握する役には立ちます。
Enumeratee.map の例に立ち戻って考えてみると、 実はそれより汎用的な Enumeratee.mapInput というメソッドがあります。これを使うと、任意のタイミングで EOF を返すことができます。
val toIntOrEnd: Enumeratee[String,Int ] = Enumeratee.mapInput[String] {
case Input.El("end") => Input.EOF
case other => other.map(e => e.toInt)
}
Enumeratee.map と Enumeratee.mapInput はかなり直感的です。どちらもチャンクを一つ一つ変換するという機能を持っています。さて、次の便利な Enumeratee は Enumeratee.filter です。
def filter[E](predicate: E => Boolean): Enumeratee[E, E]
シグネチャからも明らかかもしれませんが、 Enumeraee.filter は Enumeratee[E,E] を生成します。その Enumeratee は入力データのチャックを predicate: E => Boolean で一つ一つテストして、predicate が true を返したチャンクだけを内側の Iteratee へ送信します。
val numbers = Enumerator(1,2,3,4,5,6,7,8,9,10)
val onlyOdds = Enumeratee.filter[Int](i => i % 2 != 0)
numbers.through(onlyOdds) |>> sum
その他にも、 Enumeratee.collect や Enumeratee.drop、 Enumeratee.dropWhile、 Enumeratee.take、 Enumeratee.takeWhile など、似たような原理の Enumeratee が用意されています。
試しに、バイトデータのチャンクに対して Enumeratee.take を適用してみましょう。
// computes the size in bytes
val fillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
Iteratee.consume[Array[Byte]]()
}
val limitTo100: Enumeratee[Array[Byte],Array[Byte]] = {
Enumeratee.take[Array[Byte]](100)
}
val limitedFillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
limitTo100 &>> fillInMemory
}
一見問題なさそうにみえますが、実際のところ合計で何バイトのデータが残っているのでしょうか?どうすれば、入力データの最大サイズをうまく制限できるのでしょうか。実は、上の例は入力データのチャンク数を制限しただけで、それぞれのチャンクの大きさは制限できていません。どうやら、Enumeratee.take は入力データの型(ここでは Array[Byte])について何の情報も参照できないので、入力のデータの大きさを測ることもできないようです。
しかし、問題ありません。 Array[Byte] のような TraversableLike 型の入力データ向けの Enumeratee を作成するヘルパーが Traversable オブジェクトにひと通り用意されています。上記の例に戻ると、 TraversableLike.take を使うとうまくいきます。
val fillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
Iteratee.consume[Array[Byte]]()
}
val limitTo100: Enumeratee[Array[Byte],Array[Byte]] = {
Traversable.take[Array[Byte]](100)
}
// We are sure not to get more than 100 bytes loaded into memory
val limitedFillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
limitTo100 &>> fillInMemory
}
Traversable その他のメソッドとしては、 Traversable.takeUpTo や Traversable.drop などがあります。
最後になりましたが、Enumeratee のインスタンスは compose メソッドまたは ><> 演算子により合成することができます。注意しなければならないこととして、合成された Enumeratee のインスタンスは必ず Done と共に与えられた最後の入力データのチャンクを無視します。しかしながら、 composeConcat や >+> 演算子を使うと、最後のチャンクについてもちゃんと読み込まれます。