Documentation

§反応的なストリーム処理

§Enumerator

Iteratee がストリームの消費者やシンク、入力だとすると、 Enumerator は入力データを特定の Iteratee へ渡す送信元であるといえます。その名前が示すとおり、Enumerator は入力データを列挙 (Enumerate) して、 Iteratee に渡していきます。そして、最終的に新しい状態の Iteratee を返します。この挙動は、 Enumerator のシグネチャを見ると想像しやすいでしょう。

trait Enumerator[E] {

  /**
   * Apply this Enumerator to an Iteratee
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

}

Enumerator[E]Iteratee[E,A] を引数に取ります。この Iterate は Input[E] を消費して、 Future[Iteratee[E,A]] を返します。最終的には、この Promise から次の状態の Iteratee を取り出すことができます。

このまま単に Iteratee.fold メソッドを呼び出すことで Enumerator を実装してもよいのですが、 Enumerator 作成用のヘルパーを使うこともできます。例えば、 文字列のリストを Iteratee へ送る Enumerator[String] は次のように作成することができます。

val enumerateUsers: Enumerator[String] = {
  Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}

この Enumerator を使って、先程作成した Iteratee にデータを消費させるには次のように書きます。

val consume = Iteratee.consume[String]()
val newIteratee: Future[Iteratee[String,String]] = enumerateUsers(consume) 

Iteratee を終了させて、計算結果を取り出すためには、 Input.EOF を渡します。 Iteratee にはそのための run メソッドが用意されています。このメソッドを呼び出すと、 Input.EOF が送信されて、 Future[A] が返ります。残りの入力データは無視されます。

// We use flatMap since newIteratee is a promise, 
// and run itself return a promise
val eventuallyResult: Future[String] = newIteratee.flatMap(i => i.run)

//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }

// Prints "GuillaumeSadekPeterErwan"

もしかしたら、 Iteratee が最終的に結果を生成し (fold に適切なコールバック関数を渡した場合は Promise が返ります) 、一方で Future も最終的に結果を生成することに気づかれた方がいるかもしれません。このとき、 Future[Iteratee[E,A]]Iteratee[E,A] と見なすことができます。 Iteratee.flatten はまさしくこの Promise と Iteratee の変換を行うヘルパーです。先程の例でこのヘルパーを使ってみましょう。

//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))

val eventuallyResult: Future[String] = newIteratee.run
   
//Eventually print the result 
eventuallyResult.onSuccess { case x => println(x) }

// Prints "GuillaumeSadekPeterErwan"

Enumerator には演算子のように振る舞う記号的なメソッドがいくつか用意されています。いずれも、文脈によっては括弧の節約という意味で役に立つことがあるかもしれません。例えば、 |>> メソッドは apply メソッドと全く同じ結果になります。

val eventuallyResult: Future[String] = {
  Iteratee.flatten(enumerateUsers |>> consume).run
}

Enumerator は入力データを Iteratee へ送信して、最終的には新しい状態の Iteratee を返します。この新しい Iteratee に、別の Enumerator を使ってさらに入力データを渡すことができます。これは、 FutureflatMap を適用するか、もしくは Enumerator のインスタンスを andThen メソッドによって組み合わせることで実現できます。

val colors = Enumerator("Red","Blue","Green")

val moreColors = Enumerator("Grey","Orange","Yellow")

val combinedEnumerator = colors.andThen(moreColors)

val eventuallyIteratee = combinedEnumerator(consume)

apply メソッドと同様に、 andThen にも >>> という演算子版が用意されています。これも、状況によっては括弧の節約に役立つでしょう。

val eventuallyIteratee = {
  Enumerator("Red","Blue","Green") >>>
  Enumerator("Grey","Orange","Yellow") |>>
  consume    
}

ファイルの内容を列挙するための Enumerator を作成することもできます。

val fileEnumerator: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("path/to/some/file"))
}

より汎用的には、 Enumerator.fromStream を利用して java.io.InputStream 内のデータを列挙することができます。この場合、 Enumerator に割り当てられている Iteratee が次の入力データを読み込めるような状態になるまで、Enumerator 側でも新しいデータが読み込まれないことに注意してください。

実際に両方のメソッドは、下記のシグネチャを持つ、より汎用的な Enumerator.generateM をベースにしています。

def generateM[E](e: => Future[Option[E]]) = {
  ... 
}

Enumerator オブジェクトに定義されたこのメソッドは、手続き的なロジックから Enumerator を生成するためのもっとも重要なメソッドのひとつです。シグネチャをよく見てみると、このメソッドは、Enumerator が割り当てられている iteratee が入力を受け付けられるようになるたびに呼び出されるコールバック関数 e: => Future[Option[E]] を取ります。

例えば、このメソッドを利用すると、 Promise を返すタイミングにおいて、100 ミリ秒おきに日時データを生成するストリームを生成することができます。

Enumerator.generateM {
  Promise.timeout(Some(new Date), 100 milliseconds)
}

同じような考え方で、WS API を使って特定の URL の内容を一定時間おきに取得して、 Future を返す Enumerator も次のようにつくることができます。

このコールバック Enumerator と手続き的な Iteratee.foreach を組み合わせることで、一定時間おきに Stream から取得した日時データを println することができます。

val timeStream = Enumerator.generateM {
  Promise.timeout(Some(new Date), 100 milliseconds)
}

val printlnSink = Iteratee.foreach[Date](date => println(date))

timeStream |>> printlnSink

Enumerator を生成する、より命令的なもうひとつの方法は、準備が整い次第、push および end メソッドが定義されている Channel インターフェースを提供する Concurrent.unicast を使うことです。

val enumerator = Concurrent.unicast[String](onStart = channel => {
  channel.push("Hello")
  channel.push("World")
})

enumerator |>> Iteratee.foreach(println)

onStart 関数は、EnumeratorIteratee に割り当てられる度に呼ばれます。例えばチャットルームのような、いくつかのアプリーケーションにおいて、(例えばSTMを使用して)同期された、リスナーのリストに含むグローバル値に enumerator を割り当てることは理にかなっています。Concurrent.unicast は他に onCompleteonError という 2 つの関数を受け付けます。

最後に一つ興味深いメソッドを紹介します。 interleave または演算子の >- というメソッドです。これはその名の通り、二つの Enumerator を並べて、それぞれから入力データが与えられたら、それに反応して即座に Iteratee へ渡すという、新しい Enumerator を生成します。

§Enumerator アラカルト

さて、これまでの説明で様々な Enumerator の作り方を知ることができました。これらの EnumeratorandThen / >>>interleave / >- で組み合わせて、任意の Enumerator を合成することができます。

もうお気づきかもしれませんが、多数のストリームを要するアプリケーションをうまく構築するためには、基本となる Enumerator を生成して、それらを組み合わせるとよいでしょう。例えば、監視システムを作る場合、次のようなコードになるでしょう。

object AvailableStreams {

  val cpu: Enumerator[JsValue] = Enumerator.generateM(/* code here */)

  val memory: Enumerator[JsValue] = Enumerator.generateM(/* code here */)

  val threads: Enumerator[JsValue] = Enumerator.generateM(/* code here */)

  val heap: Enumerator[JsValue] = Enumerator.generateM(/* code here */)

}

val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap

def usersWidgetsComposition(prefs: Preferences) = {
  // do the composition dynamically
}

次ページでは、 EnumeratorIteratee を変換したり、アダプターをかませる方法… Enumeratee について説明します!

Next: Enumeratees


このドキュメントの翻訳は Play チームによってメンテナンスされているものではありません。 間違いを見つけた場合、このページのソースコードを ここ で確認することができます。 ドキュメントガイドライン を読んで、お気軽にプルリクエストを送ってください。