Once you start with streaming, you go with the flow!

Data streaming has become very popular in the big data industry. It is used for processing large amounts of data from different sources which are continuously generated, in real-time. When we say “real-time” we need to understand that it can vary from a few milliseconds to a few minutes. 

Besides that streaming is enabling us processing and enrichment in real-time, it also improves performances significantly. A set of data from a certain period of time is grouped into a batch, and when processing such a set, problems with memory and storage can occur.

In stream processing, micro-batches are generated and arrive in milliseconds, then certain conditions are determined, based on which we can take the foreseen steps. Once the micro-batches have passed through a stream that consists of loading, transformation, and eventual aggregations and conditions, data disappears from memory.

The conclusion is that we can work to process a much larger amount of data on cheaper hardware if the appropriate tool is selected and the quality processing is set up.

batch vs stream processing

Streaming essentials

Latency

Latency is the time period between activity at source, and this can be a transaction, a weblog, an activity at a base station or networks, until the time when the data is stored in the appropriate form.

The latency level, or delay that we can accept, depends on the type of application we are working on and the use case. Therefore, when choosing a real-time data processing tool, the first precondition is that the latency obtained by choosing a particular tool is no longer than the delay that we can tolerate. For example, security and fraud detection are some of the case-case examples that do not tolerate a great latency period.

Event time & Processing time

Event time is the time when some transactions happen. Processing time is the time when it’s processed inside our job. In an ideal world without any latency or delay, those two timestamps should be the same. Unfortunately, that’s hardly possible, but we tend to lower the gap between them. 

Joins

Joining in batch processing is mostly a trivial thing, but in streaming, it’s not that straightforward. If you want to join the stream part of your flow with some static table, it’s gonna be pretty much the same as batch joining, however, if you tend to join two streams, things are getting complicated. Because of this, it’s really important to give special attention to datetime records to avoid misleading calculations and possible resource overuse. 

Stateless & Stateful 

Each block or window should be defined in detail. One of the parameters that are inevitable is the state of the window. The words themselves tell us what it is about. Stateless is a type of window in a stream where records are not kept, which means that history is not collected in a certain time frame and is mainly used in windows for filtering, joins with a static table for flow enrichment or calculation at the level of one record.

Stateful is the exact opposite of that. If we define a window as stateful, we are obliged to define the time frame in which the data will be stored. It is used as a step before aggregations at the level of certain time intervals.

Streaming tools

Mostly thanks to the development of tools from the Apache Foundation and the development of cloud solutions, today we have no shortage of streaming software. As you may have already concluded, to develop a streaming application, programming knowledge is required, while the required level of knowledge may vary depending on the complexity of the use-case and the tools you choose. In one of the following texts, we will write more precise about one of the tool from the picture below

Streaming use-cases

Telco. Process and analysis of events data (CDRs). With a streaming approach, you can have real-time data or voice usage per customer and trigger additional campaigns regarding it. Using network performance data you can detect anomalies on your base stations in almost real-time. This can be used alongside your alarm system to enrich failures information. 

Banking. The streaming approach will bust your time to respond in case of any fraudulent activity. Banking is probably the most promising industry when it comes to real-time use cases. One of them is customer personalization. With this approach, you can send an optimal offer depending on location, transaction, context, etc.

Retail. The retail industry also gives us a huge spectrum of use-cases to be analyzed in real-time. We can use it for better recommender systems, location-based triggers, prepare for market trends, and personalized offers. In this industry smart and on-time supply, monitoring is crucial, so inventory management can save you a lot of money and effort, for sure. 

Automotive.  Everyone knows about the Uber use-case. The purpose of the streaming approach is to find the nearest free car to optimize expenses and increase customer satisfaction. Traffic prediction and route optimization need to be done in a timely manner – which makes this industry a perfect ground for streaming applications. 

In the competitive environment and industry, as it is today, it is really necessary to take the right steps in the shortest possible time to be ahead of the competition. Thanks to the throughput of the networks we have, the cloud, cheap hardware, and the development of the tools we use today, we can and should take all the steps to take advantage of the large amount of data that we receive in real-time from various systems, which we were not able to process until now. Suggestions would be to start with the simplest use case that fits your business and then build more advanced solutions on top of that, without rushing into complexity.

In the next post, we will write more specifically about Flink and usage in the streaming environment – stay tuned.

 

 

 

The cover photo is taken from https://pexels.com/@munkee-panic-272941

Interactive log analysis with Apache Spark

The Internet is becoming the largest global shop across markets, and anyone who is offering products and services of any kind prefers for web shops to become the primary outlets to supply customers. This leads to a reduction in the number of employees and traditional brick and mortar branches and reduction in costs, so it is clear that the customer behavior analysis on digital and online channels is of great importance. For this reason it should not be surprising that many companies accept this kind of analysis as a basic need.

In this post I will not focus that much on Spark itself, since the Apache community has an excellent documentation. It is enough to mention that Apache Spark is the most common Big Data tool for processing large amounts of data, with rich APIs for machine learning, streaming data, graph analysis,  etc. The  main feature of Spark is that the processing is performed in-memory which makes it extremely fast – even  up to 100 times faster than traditional MapReduce, and that is why it is recognized as the  most valuable tool in cases like this, when dealing with hundreds of gigabytes of data.

Spark provides APIs in Scala, Java, R, SQL and Python. In this post we are going to use the last one, which is called PySpark. In terms of data structures, Spark supports three types – RDD, Datasets and DataFrames. Datasets and DataFrames are built on top of the Spark SQL engine and that’s a reason for more efficient  way to handle the data compared to RDD.

Now we can move to the subject of this post. The main focus will be some basic methods for parsing website logs in PySpark, with a little help from SQL for making life easier, and several pretty neat Python libraries specifically designed for this purpose.

The first step after library imports  would be creating SparkSession and determining the path to the input file. As input I will be using synthetically generated logs from Apache web server, and Jupyter Notebook for interactive analysis.

Let’s start building our Spark application. The first step is to build a SparkSession object, which is the entry point for a Spark application…

[code language=“python”]
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import Row

spark = SparkSession.builder.appName(“Logs Streaming Parser”).getOrCreate()
mydata = spark.sparkContext.textFile(‘/Users/sinisajovic/Downloads/logs.log’)
[/code]

Data is rarely 100% well formatted, so I would suggest  applying a function that will reduce missing or incorrect exported log lines. In our dataset if there is an incorrect logline it would start with ‘#’ or ‘-’, and only thing we need to do is skip those lines.

Here is a sample Apache server log line:

[code language=“python”]

127.0.0.1.800.00, 127.0.0.1.800.00 – – [08/Feb/2017:16:33:27 +0100] “GET /api/house/get_for_compare?idn=33&code=99992&type= HTTP/1.1” 404 19636 “mywebsitelocation.com” “Mozilla/5.0 (Linux; Android 5.0.1; GT-I9505 Build/LRX22C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Mobile Safari/537.36”

[/code]

To split log lines into useful columns we are going to write a regex. Pythex.org is a good place to test whether the regex suits our needs.

In my case the regex looks like this:

[code language=“python”]

(.+?)\ – – \[(.+?)\] \“(.+?)\ (.+?)\ (.+?)\/(.+?)\” (.+?) (.+?) \“(.+?)\” \“(.+?)\”

[/code]

We are going to use user agent library, since it has pretty good ability to identify primary device characteristics, such as brand name, model, type, operating system that runs on device, browser used to surf and so on. Also, we can check if is it touch device or not, or even if it’s a bot…

[code language=“python”]
from user_agents import parse
from urlparse import urlparse
import requests
import datetime

def device_type(user_agent):
options = {1: ‘mobile’, 2: ‘tablet’, 3: ‘pc’, 4:’touch’, 5:’bot’}
if user_agent.is_mobile:
ty = 1
elif user_agent.is_tablet:
ty = 2
elif user_agent.is_pc:
ty = 3
elif user_agent.is_touch_capable:
ty = 4
elif user_agent.is_bot:
ty = 5
else:
ty = 6
return options.get(ty, ‘unknown’)
[/code]

The main part of our script is the function parse_line, for getting the structure of defined structure of defined fields by parsing, using our regex pattern (LOG_PATTERN), user agent, and tldextract libraries. As output we are retrieving columns with matched groups of user agent strings.

[code language=“python”]
def parse_line(logline):
if len(logline)<1 or logline is None:
return ‘Other’
try:
match = re.search(LOG_PATTERN, logline)
user_agent = parse(match.group(10))
domen = tldextract.extract(match.group(9)).domain

return Row(
IP_protocol=match.group(1),
Timestemp=match.group(2),
Request_type=match.group(3),
Request=match.group(4),
Type=match.group(5),
Version=match.group(6),
Status=match.group(7),
Size_of_response=match.group(8),
Referer=match.group(9),
Browser=user_agent.browser.family,
Device=user_agent.device.family,
OS=user_agent.os.family,
Mobile=user_agent.is_mobile,
Tablet=user_agent.is_tablet,
Touch=user_agent.is_touch_capable,
PC=user_agent.is_pc,
Bot=user_agent.is_bot,
Domain=domen

)
except:
pass
[/code]

The last match group in this case is domain, parsed with tldextract library. The tldextract library takes the url link and extracts the domain information from the link. After we’re finished with processing with the libraries that I’ve introduces you to, we need to create the schema. This is nothing else than creating a table like structure, matching fields with previous functions and assigning the column names. You can check just a few lines of code to how to create schema for our dataset:

[code language=“python”]
schema = StructType([
StructField(‘Browser’, StringType(), True),
StructField(‘Device’, StringType(), True),
StructField(‘OS’, StringType(), True),

StructField(‘PC’, StringType(), True),
StructField(‘Bot’, StringType(), True)
])

logs = spark.createDataFrame(data=log_parsed, schema=schema, samplingRatio=None)
[/code]

This is maybe a too much of code samples, but I find it somehow much more efficient and helpful for understanding the basic logic of our approach. Since we want to use the SQL capabilities, let’s create a temp table.

[code language=“python”]
from pyspark.sql import SparkSession
from pyspark.sql.types import *
log_df.registerTempTable(name=’data’)
log_df.createOrReplaceTempView(name=logsdata)
devicedf = spark.sql(‘select Device, count(Device) from log_df group by Device’)
browserdf = spark.sql(‘select Browser, count(Browser) from log_df group by Browser’)
[/code]

With this implementation we’re allowed to write SQL queries with huge benefits for filtering and sorting data. Very important thing to do is filter poorly processed lines. For further processing, good approach would be to create a DataFrame for each specific category. The way to do that is to select category and do count and group by. We can do this in pandas too, but SQL gives us a much better performance. This is the last step in our tutorial before skipping to one of the most popular Python library for handling with structured datasets, known as pandas. To simplify the usage of matched groups, create pandas dataframe for each of them.

For plotting we’re going to use matplotlib library.

[code language=“python”]
Import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib.patches as mpatches

devicetype = device_df.plot.bar(x=[‘Mobile’], color =(‘firebrick’,’steelblue’,’darkgrey’), figsize=(8, 8), title=’Devices’, width = 0.5, fontsize = 15);
devicetype.set_xlabel(‘Device type’)
plt.figure();

operatingsystems_bar = operatingsystems _df.plot.bar(‘OS’, color=’Red’, figsize=(8, 8), yticks = operatingsystems _df[‘count(OS)’], title=’Operating Systems’, width = 0.5, fontsize = 15);
plt.show()
[/code]On the first plot we can display the device information – whether it is a tablet, touch , mobile, PC or of some other type.
devicetype_bar
We can also dig into the operating systems most commonly used on devices. There are many plot types supported, and they are pretty well documented in the official library documentation.
Operating systemsLet’s  see which are the most represented mobile phones and the browsers for our users.
BrowsersPhones

On this plot we see that iPhone is on the first place. The reason for this lies in the library itself – the library does not recognize the exact model of iPhone brand, unlike the others.
We can see a pretty tremendous domination of Samsung in a place of budget phone market and Chrome in a browsers sphere (at least in Serbia based on our sample)
Also, it’s evident how Facebook marketing is essential and decisive, since Facebook in-app browser is on the third place.

This is just a brief introduction to exploratory log analysis with Python, pandas and matplotlib. Pandas provides us capabilities to do a much deeper analysis using DataFrames, which will be the main focus of my future posts. We will also move to a more advanced analysis, such as mapping user behavior on a website, user segmentation and recommender systems.