Skip to content

Instantly share code, notes, and snippets.

@tensorvijay
Last active September 23, 2023 16:33
Show Gist options
  • Select an option

  • Save tensorvijay/1f5b860c9bd08548f7dd17de04494278 to your computer and use it in GitHub Desktop.

Select an option

Save tensorvijay/1f5b860c9bd08548f7dd17de04494278 to your computer and use it in GitHub Desktop.
PySpark_Regression_Analysis
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "PySpark_Regression_Analysis",
"provenance": [],
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/tensorvijay/1f5b860c9bd08548f7dd17de04494278/pyspark_regression_analysis.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"metadata": {
"id": "sq8U3BtmhtRx"
},
"cell_type": "markdown",
"source": [
"\n",
"# **Running Pyspark in Colab**\n",
"\n",
"To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python.\n",
"Follow the steps to install the dependencies:"
]
},
{
"metadata": {
"id": "lh5NCoc8fsSO"
},
"cell_type": "code",
"source": [
"!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n",
"!wget -q https://www-us.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz\n",
"!tar xf spark-2.4.1-bin-hadoop2.7.tgz\n",
"!pip install -q findspark"
],
"execution_count": null,
"outputs": []
},
{
"metadata": {
"id": "ILheUROOhprv"
},
"cell_type": "markdown",
"source": [
"Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:"
]
},
{
"metadata": {
"id": "v1b8k_OVf2QF"
},
"cell_type": "code",
"source": [
"import os\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-2.3.2-bin-hadoop2.7\""
],
"execution_count": null,
"outputs": []
},
{
"metadata": {
"id": "KwrqMk3HiMiE"
},
"cell_type": "markdown",
"source": [
"Run a local spark session to test your installation:"
]
},
{
"metadata": {
"id": "9_Uz1NL4gHFx"
},
"cell_type": "code",
"source": [
"import findspark\n",
"findspark.init()\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.master(\"local[*]\").getOrCreate()"
],
"execution_count": null,
"outputs": []
},
{
"metadata": {
"id": "JEb4HTRwiaJx"
},
"cell_type": "markdown",
"source": [
"Congrats! Your Colab is ready to run Pyspark. Let's build a simple Linear Regression model.\n",
"\n",
"# Linear Regression Model\n",
"\n",
"\n",
"Linear Regression model is one the oldest and widely used machine learning approach which assumes a relationship between dependent and independent variables. For example, a modeler might want to predict the forecast of the rain based on the humidity ratio. Linear Regression consists of the best fitting line through the scattered points on the graph and the best fitting line is known as the regression line.\n",
"\n",
"The goal of this exercise to predict the housing prices by the given features. Let's predict the prices of the Boston Housing dataset by considering MEDV as the output variable and all the other variables as input.\n",
"\n",
"Download the dataset from [here](https://github.com/asifahmed90/pyspark-ML-in-Colab/blob/master/BostonHousing.csv) and keep it somewhere on your computer. Load the dataset into your Colab directory from your local system:"
]
},
{
"metadata": {
"id": "PAISFqHXf7dt"
},
"cell_type": "code",
"source": [
"from google.colab import files\n",
"files.upload()"
],
"execution_count": null,
"outputs": []
},
{
"metadata": {
"id": "LNsM_jHqrjBg"
},
"cell_type": "markdown",
"source": [
"Check the dataset is uploaded correctly in the system by the following command"
]
},
{
"metadata": {
"id": "m606eNuQgA82",
"outputId": "8559ee1e-b25f-47b3-a25a-da5a9b5ded4c",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 50
}
},
"cell_type": "code",
"source": [
"!ls"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"BostonHousing.csv spark-2.3.2-bin-hadoop2.7\n",
"sample_data\t spark-2.3.2-bin-hadoop2.7.tgz\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "21D9EANUvnwF"
},
"cell_type": "markdown",
"source": [
"Now that we have uploaded the dataset, we can start analyzing.\n",
"For our linear regression model we need to import two modules from Pyspark i.e. Vector Assembler and Linear Regression. Vector Assembler is a transformer that assembles all the features into one vector from multiple columns that contain type double. We could have used StringIndexer if any of our columns contains string values to convert it into numeric values. Luckily, the BostonHousing dataset only contains double values, so we don't need to worry about StringIndexer for now."
]
},
{
"metadata": {
"id": "0ZeJ7WQCgM8g"
},
"cell_type": "code",
"source": [
"from pyspark.ml.feature import VectorAssembler\n",
"from pyspark.ml.regression import LinearRegression\n",
"\n",
"dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)"
],
"execution_count": null,
"outputs": []
},
{
"metadata": {
"id": "UJLoAfqVv8-E"
},
"cell_type": "markdown",
"source": [
"Notice that we used InferSchema inside read.csv mofule. InferSchema enables us to infer automatically different data types for each column.\n",
"\n",
"Let us print look into the dataset to see the data types of each column:"
]
},
{
"metadata": {
"id": "Gok1FXWugYkE",
"outputId": "b1262434-2849-4b9a-b866-5b16f3ad8e44",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 286
}
},
"cell_type": "code",
"source": [
"dataset.printSchema()"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"root\n",
" |-- crim: double (nullable = true)\n",
" |-- zn: double (nullable = true)\n",
" |-- indus: double (nullable = true)\n",
" |-- chas: integer (nullable = true)\n",
" |-- nox: double (nullable = true)\n",
" |-- rm: double (nullable = true)\n",
" |-- age: double (nullable = true)\n",
" |-- dis: double (nullable = true)\n",
" |-- rad: integer (nullable = true)\n",
" |-- tax: integer (nullable = true)\n",
" |-- ptratio: double (nullable = true)\n",
" |-- b: double (nullable = true)\n",
" |-- lstat: double (nullable = true)\n",
" |-- medv: double (nullable = true)\n",
"\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "3L9VJqsHwEGf"
},
"cell_type": "markdown",
"source": [
"Next step is to convert all the features from different columns into a single column and let's call this new vector column as 'Attributes' in the outputCol."
]
},
{
"metadata": {
"id": "sKSqdT9QgkfD",
"outputId": "2c0f6417-6710-4bf1-d23f-41677499ce38",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 454
}
},
"cell_type": "code",
"source": [
"#Input all the features in one vector column\n",
"assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')\n",
"\n",
"output = assembler.transform(dataset)\n",
"\n",
"#Input vs Output\n",
"finalized_data = output.select(\"Attributes\",\"medv\")\n",
"\n",
"finalized_data.show()"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"+--------------------+----+\n",
"| Attributes|medv|\n",
"+--------------------+----+\n",
"|[0.00632,18.0,2.3...|24.0|\n",
"|[0.02731,0.0,7.07...|21.6|\n",
"|[0.02729,0.0,7.07...|34.7|\n",
"|[0.03237,0.0,2.18...|33.4|\n",
"|[0.06905,0.0,2.18...|36.2|\n",
"|[0.02985,0.0,2.18...|28.7|\n",
"|[0.08829,12.5,7.8...|22.9|\n",
"|[0.14455,12.5,7.8...|27.1|\n",
"|[0.21124,12.5,7.8...|16.5|\n",
"|[0.17004,12.5,7.8...|18.9|\n",
"|[0.22489,12.5,7.8...|15.0|\n",
"|[0.11747,12.5,7.8...|18.9|\n",
"|[0.09378,12.5,7.8...|21.7|\n",
"|[0.62976,0.0,8.14...|20.4|\n",
"|[0.63796,0.0,8.14...|18.2|\n",
"|[0.62739,0.0,8.14...|19.9|\n",
"|[1.05393,0.0,8.14...|23.1|\n",
"|[0.7842,0.0,8.14,...|17.5|\n",
"|[0.80271,0.0,8.14...|20.2|\n",
"|[0.7258,0.0,8.14,...|18.2|\n",
"+--------------------+----+\n",
"only showing top 20 rows\n",
"\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "dNgFCto2wHLd"
},
"cell_type": "markdown",
"source": [
"Here, 'Attributes' are in the input features from all the columns and 'medv' is the target column.\n",
"Next, we should split the training and testing data according to our dataset (0.8 and 0.2 in this case)."
]
},
{
"metadata": {
"id": "kwe1VT0UNOIN",
"outputId": "956613f7-a5a5-4748-cc75-4ab5b1feb015",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 454
}
},
"cell_type": "code",
"source": [
"#Split training and testing data\n",
"train_data,test_data = finalized_data.randomSplit([0.8,0.2])\n",
"\n",
"\n",
"regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')\n",
"\n",
"#Learn to fit the model from training set\n",
"regressor = regressor.fit(train_data)\n",
"\n",
"#To predict the prices on testing set\n",
"pred = regressor.evaluate(test_data)\n",
"\n",
"#Predict the model\n",
"pred.predictions.show()"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"+--------------------+----+------------------+\n",
"| Attributes|medv| prediction|\n",
"+--------------------+----+------------------+\n",
"|[0.01301,35.0,1.5...|32.7| 30.07670363535312|\n",
"|[0.01538,90.0,3.7...|44.0| 37.75244575519337|\n",
"|[0.01778,95.0,1.4...|32.9|30.596108327253294|\n",
"|[0.0187,85.0,4.15...|23.1|25.717620889129734|\n",
"|[0.01965,80.0,1.7...|20.1|19.992379582220035|\n",
"|[0.02729,0.0,7.07...|34.7|30.425294527192754|\n",
"|[0.03113,0.0,4.39...|17.5|16.330496893793097|\n",
"|[0.03237,0.0,2.18...|33.4|28.578543755284294|\n",
"|[0.03306,0.0,5.19...|20.6| 22.16010760013387|\n",
"|[0.03359,75.0,2.9...|34.9| 34.42265990782376|\n",
"|[0.03537,34.0,6.0...|22.0|28.784081950984906|\n",
"|[0.03584,80.0,3.3...|23.5| 30.77179427151925|\n",
"|[0.03738,0.0,5.19...|20.7| 21.65956978285279|\n",
"|[0.04297,52.5,5.3...|24.8|26.706348196385573|\n",
"|[0.0456,0.0,13.89...|23.3|26.369847201011538|\n",
"|[0.04684,0.0,3.41...|22.6|26.949731074397704|\n",
"|[0.04981,21.0,5.6...|23.4| 23.90871028835852|\n",
"|[0.05372,0.0,13.9...|27.1|27.156639422924407|\n",
"|[0.05425,0.0,4.05...|24.6| 29.54769429196901|\n",
"|[0.06466,70.0,2.2...|22.5|29.459287514682245|\n",
"+--------------------+----+------------------+\n",
"only showing top 20 rows\n",
"\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "Y3vYyp5dwOm_"
},
"cell_type": "markdown",
"source": [
"We can also print the coefficient and intercept of the regression model by using the following command:"
]
},
{
"metadata": {
"id": "Eja1BLiaTThT",
"outputId": "ebaaeadc-ac17-4e59-a494-62810e1e9438",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 50
}
},
"cell_type": "code",
"source": [
"#coefficient of the regression model\n",
"coeff = regressor.coefficients\n",
"\n",
"#X and Y intercept\n",
"intr = regressor.intercept\n",
"\n",
"print (\"The coefficient of the model is : %a\" %coeff)\n",
"print (\"The Intercept of the model is : %f\" %intr)\n"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"The coefficient of the model is : DenseVector([-0.1239, 0.056, 0.0205, 2.7283, -16.8634, 3.218, 0.0163, -1.4331, 0.3657, -0.0134, -0.9328, 0.0096, -0.6229])\n",
"The Intercept of the model is : 39.049826\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "bYORz3Q9wTSW"
},
"cell_type": "markdown",
"source": [
"# Basic Statistical Analysis\n",
"\n",
"Once we are done with the basic linear regression operation, we can go a bit further and analyze our model statistically by importing RegressionEvaluator module from Pyspark."
]
},
{
"metadata": {
"id": "8qrQdEj62ptt",
"outputId": "94ff1f60-0678-4942-a07e-e3173ea50602",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 84
}
},
"cell_type": "code",
"source": [
"from pyspark.ml.evaluation import RegressionEvaluator\n",
"eval = RegressionEvaluator(labelCol=\"medv\", predictionCol=\"prediction\", metricName=\"rmse\")\n",
"\n",
"# Root Mean Square Error\n",
"rmse = eval.evaluate(pred.predictions)\n",
"print(\"RMSE: %.3f\" % rmse)\n",
"\n",
"# Mean Square Error\n",
"mse = eval.evaluate(pred.predictions, {eval.metricName: \"mse\"})\n",
"print(\"MSE: %.3f\" % mse)\n",
"\n",
"# Mean Absolute Error\n",
"mae = eval.evaluate(pred.predictions, {eval.metricName: \"mae\"})\n",
"print(\"MAE: %.3f\" % mae)\n",
"\n",
"# r2 - coefficient of determination\n",
"r2 = eval.evaluate(pred.predictions, {eval.metricName: \"r2\"})\n",
"print(\"r2: %.3f\" %r2)\n",
"\n"
],
"execution_count": null,
"outputs": [
{
"output_type": "stream",
"text": [
"RMSE: 4.703\n",
"MSE: 22.118\n",
"MAE: 3.457\n",
"r2: 0.738\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "jxg6CW4OzrrH"
},
"cell_type": "markdown",
"source": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment