Introduction to Akka Streams

  • Source: This is the entry point to your stream. There must be at least one in every stream.Source takes two type parameters. The first one represents the type of data it emits and the second one is the type of the auxiliary value it can produce when ran/materialized. If we don’t produce any we use the NotUsed type provided by Akka.There are various ways of creating Source:
val source = Source(1 to 10) 
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
  • Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream. Basically it’s a subscriber of the data sent/processed by a Source. Usually it outputs its input to some system IO.It is the endpoint of a stream and therefore consumes data. A Sink has a single input channel and no output channel. Sinks are especially needed when we want to specify the behavior of the data collector in a reusable way and without evaluating the stream
val sink = Sink.fold[Int, Int](0)(_ + _) //Creating a sink
sink:akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] =
  • Flow: The flow is a processing step within the stream. It combines one incoming channel and one outgoing channel as well as some transformation of the messages passing through it.If a Flow is connected to a Source a new Source is the result. Likewise, a Flow connected to a Sink creates a new Sink. And a Flow connected with both a Source and a Sink results in a RunnableFlow. Therefore, they sit between the input and the output channel but by themselves do not correspond to one of the flavors as long as they are not connected to either a Source or a Sink.
val source = Source(1 to 3)
val sink = Sink.foreach[Int](println)
val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int(Source Output),Int(Sink Input),akka.NotUsed(Result of Flow Operation)] =
val runnable = source via doubler to sink
  • ActorMaterializer: To run a stream this is required. It is responsible for creating the underlying actors with the specific functionality you define in your stream. Since ActorMaterializer creates actors, it also needs an ActorSystem. It basically allocates all the necessary resources to run a stream. It is important to remember that even after constructing the stream by connecting all the source, sink and different operators, no data will flow through it until it is materialized
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Future
object StreamExample {def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
val numbers = 1 to 100

//We create a Source that will iterate over the number sequence
val numberSource: Source[Int, NotUsed] = Source.fromIterator(() => numbers.iterator)
//Only let pass even numbers through the Flow
val isEvenFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter((num) => num % 2 == 0)
//Create a Source of even random numbers by combining the random number Source with the even number filter Flow
val evenNumbersSource: Source[Int, NotUsed] = numberSource.via(isEvenFlow)

//A Sink that will write its input onto the console
val consoleSink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
//Connect the Source with the Sink and run it using the materializer
evenNumbersSource.runWith(consoleSink) }
  1. ActorSystem and ActorMaterializer instances that we will use to run the Stream. This is needed because Akka Streams is backed by Akka’s Actor model.
  2. Source based of the static number sequence’s iterator
  3. Flow that filters that only let’s through even numbers
  4. Sink that will print out its input to the console using println
  5. Complete Stream by connecting evenNumbers with consoleSinkand running it by using runWith.

--

--

--

SDE-3 at PayPal

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

elementaryOS: diving in head-first

Explaining OOP classes with RPG classes

PowerAutomate Desktop — Write to Excel File

FAQ: How to use Splunk OTel Collector Chart?

The Design of An Object-based Operating System

WordPress Import Content From Other Site — Migration

Kubernetes Pod Security Policy Best Practices

What is Big O notation? And why we need it?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Archit Agarwal

Archit Agarwal

SDE-3 at PayPal

More from Medium

LeetCode Problem#1051 Height Checker

Complex test data prototyping with Shapeless and Monocle

Sending email through scala using JAVA mail API ( For Gmail )

Side effects and IO