top of page
  • Writer's pictureHackers Realm

Mastering Machine Learning with PySpark | Loan Prediction | Python

The goal of this project tutorial is to demonstrate the use of PySpark and Machine Learning to predict loan approvals. The project will involve the following steps:

  • Data Collection: Collecting data related to loan approvals, including features such as income, credit score, loan amount, and loan status.

  • Data Preparation: Preprocessing and cleaning up the data to ensure it is ready for use in Machine Learning algorithms.

  • Feature Engineering: Selecting and engineering relevant features to improve the accuracy of the model.

  • Model Training: Using PySpark to train a Machine Learning model on the prepared data.

  • Model Evaluation: Testing the trained model on a validation set to evaluate its accuracy and adjust the parameters if necessary.

  • Loan Prediction: Using the trained model to predict whether a loan will be approved or not based on input features.

Mastering Machine Learning with PySpark
Mastering Machine Learning with PySpark

The project will require knowledge of PySpark, Machine Learning, and Python programming. The end result will be a Machine Learning model that can accurately predict loan approvals based on various features. This can be used by financial institutions or individuals to make more informed decisions regarding loans. Let us deep dive into the above mentioned steps


You can watch the video-based tutorial with step by step explanation down below.

Dataset Information


Dream Housing Finance company deals in all home loans. They have presence across all urban, semi urban and rural areas. Customer first apply for home loan after that company validates the customer eligibility for loan. Company wants to automate the loan eligibility process (real time) based on customer detail provided while filling online application form. These details are Gender, Marital Status, Education, Number of Dependents, Income, Loan Amount, Credit History and others. To automate this process, they have given a problem to identify the customers segments, those are eligible for loan amount so that they can specifically target these customers.


This is a standard supervised classification task. A classification problem where we have to predict whether a loan would be approved or not. Below is the dataset attributes with description

Dataset information of Loan Sanction
Dataset information

Download the Dataset here


Installing PySpark


Follow the below steps to install PySpark

  1. Install Java

  2. Install Apache spark with Hadoop

  3. Set the environment variables

  4. Next install pyspark using the command

!pip install pyspark

Import Modules

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import pandas as pd
import warnings
warnings.filterwarnings('ignore')
  • pyspark - It is a Python library and interface for Apache Spark, which is an open-source distributed computing system. PySpark allows you to write Spark applications using Python, leveraging the power and scalability of Spark for big data processing and analytics

  • pyspark.sql - provides a programming interface for working with structured data, such as structured data files, tables, and relational databases. It offers a high-level API for querying and manipulating structured data using SQL-like expressions and DataFrame operations

  • pyspark.sql.functions - provides a collection of built-in functions that can be used for data manipulation, transformation, and aggregation on DataFrame columns

  • pandas - used to perform data manipulation and analysis

  • warnings - to manipulate warnings details filterwarnings('ignore') is to ignore the warnings thrown by the modules (gives clean results)


Next we will initialize the spark session

# initialize the session
spark = SparkSession.builder.appName('loan_prediction').getOrCreate()
  • SparkSession is the entry point for any Spark functionality. It provides a way to interact with various Spark features, such as DataFrame and SQL operations, streaming, machine learning, and more

  • The builder pattern is used to create and configure the SparkSession. The builder method returns a Builder object that allows you to set various configuration options for the SparkSession

  • The appName method sets the name of the Spark application. In this case, it is set to "loan_prediction"

  • The getOrCreate method tries to reuse an existing SparkSession or creates a new one if it doesn't exist


Load the Dataset


Next we will load the dataset

df = spark.read.csv('Loan Prediction Dataset.csv', header=True, sep=',', inferSchema=True)
df.show(5)
  • spark.read.csv() function is used to read data from a CSV file and create a DataFrame

  • header=True option indicates that the first row of the CSV file contains the header or column names. By setting this option to True, the column names will be inferred from the header row

  • sep=',' option specifies the delimiter used in the CSV file. In this case, the delimiter is set to a comma (','), which is a common delimiter in CSV files

  • inferSchema=True option instructs Spark to automatically infer the data types for each column in the DataFrame based on the contents of the CSV file

Loan Prediction Data frame
Loan Prediction DataFrame
  • This is the first 5 rows of the DataFrame


Next let us see the schema of DataFrame

df.printSchema()
Schema Information of Loan Sanction Data
Schema Information


Next let us see the data types of columns

df.dtypes
loan prediction data with column datatype


Next let us convert spark DataFrame to pandas DataFrame

# convert spark dataframe to pandas
pandas_df = df.toPandas()
pandas_df.head()
  • The toPandas() method is used to convert a PySpark DataFrame to a Pandas DataFrame. It collects the data from the distributed Spark DataFrame and brings it into a single machine's memory as a Pandas DataFrame

  • The resulting Pandas DataFrame pandas_df contains the same data as the original PySpark DataFrame df, but it is now in a Pandas format, allowing you to use Pandas-specific operations and libraries for further analysis and processing

Pandas Dataframe Loan Sanction Dataset
Pandas Dataframe


Data Analysis


First let us display count based on loan status

# display count based on loan status
df.groupBy('Loan_Status').count().show()
  • df.groupBy() groups the DataFrame df by the 'Loan_Status' column. It creates a GroupedData object that allows you to perform aggregation operations on the grouped data

  • count() function is an aggregation function that calculates the number of non-null values in each group. In this case, it counts the occurrences of each unique value in the 'Loan_Status' column within each group

  • show() function is used to display the result of the aggregation operation

Distribution of Class Labels
Distribution of Class Labels
  • We can see that the count of status Y is 422 and status N is 192


Next let us perform grouping and aggregation operation on the DataFrame

df.select("Credit_History", "Loan_Status").groupBy('Loan_Status').agg(F.avg('Credit_History')).show()
  • The above code snippet selects the columns "Credit_History" and "Loan_Status" from the DataFrame, groups the data by the "Loan_Status" column, and then calculates the average of the "Credit_History" column for each group

Average Credit History for Loan Status

Next we will perform grouping and counting operation on the DataFrame

df.select('Gender', 'Loan_Status').groupBy('Loan_Status', 'Gender').count().show()
  • It selects the columns 'Gender' and 'Loan_Status' from the DataFrame, groups the data by both 'Loan_Status' and 'Gender', and then counts the number of occurrences for each combination

Distribution of Gender and Loan Status

Correlation Matrix


Next we will create a correlation matrix

columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']
corr_df = pd.DataFrame()
for i in columns:
    corr = []
    for j in columns:
        corr.append(round(df.stat.corr(i, j), 2))
    corr_df = pd.concat([corr_df, pd.Series(corr)], axis=1)
corr_df.columns = columns
corr_df.insert(0, '', columns)
corr_df.set_index('')
  • We will calculate the correlation matrix for the columns 'ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History' in a DataFrame using the Pandas library

  • The correlation between each pair of columns is calculated using the corr() method on the corresponding columns in the DataFrame . The resulting correlation coefficients are rounded to two decimal places and stored in the corr_df DataFrame. The column names and index of corr_df are set

Correlation Matrix using pyspark
Correlation Matrix

Perform SQL Operations


Next let us perform analysis using SQL operations

import pyspark.sql as sparksql
  • First we will import sparksql from pyspark


Next we will create a temporary view named "table" from the DataFrame df in Apache Spark. A temporary view allows you to query the DataFrame using SQL-like syntax

df.createOrReplaceTempView('table')

Next we will display the top rows from the table

# display top rows from the table
spark.sql("select * from table limit 5").show()
Display top 5 rows using SQL
Display top 5 rows using SQL

Next we will execute a sql query on the temporary view created

spark.sql('select Loan_ID from table where Credit_History=1').show()
  • This will execute the SQL query on the "table" temporary view and display the result, which includes the Loan_ID values for rows where the Credit_History column equals 1

displaying top 20 rows loan id with credit history

Data Preprocessing


First we will display the null values in the columns

# display null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
  • This uses the select function in Apache Spark to count the number of null values for each column in the DataFrame df and display the result

Count of null values in the columns
Count of null values in the columns


Next we will calculate the mean value of the column

# get mean value of column
mean = df.select(F.mean(df['LoanAmount'])).collect()[0][0]
mean
  • This calculates the mean value of the 'LoanAmount' column in the DataFrame df using the mean() function from the pyspark.sql.functions module

146.41216216216216

  • This is the mean value of the 'LoanAmount' column


Next we will fill the null values

# fill null value
df = df.na.fill(mean, ['LoanAmount'])
  • This fills the null values in the 'LoanAmount' column of the DataFrame df with the mean value


Next we will get mode value of a column

# get mode value of column
df.groupby('Gender').count().orderBy("count", ascending=False).first()[0]

'Male'

  • This performs a group by operation on the 'Gender' column of the DataFrame df, counts the number of occurrences for each gender, orders the result in descending order based on the count, and retrieves the first row. Finally, it returns the value in the first column of the first row, which corresponds to the gender with the highest count


Next we will fill the null values for all the columns

# fill null values for all the columns
numerical_cols = ['LoanAmount', 'Loan_Amount_Term']
categorical_cols = ['Gender', 'Married', 'Dependents', 'Self_Employed', 'Credit_History']
  • First we will create a list of numerical columns and categorical columns


Next let us first fill the null values for numerical columns

for col in numerical_cols:
    mean = df.select(F.mean(df[col])).collect()[0][0]
    df = df.na.fill(mean, [col])
  • This fills the null values in multiple numerical columns of the DataFrame df with their respective mean values. It iterates over each column name in the list numerical_cols, calculates the mean value for each column, and then fills the null values in that column with the corresponding mean value


Next let us fill the null values for categorical columns

for col in categorical_cols:
    mode = df.groupby(col).count().orderBy("count", ascending=False).first()[0]
    df = df.na.fill(mode, [col])
  • This fills the null values in multiple categorical columns of the DataFrame df with their respective mode values. It iterates over each column name in the list categorical_cols, calculates the mode value for each column, and then fills the null values in that column with the corresponding mode value


Now let us display the count of null values in the respective columns

# display null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
Count of null values after preprocessing
Count of null values after preprocessing
  • We can see that there is no null values in any columns



Next let us create new column

# create new feature column
df = df.withColumn('TotalIncome', F.col('ApplicantIncome') + F.col('CoapplicantIncome'))
df.show(2)
  • The withColumn() function is used to add a new column named 'TotalIncome' to the DataFrame df.

  • The new column is created by performing the addition operation F.col('ApplicantIncome') + F.col('CoapplicantIncome'), where F.col() is used to reference the corresponding columns

Dataframe with new feature column
Dataframe with new column
  • We can see that a new column TotalIncome is added with corresponding values


Let us see how we can find and replace the values

# how to find and replace values
df = df.withColumn('Loan_Status', F.when(df['Loan_Status']=='Y', 1).otherwise(0))
df.show(2)
  • The withColumn() function is used to add a new column named 'Loan_Status' to the DataFrame df.

  • The new column is created using the F.when() function, which checks the condition df['Loan_Status'] == 'Y'. If the condition is true, it assigns the value 1 to the 'Loan_Status' column; otherwise, it assigns the value 0 using the otherwise() function

loan prediction dataframe
  • We can see that Loan_Status column values are replaced with 1 and 0


Feature Engineering


Feature engineering is the process of transforming raw data into meaningful features that can improve the performance of a machine learning model. It involves creating new features or modifying existing features to capture patterns, relationships, or important information in the data that may be useful for the model's predictive task


First let us print the schema of the dataframe

df.printSchema()
Schema of the dataframe
Schema of the dataframe

Next let us import required modules

from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
  • pyspark.ml.feature - provides various feature engineering techniques and transformations for machine learning tasks

  • pyspark.ml - provides a high-level API for building machine learning pipelines. It offers a set of tools and algorithms for various stages of the machine learning workflow, including feature extraction, transformation, model training, and evaluation


Next we will perform feature engineering workflow using pyspark.ml to handle categorical and numerical columns

categorical_columns = ['Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Property_Area', 'Credit_History']
numerical_columns = ['ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'TotalIncome']

# index the string columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_columns]

# encode the indexed values
encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol()))
           for indexer in indexers]

input_columns = [encoder.getOutputCol() for encoder in encoders] + numerical_columns

# vectorize the encoded values
assembler = VectorAssembler(inputCols=input_columns, outputCol="feature")
  • First we will create a list of categorical columns and a list of numerical columns that you want to process

  • The categorical columns are indexed using StringIndexer. Each categorical column is transformed into a numerical column with unique indices. A separate StringIndexer is created for each categorical column, and the output column names are suffixed with _index

  • Next, the indexed values are encoded using OneHotEncoder. Each indexed column is transformed into a binary vector representation, where each category is represented by a separate binary column. Again, a separate OneHotEncoder is created for each indexed column, and the output column names are suffixed with _encoded

  • The encoded values and the numerical columns are combined into a single list , which will serve as input for the subsequent steps.

  • Finally, the combined values are vectorized using VectorAssembler. The VectorAssembler takes the list of input columns and combines them into a single feature vector column named "features"


Next we will create a pipeline to transform data

# create the pipeline to transform the data
pipeline = Pipeline(stages = indexers + encoders + [assembler])
  • The Pipeline object is created with the specified stages, which include the indexers, encoders, and assembler steps. The stages are added to the pipeline using the + operator, and [assembler] is wrapped in a list to ensure it is passed as a single element

  • The Pipeline object allows you to chain together multiple stages into a single workflow, where each stage represents a transformation or modeling step. This enables you to streamline the feature engineering process and apply it consistently to different datasets


Next we will fit the pipeline into a dataframe

data_model = pipeline.fit(df)
  • This fits the defined Pipeline object (pipeline) to the DataFrame df.

  • The fit() method applies each stage of the pipeline to the data sequentially, starting from the first stage and continuing through the subsequent stages

Next we will transform the data

transformed_df = data_model.transform(df)
  • This applies the fitted PipelineModel (data_model) to the DataFrame df to perform the feature engineering transformations defined in the pipeline. The result is a new DataFrame transformed_df that contains the original columns from df along with the engineered features

  • The transform() method of a PipelineModel applies each transformation stage in the pipeline to the input DataFrame, starting from the first stage and sequentially passing the transformed output of each stage as input to the next stage


Next we will display the transformed dataframe

transformed_df.show(1)
Transformed Dataframe
Transformed Dataframe

Next let us get the input and output columns

# get input feature and output columns
transformed_df = transformed_df.select(['feature', 'Loan_Status'])
  • This selects the specified columns "feature" and "Loan_Status" from the DataFrame transformed_df and returns a new DataFrame that contains only those columns


Next let us split the train and test data

# split the data for train and test
train_data, test_data = transformed_df.randomSplit([0.8, 0.2], seed=42)
  • The randomSplit() method is called on the DataFrame transformed_df and takes two arguments

  • The first argument is a list [0.8, 0.2], which specifies the relative weights or proportions for splitting the data. In this case, it indicates that 80% of the data will be assigned to train_data, and 20% of the data will be assigned to test_data.

  • The second argument is the seed parameter, which sets a seed value for the random number generator. This ensures that the data is split in a consistent manner when the code is executed multiple times


Next let us display the train data

train_data.show(5)
Loan Prediction Train Data
Train Data

Model Training & Testing


First let us import the required modules

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
  • pyspark.ml.classification - provides classes and algorithms for performing classification tasks using machine learning. It includes various classification algorithms and evaluation metrics to train and evaluate classification models

  • pyspark.ml.evaluation - provides classes for evaluating the performance of machine learning models. It includes evaluation metrics for both binary and multiclass classification, as well as regression tasks


Next let us train logistic regression model

lr = LogisticRegression(featuresCol='feature', labelCol='Loan_Status')
lr_model = lr.fit(train_data)
  • The LogisticRegression class is used for binary classification tasks and implements logistic regression, which is a popular algorithm for modeling the probability of a binary outcome

  • The arguments featuresCol and labelCol specify the names of the columns in the DataFrame that contain the features and the target variable, respectively

  • After creating the LogisticRegression instance, the code fits the logistic regression model to the training data . The fit() method is called on the LogisticRegression object, and it trains the model using the features and target variable from the specified DataFrame

  • The resulting lr_model is an instance of the LogisticRegressionModel class, which represents the trained logistic regression model. This model can be used to make predictions on new data or evaluate its performance


Next we will make predictions on new data

# predict on test data
predictions = lr_model.transform(test_data)
predictions.show(5)
  • The transform() method of the logistic regression model takes the test data as input and produces a new DataFrame predictions that includes the original columns from test_data along with additional columns, including the "prediction" column

Prediction on test data
Prediction on test data

Next we will create a metric for LogisticRegressionModel

predictions = lr_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions)))
  • We will apply the logistic regression model (lr_model) to the test data to generate predictions. The resulting DataFrame predictions contains the original columns from test_data along with additional columns, including the "prediction" column that contains the predicted class labels

  • The code BinaryClassificationEvaluator().setLabelCol('Loan_Status') creates an instance of the BinaryClassificationEvaluator and sets the label column to 'Loan_Status'. The BinaryClassificationEvaluator is used to evaluate the performance of binary classification models, such as logistic regression, by computing evaluation metrics like AUC-ROC

AUC metric for LogisticRegressionModel
AUC metric for LogisticRegressionModel
  • We got around 78% accuracy using LogisticRegressionModel


Next let us train the RandomForestClassifier model

rf = RandomForestClassifier(featuresCol='feature', labelCol='Loan_Status')
rf_model = rf.fit(train_data)
  • This creates an instance of the RandomForestClassifier in PySpark. The RandomForestClassifier is an ensemble classifier that fits multiple decision trees on different sub-samples of the dataset and averages the predictions to make the final classification

  • The arguments featuresCol and labelCol specify the names of the columns in the DataFrame that contain the features and the target variable, respectively

  • After creating the RandomForestClassifier instance, the code rf_model = rf.fit(train_data) fits the random forest model to the training data. The fit() method is called on the RandomForestClassifier object, and it trains the model using the features and target variable from the specified DataFrame


Next let us see the metric for RandomForestClassifier model

predictions = rf_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(predictions)))
  • The code applies the trained random forest model (rf_model) to the test data (test_data) to make predictions. The transform() method of the random forest model takes the test data as input and produces a new DataFrame predictions that includes the original columns from test_data along with additional columns, including the "prediction" column

  • The BinaryClassificationEvaluator is used to evaluate the performance of binary classification models, such as random forest, by computing evaluation metrics like AUC-ROC

  • The evaluate() method computes the area under the ROC curve (AUC-ROC) based on the "prediction" column and the specified label column ('Loan_Status' in this case). The AUC value represents the performance of the random forest model in distinguishing between the two classes

AUC metric for RandomForestClassifier
AUC metric for RandomForestClassifier
  • We got around 80% accuracy using RandomForestClassifier


Final Thoughts

  • PySpark provides a powerful and scalable framework for performing machine learning tasks

  • With PySpark's MLlib library, you can build and train machine learning models on large-scale datasets using distributed computing capabilities

  • Overall, PySpark simplifies the process of building and deploying machine learning models on large datasets by leveraging distributed computing capabilities. It allows you to harness the power of Apache Spark for efficient data processing and modeling, making it a valuable tool for big data analytics and machine learning tasks

In this project tutorial we have seen how we can create a Machine Learning model that can accurately predict loan approvals based on various features and also explored Logistic Regression and Random Forest algorithms. This project can also be extended by exploring different Machine Learning algorithms and improving the accuracy of the model

Get the project notebook from here


Thanks for reading the article!!!


Check out more project videos from the YouTube channel Hackers Realm

Comments


bottom of page