Akka Streams Hello World

Akka Streams 允许你轻松创建利用 Akka 框架功能的流,而无需明确定义 actor 行为和消息。每个流将至少有一个 Source(数据的来源)和至少一个 Sink(数据的目的地)。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File

val stream = Source(Seq("test1.txt", "test2.txt", "test3.txt"))
  .map(new File(_))
  .filter(_.exists())
  .filter(_.length() != 0)
  .to(Sink.foreach(f => println(s"Absolute path: ${f.getAbsolutePath}")))

在这个快速示例中,我们有一个我们输入到流中的文件名。首先我们将它们映射到 File,然后我们过滤掉不存在的文件,然后是长度为 0 的文件。如果文件通过过滤器,它将打印到 stdout

Akka 流还允许你以模块化方式执行流。你可以使用流的部分模块创建 Flows。如果我们采用相同的例子,我们也可以这样做:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import java.io.File

implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()

val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
val mapper = Flow[String].map(new File(_))
val existsFilter = Flow[File].filter(_.exists())
val lengthZeroFilter = Flow[File].filter(_.length() != 0)
val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

val stream = source
  .via(mapper)
  .via(existsFilter)
  .via(lengthZeroFilter)
  .to(sink)

stream.run()

在第二个版本中,我们可以看到 mapperexistsFilterlengthZeroFilterFlows。你可以使用 via 方法在流中组合它们。此功能允许你重用你的代码片段。值得一提的是,Flows 可以是无国籍的或有状态的。在有状态的情况下,重用它们时需要小心。

你也可以将流视为 Graphs。Akka Streams 还提供了一个强大的功能,以简单的方式定义复杂的流。我们可以做同样的例子:

import java.io.File
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}

implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
  val mapper = Flow[String].map(new File(_))
  val existsFilter = Flow[File].filter(_.exists())
  val lengthZeroFilter = Flow[File].filter(_.length() != 0)
  val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

  source ~> mapper ~> existsFilter ~> lengthZeroFilter ~> sink

  ClosedShape
})

graph.run()

也可以使用 GraphDSL 创建聚合流。例如,如果我们想将 mapper 和两个过滤器组合在一起,我们可以做到:

val combinedFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mapper = builder.add(Flow[String].map(new File(_)))
  val existsFilter = builder.add(Flow[File].filter(_.exists()))
  val lengthZeroFilter = builder.add(Flow[File].filter(_.length() != 0))

  mapper ~> existsFilter ~> lengthZeroFilter

  FlowShape(mapper.in, lengthZeroFilter.out)
})

然后将其用作单个块。combinedFlow 将是 FlowShapePartialGraph。我们可以用 via 为例:

val stream = source
  .via(combinedFlow)
  .to(sink)

stream.run()

或者使用 GraphDSL

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val source = Source(List("test1.txt", "test2.txt", "test3.txt"))
  val sink = Sink.foreach[File](f => println(s"Absolute path: ${f.getAbsolutePath}"))

  source ~> combinedFlow ~> sink

  ClosedShape
})

graph.run()