Streaming analytics in banking: How to start with Apache Flink and Kafka in 7 steps

In the previous text, we talked about the basics of streaming, what it means in theory, what are the advantages, disadvantages and mentioned some streaming tools.

This text is more technical, and we will talk about Flink in general as well as the basics of streaming in Flink, the whole process from start (read data) to end (write streaming results), using the Python API, with a little help from Kafka. The use-case is very simple, with only seven parameters per event, but what is important is that you will be able to easily expand the use-case by yourself if you understand how everything below works. Although the title states with “banking”, it can be applied in any industry, of course.

What is Apache Flink?

Apache Flink is an open-source framework used for distributed data-processing at scale. Flink is primarily used as a streaming engine but can be used as well as a batch processing engine. The initial release was 9 years ago and it’s developed in Java and Scala. Alongside those two languages, Python API is available as well, and that’s the one we are going to use in this example. Flink integrates with all common cluster resource managers such as Hadoop YARN, Apache Mesos, and Kubernetes but can also be set up to run as a stand-alone cluster.

Flink provides an extremely simple high-level API in the form of Map/Reduce, Filters, Window, GroupBy, Sort and Joins. Talking about the advantages for Flink, we should not forget the main advantage of streaming compared to batch processing and that’s minimal resources. Exactly, Flink provides you that. In terms of development, also support SQL in queries which can be of great benefit to people with less technical knowledge. 

Sources and sinks can be retrieved from various sources:

  • Filesystem (source/sink)
  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

Basic PyFlink use-case

After a small introduction to Apache Flink, let’s get hands on the real example with code. This example consists of a python script that generates dummy data and loads it into a Kafka topic. Flink source is connected to that Kafka topic and loads data in micro-batches to aggregate them in a streaming way and satisfying records are written to the filesystem (CSV files).

Step 1 – Setup Apache Kafka

Requirements za Flink job: 

Kafka 2.13-2.6.0
Python 2.7+ or 3.4+
Docker (let’s assume you are familiar with Docker basics)

Our test project has this structure:

streaming_with_kafka_and_pyflink
  - docker-pyflink
    - image
       Dockerfile
    - examples
      - data
        - output
          - output_file.csv
      - app
        app.py
    - docker-compose.yml
  - data-generators
    - dummy_data_generator.py

So, let’s start with this mini use case flow.

First of all, get Kafka from here
Then tar that file with:

tar -xzf kafka_2.13-2.6.0.tgz
cd kafka_2.13-2.6.0

Add code blocks below to serve.properties to ensure Flink in Docker can see Kafka topics running locally:

listeners=EXTERNAL://0.0.0.0:19092,INTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://127.0.0.1:9092,EXTERNAL://docker.for.mac.localhost:19092
inter.broker.listener.name=INTERNAL

To the end of config/server.properties file and then run Zookeeper and Kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Now your Kafka is ready. One thing that is left is to create a Kafka topic that will be used in this case, let’s call it “transactions-data”.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic transactions-data

Step 2 – Create a dummy data

Of course, having data is the biggest requirement of all. For those who have already predefined data for the test, please skip this step. But if you don’t, follow instructions for the Python script we are using to create dummy data. In addition to other libraries, we are using library Faker to create fake user names. So, there is no real person in this example. 

Let’s take a look at the python script for generating dummy data: 

On the top is the import of necessary libraries, some of them you will get immediately with Python installation, the other you should install manually.

import time
import datetime
import random
import pandas as pd 
from faker import Faker 
from kafka import KafkaProducer 
from json import dumps 

In the next code-block, you can see functions used for generating random users, spent amount for each user, type of transaction, location and DateTime of the event.

fake = Faker()
start_time = time.time()

list_of_transactions = ['highway_toll', 'petrol_station', 'supermarket', 'shopping',
                        'mortgage', 'city_service', 'restaurant', 'healthcare']

def generate_random_users():
    list_of_users = []
    for number in range(100):
        list_of_users.append(fake.name())
    return list_of_users

def get_random_user(users):
    return random.choice(users)

def get_usage_amount():
    a = [0, round(random.randint(1, 100) * random.random(), 2)]
    random.shuffle(a)
    return a

def get_usage_date():
    return fake.date_time_between(start_date='-1m', end_date='now').isoformat()

def create_usage_record(list_of_users):
    amount = get_usage_amount()
    return {
        'customer': get_random_user(list_of_users),
        'transaction_type': random.choice(list_of_transactions),
        'online_payment_amount': amount[0],
        'in_store_payment_amount': amount[1],
        'transaction_datetime': get_usage_date(),
    }

The third code-block contains the time range we are going to put our script to generate data until we break (and we choose almost 50 years ahead). In that time-range, the script will generate 1000 records with a pause of 5 seconds. All generated data will be sent to a Kafka topic called data_usage.

users = generate_random_users()
end_datetime = datetime.datetime.combine(datetime.date(2070, 1, 1), datetime.time(1, 0, 0))
current_datetime = datetime.datetime.combine(datetime.date.today(), datetime.datetime.now().time())
while current_datetime <= end_datetime:
    data_usage = pd.DataFrame([create_usage_record(users) for _ in range(1000)])
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'))

    for row in data_usage.itertuples():
        data_customer = row.customer
        data_transaction_type = row.transaction_type
        data_online_payment_amount = row.online_payment_amount
        data_in_store_payment_amount = row.in_store_payment_amount
        data_lat = random.uniform(44.57, 44.91)
        data_lon = random.uniform(20.20, 20.63)
        data_transaction_datetime = row.transaction_datetime + "Z"
        data = {
                'customer': data_customer,
                'transaction_type': data_transaction_type,
                'online_payment_amount': data_online_payment_amount,
                'in_store_payment_amount': data_in_store_payment_amount,
                'lat': data_lat,
                'lon': data_lon,
                'transaction_datetime': data_transaction_datetime
                }
        future = producer.send('transactions-data', value=data)
        result = future.get(timeout=5)
    print("--- {} seconds ---".format(time.time() - start_time))
    time.sleep(5)
    current_datetime = datetime.datetime.combine(datetime.date.today(), datetime.datetime.now().time())

After Kafka installation, we can start a Python script for generating dummy data and load them into the topic. The last step is to create another Python script. But this time with Flink functions. To avoid differences between operating systems, we packed a script with Flink functions into Docker. That way we are sure it will work on every operating system without any or at least, major changes.

Step 3 – Load data to Flink

In the script below, called app.py we have 3 important steps. Definition of data source, the definition of data output (sink) and aggregate function.

Let’s go step by step. The first of them is to connect to a Kafka topic and define source data mode.

import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka, Elasticsearch
from pyflink.table.window import Tumble
 def register_transactions_source(st_env):
    st_env.connect(Kafka()
                   .version("universal")
                   .topic("transactions-data")
                   .start_from_latest()
                   .property("zookeeper.connect", "host.docker.internal:2181")
                   .property("bootstrap.servers", "host.docker.internal:19092")) \
        .with_format(Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("customer", DataTypes.STRING()),
        DataTypes.FIELD("transaction_type", DataTypes.STRING()),
        DataTypes.FIELD("online_payment_amount", DataTypes.DOUBLE()),
        DataTypes.FIELD("in_store_payment_amount", DataTypes.DOUBLE()),
        DataTypes.FIELD("lat", DataTypes.DOUBLE()),
        DataTypes.FIELD("lon", DataTypes.DOUBLE()),
        DataTypes.FIELD("transaction_datetime", DataTypes.TIMESTAMP())]))) \
        .with_schema(Schema()
        .field("customer", DataTypes.STRING())
        .field("transaction_type", DataTypes.STRING())
        .field("online_payment_amount", DataTypes.DOUBLE())
        .field("in_store_payment_amount", DataTypes.DOUBLE())
        .field("lat", DataTypes.DOUBLE())
        .field("lon", DataTypes.DOUBLE())
        .field("rowtime", DataTypes.TIMESTAMP())
        .rowtime(
        Rowtime()
            .timestamps_from_field("transaction_datetime")
            .watermarks_periodic_bounded(60000))) \
        .in_append_mode() \
        .register_table_source("source")

At the top, we can find imports of libraries we are going to use further. In functions register_transactions_source we defined data source, column names and data types for each column. Since we have a timestamp, it’s mandatory to round event time to some level. In this case, it’s 60000 milliseconds. All records with a latency of more than one minute will be left from the stream. In one sentence, the watermark is used to differentiate between the late and the “too-late” events and treat them accordingly.
The last two lines are used to define the appending mode and name of the table that will be used later in the non-source window.

 

Step 4 – Create a sink function and data model

In this step, we will create a function which defines our final output data model:

def register_transactions_sink_into_csv(st_env):
    result_file = "/opt/examples/data/output/output_file.csv"
    if os.path.exists(result_file):
        os.remove(result_file)
    st_env.register_table_sink("sink_into_csv",
                               CsvTableSink(["customer",
                                             "count_transactions",
                                             "total_online_payment_amount",
                                             "total_in_store_payment_amount",
                                             "lat",
                                             "lon",
                                             "last_transaction_time"],
                                            [DataTypes.STRING(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.TIMESTAMP()],
                                            result_file))

In the function register_transactions_sink we are defining output (CSV on the filesystem is our choice this time). In the function st_env.register_table_sink we define job type, columns to export and data types. The last parameter is the path to the file (defined in the first line) where we are going to store records that satisfy our conditions.

Step 5 – Trigger requirements definition

And the last part of the script, but most important is a function called transactions_job. It’s a function that combines both earlier mentions functions into the flow, adding queries to fulfill use-case requirements.

def transactions_job():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_parallelism(1)
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

    register_transactions_source(st_env)
    register_transactions_sink_into_csv(st_env)

    st_env.from_path("source") \
        .window(Tumble.over("10.hours").on("rowtime").alias("w")) \
        .group_by("customer, w") \
        .select("""customer as customer, 
                   count(transaction_type) as count_transactions,
                   sum(online_payment_amount) as total_online_payment_amount, 
                   sum(in_store_payment_amount) as total_in_store_payment_amount,
                   last(lat) as lat,
                   last(lon) as lon,
                   w.end as last_transaction_time
                   """) \
        .filter("total_online_payment_amount<total_in_store_payment_amount") \ 
        .filter("count_transactions>=3") \
        .filter("lon < 20.62") \ .filter("lon > 20.20") \
        .filter("lat < 44.91") \ .filter("lat > 44.57") \
        .insert_into("sink_into_csv")

    st_env.execute("app")


if __name__ == '__main__':
    usage_job()

The first line in the function is defining the environment. The streaming execution environment is definitely the one we are looking for. Environment parameter set_stream_time_characteristic is there to choose how we will threat records in terms of time. We are using a predefined timestamp that comes from the source, another possibility is to add a timestamp to each record while processing it. Event time allows a table program to stream results based on the time that is contained in every record. This way we will have consistent records in the stream even if some of them are late. Of course, until the watermark period is not expired. It also ensures the replayable results of the table program when reading records from persistent storage.

The next two lines are strictly registration of the previously defined source and sink into the streaming flow. The last part of this function is flow order and aggregation definition. In this case, the rolling window is defined by 10 hours, we will group by the customer. The aggregation will be performed on the spent amount (sum), a number of transactions (count) coordinates (last) and time of events (max). Also, there is a filter that needs to be fulfilled so events can be considered as satisfied and make an output to output_file.csv. This simple use case has requirements to trigger users if they spent more online than offline in the last three or more transactions inside filtered coordinates, which is some predefined place. Of course, in real use-case, this will probably be more complicated, which is not a problem since Apache Flink offers a wide range of functions and APIs.

Step 6 – Put everything into Docker

As mentioned earlier, Flink job is packed into docker-compose next way: We are using pyflink/playgrounds:1.10.0 image, which is the official docker image for Flink. You need to be careful with your volumes, in our case is ./examples:/opt/examples. On port localhost:8088 you will have Flink Dashboard with available data and stats about all your Flink jobs.

version: '2.1'
services:
  jobmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8088:8088"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

Dockerfile:

FROM flink:1.10.0-scala_2.11

RUN set -ex; \
  apt-get update; \
  apt-get -y install python3; \
  apt-get -y install python3-pip; \
  apt-get -y install python3-dev; \
  ln -s /usr/bin/python3 /usr/bin/python; \
  ln -s /usr/bin/pip3 /usr/bin/pip


RUN set -ex; \
  apt-get update; \
  python -m pip install --upgrade pip; \
  pip install apache-flink; \
  pip install kafka-python

ARG FLINK_VERSION=1.10.0

RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/${FLINK_VERSION}/flink-sql-connector-elasticsearch6_2.11-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \

# Create data folders
    mkdir -p /opt/data; \
    echo "taskmanager.memory.jvm-metaspace.size: 512m" >> /opt/flink/conf/flink-conf.yaml; \
    echo "rest.port: 8088" >> /opt/flink/conf/flink-conf.yaml;


WORKDIR /opt/flink/

Step 7: Run everything

To run everything from below we need three additional commands:

Run python script to generate dummy data:

python dummy_data_generator.py

Build and run docker:

docker-compose up --build

After a while, you’ll be able to go to localhost:8088 and see the next page:

In the end, add PyFlink job to job manager:

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/queries/app.py

You’ll see a “job submitted” message which indicates you have a running job viewable on Flink Dashboard. Now you need to wait a little bit, depending on the time you choose as a window. After those moments passed, you can take a look at output_file.csv and see your results.

Step 8: Extra step

In case you want the output to ElasticSearch (running in Docker too), you can just add code-block from below to your docker-compose:

elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.1
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536

And in app.py change the sink function with the code block below:

def register_transactions_es_sink(st_env):
    st_env.connect(Elasticsearch()
                   .version("6")
                   .host("0.0.0.0", 9200, "http")
                   .index("transactions-supermarket-case")
                   .document_type("usage")) \
        .with_schema(Schema()
                     .field("customer", DataTypes.STRING())
                     .field("count_transactions", DataTypes.STRING())
                     .field("total_online_payment_amount", DataTypes.DOUBLE())
                     .field('total_in_store_payment_amount', DataTypes.DOUBLE())
                     .field("lon", DataTypes.FLOAT())
                     .field("lat", DataTypes.FLOAT())
                     .field('last_transaction_time', DataTypes.STRING())
                     ) \
        .with_format(Json().derive_schema()).in_upsert_mode().register_table_sink("sink_elasticsearch")

 

I hope this can be a great lead for you to start with your own streaming service and build on top of them. Flink was a little late on stage compared to Apache Storm and Apache Spark, but lately, the community of Flink is growing really fast which ensures fast-paced development of new features and everything you need to do is to use it. Also, it will give you a true streaming experience.

In today’s industries, it is not enough only to stream data with basic filters and joins, but to include machine learning models to help you better recognize real patterns, create recommendation systems, forecast your spent and profit, cluster your customers, etc. In consideration of that, the next blog on Apache Flink will include one of the mentioned machine learning methods in combination with streaming.

Meanwhile, take a look to a blogpost about recommendations in Covid time and stay tuned, because there will be more of this within our blog.

 

 

Featured Photo by Marta Wave from Pexels

Once you start with streaming, you go with the flow!

Data streaming has become very popular in the big data industry. It is used for processing large amounts of data from different sources which are continuously generated, in real-time. When we say “real-time” we need to understand that it can vary from a few milliseconds to a few minutes. 

Besides that streaming is enabling us processing and enrichment in real-time, it also improves performances significantly. A set of data from a certain period of time is grouped into a batch, and when processing such a set, problems with memory and storage can occur.

In stream processing, micro-batches are generated and arrive in milliseconds, then certain conditions are determined, based on which we can take the foreseen steps. Once the micro-batches have passed through a stream that consists of loading, transformation, and eventual aggregations and conditions, data disappears from memory.

The conclusion is that we can work to process a much larger amount of data on cheaper hardware if the appropriate tool is selected and the quality processing is set up.

batch vs stream processing

Streaming essentials

Latency

Latency is the time period between activity at source, and this can be a transaction, a weblog, an activity at a base station or networks, until the time when the data is stored in the appropriate form.

The latency level, or delay that we can accept, depends on the type of application we are working on and the use case. Therefore, when choosing a real-time data processing tool, the first precondition is that the latency obtained by choosing a particular tool is no longer than the delay that we can tolerate. For example, security and fraud detection are some of the case-case examples that do not tolerate a great latency period.

Event time & Processing time

Event time is the time when some transactions happen. Processing time is the time when it’s processed inside our job. In an ideal world without any latency or delay, those two timestamps should be the same. Unfortunately, that’s hardly possible, but we tend to lower the gap between them. 

Joins

Joining in batch processing is mostly a trivial thing, but in streaming, it’s not that straightforward. If you want to join the stream part of your flow with some static table, it’s gonna be pretty much the same as batch joining, however, if you tend to join two streams, things are getting complicated. Because of this, it’s really important to give special attention to datetime records to avoid misleading calculations and possible resource overuse. 

Stateless & Stateful 

Each block or window should be defined in detail. One of the parameters that are inevitable is the state of the window. The words themselves tell us what it is about. Stateless is a type of window in a stream where records are not kept, which means that history is not collected in a certain time frame and is mainly used in windows for filtering, joins with a static table for flow enrichment or calculation at the level of one record.

Stateful is the exact opposite of that. If we define a window as stateful, we are obliged to define the time frame in which the data will be stored. It is used as a step before aggregations at the level of certain time intervals.

Streaming tools

Mostly thanks to the development of tools from the Apache Foundation and the development of cloud solutions, today we have no shortage of streaming software. As you may have already concluded, to develop a streaming application, programming knowledge is required, while the required level of knowledge may vary depending on the complexity of the use-case and the tools you choose. In one of the following texts, we will write more precise about one of the tool from the picture below

Streaming use-cases

Telco. Process and analysis of events data (CDRs). With a streaming approach, you can have real-time data or voice usage per customer and trigger additional campaigns regarding it. Using network performance data you can detect anomalies on your base stations in almost real-time. This can be used alongside your alarm system to enrich failures information. 

Banking. The streaming approach will bust your time to respond in case of any fraudulent activity. Banking is probably the most promising industry when it comes to real-time use cases. One of them is customer personalization. With this approach, you can send an optimal offer depending on location, transaction, context, etc.

Retail. The retail industry also gives us a huge spectrum of use-cases to be analyzed in real-time. We can use it for better recommender systems, location-based triggers, prepare for market trends, and personalized offers. In this industry smart and on-time supply, monitoring is crucial, so inventory management can save you a lot of money and effort, for sure. 

Automotive.  Everyone knows about the Uber use-case. The purpose of the streaming approach is to find the nearest free car to optimize expenses and increase customer satisfaction. Traffic prediction and route optimization need to be done in a timely manner – which makes this industry a perfect ground for streaming applications. 

In the competitive environment and industry, as it is today, it is really necessary to take the right steps in the shortest possible time to be ahead of the competition. Thanks to the throughput of the networks we have, the cloud, cheap hardware, and the development of the tools we use today, we can and should take all the steps to take advantage of the large amount of data that we receive in real-time from various systems, which we were not able to process until now. Suggestions would be to start with the simplest use case that fits your business and then build more advanced solutions on top of that, without rushing into complexity.

In the next post, we will write more specifically about Flink and usage in the streaming environment – stay tuned.

 

 

 

The cover photo is taken from https://pexels.com/@munkee-panic-272941