Deep Dive into Windowing concepts in Apache Flink

Balachandar Paulraj
4 min readJun 13, 2022

Windows plays a major role and also defined as a core part in processing infinite streams. Windows split the incoming stream into buckets of finite size, over which required transformations can be applied.

Though there are many components in windows that includes trigger, evictor, allowedLateness, sideOutputLateData…etc, this post focusses on data-driven & time-driven window operations.

Data-Driven Window

Data-Driven can also be known as Count Window, where window will be triggered once it reaches the point, where required amount (count) of data has been received from the stream. In Flink, this can be achieved through countWindow and countWindowAllfunction.

Examples for both countWindow and countWindowAll are as follows:

countWindowAll

Below code sums (using sum function) whenever number of elements entered the socket reaches 3 (implemented using countWindowAll(3)). Input Stream can be anything, however here Socket TextStream has been created using nc -lk 9999. To have it as an interactive environment, I have used flink-scala-shell (by running start-scala-shell.sh in local mode) for executing the code. However, this has been supported only in few versions of flink (refer flink-scala-shell). If your flink doesn’t support flink-scala-shell, please create a jar for the same piece of code and upload it to Flink UI.

//senv - Streaming Execution Environment in flink-scala-shellval dataStream = senv.socketTextStream("localhost",9999,' ')
val outputSink = dataStream.flatMap(_.split(" "))
.map(_.toInt).countWindowAll(3)
outputSink.print()
senv.execute("Program to implement countWindowAll")

countWindowAll can also be referred as Non-Keyed Window, due to the fact that windows are not grouped together using keyBy operation. In other words, there is no need to group the elements for this requirement.

countWindow

Also referred as Keyed Windows, where the elements are grouped together based on given condition. Below example shows the code for countWindow where elements are grouped using the condition % 2 . In contrast to countWindowAll , window will be triggered whenever respective window’s count reaches 3. Here, in this case, either odd or even number count reaches 3, it will be summed and displayed.

val dataStream = senv.socketTextStream("localhost",9999,' ')
val keyedDataStream = dataStream.flatMap(_.split(" "))
.map(_.toInt).keyBy(a => a%2)
val outputSink = keyedDataStream.countWindow(3).sum(0)
outputSink.print()
senv.execute("Program to implement countWindow through Even/Odd No")

Time-Driven Window

Flink supports different types of time windows like Tumbling time windows, Sliding time windows, Session time windows..etc. Out of which, Tumbling time and Sliding time windows are discussed below:

Tumbling Time Window

Tumbling Window (10 minutes)

Tumbling time defines a window of given time duration, that tumbles. It means that elements are grouped according to their arrival time, and every element belongs to exactly one window. This type of Window is non-overlapping, i.e the events in one window will not overlap in other windows. Above pictures shows a tumbling time window of 10 minutes where all elements that falls within a window are grouped together.

Below code creates a tumbling window of 5seconds. Within 5 seconds, due to keyBy operation, odd and event numbers are placed into separate windows and their repsective sums are computed.

val dataStream = senv.socketTextStream("localhost",9999,' ')
val keyedDataStream = dataStream.flatMap(_.split(" ")).map(_.toInt)
.keyBy(a => a%2)
//Creates tumbling window of 5 seconds
val tumblingWindow = keyedDataStream.timeWindow(Time.seconds(5))
val outputSink = tumblingWindow.sum(0)
outputSink.print()
senv.execute("Program to implement tumbling window")

Sliding Time Window

Sliding Window (20 minutes, slide 10 minutes)

Unlike Tumbling time window, sliding window slides over the incoming stream of data. Sliding window can be overlapping. Picture shows how window operations are carried out for sliding window of 20 minutes with slide of 10 minutes.

The same program discussed previously has been changed to develop sliding window of 20 seconds with 10 seconds slide.

val dataStream = senv.socketTextStream("localhost",9999,' ')
val keyedDataStream = dataStream.flatMap(_.split(" "))
.map(_.toInt).keyBy(a => a%2)
//Creates sliding window of 20 seconds with 10 seconds slide
val slidingWindow = keyedDataStream.timeWindow(Time.seconds(20)
,Time.seconds(10))
val outputSink = slidingWindow.sum(0)
outputSink.print()
senv.execute("Program to implement sliding window")

Conclusion

Though there are other concepts and other types of windows, I think this covers the basic and initial part of windowing in Flink. I’m planning to cover other concepts in a separate post. Meanwhile, please try the code provided here to understand the output for each window.

Happy Learning and please go through my other posts too!!!

--

--