当前位置 : 主页 > 手机开发 > 其它 >

斯卡拉 – 阿卡流.分组,聚合一段时间并发出结果

来源:互联网 收集:自由互联 发布时间:2021-06-22
我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西: Source .tick(0 second, 50 millis, () = if (Ra
我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:

Source
    .tick(0 second, 50 millis, () => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
    .map { f => f() }
    .groupBy(10, _._1)
    // how to aggregate grouped elements here for two seconds?
    .scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
    .to(Sink.foreach(println))

期望的输出应该如下所示:

Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on

如何使用流实现此类功能?

你需要在你的流程中分组:

http://doc.akka.io/docs/akka/2.4.17/scala/stream/stages-overview.html#groupedwithin

网友评论