Akka Streams Hello World

Akka Streams 允許你輕鬆建立利用 Akka 框架功能的流,而無需明確定義 actor 行為和訊息。每個流將至少有一個 Source(資料的來源)和至少一個 Sink(資料的目的地)。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File

val stream = Source(Seq("test1.txt", "test2.txt", "test3.txt"))
  .map(new File(_))
  .filter(_.exists())
  .filter(_.length() != 0)
  .to(Sink.foreach(f => println(s"Absolute path: ${f.getAbsolutePath}")))

在這個快速示例中,我們有一個我們輸入到流中的檔名。首先我們將它們對映到 File,然後我們過濾掉不存在的檔案,然後是長度為 0 的檔案。如果檔案通過過濾器,它將列印到 stdout

Akka 流還允許你以模組化方式執行流。你可以使用流的部分模組建立 Flows。如果我們採用相同的例子,我們也可以這樣做:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File

implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()

val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

val stream = source
  .via(mapper)
  .via(existsFilter)
  .via(lengthZeroFilter)
  .to(sink)

stream.run()

在第二個版本中,我們可以看到 mapperexistsFilterlengthZeroFilterFlows。你可以使用 via 方法在流中組合它們。此功能允許你重用你的程式碼片段。值得一提的是,Flows 可以是無國籍的或有狀態的。在有狀態的情況下,重用它們時需要小心。

你也可以將流視為 Graphs。Akka Streams 還提供了一個強大的功能,以簡單的方式定義複雜的流。我們可以做同樣的例子:

import java.io.File
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}

implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
  val mapper = Flow[String].map(new File(_))
  val existsFilter = Flow[File].filter(_.exists())
  val lengthZeroFilter = Flow[File].filter(_.length() != 0)
  val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

  source ~> mapper ~> existsFilter ~> lengthZeroFilter ~> sink

  ClosedShape
})

graph.run()

也可以使用 GraphDSL 建立聚合流。例如,如果我們想將 mapper 和兩個過濾器組合在一起,我們可以做到:

val combinedFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mapper = builder.add(Flow[String].map(new File(_)))
  val existsFilter = builder.add(Flow[File].filter(_.exists()))
  val lengthZeroFilter = builder.add(Flow[File].filter(_.length() != 0))

  mapper ~> existsFilter ~> lengthZeroFilter

  FlowShape(mapper.in, lengthZeroFilter.out)
})

然後將其用作單個塊。combinedFlow 將是 FlowShapePartialGraph。我們可以用 via 為例:

val stream = source
  .via(combinedFlow)
  .to(sink)

stream.run()

或者使用 GraphDSL

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
  val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

  source ~> combinedFlow ~> sink

  ClosedShape
})

graph.run()