Apache Spark

According to Databrick’s definition “Apache Spark is a lightning-fast unified analytics engine for big data and machine learning. It was originally developed at UC Berkeley in 2009.”

Databricks is one of the major contributors to Spark includes yahoo! Intel etc. Apache spark is one of the largest open-source projects for data processing. It is a fast and in-memory data processing engine.

History of spark :
Spark started in 2009 in UC Berkeley R&D Lab which is known as AMPLab now. Then in 2010 spark became open source under a BSD license. After that spark transferred to ASF (Apache Software Foundation) in June 2013. Spark researchers previously working on Hadoop map-reduce. In UC Berkeley R&D Lab they observed that was inefficient for iterative and interactive computing jobs. In Spark to support in-memory storage and efficient fault recovery that Spark was designed to be fast for interactive queries and iterative algorithms. In the below-given diagram, we are going to describe the history of Spark. Let’s have a look.

Features of Spark :

  • Apache spark can use to perform batch processing.
  • Apache spark can also use to perform stream processing. For stream processing, we were using Apache Storm / S4.
  • It can be used for interactive processing. Previously we were using Apache Impala or Apache Tez for interactive processing.
  • Spark is also useful to perform graph processing. Neo4j / Apache Graph was using for graph processing.
  • Spark can process the data in real-time and batch mode.

So, we can say that Spark is a powerful open-source engine for data processing.

Components of Apache Spark

Spark is a cluster computing system. It is faster as compared to other cluster computing systems (such as Hadoop). It provides high-level APIs in Python, Scala, and Java. Parallel jobs are easy to write in Spark. In this article, we will discuss the different components of Apache Spark.

Spark processes a huge amount of datasets and it is the foremost active Apache project of the current time. Spark is written in Scala and provides API in Python, Scala, Java, and R. The most vital feature of Apache Spark is its in-memory cluster computing that extends the speed of the data process. Spark is an additional general and quicker processing platform. It helps us to run programs relatively quicker than Hadoop (i.e.) a hundred times quicker in memory and ten times quicker even on the disk. The main features of spark are:

  1. Multiple Language Support: Apache Spark supports multiple languages; it provides API’s written in Scala, Java, Python or R. It permits users to write down applications in several languages.
  2. Quick Speed: The most vital feature of Apache Spark is its processing speed. It permits the application to run on a Hadoop cluster, up to one hundred times quicker in memory, and ten times quicker on disk.
  3. Runs Everywhere: Spark will run on multiple platforms while not moving the processing speed. It will run on Hadoop, Kubernetes, Mesos, Standalone, and even within the Cloud.
  4. General Purpose: It is powered by plethora libraries for machine learning (i.e.) MLlib, DataFrames, and SQL at the side of Spark Streaming and GraphX. It is allowed to use a mix of those libraries which are coherently associated with the application. The feature of mix streaming, SQL, and complicated analytics, within the same application, makes Spark a general framework.
  5. Advanced Analytics: Apache Spark also supports “Map” and “Reduce” that has been mentioned earlier. However, at the side of MapReduce, it supports Streaming data, SQL queries, Graph algorithms, and Machine learning. Thus, Apache Spark may be used to perform advanced analytics.

Components of Spark:

The above figure illustrates all the spark components. Let’s understand each of the components in detail:

  1. Spark Core: All the functionalities being provided by Apache Spark are built on the highest of the Spark Core. It delivers speed by providing in-memory computation capability. Spark Core is the foundation of parallel and distributed processing of giant dataset. It is the main backbone of the essential I/O functionalities and significant in programming and observing the role of the spark cluster. It holds all the components related to scheduling, distributing and monitoring jobs on a cluster, Task dispatching, Fault recovery. The functionalities of this component are:
    1. It contains the basic functionality of spark. (Task scheduling, memory management, fault recovery, interacting with storage systems).
    1. Home to API that defines RDDs.
  2. Spark SQL Structured data: The Spark SQL component is built above the spark core and used to provide the structured processing on the data. It provides standard access to a range of data sources. It includes Hive, JSON, and JDBC. It supports querying data either via SQL or via the hive language. This also works to access structured and semi-structured information. It also provides powerful, interactive, analytical application across both streaming and historical data. Spark SQL could be a new module in the spark that integrates the relative process with the spark with programming API. The main functionality of this module is:
    1. It is a Spark package for working with structured data.
    1. It Supports many sources of data including hive tablets, parquet, json.
    1. It allows the developers to intermix SQK with programmatic data manipulation supported by RDDs in python, scala and java.
  3. Spark Streaming: Spark streaming permits ascendible, high-throughput, fault-tolerant stream process of live knowledge streams. Spark can access data from a source like a flume, TCP socket. It will operate different algorithms in which it receives the data in a file system, database and live dashboard. Spark uses Micro-batching for real-time streaming. Micro-batching is a technique that permits a method or a task to treat a stream as a sequence of little batches of information. Hence spark streaming groups the live data into small batches. It delivers it to the batch system for processing. The functionality of this module is:
    1. Enables processing of live streams of data like log files generated by production web services.
    1. The API’s defined in this module are quite similar to spark core RDD API’s.
  4. Mlib Machine Learning: MLlib in spark is a scalable Machine learning library that contains various machine learning algorithms. The motive behind MLlib creation is to make the implementation of machine learning simple. It contains machine learning libraries and the implementation of various algorithms. For example, clustering, regression, classification and collaborative filtering.
  5. GraphX graph processing: It is an API for graphs and graph parallel execution. There is network analytics in which we store the data. Clustering, classification, traversal, searching, and pathfinding is also possible in the graph. It generally optimizes how we can represent vertex and edges in a graph. GraphX also optimizes how we can represent vertex and edges when they are primitive data types. To support graph computation, it supports fundamental operations like subgraph, joins vertices, and aggregate messages as well as an optimized variant of the Pregel API.

Uses of Apache Spark: The main applications of the spark framework are:

  1. The data generated by systems aren’t consistent enough to mix for analysis. To fetch consistent information from systems we will use processes like extract, transform and load and it reduces time and cost since they are very efficiently implemented in spark.
  2. It is tough to handle the time generated data like log files. Spark is capable enough to work well with streams of information and reuse operations.
  3. As spark is capable of storing information in memory and might run continual queries quickly, it makes it straightforward to figure out the machine learning algorithms that can be used for a particular kind of data.

Introduction to PySpark Distributed Computing with Apache Spark

Datasets are becoming huge. Infact, data is growing faster than processing speeds. Therefore, algorithms involving large data and high amount of computation are often run on a distributed computing system. A distributed computing system involves nodes (networked computers) that run processes in parallel and communicate (if, necessary).

MapReduce – The programming model that is used for Distributed computing is known as MapReduce. The MapReduce model involves two stages, Map and Reduce.

  1. Map – The mapper processes each line of the input data (it is in the form of a file), and produces key – value pairs.

Input data → Mapper → list([key, value])

  • Reduce – The reducer processes the list of key – value pairs (after the Mapper’s function). It outputs a new set of key – value pairs.

list([key, value]) → Reducer → list([key, list(values)])

Spark – Spark (open source Big-Data processing engine by Apache) is a cluster computing system. It is faster as compared to other cluster computing systems (such as, Hadoop). It provides high level APIs in Python, Scala, and Java. Parallel jobs are easy to write in Spark. We will cover PySpark (Python + Apache Spark), because this will make the learning curve flatter. To install Spark on a linux system, follow this. To run Spark in a multi – cluster system, follow this. We will see how to create RDDs (fundamental data structure of Spark).

RDDs (Resilient Distributed Datasets) – RDDs are immutable collection of objects. Since we are using PySpark, these objects can be of multiple types. These will become more clear further.

SparkContext – For creating a standalone application in Spark, we first define a SparkContext –

from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster(“local”).setAppName(“Test”) # setMaster(local) – we are doing tasks on a single machine sc = SparkContext(conf = conf)

RDD transformations – Now, a SparkContext object is created. Now, we will create RDDs and see some transformations on them.

# create an RDD called lines from ‘file_name.txt’ lines = sc.textFile(“file_name.txt”, 2)    # print lines.collect() prints the whole RDD print lines.collect()

One major advantage of using Spark is that it does not load the dataset into memory, lines is a pointer to the ‘file_name.txt’ file.

A simple PySpark app to count the degree of each vertex for a given graph 

from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster(“local”).setAppName(“Test”) # setMaster(local) – we are doing tasks on a single machine sc = SparkContext(conf = conf) def conv(line):     line = line.split()     return (int(line[0]), [int(line[1])]) def numNeighbours(x, y):     return len(x) + len(y) lines = sc.textFile(‘graph.txt’) edges = lines.map(lambda line: conv(line)) Adj_list = edges.reduceByKey(lambda x, y: numNeighbours(x, y)) print Adj_list.collect()

Understanding the above code 

  1. Our text file is in the following format – (each line represents an edge of a directed graph)
    1    2
    1    3
    2    3
    3    4
    .    .
    .    .
    .    .PySpark
  2. Large Datasets may contain millions of nodes, and edges.
  3. First few lines set up the SparkContext. We create an RDD lines from it.
  4. Then, we transform the lines RDD to edges RDD.The function conv a?cts on each line and key value pairs of the form (1, 2), (1, 3), (2, 3), (3, 4), … are stored in the edges RDD.
  5. After this the reduceByKey aggregates all the key – pairs corresponding to a particular key and numNeighbours function is used for generating each vertex’s degree in a separate RDD Adj_list, which has the form (1, 2), (2, 1), (3, 1), …

Running the code 

  1. The above code can be run by the following commands –

$ cd /home/arik/Downloads/spark-1.6.0/

$ ./bin/spark-submit degree.py

  • You can use your Spark installation path in the first line.

Pyspark Linear regression using Apache MLlib

Problem Statement: Build a predictive Model for the shipping company, to find an estimate of how many Crew members a ship requires.


The dataset contains 159 instances with 9 features.

The Description of dataset is as below:


Let’s make the Linear Regression Model, predicting Crew members

Attached dataset: cruise_ship_info

import pyspark from pyspark.sql import SparkSession #SparkSession is now the entry point of Spark #SparkSession can also be construed as gateway to spark libraries    #create instance of spark class spark=SparkSession.builder.appName('housing_price_model').getOrCreate()    #create spark dataframe of input csv file df=spark.read.csv('D:\python coding\pyspark_tutorial\Linear regression\ cruise_ship_info.csv'                   ,inferSchema=True,header=True) df.show(10)

Output :

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23|            70.367|     20.56|  8.55| 10.22|            34.23| 9.2|
|Fascination|   Carnival| 19|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Freedom|   Carnival|  6|110.23899999999999|      37.0|  9.51| 14.87|            29.79|11.5|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
#prints structure of dataframe along with datatype df.printSchema()

Output :

#In our predictive model, below are the columns df.columns

Output :

#columns identified as features are as below: #['Cruise_line','Age','Tonnage','passengers','length','cabins','passenger_density'] #to work on the features, spark MLlib expects every value to be in numeric form #feature 'Cruise_line is string datatype #using StringIndexer, string type will be typecast to numeric datatype #import library strinindexer for typecasting    from pyspark.ml.feature import StringIndexer indexer=StringIndexer(inputCol='Cruise_line',outputCol='cruise_cat') indexed=indexer.fit(df).transform(df)    #above code will convert string to numeric feature and create a new dataframe #new dataframe contains a new feature 'cruise_cat' and can be used further #feature cruise_cat is now vectorized and can be used to fed to model for item in indexed.head(5):     print(item)     print('\n')

Output :

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, 
Tonnage=30.276999999999997, passengers=6.94, length=5.94, 
cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)
 
Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, 
Tonnage=30.276999999999997, passengers=6.94, length=5.94, 
cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)
 
Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, 
Tonnage=47.262, passengers=14.86, length=7.22, 
cabins=7.43, passenger_density=31.8, crew=6.7, cruise_cat=1.0)
 
Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, 
Tonnage=110.0, passengers=29.74, length=9.53,
 cabins=14.88, passenger_density=36.99, crew=19.1, cruise_cat=1.0)
 
Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, 
Tonnage=101.353, passengers=26.42, length=8.92, 
cabins=13.21, passenger_density=38.36, crew=10.0, cruise_cat=1.0)
from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler #creating vectors from features #Apache MLlib takes input if vector form assembler=VectorAssembler(inputCols=['Age',  'Tonnage',  'passengers',  'length',  'cabins',  'passenger_density',  'cruise_cat'],outputCol='features') output=assembler.transform(indexed) output.select('features','crew').show(5) #output as below

Output :

#final data consist of features and label which is crew. final_data=output.select('features','crew') #splitting data into train and test train_data,test_data=final_data.randomSplit([0.7,0.3]) train_data.describe().show()

Output :

test_data.describe().show()

Output :

#import LinearRegression library from pyspark.ml.regression import LinearRegression #creating an object of class LinearRegression #object takes features and label as input arguments ship_lr=LinearRegression(featuresCol='features',labelCol='crew') #pass train_data to train model trained_ship_model=ship_lr.fit(train_data) #evaluating model trained for Rsquared error ship_results=trained_ship_model.evaluate(train_data)    print('Rsquared Error :',ship_results.r2) #R2 value shows accuracy of model is 92% #model accuracy is very good and can be use for predictive analysis

Output :

#testing Model on unlabeled data #create unlabeled data from test_data #testing model on unlabeled data unlabeled_data=test_data.select('features') unlabeled_data.show(5)

Output :

predictions=trained_ship_model.transform(unlabeled_data) predictions.show() #below are the results of output from test data

Output :

Pyspark Linear regression with Advanced Feature Dataset using Apache MLlib

Ames Housing Data: The Ames Housing dataset was compiled by Dean De Cock for use in data science education and expanded version of the often-cited Boston Housing dataset. The dataset provided has 80 features and 1459 instances.


Dataset description is as below:


For demo few columns are displayed but there are a lot more columns are there in the dataset.

Examples:
Input Attached Dataset: Ames_housing_dataset

Code :

# SparkSession is now the entry point of Spark # SparkSession can also be construed as gateway to spark libraries import pyspark from pyspark.sql import SparkSession # create instance of spark class spark = SparkSession.builder.appName('ames_housing_price_model').getOrCreate() df_train = spark.read.csv(r'D:\python coding\pyspark_tutorial\Linear regression'                         '\housing price multiple features'                         '\house-prices-advanced-regression-techniques'                         '\train.csv', inferSchema = True, header = True)

Code :

# identifying the columns having less meaningful data on the basis of datatypes l_int =[] for item in df_train.dtypes:     if item[1]=='int':         l_int.append(item[0]) print(l_int)     l_str =[] for item in df_train.dtypes:     if item[1]=='string':         l_str.append(item[0]) print(l_str)

Output

Integer Datatypes:
['Id', 'MSSubClass', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 
'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 
'1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath',
 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 
'TotRmsAbvGrd', 'Fireplaces', 'GarageCars', 'GarageArea', 'WoodDeckSF', 
'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 
'MiscVal', 'MoSold', 'YrSold', 'SalePrice']
 
String Datatypes:
['MSZoning', 'LotFrontage', 'Street', 'Alley', 'LotShape', 'LandContour',
 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 
'BldgType', 'HouseStyle', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 
'MasVnrType', 'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 
'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC',
 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType',
 'GarageYrBlt', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence',
 'MiscFeature', 'SaleType', 'SaleCondition']


Code :

# identifying integer column records having less meaningful data # identifying integer column records having less meaningful data for i in df_train.columns:     if i in l_int:         a ='df_train'+'.'+i         ct_total = df_train.select(i).count()         ct_zeros = df_train.filter((col(i)== 0)).count()     per_zeros =(ct_zeros / ct_total)*100     print('total count / zeros count '           +i+' '+str(ct_total)+' / '+str(ct_zeros)+' / '+str(per_zeros))

Output of zeros percentage:

total count/zeros count/zeros_percent OpenPorchSF 1460 / 656 / 44.93150684931507
total count/zeros count/zeros_percent EnclosedPorch 1460 / 1252 / 85.75342465753425
total count/zeros count/zeros_percent 3SsnPorch 1460 / 1436 / 98.35616438356163
total count/zeros count/zeros_percent ScreenPorch 1460 / 1344 / 92.05479452054794
total count/zeros count/zeros_percent PoolArea 1460 / 1453 / 99.52054794520548
total count/zeros count/zeros_percent PoolQC 1460 / 1453 / 99.52054794520548
total count/zeros count/zeros_percent Fence 1460 / 1453 / 99.52054794520548
total count/zeros count/zeros_percent MiscFeature 1460 / 1453 / 99.52054794520548
total count/zeros count/zeros_percent MiscVal 1460 / 1408 / 96.43835616438356
total count/zeros count/zeros_percent MoSold 1460 / 0 / 0.0
total count/zeros count/zeros_percent YrSold 1460 / 0 / 0.0

Code :

# above calculation gives us an insight about the useful features # now drop the columns having zeros or NA % more than 75 %     df_new = df_train.drop(*['BsmtFinSF2', 'LowQualFinSF', 'BsmtHalfBath',                        'EnclosedPorch', '3SsnPorch', 'ScreenPorch',                        'PoolArea', 'PoolQC', 'Fence', 'MiscFeature',                        'MiscVal', 'Alley']) df_new = df_new.drop(*['Id']) # now we have the clean data to work

Code :

# converting string to numeric feature from pyspark.ml.feature import StringIndexer from pyspark.ml import Pipeline feat_list =['MSZoning', 'LotFrontage', 'Street', 'LotShape', 'LandContour',            'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1',            'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle',            'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType',            'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation',            'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2',            'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual',            'Functional', 'FireplaceQu', 'GarageType',                  'GarageYrBlt', 'GarageFinish', 'GarageQual', 'GarageCond',            'PavedDrive', 'SaleType', 'SaleCondition'] print('indexed list created')    # there are multiple features to work # using pipeline we can convert multiple features to indexers indexers = [StringIndexer(inputCol = column, outputCol = column+"_index").fit(df_new) for column in feat_list] type(indexers) # Combines a given list of columns into a single vector column. # input_cols: Columns to be assembled. # returns Dataframe with assembled column.    pipeline = Pipeline(stages = indexers) df_feat = pipeline.fit(df_new).transform(df_new) df_feat.columns # using above code we have converted list of features into indexes    from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler    # we will convert below columns into features to work with assembler = VectorAssembler(inputCols =['MSSubClass', 'LotArea', 'OverallQual',                                      'OverallCond', 'YearBuilt', 'YearRemodAdd',                                      'BsmtFinSF1', 'BsmtUnfSF', 'TotalBsmtSF',                                      '1stFlrSF', '2ndFlrSF', 'GrLivArea',                                       'BsmtFullBath', 'FullBath', 'HalfBath',                                      'GarageArea', 'MoSold', 'YrSold',                                       'MSZoning_index', 'LotFrontage_index',                                      'Street_index', 'LotShape_index',                                       'LandContour_index', 'Utilities_index',                                       'LotConfig_index', 'LandSlope_index',                                      'Neighborhood_index', 'Condition1_index',                                      'Condition2_index', 'BldgType_index',                                      'HouseStyle_index', 'RoofStyle_index',                                       'RoofMatl_index', 'Exterior1st_index',                                       'Exterior2nd_index', 'MasVnrType_index',                                      'MasVnrArea_index', 'ExterQual_index',                                       'ExterCond_index', 'Foundation_index',                                      'BsmtQual_index', 'BsmtCond_index',                                       'BsmtExposure_index', 'BsmtFinType1_index',                                       'BsmtFinType2_index', 'Heating_index',                                      'HeatingQC_index', 'CentralAir_index',                                      'Electrical_index', 'KitchenQual_index',                                      'Functional_index', 'FireplaceQu_index',                                      'GarageType_index', 'GarageYrBlt_index',                                      'GarageFinish_index', 'GarageQual_index',                                      'GarageCond_index', 'PavedDrive_index',                                       'SaleType_index', 'SaleCondition_index'],                                       outputCol ='features') output = assembler.transform(df_feat)    final_data = output.select('features', 'SalePrice')    # splitting data for test and validation train_data, test_data = final_data.randomSplit([0.7, 0.3])

Code :

train_data.describe().show()
test_data.describe().show()

Code :

from pyspark.ml.regression import LinearRegression house_lr = LinearRegression(featuresCol ='features', labelCol ='SalePrice') trained_house_model = house_lr.fit(train_data) house_results = trained_house_model.evaluate(train_data) print('Rsquared Error :', house_results.r2)    # Rsquared Error : 0.8279155904297449 # model accuracy is 82 % with train data    # evaluate model on test_data test_results = trained_house_model.evaluate(test_data) print('Rsquared error :', test_results.r2)    # Rsquared error : 0.8431420382408793 # result is quiet better with 84 % accuracy    # create unlabelled data from test_data # test_data.show() unlabeled_data = test_data.select('features') unlabeled_data.show()

Code :

predictions = trained_house_model.transform(unlabeled_data) predictions.show()