1. Create a SparkContext

Initiate a SparkContext

In [1]:
import pyspark
from pyspark import SparkContext
sc =SparkContext()

Basic Operation

In [2]:
# Create a collection of data called RDD, Resilient Distributed Dataset. 
# Computation in an RDD is automatically parallelized across the cluster.

nums= sc.parallelize([1,2,3,4])
In [3]:
# access the first row with take
nums.take(1) 
Out[3]:
[1]
In [4]:
# apply a transformation to the data with a lambda function

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16 
In [5]:
[num for num in squared]
Out[5]:
[1, 4, 9, 16]

2. SQLContext

In [7]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Step 1 -- Create the list of tuple with the information

In [8]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

Step 2 -- Build a RDD

In [9]:
rdd = sc.parallelize(list_p)

Step 3 -- Convert the tuples

In [10]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Step 4 -- Create a DataFrame context

In [11]:
DF_ppl = sqlContext.createDataFrame(ppl)

If you want to access the type of each feature, you can use printSchema()

In [12]:
DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

3. UseCase - Basic Operation with Spark

In [13]:
from pyspark.sql import SQLContext
from pyspark import SparkFiles
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc.addFile(url)
sqlContext = SQLContext(sc)

Can read the csv file with sqlContext.read.csv

You use inferSchema set to True to tell Spark to guess automatically the type of data. If you didn't set inferSchema to True, there are all in string.

In [14]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= False)
In [15]:
df.printSchema()
root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

In [16]:
df.show(5, truncate = False)
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv  |Husband     |White|Male  |0           |0           |40            |United-States |>50K  |
|4  |44 |Private  |160323|Some-college|10             |Married-civ-spouse|Machine-op-inspct|Husband     |Black|Male  |7688        |0           |40            |United-States |>50K  |
|5  |18 |?        |103497|Some-college|10             |Never-married     |?                |Own-child   |White|Female|0           |0           |30            |United-States |<=50K |
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
only showing top 5 rows

In [17]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
In [18]:
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']

# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())

# Check the dataset
df.printSchema()
root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

Select columns

In [19]:
df.select('age','fnlwgt').show(5)
+----+--------+
| age|  fnlwgt|
+----+--------+
|25.0|226802.0|
|38.0| 89814.0|
|28.0|336951.0|
|44.0|160323.0|
|18.0|103497.0|
+----+--------+
only showing top 5 rows

Count by group

In [20]:
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+

Describe the data

In [ ]:
# Get a summary statistics, of the data, use describe() : 
### - count
### - mean
### - standarddeviation
### - min
### - max
In [22]:
df.describe('age','fnlwgt').show()
+-------+------------------+------------------+
|summary|               age|            fnlwgt|
+-------+------------------+------------------+
|  count|             48842|             48842|
|   mean| 38.64358543876172|189664.13459727284|
| stddev|13.710509934443502|105604.02542315757|
|    min|              17.0|           12285.0|
|    max|              90.0|         1490400.0|
+-------+------------------+------------------+

In [26]:
df.select('age','income').show(5)
+----+------+
| age|income|
+----+------+
|25.0| <=50K|
|38.0| <=50K|
|28.0|  >50K|
|44.0|  >50K|
|18.0| <=50K|
+----+------+
only showing top 5 rows

CrossTab

See the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.

In [34]:
df.crosstab('age', 'income').sort("age_income").show()
+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|      17.0|  595|   0|
|      18.0|  862|   0|
|      19.0| 1050|   3|
|      20.0| 1112|   1|
|      21.0| 1090|   6|
|      22.0| 1161|  17|
|      23.0| 1307|  22|
|      24.0| 1162|  44|
|      25.0| 1119|  76|
|      26.0| 1068|  85|
|      27.0| 1117| 115|
|      28.0| 1101| 179|
|      29.0| 1025| 198|
|      30.0| 1031| 247|
|      31.0| 1050| 275|
|      32.0|  957| 296|
|      33.0| 1045| 290|
|      34.0|  949| 354|
|      35.0|  997| 340|
|      36.0|  948| 400|
+----------+-----+----+
only showing top 20 rows

Drop column

  • drop(): Drop a column
  • dropna(): Drop NA's
In [36]:
df.drop('education_num').columns
Out[36]:
['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

Filter data

In [37]:
df.filter(df.age > 40).count()
Out[37]:
20211

Descriptive statistics by group

In [40]:
df.groupby('marital-status').agg({'capital-gain': 'mean', 'capital-loss': 'sum'}).show()
+--------------------+-----------------+------------------+
|      marital-status|sum(capital-loss)| avg(capital-gain)|
+--------------------+-----------------+------------------+
|           Separated|          86627.0| 581.8424836601307|
|       Never-married|         872350.0|  384.382639449029|
|Married-spouse-ab...|          39680.0| 629.0047770700637|
|            Divorced|         448751.0| 793.6755615860094|
|             Widowed|         123900.0| 603.6442687747035|
|   Married-AF-spouse|           3136.0|2971.6216216216217|
|  Married-civ-spouse|        2699344.0|1739.7006121810625|
+--------------------+-----------------+------------------+

Data preprocessing

In [41]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)