Streaming analytics in banking: How to start with Apache Flink and Kafka in 7 steps

In the previous text, we talked about the basics of streaming, what it means in theory, what are the advantages, disadvantages and mentioned some streaming tools.

This text is more technical, and we will talk about Flink in general as well as the basics of streaming in Flink, the whole process from start (read data) to end (write streaming results), using the Python API, with a little help from Kafka. The use-case is very simple, with only seven parameters per event, but what is important is that you will be able to easily expand the use-case by yourself if you understand how everything below works. Although the title states with “banking”, it can be applied in any industry, of course.

What is Apache Flink?

Apache Flink is an open-source framework used for distributed data-processing at scale. Flink is primarily used as a streaming engine but can be used as well as a batch processing engine. The initial release was 9 years ago and it’s developed in Java and Scala. Alongside those two languages, Python API is available as well, and that’s the one we are going to use in this example. Flink integrates with all common cluster resource managers such as Hadoop YARN, Apache Mesos, and Kubernetes but can also be set up to run as a stand-alone cluster.

Flink provides an extremely simple high-level API in the form of Map/Reduce, Filters, Window, GroupBy, Sort and Joins. Talking about the advantages for Flink, we should not forget the main advantage of streaming compared to batch processing and that’s minimal resources. Exactly, Flink provides you that. In terms of development, also support SQL in queries which can be of great benefit to people with less technical knowledge. 

Sources and sinks can be retrieved from various sources:

  • Filesystem (source/sink)
  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

Basic PyFlink use-case

After a small introduction to Apache Flink, let’s get hands on the real example with code. This example consists of a python script that generates dummy data and loads it into a Kafka topic. Flink source is connected to that Kafka topic and loads data in micro-batches to aggregate them in a streaming way and satisfying records are written to the filesystem (CSV files).

Step 1 – Setup Apache Kafka

Requirements za Flink job: 

Kafka 2.13-2.6.0
Python 2.7+ or 3.4+
Docker (let’s assume you are familiar with Docker basics)

Our test project has this structure:

streaming_with_kafka_and_pyflink
  - docker-pyflink
    - image
       Dockerfile
    - examples
      - data
        - output
          - output_file.csv
      - app
        app.py
    - docker-compose.yml
  - data-generators
    - dummy_data_generator.py

So, let’s start with this mini use case flow.

First of all, get Kafka from here
Then tar that file with:

tar -xzf kafka_2.13-2.6.0.tgz
cd kafka_2.13-2.6.0

Add code blocks below to serve.properties to ensure Flink in Docker can see Kafka topics running locally:

listeners=EXTERNAL://0.0.0.0:19092,INTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://127.0.0.1:9092,EXTERNAL://docker.for.mac.localhost:19092
inter.broker.listener.name=INTERNAL

To the end of config/server.properties file and then run Zookeeper and Kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Now your Kafka is ready. One thing that is left is to create a Kafka topic that will be used in this case, let’s call it “transactions-data”.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic transactions-data

Step 2 – Create a dummy data

Of course, having data is the biggest requirement of all. For those who have already predefined data for the test, please skip this step. But if you don’t, follow instructions for the Python script we are using to create dummy data. In addition to other libraries, we are using library Faker to create fake user names. So, there is no real person in this example. 

Let’s take a look at the python script for generating dummy data: 

On the top is the import of necessary libraries, some of them you will get immediately with Python installation, the other you should install manually.

import time
import datetime
import random
import pandas as pd 
from faker import Faker 
from kafka import KafkaProducer 
from json import dumps 

In the next code-block, you can see functions used for generating random users, spent amount for each user, type of transaction, location and DateTime of the event.

fake = Faker()
start_time = time.time()

list_of_transactions = ['highway_toll', 'petrol_station', 'supermarket', 'shopping',
                        'mortgage', 'city_service', 'restaurant', 'healthcare']

def generate_random_users():
    list_of_users = []
    for number in range(100):
        list_of_users.append(fake.name())
    return list_of_users

def get_random_user(users):
    return random.choice(users)

def get_usage_amount():
    a = [0, round(random.randint(1, 100) * random.random(), 2)]
    random.shuffle(a)
    return a

def get_usage_date():
    return fake.date_time_between(start_date='-1m', end_date='now').isoformat()

def create_usage_record(list_of_users):
    amount = get_usage_amount()
    return {
        'customer': get_random_user(list_of_users),
        'transaction_type': random.choice(list_of_transactions),
        'online_payment_amount': amount[0],
        'in_store_payment_amount': amount[1],
        'transaction_datetime': get_usage_date(),
    }

The third code-block contains the time range we are going to put our script to generate data until we break (and we choose almost 50 years ahead). In that time-range, the script will generate 1000 records with a pause of 5 seconds. All generated data will be sent to a Kafka topic called data_usage.

users = generate_random_users()
end_datetime = datetime.datetime.combine(datetime.date(2070, 1, 1), datetime.time(1, 0, 0))
current_datetime = datetime.datetime.combine(datetime.date.today(), datetime.datetime.now().time())
while current_datetime <= end_datetime:
    data_usage = pd.DataFrame([create_usage_record(users) for _ in range(1000)])
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'))

    for row in data_usage.itertuples():
        data_customer = row.customer
        data_transaction_type = row.transaction_type
        data_online_payment_amount = row.online_payment_amount
        data_in_store_payment_amount = row.in_store_payment_amount
        data_lat = random.uniform(44.57, 44.91)
        data_lon = random.uniform(20.20, 20.63)
        data_transaction_datetime = row.transaction_datetime + "Z"
        data = {
                'customer': data_customer,
                'transaction_type': data_transaction_type,
                'online_payment_amount': data_online_payment_amount,
                'in_store_payment_amount': data_in_store_payment_amount,
                'lat': data_lat,
                'lon': data_lon,
                'transaction_datetime': data_transaction_datetime
                }
        future = producer.send('transactions-data', value=data)
        result = future.get(timeout=5)
    print("--- {} seconds ---".format(time.time() - start_time))
    time.sleep(5)
    current_datetime = datetime.datetime.combine(datetime.date.today(), datetime.datetime.now().time())

After Kafka installation, we can start a Python script for generating dummy data and load them into the topic. The last step is to create another Python script. But this time with Flink functions. To avoid differences between operating systems, we packed a script with Flink functions into Docker. That way we are sure it will work on every operating system without any or at least, major changes.

Step 3 – Load data to Flink

In the script below, called app.py we have 3 important steps. Definition of data source, the definition of data output (sink) and aggregate function.

Let’s go step by step. The first of them is to connect to a Kafka topic and define source data mode.

import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka, Elasticsearch
from pyflink.table.window import Tumble
 def register_transactions_source(st_env):
    st_env.connect(Kafka()
                   .version("universal")
                   .topic("transactions-data")
                   .start_from_latest()
                   .property("zookeeper.connect", "host.docker.internal:2181")
                   .property("bootstrap.servers", "host.docker.internal:19092")) \
        .with_format(Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("customer", DataTypes.STRING()),
        DataTypes.FIELD("transaction_type", DataTypes.STRING()),
        DataTypes.FIELD("online_payment_amount", DataTypes.DOUBLE()),
        DataTypes.FIELD("in_store_payment_amount", DataTypes.DOUBLE()),
        DataTypes.FIELD("lat", DataTypes.DOUBLE()),
        DataTypes.FIELD("lon", DataTypes.DOUBLE()),
        DataTypes.FIELD("transaction_datetime", DataTypes.TIMESTAMP())]))) \
        .with_schema(Schema()
        .field("customer", DataTypes.STRING())
        .field("transaction_type", DataTypes.STRING())
        .field("online_payment_amount", DataTypes.DOUBLE())
        .field("in_store_payment_amount", DataTypes.DOUBLE())
        .field("lat", DataTypes.DOUBLE())
        .field("lon", DataTypes.DOUBLE())
        .field("rowtime", DataTypes.TIMESTAMP())
        .rowtime(
        Rowtime()
            .timestamps_from_field("transaction_datetime")
            .watermarks_periodic_bounded(60000))) \
        .in_append_mode() \
        .register_table_source("source")

At the top, we can find imports of libraries we are going to use further. In functions register_transactions_source we defined data source, column names and data types for each column. Since we have a timestamp, it’s mandatory to round event time to some level. In this case, it’s 60000 milliseconds. All records with a latency of more than one minute will be left from the stream. In one sentence, the watermark is used to differentiate between the late and the “too-late” events and treat them accordingly.
The last two lines are used to define the appending mode and name of the table that will be used later in the non-source window.

 

Step 4 – Create a sink function and data model

In this step, we will create a function which defines our final output data model:

def register_transactions_sink_into_csv(st_env):
    result_file = "/opt/examples/data/output/output_file.csv"
    if os.path.exists(result_file):
        os.remove(result_file)
    st_env.register_table_sink("sink_into_csv",
                               CsvTableSink(["customer",
                                             "count_transactions",
                                             "total_online_payment_amount",
                                             "total_in_store_payment_amount",
                                             "lat",
                                             "lon",
                                             "last_transaction_time"],
                                            [DataTypes.STRING(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.DOUBLE(),
                                             DataTypes.TIMESTAMP()],
                                            result_file))

In the function register_transactions_sink we are defining output (CSV on the filesystem is our choice this time). In the function st_env.register_table_sink we define job type, columns to export and data types. The last parameter is the path to the file (defined in the first line) where we are going to store records that satisfy our conditions.

Step 5 – Trigger requirements definition

And the last part of the script, but most important is a function called transactions_job. It’s a function that combines both earlier mentions functions into the flow, adding queries to fulfill use-case requirements.

def transactions_job():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_parallelism(1)
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

    register_transactions_source(st_env)
    register_transactions_sink_into_csv(st_env)

    st_env.from_path("source") \
        .window(Tumble.over("10.hours").on("rowtime").alias("w")) \
        .group_by("customer, w") \
        .select("""customer as customer, 
                   count(transaction_type) as count_transactions,
                   sum(online_payment_amount) as total_online_payment_amount, 
                   sum(in_store_payment_amount) as total_in_store_payment_amount,
                   last(lat) as lat,
                   last(lon) as lon,
                   w.end as last_transaction_time
                   """) \
        .filter("total_online_payment_amount<total_in_store_payment_amount") \ 
        .filter("count_transactions>=3") \
        .filter("lon < 20.62") \ .filter("lon > 20.20") \
        .filter("lat < 44.91") \ .filter("lat > 44.57") \
        .insert_into("sink_into_csv")

    st_env.execute("app")


if __name__ == '__main__':
    usage_job()

The first line in the function is defining the environment. The streaming execution environment is definitely the one we are looking for. Environment parameter set_stream_time_characteristic is there to choose how we will threat records in terms of time. We are using a predefined timestamp that comes from the source, another possibility is to add a timestamp to each record while processing it. Event time allows a table program to stream results based on the time that is contained in every record. This way we will have consistent records in the stream even if some of them are late. Of course, until the watermark period is not expired. It also ensures the replayable results of the table program when reading records from persistent storage.

The next two lines are strictly registration of the previously defined source and sink into the streaming flow. The last part of this function is flow order and aggregation definition. In this case, the rolling window is defined by 10 hours, we will group by the customer. The aggregation will be performed on the spent amount (sum), a number of transactions (count) coordinates (last) and time of events (max). Also, there is a filter that needs to be fulfilled so events can be considered as satisfied and make an output to output_file.csv. This simple use case has requirements to trigger users if they spent more online than offline in the last three or more transactions inside filtered coordinates, which is some predefined place. Of course, in real use-case, this will probably be more complicated, which is not a problem since Apache Flink offers a wide range of functions and APIs.

Step 6 – Put everything into Docker

As mentioned earlier, Flink job is packed into docker-compose next way: We are using pyflink/playgrounds:1.10.0 image, which is the official docker image for Flink. You need to be careful with your volumes, in our case is ./examples:/opt/examples. On port localhost:8088 you will have Flink Dashboard with available data and stats about all your Flink jobs.

version: '2.1'
services:
  jobmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8088:8088"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: pyflink/playgrounds:1.10.0
    volumes:
      - ./examples:/opt/examples
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

Dockerfile:

FROM flink:1.10.0-scala_2.11

RUN set -ex; \
  apt-get update; \
  apt-get -y install python3; \
  apt-get -y install python3-pip; \
  apt-get -y install python3-dev; \
  ln -s /usr/bin/python3 /usr/bin/python; \
  ln -s /usr/bin/pip3 /usr/bin/pip


RUN set -ex; \
  apt-get update; \
  python -m pip install --upgrade pip; \
  pip install apache-flink; \
  pip install kafka-python

ARG FLINK_VERSION=1.10.0

RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/${FLINK_VERSION}/flink-sql-connector-elasticsearch6_2.11-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \

# Create data folders
    mkdir -p /opt/data; \
    echo "taskmanager.memory.jvm-metaspace.size: 512m" >> /opt/flink/conf/flink-conf.yaml; \
    echo "rest.port: 8088" >> /opt/flink/conf/flink-conf.yaml;


WORKDIR /opt/flink/

Step 7: Run everything

To run everything from below we need three additional commands:

Run python script to generate dummy data:

python dummy_data_generator.py

Build and run docker:

docker-compose up --build

After a while, you’ll be able to go to localhost:8088 and see the next page:

In the end, add PyFlink job to job manager:

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/queries/app.py

You’ll see a “job submitted” message which indicates you have a running job viewable on Flink Dashboard. Now you need to wait a little bit, depending on the time you choose as a window. After those moments passed, you can take a look at output_file.csv and see your results.

Step 8: Extra step

In case you want the output to ElasticSearch (running in Docker too), you can just add code-block from below to your docker-compose:

elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.1
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536

And in app.py change the sink function with the code block below:

def register_transactions_es_sink(st_env):
    st_env.connect(Elasticsearch()
                   .version("6")
                   .host("0.0.0.0", 9200, "http")
                   .index("transactions-supermarket-case")
                   .document_type("usage")) \
        .with_schema(Schema()
                     .field("customer", DataTypes.STRING())
                     .field("count_transactions", DataTypes.STRING())
                     .field("total_online_payment_amount", DataTypes.DOUBLE())
                     .field('total_in_store_payment_amount', DataTypes.DOUBLE())
                     .field("lon", DataTypes.FLOAT())
                     .field("lat", DataTypes.FLOAT())
                     .field('last_transaction_time', DataTypes.STRING())
                     ) \
        .with_format(Json().derive_schema()).in_upsert_mode().register_table_sink("sink_elasticsearch")

 

I hope this can be a great lead for you to start with your own streaming service and build on top of them. Flink was a little late on stage compared to Apache Storm and Apache Spark, but lately, the community of Flink is growing really fast which ensures fast-paced development of new features and everything you need to do is to use it. Also, it will give you a true streaming experience.

In today’s industries, it is not enough only to stream data with basic filters and joins, but to include machine learning models to help you better recognize real patterns, create recommendation systems, forecast your spent and profit, cluster your customers, etc. In consideration of that, the next blog on Apache Flink will include one of the mentioned machine learning methods in combination with streaming.

Meanwhile, take a look to a blogpost about recommendations in Covid time and stay tuned, because there will be more of this within our blog.

 

 

Featured Photo by Marta Wave from Pexels

Data Exploration with Pandas (Part 2)

In the previous article, I wrote about some introductory stuff and basic Pandas capabilities. In this part, the main focus will be on DateTime values. I am also going to introduce you to some grouping and merging possibilities in Pandas. For this purpose here is another dataset downloaded from UCI Repository, which contains date and time columns.

This time my data comes from Excel file, so the way I read it is a slightly different than in the previous post. After importing pandas library (see the previous blog post) I’m going to use read_excel() function and display data sample with head() function.

Selection_023

As you can see, we have “PERIOD_START_TIME” as a first column, and this column is going to be the focus of this post. This dataset is also suitable for grouping and making aggregations on top of these groups.

To check the column types, the info() function is used. This helps you to look if “PERIOD_START_TIME” is a datetime object, so datetime functions can be applied. If not, the column should be cast to datetime in the following way (after importing the datetime library):

import datatime as dt
data['PERIOD_START_TIME'] = pd.to_datetime(data.PERIOD_START_TIME)

Now, let’s see how to aggregate “PERIOD_START_TIME” to hourly values.

import datetime as dt
data = data.groupby([data['PERIOD_START_TIME'].dt.floor('H'),'CustomerID', 'InvoiceNo', 'StockCode', 'Country']).agg(['mean','sum', 'max'])
data.columns = data.columns.map('_'.join)

What we have done here is aggregating “PERIOD_START_TIME” to the hourly level and finding mean, max and sum for on purpose selected columns while grouping by others. In this case, I have drop other aggregated column and keep just the sum.

Selection_024

Now let’s find a busy hour for each country to see hours when citizens mostly do shopping.

First, let’s find “Quantity_sum” for every hour grouped by country:

data['Quantity_sum_by_hour'] = data.groupby(['PERIOD_START_TIME' ,'Country'])['Quantity_sum'].transform('sum')

For this case we need to separate date and time columns.

We can achieve this with:

data['date'] = data['PERIOD_START_TIME'].dt.date
data['time'] = data['PERIOD_START_TIME'].dt.time

After next code line it will be more understandable why we need separated date and time:

busy_hour=data.sort_values(['Quantity_sum_by_hour'],ascending=False).groupby(['Date','Country']).head(1)

Here we have a grouping by data and country, because our goal is to find sum of columns “Quantity” for every country,  for each day.

Now let’s sort by country to see who are the biggest shoppers:

busy_hour.sort_values(['date', 'Country'],ascending=False).groupby(['date', 'Country']).head(1)

Selection_025

Using head() function with value 1 returns maximum value for each day and country.

As you can see, I’ve selected just a few columns that I need.

Next step could be plotting some results of our previous steps.

Let’s take a look at how values for a busy hour in the United Kingdom grew in a few consecutive days.

united_kingdom_data = data[data['Country']=='United Kingdom']

The simplest way for plotting is using matplotlib library:

import matplotlib.pyplot as plt
plt.figure(figsize=(20, 5))
plt.plot(busy_hour['date'], busy_hour['Quantity_sum_by_hour'])
plt.show()

Selection_026

The first line of code is importing library, the second one is the assigning a size, in the third line we have to choose x and y coordinates. Without the last one where we said plt.show(), nothing would be printed.

That would be a straightforward example of grouping and plotting data. A bit deeper plotting functionalities will be the subject of my future posts.

Now let’s import one more dataset, this time a csv (downloaded from here),  to show joining and merging capabilities.

In this data file called ‘continents_data’ we also have a ‘Country’ column, which is going to be used for joining, and new columns: ‘Continent’ and ‘Region’.

data_merged = data.merge(continents_data, how='inner', on='Country')

, or

data_merged_2 = pd.merge(data, continents_data, how='inner', on='Country')

Selection_034Also, you can append one dataframe to another one, with append() and concat() function, but I’m leaving it to you to explore.
Merging methods compared to SQL:
Left   -  Left outer join
Right - Right outer join
Outer - Full outer join
Inner - Inner join

When performing an outer join, a good practice is to include indicator column, which shows information from which dataset that value comes.

pd.merge(data, continents_data, how='outer', on='Country', indicator='indicator_column')

Selection_035

We should add a suffix to know from which dataframe it comes. As we can see, in dataset  “continents_data” we have no data for United Kingdom, so columns ‘Continent’ and ‘Region’ are empty and we can clearly conclude that from ‘indicator_column’ which we create in this way:

pd.merge(data, continents_data, on='subject_id', how='left', suffixes=('_left', '_right'))

We can also map a set of values into new values:

data['Continent'] = data.Continent.map({'Europe': 'Old Continent', 'Asia' : 'the East', 'Americas' : 'West'})

Now let’s calculate a rolling mean across our data:

data['Rolling_mean'] = data.groupby('Country')['UnitPrice'].apply(lambda x:x.rolling(center=False,window=2).mean())

The first element of the moving average is obtained by taking the average of the initial fixed subset of the number series. Then the subset is modified by “shifting forward”; that is, excluding the first number of the series and including the next value in the subset.
Selection_039

This post provides the basics of data manipulation using merging, joining and mapping functionalities, and also catching up with the time-stamped data. In following posts, I’m going to write in more detail about data plotting and visualization, but make sure you have understood basic pandas functionalities presented in this and previous posts in order to follow up. 🙂

Data Exploration with Pandas (part 1)

If you ever decide to become someone who is into big data, surely you can do it without having a clue about pandas. But that’s not the brightest solution, because why would you leave aside something that’s gonna make you a lot better. Pandas as well know library for manipulating datasets that contains numerical and table structures, which makes it pretty good-to-know library for data engineers and data scientists. In part 1 we’re gonna go through some of the basic stuff to introduce you to the Pandas capabilities.. For the purpose of this article, as an example dataset I’ve used free dataset about GDP statistics for World countries (You can download it from here). Also, you can find a lot of other datasets online for free in this place UCI Repository. For presentation I’m using Jupyter Notebook.

Let’s start with basic stuff – importing data to our script, after importing pandas library.

import pandas as pd
data = pd.read_csv('gdp_country.csv', skiprows=2, delimiter=',')

Variable I’m gonna use from now on is called “data”, and I think this is pretty clear to everyone who has at least a little programming experience.  Next thing is reading data with pd.read_csv(), and path to data is the first parameter in parentheses. In this case, data is in the same folder location as my Python (notebook) script. Parameter “skiprows” depends on quality of your data. I have chosen this time on purpose, to make it more interesting, but to be honest this is one in 50 times that I need to use it. As the word says, you can choose how many top raws you want to skip. The last parameter is really important. CSV file is an abbreviation of comma-separated-values, so it’s obvious that delimiter, like something that separates the values, is comma in this dataset.

When it comes to displaying dataset after reading it, you don’t always need to see a full set, it’s enough to look at the few top rows, and that is when we use head() function. As a parameter you type the number of top rows you want to see.

data.head(2)

Selection_008

 

Big Data means that you are really often going to  work with a huge amount of data, which usually includes some unwanted or unnecessary columns and rows, so the best way to optimize your memory usage is to drop them. If there is just a one column, you can use a lighter command

data.drop("column_to_drop", axis=1, inplace=True)

For more columns the best approach is to create a list of column names, and drop them all together:

Or by selecting columns that contain some string,

data[data.columns.drop(list(data.filter(regex='string_we_need')))]

Drop() function takes column name or list of column names as a first parameter, the second one is axis, which has values 0 and 1 that represent rows and columns.

Inplace with values True or False gives you an opportunity to save it permanently in the existing variable, but if you create a new one, like in the second example, “inplace” parameter should not be used.

In case you need to drop many columns and keep just a few, there is an easier way with just selecting necessary columns.

data = data[['Country Name', 'Country Code', 'Indicator name', 'Indicator Code', '1965', '2009',  '2011']]

Selection_020

If you tend to make a new dataframe with exactly the same shape and value, you can just do the next:

new_data = data

However, this will not give you the possibility to work with each other independently by not catching the other one.

The following line provides such a capability:

New_data = data.copy()

Feature data.shape will gives a quick insight into number of rows and columns in the dataframe.

Filtering rows is not that distant from filtering columns.

data = data[[data['Country Name']=='Angola']

Selection_003

Filtering can be applied to multiple criteria by chaining conditions:

data = data[(data['Country Name']=='Angola') | (data['Country Name']=='Uruguay')]

Selection_004

Another way for achieving this:

my_list_of_values = ['Angola', 'Uruguay']
data.loc[data['Country Name'].isin(my_list_of_values)]

If you need top rows you can use head function, mentioned earlier, but assume that you need rows somewhere from the middle of the dataset, there is selecting by the index, starting from 0 as a first row.

data = data[134:142]

Also, a set of random rows can be retrieved from Pandas Dataframe, using function .sample()

data.sample(4)

Selection_006

You may have noticed that we have NaN value for Monaco in 2011. and that’s classified as a missing data. For now, we are going to skip that.

Now let’s play with column names and keys.

Something that is usually required is changing column names.

data.rename(columns={'Country Name' : 'Name of country'}, inplace=True)

Selection_007

Adding prefix or suffix is frequent but simple routine.

data = data.add_suffix('_added-suffix')

Selection_010

Since the data comes from various sources, after reading it in spaces might occur within column mans. The bests practice is removing them, since they might cause troubles:

data.columns = [x.strip().replace(' ','_') for x in data.columns]

Selection_012

To split all column values by one point we can use next code-line (but keep in mind that no values should be missing:

data['Indicator Name'].str.split(',').str[1]

We pass the name of the DataFrame (“data”), then column name (“Indicator Name”), and proceed with the character we’re gonna split by and ordinal number of string we are taking (started with zero).

Splitting in Pandas works pretty much the same as in Python:

String = '2_under_score'
Splitted = String.split('_')[1]
Out:
'Under'

To set desired value in one cell, we can pass the name of a column and row index:

data.set_value(7, 'Indicator Name', 'Changed value in cell').inplace=True

Selection_021

Handling missing values and duplicates with basic grouping and sorting data

There is a new column Continent which I’m gonna use for grouping.

data.sort_values(['2011'],ascending=False).groupby('Continent').head(5)

Selection_028

Column “2011” is chosen to be sorted in descending order, but just within a self group that represents continents. That’s all you need to know about grouping to get in touch with missing nan values and duplicates.

In this case, it’s obvious there are some missing values, but sometimes in large datasets it will be in the middle so we need to use a function .isnull(), we can use it inside the function len to count rows with NaN values in some column. That’s pretty easy and we do it this way:

len(data[data.Column.isnull()==True])

Also, we can check unique values with the next code.

len(data['name_of _column'].unique())

The way to handle missing data depends on the requirements, whether to fill it with some constant value or drop rows or columns.

Dropping row with at least one NaN value:

data.dropna(axis=0, inplace=True)

Selection_029

From the previous dataframe now it’s only one row left.

Dropping columns:

data.dropna(axis=1, inplace=True)

Selection_030

Impute value instead of NaN value:

data.fillna(1)

Selection_017

For example, if you plan to do some multiplication, the best way to fill missing values is fill with 1, especially when some calculations is in plan to do, but you can fill it with a string or anything you need.

Dropping duplicates is pretty simple with this code “data.drop_duplicates()”, but this won’t work for you every time, so we need something a little bit more complex.

I have one case where there was a huge number of duplicate rows or rows with approximately the same values, so I will use it to take maximum value for each group. Columns that you want to keep you specify in groupby parentheses.

There is a neat way to do this with Pandas:

data.groupby(['Continent'])[['1965', '2009', '2011']].max()

Selection_018

As it’s noticed, if you don’t mention column name in a group by or you don’t do any aggregate function on them, they won’t appear in the output.

“Continent” is now index column, and for going back from index to column label, we need to reset the index.

data.reset_index(drop=False)

With options drop=True and drop=False it is possible to keep or drop index column, in this case, column “Continent”.

And for the end, saving dataframe into csv file:

data.to_csv('name_of_csv_file.csv', index=False)

That’s enough for Part 1 – we have gone over basic data import from csv into Pandas DataFrame, selecting, dropping, renaming columns, and concluded with handling the missing data. Those operations are ordinary for almost every dataset to get prepared for further calculations. In the following posts, I’m going to write about joining, merging, advanced column calculations, applying map functions and more. The focus will be on more challenges from my real projects, and I find that it will be more interesting. Any question is welcome, so do not hesitate. 🙂

Interactive log analysis with Apache Spark

The Internet is becoming the largest global shop across markets, and anyone who is offering products and services of any kind prefers for web shops to become the primary outlets to supply customers. This leads to a reduction in the number of employees and traditional brick and mortar branches and reduction in costs, so it is clear that the customer behavior analysis on digital and online channels is of great importance. For this reason it should not be surprising that many companies accept this kind of analysis as a basic need.

In this post I will not focus that much on Spark itself, since the Apache community has an excellent documentation. It is enough to mention that Apache Spark is the most common Big Data tool for processing large amounts of data, with rich APIs for machine learning, streaming data, graph analysis,  etc. The  main feature of Spark is that the processing is performed in-memory which makes it extremely fast – even  up to 100 times faster than traditional MapReduce, and that is why it is recognized as the  most valuable tool in cases like this, when dealing with hundreds of gigabytes of data.

Spark provides APIs in Scala, Java, R, SQL and Python. In this post we are going to use the last one, which is called PySpark. In terms of data structures, Spark supports three types – RDD, Datasets and DataFrames. Datasets and DataFrames are built on top of the Spark SQL engine and that’s a reason for more efficient  way to handle the data compared to RDD.

Now we can move to the subject of this post. The main focus will be some basic methods for parsing website logs in PySpark, with a little help from SQL for making life easier, and several pretty neat Python libraries specifically designed for this purpose.

The first step after library imports  would be creating SparkSession and determining the path to the input file. As input I will be using synthetically generated logs from Apache web server, and Jupyter Notebook for interactive analysis.

Let’s start building our Spark application. The first step is to build a SparkSession object, which is the entry point for a Spark application…

[code language=“python”]
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import Row

spark = SparkSession.builder.appName(“Logs Streaming Parser”).getOrCreate()
mydata = spark.sparkContext.textFile(‘/Users/sinisajovic/Downloads/logs.log’)
[/code]

Data is rarely 100% well formatted, so I would suggest  applying a function that will reduce missing or incorrect exported log lines. In our dataset if there is an incorrect logline it would start with ‘#’ or ‘-’, and only thing we need to do is skip those lines.

Here is a sample Apache server log line:

[code language=“python”]

127.0.0.1.800.00, 127.0.0.1.800.00 – – [08/Feb/2017:16:33:27 +0100] “GET /api/house/get_for_compare?idn=33&code=99992&type= HTTP/1.1” 404 19636 “mywebsitelocation.com” “Mozilla/5.0 (Linux; Android 5.0.1; GT-I9505 Build/LRX22C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Mobile Safari/537.36”

[/code]

To split log lines into useful columns we are going to write a regex. Pythex.org is a good place to test whether the regex suits our needs.

In my case the regex looks like this:

[code language=“python”]

(.+?)\ – – \[(.+?)\] \“(.+?)\ (.+?)\ (.+?)\/(.+?)\” (.+?) (.+?) \“(.+?)\” \“(.+?)\”

[/code]

We are going to use user agent library, since it has pretty good ability to identify primary device characteristics, such as brand name, model, type, operating system that runs on device, browser used to surf and so on. Also, we can check if is it touch device or not, or even if it’s a bot…

[code language=“python”]
from user_agents import parse
from urlparse import urlparse
import requests
import datetime

def device_type(user_agent):
options = {1: ‘mobile’, 2: ‘tablet’, 3: ‘pc’, 4:’touch’, 5:’bot’}
if user_agent.is_mobile:
ty = 1
elif user_agent.is_tablet:
ty = 2
elif user_agent.is_pc:
ty = 3
elif user_agent.is_touch_capable:
ty = 4
elif user_agent.is_bot:
ty = 5
else:
ty = 6
return options.get(ty, ‘unknown’)
[/code]

The main part of our script is the function parse_line, for getting the structure of defined structure of defined fields by parsing, using our regex pattern (LOG_PATTERN), user agent, and tldextract libraries. As output we are retrieving columns with matched groups of user agent strings.

[code language=“python”]
def parse_line(logline):
if len(logline)<1 or logline is None:
return ‘Other’
try:
match = re.search(LOG_PATTERN, logline)
user_agent = parse(match.group(10))
domen = tldextract.extract(match.group(9)).domain

return Row(
IP_protocol=match.group(1),
Timestemp=match.group(2),
Request_type=match.group(3),
Request=match.group(4),
Type=match.group(5),
Version=match.group(6),
Status=match.group(7),
Size_of_response=match.group(8),
Referer=match.group(9),
Browser=user_agent.browser.family,
Device=user_agent.device.family,
OS=user_agent.os.family,
Mobile=user_agent.is_mobile,
Tablet=user_agent.is_tablet,
Touch=user_agent.is_touch_capable,
PC=user_agent.is_pc,
Bot=user_agent.is_bot,
Domain=domen

)
except:
pass
[/code]

The last match group in this case is domain, parsed with tldextract library. The tldextract library takes the url link and extracts the domain information from the link. After we’re finished with processing with the libraries that I’ve introduces you to, we need to create the schema. This is nothing else than creating a table like structure, matching fields with previous functions and assigning the column names. You can check just a few lines of code to how to create schema for our dataset:

[code language=“python”]
schema = StructType([
StructField(‘Browser’, StringType(), True),
StructField(‘Device’, StringType(), True),
StructField(‘OS’, StringType(), True),

StructField(‘PC’, StringType(), True),
StructField(‘Bot’, StringType(), True)
])

logs = spark.createDataFrame(data=log_parsed, schema=schema, samplingRatio=None)
[/code]

This is maybe a too much of code samples, but I find it somehow much more efficient and helpful for understanding the basic logic of our approach. Since we want to use the SQL capabilities, let’s create a temp table.

[code language=“python”]
from pyspark.sql import SparkSession
from pyspark.sql.types import *
log_df.registerTempTable(name=’data’)
log_df.createOrReplaceTempView(name=logsdata)
devicedf = spark.sql(‘select Device, count(Device) from log_df group by Device’)
browserdf = spark.sql(‘select Browser, count(Browser) from log_df group by Browser’)
[/code]

With this implementation we’re allowed to write SQL queries with huge benefits for filtering and sorting data. Very important thing to do is filter poorly processed lines. For further processing, good approach would be to create a DataFrame for each specific category. The way to do that is to select category and do count and group by. We can do this in pandas too, but SQL gives us a much better performance. This is the last step in our tutorial before skipping to one of the most popular Python library for handling with structured datasets, known as pandas. To simplify the usage of matched groups, create pandas dataframe for each of them.

For plotting we’re going to use matplotlib library.

[code language=“python”]
Import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib.patches as mpatches

devicetype = device_df.plot.bar(x=[‘Mobile’], color =(‘firebrick’,’steelblue’,’darkgrey’), figsize=(8, 8), title=’Devices’, width = 0.5, fontsize = 15);
devicetype.set_xlabel(‘Device type’)
plt.figure();

operatingsystems_bar = operatingsystems _df.plot.bar(‘OS’, color=’Red’, figsize=(8, 8), yticks = operatingsystems _df[‘count(OS)’], title=’Operating Systems’, width = 0.5, fontsize = 15);
plt.show()
[/code]On the first plot we can display the device information – whether it is a tablet, touch , mobile, PC or of some other type.
devicetype_bar
We can also dig into the operating systems most commonly used on devices. There are many plot types supported, and they are pretty well documented in the official library documentation.
Operating systemsLet’s  see which are the most represented mobile phones and the browsers for our users.
BrowsersPhones

On this plot we see that iPhone is on the first place. The reason for this lies in the library itself – the library does not recognize the exact model of iPhone brand, unlike the others.
We can see a pretty tremendous domination of Samsung in a place of budget phone market and Chrome in a browsers sphere (at least in Serbia based on our sample)
Also, it’s evident how Facebook marketing is essential and decisive, since Facebook in-app browser is on the third place.

This is just a brief introduction to exploratory log analysis with Python, pandas and matplotlib. Pandas provides us capabilities to do a much deeper analysis using DataFrames, which will be the main focus of my future posts. We will also move to a more advanced analysis, such as mapping user behavior on a website, user segmentation and recommender systems.