Notes and code written while reading Data Analysis with Python and PySpark
by Jonathan Rioux.
brew install apache-spark
sudo ln -sfn $(brew --prefix)/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk
after installation. This makes sure system Java wrappers find the Java development kit (JDK) associated with this package.conda create -n pyspark python=3.8 pandas pyspark=3.0.0
conda activate spark
conda install -c conda-forge notebook
conda install -c anaconda ipykernel
python -m ipykernel install --user --name=pyspark
Example of 4 workers working together to calculate and average of one column:
Python R Java are eagerly evaluated. Spark is lazily evaluated.
Spark distinguishes between transformations and actions.
Transformations are:
Actions are:
show
)write
).A Spark program will avoid performing any data work an action triggers the computation chain. Before that the master will cache your instructions. Benefits are:
Most data-driven application functions in the Extract-Transform-Load (ETL) pipeline:
SparkSession
entry point¶SparkSession
provides an entry point to Spark.SparkContext
and provides functionality for interacting with the data.SparkSession
builder: builder pattern with set of methods to create a configurable object.Creating a SparkSession
entry point from scratch
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("Analyzing the vocabulary of Pride and Prejudice.")
.getOrCreate())
Most data-driven application functions in the Extract-Transform-Load (ETL) pipeline:
SparkSession
entry point¶SparkSession
provides an entry point to Spark.SparkContext
and provides functionality for interacting with the data.SparkSession
builder: builder pattern with set of methods to create a configurable object.SparkSession
entry point from scratch¶from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("Analyzing the vocabulary of Pride and Prejudice.")
.getOrCreate())
sparkContext
can be invoked from the SparkSession
object like below.
(Older code may present sparkContext
as an sc
variable)
sc = spark.sparkContext
sc
WARN
.spark.sparkContext.setLogLevel(KEYWORD)
Keyword |
Description |
|
No logging at all (not recommended). |
|
Only fatal errors. A fatal error will crash your Spark cluster. |
|
My personal favorite, will show |
|
Add warnings (and there is quite a lot of them). |
|
Will give you runtime information, such as repartitioning and data recovery (see chapter 1). |
|
Will provide debug information on your jobs. |
|
Will trace your jobs (more verbose debug logs). Can be quite pedagogic, but very annoying. |
|
Everything that PySpark can spit, it will spit. As useful as |
spark.sparkContext.setLogLevel('ERROR')
Goal: What are the most popular words in the Jane Austen's Pride and Prejudice?
Steps:
PySpark provide two main structures for storing data when performing manipulations:
spark.read
¶Reading data into a data frame is done through the DataFrameReader object, which we can access through spark.read
.
value: string
is the column, with text within that column
book = spark.read.text("data/Ch02/1342-0.txt")
book
# Check schema
display(book.printSchema())
display(book.dtypes)
spark.show()
¶The show() method takes three optional parameters.
n
can be set to any positive integer, and will display that number of rows.truncate
, if set to true, will truncate the columns to display only 20 characters. Set to False to display the whole length, or any positive integer to truncate to a specific number of characters.vertical
takes a Boolean value and, when set to True, will display each record as a small table. If you need to check some records in detail, this is a very useful option.# play with params
book.show(2, truncate=False, vertical=True)
show()
to see dataframe content. This follow's Spark's idea of lazy evaluation until some action is needed.from pyspark.sql import SparkSession
spark = (SparkSession.builder
.config("spark.sql.repl.eagerEval.enabled", "True")
.getOrCreate())
select()
and split()
¶select()
selects the data. Similar to SQL. Syntax is similar to pandas:
book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")
split()
transforms string column into an array column, containing n
string elements (i.e. tokens). Note that it uses JVM
-based regex instead of Python.
alias()
renames transformed columns for easier reference. When applied to a column, it takes a single string as an argument.
Another way to alias set an alias is calling .withColumnRenamed()
on the data frame. If you just want to rename a column without changing the rest of the data frame, use .withColumnRenamed.
from pyspark.sql.functions import col, split
# Read, tokenize and alias the column
lines = book.select(split(col('value'), " ").alias("line"))
display(lines)
lines.printSchema()
lines.show(5)
# Changing alias name using withColumnRenamed
alternative = lines.withColumnRenamed("line",
"here is an alternate alias")
alternative.printSchema()
explode()
¶When applied to a column containing a container-like data structure (such as an array), explode()
will take each element and give it its own row.
# Explode column of arrays into rows of elements
from pyspark.sql.functions import explode, col
words = lines.select(explode(col("line")).alias("word"))
words.show(10)
from pyspark.sql.functions import lower, regexp_extract
# Lowercase
words_lower = words.select(lower("word").alias("word_lower"))
words_lower.show()
# Naive punctuation normalization using regex
word_norm = words_lower.select(regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word_normalized"))
word_norm.show()
# Remove empty records
word_nonull = word_norm.filter(col("word_normalized") != "") \
.withColumnRenamed('word_normalized', 'word_nonull')
word_nonull.show()
Rewrite the following code snippet, removing the withColumnRenamed method. Which version is clearer and easier to read?
from pyspark.sql.functions import col, length
# The `length` function returns the number of characters in a string column.
ex21 = (
spark.read.text("./data/Ch02/1342-0.txt")
.select(length(col("value")))
.withColumnRenamed("length(value)", "number_of_char")
)
from pyspark.sql.functions import col, length
ex21 = (
spark.read.text("./data/Ch02/1342-0.txt")
.select(length(col("value")).alias('values'))
)
ex21.show(5)
The following code blocks gives an error. What is the problem and how can you solve it?
from pyspark.sql.functions import col, greatest
ex22.printSchema()
# root
# |-- key: string (containsNull = true)
# |-- value1: long (containsNull = true)
# |-- value2: long (containsNull = true)
# `greatest` will return the greatest value of the list of column names,
# skipping null value
# The following statement will return an error
ex22.select(
greatest(col("value1"), col("value2")).alias("maximum_value")
).select(
"key", "max_value"
)
The columns given are not in a list?
Let’s take our words_nonull data frame, available in listing 2.19. You can use the code in the repository (code/Ch02/end_of_chapter.py) into your REPL to get the data frame loaded.
a) Remove all of the occurrences of the word "is"
b) (Challenge) Using the length function explained in exercise 2.1, keep only the words with more than 3 characters.
# 1. Remove all of the occurences of the word "is",
# 2. Using the length function explained in exercise 2.1, keep only the words with more than 3 characters.
word_nonull.filter(col("word_nonull") != "is") \
.filter(length(col("word_nonull")) > 3) \
.withColumnRenamed('word_nonull', 'words_greater_than_3') \
.show()
Remove the words is, not, the and if from your list of words, using a single where()
method on the words_nonull data frame (see exercise 2.3). Write the code to do so.
word_nonull.where(~col("word_nonull").isin(['is', 'not', 'the', 'if'])) \
.show()
One of your friends come to you with the following code. They have no idea why it doesn’t work. Can you diagnose the problem, explain why it is an error and provide a fix?
from pyspark.sql.functions import col, split
book = spark.read.text("./data/ch02/1342-0.txt")
book = book.printSchema()
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
They're assigning the output of book.printSchema()
to book
, hence writing over the spark data frame.
from pyspark.sql.functions import col, split
book = spark.read.text("./data/ch02/1342-0.txt")
# Don't assign it back to `book`
book.printSchema()
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words.show()
# Set up
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(
"Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Data Frame Setup
# Set up
from pyspark.sql.functions import col, split, lower, explode, regexp_extract
book = spark.read.text("data/Ch02/1342-0.txt")
lines = book.select(split(col("value"), " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower("word").alias("word_lower"))
word_norm = words_lower.select(
regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word_normalized")
)
word_nonull = word_norm.filter(col("word_normalized") != "").withColumnRenamed(
"word_normalized", "word_nonull"
)
groupBy
and count
¶GroupedData
allows you to perform an aggregate function on each group. groupby
to count record occurrence, passing columns we want to group. Returned value is a GroupedData
object, not a DataFrame
. Once you apply a function to it like count()
, it returns a DataFrame
.groupby
and groupBy
are the same thing.orderBy
orderBy
only exists as camel case.groups = word_nonull.groupBy(col("word_nonull"))
display(groups)
results = groups.count().orderBy("count", ascending=False)
results.show()
csv
¶write
method, which can be chained with csv
_SUCCESS
file.coalesce
to concat to 1 file.mode('overwrite')
to force writeTIP: Never assume that your data frame will keep the same ordering of records unless you explicitly ask via orderBy().
# Write multiple partitions + success file
results.write.mode("overwrite").csv("./output/results")
# Concatenate into 1 file, then write to disk
results.coalesce(1).write.mode("overwrite").csv("./output/result_single_partition")
# qualified import; import the whole module
import pyspark.sql.functions as F
# chain methods together instead of multiple variables
results = (
spark.read.text("./data/ch02/1342-0.txt")
.select(F.split(F.col("value"), " ").alias("line"))
.select(F.explode(F.col("line")).alias("word"))
.select(F.lower(F.col("word")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
.where(F.col("word") != "")
.groupby("word")
.count()
)
spark-submit
¶When wrapping a script to be executed with spark-submit
ratherh than with the pyspark
command, you'll need to define your SparkSession
first.
# This can be wrapped into a `word_counter.py` file and be executed
# using `spark-submit`
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName(
"Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
results = (
spark.read.text("./data/ch02/*.txt")
.select(F.split(F.col("value"), " ").alias("line"))
.select(F.explode(F.col("line")).alias("word"))
.select(F.lower(F.col("word")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
.where(F.col("word") != "")
.groupby("word")
.count()
.orderBy("count", ascending=False)
)
results.show()
See chapter 3 code
SparkReader
object to read any kind of data directly in a data frame. The specialized CSV SparkReader
is used to ingest comma-separated value (CSV) files. Just like when reading text, the only mandatory parameter is the source location.field
delimiter, the record
delimiter, and the quotation
character. All of those parameters have sensible defaults.inferSchema
optional parameter to True. PySpark accomplishes this by reading the data twice: once for setting the appropriate types for each columns, and another time to ingest the data in the inferred format.select()
, drop()
and withColumn()
, respectively.withColumnRenamed()
method, or all at once by using the toDF()
method.describe()
or summary()
method. describe()
has a fixed set of metrics, while summary()
will take functions as parameters and apply them to all columns.PySpark operates either on the whole data frame objects (via methods such as
select()
andgroupby()
) or on Column objects (for instance when using a function likesplit()
).
- The data frame is column-major, so its API focuses on manipulating the columns to transform the data.
- Hence with data transformations, think about what operations to do and which columns will be impacted.
# setup
import os
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
For this exercise, we’ll use some open data from the Government of Canada, more specifically the CRTC (Canadian Radio-television and Telecommunications Commission). Every broadcaster is mandated to provide a complete log of the programs, commercials and all, showcased to the Canadian public.
This gives us a lot of potential questions to answer, but we’ll select one specific one: what are the channels with the most and least proportion of commercials?
spark.createDataFrame
# Example creating a data frame with toy data
my_grocery_list = [
["Banana", 2, 1.74],
["Apple", 4, 2.04],
["Carrot", 1, 1.09],
["Cake", 1, 10.99],
]
df_grocery_list = spark.createDataFrame(my_grocery_list, ["Item", "Quantity", "Price"])
df_grocery_list.printSchema()
Composed of row delimiter (e.g. newline \n
) and column delimiter (e.g. tabs \t
for TSVs)
DIRECTORY = "./data/Ch04"
logs = spark.read.csv(
os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
sep="|", # default is ","
quote='"', # default is double quote.
header=True, # set first row as column names
inferSchema=True, # infer schema from column names default False
)
logs.printSchema()
sample = spark.read.csv(
os.path.join(DIRECTORY, "ch4_exercise.csv"),
sep=",",
header=True,
quote="$",
inferSchema=True,
)
sample.show()
Re-read the data in a logs_raw
data frame, taking inspiration from the code in listing 4.3, this time without passing any optional parameters. Print the first 5 rows of data, as well as the schema. What are the differences in terms of data and schema between logs and logs_raw?
DIRECTORY = "./data/Ch04"
raw_logs = spark.read.csv(
os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
)
raw_logs.show(5, False) # False = show entire contents
raw_logs.printSchema()
# Result shows entire row concatenated into one column (_c0). Not what we want.
Wiki:
In computing, the star schema is the simplest style of data mart schema and is the approach most widely used to develop data warehouses and dimensional data marts. The star schema consists of one or more fact tables referencing any number of dimension tables.
Star schemas are common in the relational database world because of normalization, a process used to avoid duplicating pieces of data and improve data integrity.
Spark uses denormalized tables (ie fat tables). Why? Mainly because it is easier to run analyses on a single table.
select
-ing what we want to see¶Four ways to select
colums in PySpark, all equivalent in term of results
# Using the string to column conversion
logs.select("BroadCastLogID", "LogServiceID", "LogDate")
logs.select(
*["BroadCastLogID", "LogServiceID", "LogDate"]
) # Unpack list with star prefix
# Passing the column object explicitly
logs.select(F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate"))
logs.select(
*[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")]
) # Unpack list with star prefix
Because of the width of our data frame, we could split our columns into manageable sets of three to keep the output tidy on the screen. This gives a high-level view of what the data frame contains.
# Splitting columns in groups of three using numpy
display("Columns in groups of three")
column_split = np.array_split(np.array(logs.columns), len(logs.columns) // 3)
display(column_split)
# Show columns in groups of three
display("Table display in column groups of three")
for x in column_split:
logs.select(*x).show(5, False)
drop
-ing columns we don't need¶Remove BroadCastLogID
(primary key not needed in single table) and SequenceNo
. drop()
returns a new data frame.
Warning with
drop
: Unlikeselect()
, where selecting a column that doesn’t exist will return a runtime error, dropping a non-existent column is a no-op. PySpark will just ignore the columns it doesn’t find. Careful with the spelling of your column names!
logs = logs.drop("BroadCastLogID", "SequenceNo")
assert all(col not in logs.columns for col in ["BroadCastLogID", "SequenceNo"])
Alternate method of above just using select
using list comprehension.
logs = logs.select(
*[col for col in logs.columns if col not in ["BroadCastLogID", "SequenceNo"]]
)
assert all(col not in logs.columns for col in ["BroadCastLogID", "SequenceNo"])
print([col for col in logs.columns if col[-2:] != "ID"])
# Load original CSV again
DIRECTORY = "./data/Ch04"
logs = spark.read.csv(
os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
sep="|", # default is ","
quote='"', # default is double quote.
header=True, # set first row as column names
inferSchema=True, # infer schema from column names default False
)
# Filter to columns that don't end with "ID"
logs_no_id = logs.select(*[col for col in logs.columns if col[-2:].lower() != "id"])
print("Filtered results (not end with 'ID')")
logs_no_id.printSchema()
assert all("id" not in col[-2:] for col in logs_no_id.columns)
logs.select(F.col("Duration")).show(5)
print(
"dtype of 'Duration' column is 'string'. Best to convert to timestamp:\n",
logs.select(F.col("Duration")).dtypes,
)
logs.select(
F.col("Duration"),
F.col("Duration").substr(1, 2).cast("int").alias("hours"),
F.col("Duration").substr(4, 2).cast("int").alias("minutes"),
F.col("Duration").substr(7, 2).cast("int").alias("seconds"),
# Add final column converting duration into total seconds
(
F.col("Duration").substr(1, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
+ F.col("Duration").substr(7, 2).cast("int")
).alias("duration_seconds"),
).distinct().show(
5
) # only show distinct entries
withColumn()
to add 'duration_seconds' to original data frame¶logs = logs.withColumn(
"duration_seconds",
F.col("Duration").substr(1, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
+ F.col("Duration").substr(7, 2).cast("int"),
)
assert "duration_seconds" in logs.columns
Warning: If you’re creating a column withColumn() and give it a name that already exists in your data frame, PySpark will happily overwrite the column.
toDF()
¶logs.toDF(*[x.lower() for x in logs.columns]).printSchema()
sort
¶logs.select(sorted(logs.columns)).printSchema()
describe
and summary
¶describe
only works for numerical and string columns# Show stats for the first three columns
for i in logs.columns[:3]:
logs.describe(i).show()
summary
shows extra stats like 25-50% and 75% percentiles# Show stats for the first three columns
for i in logs.columns[:3]:
logs.select(i).summary().show()
# write checkpoint file
logs.coalesce(1).write.mode("overwrite").csv("./output/ch04/logs.csv", header=True)
logs.printSchema()
# Set up
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Read the data
DIRECTORY = "./data/Ch04"
logs = spark.read.csv(
"./output/ch04/logs.csv", # read in data transformed in Ch04
sep=",", # default is ","
quote='"', # default is double quote.
header=True, # set first row as column names
inferSchema=True, # infer schema from column names default False
)
logs.printSchema()
# Read link table and filter to only primary channels (ie. PrimaryFG == 1)
log_identifier = spark.read.csv(
os.path.join(DIRECTORY, "ReferenceTables", "LogIdentifier.csv"),
sep="|",
header=True,
inferSchema=True,
)
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
# Show results
log_identifier.printSchema()
log_identifier.show(5)
print("Unique primary channels: ", log_identifier.count())
join
recipe¶[LEFT].join(
[RIGHT],
on=[PREDICTES],
how=[METHOD]
)
and
predicates into a list, like:[
left["col1"] == right["colA"],
left["col2"] > right["colB"], # value on left table is greater than the right
left["col3"] != right["colC"]
]
how
¶cross
- returns a record for every record pair. not common.inner
= returns record if predicate is true, otherwise drops it. most common, pyspark join
default. left
& right
- similar to inner
, except on what to do with false predicates:left
join adds unmatched records from the left table in the joined table, and fills in columns from right able with None
right
join adds unmatched records nad fills in column vice versa.outer
- adds unmatched records from the left and right able, padding with None
.left_semi
- same as inner join but only keeps columns in left table. left_anti
- returns only records that don't match the predicate with any record in the right table. opposite of left
join.# Join `logs` with `log_identifier` using the 'LogServiceID' column
joined = logs.join(log_identifier, on="LogServiceID", how="inner")
# Additionally join CategoryID and ProgramClassID table
# Use left joins since keys may not be available in the link table.
# CategoryID
cd_category = spark.read.csv(
os.path.join(DIRECTORY, "ReferenceTables", "CD_Category.csv"),
sep="|",
header=True,
inferSchema=True,
).select(
"CategoryID",
"CategoryCD",
F.col("EnglishDescription").alias("Category_Description"),
)
# ProgramClass
cd_program_class = spark.read.csv(
os.path.join(DIRECTORY, "ReferenceTables", "CD_ProgramClass.csv"),
sep="|",
header=True,
inferSchema=True,
).select(
"ProgramClassID",
"ProgramClassCD",
F.col("EnglishDescription").alias("ProgramClass_Description"),
)
# Join all to joined table
full_log = joined.join(cd_category, "CategoryID", how="left",).join(
cd_program_class, "ProgramClassID", how="left",
)
# Check if additional columns were joined to original log data frame
full_log.printSchema()
To be able to process a comparison between records, the data needs to be on the same machine. If not, PySpark will move the data in an operation called a shuffle, which is slow and expensive. More on join strategies in later chapters.
PySpark happily joins the two data frames together but fails when we try to work with the ambiguous column.
# Joining two tables with the same LogServiceID column
logs_and_channels_verbose = logs.join(
log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.printSchema()
print(
'Joined table now has two "LogServiceID" columns: ',
[col for col in logs_and_channels_verbose.columns if col == "LogServiceID"],
"\n",
)
print('Selecting "LogServiceID" will now throw an error')
# Selecting "LogServiceID" will throw an error
try:
logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
print("AnalysisException: ", err)
logs_and_channels = logs.join(log_identifier, "LogServiceID")
logs_and_channels_verbose.select(log_identifier["LogServiceID"])
Use the Column
object directly
logs_and_channels_verbose = logs.alias("left").join(
log_identifier.alias("right"),
logs["LogServiceID"] == log_identifier["LogServiceID"],
)
logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
"LogServiceID"
)
groupby
with GroupedData
¶Goal: What channels have the most and least proportion of commercials?
Task:
groupby
on multiple columns¶GroupedData
objects, not data frame
. Can't call show()
on it.F.sum
.GroupedData
object holds all non-key columns in a group cell (see fig 5.7)agg()
vs sum()
¶agg
can take an arbitrary number of aggregate functionssum
# Group by ProgramClassCD and ProgramClass_Description, sum total duration for each
full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
F.sum("duration_seconds").alias("duration_total")
).orderBy("duration_total", ascending=False).show(100, False)
# Another way by passing dictionary to agg
# full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
# {"duration_seconds": "sum"}
# ).withColumnRenamed("sum(duration_seconds)", "duration_total").orderBy(
# "duration_total", ascending=False
# ).show(
# 100, False
# )
when
logic:
(
F.when([BOOLEAN TEST], [RESULT IF TRUE])
.when([ANOTHER BOOLEAN TEST], [RESULT IF TRUE])
.otherwise([DEFAULT RESULT, WILL DEFAULT TO null IF OMITTED])
)
# Goal: Compute only the commercial time for each program
# Create custom column logic - get duration_seconds if ProgramClassCD matches an item in
# the list
is_commercial = F.when(
F.trim(F.col("ProgramClassCD")).isin(
["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
),
F.col("duration_seconds"),
).otherwise(0)
# Use custom column logic to build a duration_commercial column,
# along with duration_total
commercial_time = (
full_log.groupby("LogIdentifierID")
.agg(
F.sum(is_commercial).alias("duration_commercial"),
F.sum("duration_seconds").alias("duration_total"),
)
.withColumn(
"commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
)
)
commercial_time.orderBy("commercial_ratio", ascending=False).show(20, False)
dropna
+ fillna
¶dropna
¶how
, which can take the value any or all. If any is selected, PySpark will drop records where at least one of the fields are null. In the case of all, only the records where all fields are null will be removed. By default, PySpark will take the any mode.thresh
takes an integer value. If set (its default is None), PySpark will ignore the how parameter and only drop the records with less than thresh non-null values.subset
will take an optional list of columns that drop will use to make its decision.# Drop records that have a commericla_ratio of null
c_time_no_null = commercial_time.dropna(subset=["commercial_ratio"])
c_time_no_null.orderBy("commercial_ratio", ascending=False).show()
# Check record counts for each
print("Records in commercial_time: ", commercial_time.count())
print("Records in c_time_no_null: ", c_time_no_null.count())
# Fill null fields
c_time_fill_null = commercial_time.fillna(0)
c_time_fill_null.orderBy("commercial_ratio", ascending=False).show()
# Check record counts for each
print("Records in commercial_time: ", commercial_time.count())
print("Records in c_time_no_null: ", c_time_fill_null.count())
summary code of all the steps taken in this notebook as a spark script
# Set up
import os
import numpy as np
import json
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.getOrCreate()
For this chapter, we use a JSON dump of the information about the TV Show Silicon Valley, from TV Maze.
multiLine
): one JSON document, one FILE, one record.# Import a single JSON document
sv = "data/ch06/shows-silicon-valley.json"
shows = spark.read.json(sv)
display(shows.count())
# Read multiple JSON documents using multiLine param
three_shows = spark.read.json("data/ch06/shows-*.json", multiLine=True)
display(three_shows.count())
# Inspect the schema
shows.printSchema()
array
, map
and struct
¶array
¶pyspark.sql.functions
# Selecting the name and genres columns of the shows dataframe
import pyspark.sql.functions as F
array_subset = shows.select("name", "genres")
array_subset.show(1, False)
# Multiple methods to extract the same array
array_subset = array_subset.select(
"name",
array_subset.genres[0].alias("dot_and_index"),
F.col("genres")[0].alias("col_and_index"),
array_subset.genres.getItem(0).alias("dot_and_method"),
F.col("genres").getItem(0).alias("col_and_method"),
)
array_subset.show()
WARNING: Although the square bracket approach looks very Pythonic, you can’t use it as a slicing tool. PySpark will accept only one integer as an index.
lit()
to create scalar columns, then make_array()
) to create an array of possible genres.array_repeat()
to create a column repeating the "Comedy" string"""
1. Create three literal columns (using lit() to create scalar columns,
then make_array() to ) to create an array of possible genres.
2. Use the function array_repeat() to create a column repeating the "Comedy" string
"""
array_subset_repeated = array_subset.select(
"name",
F.lit("Comedy").alias("one"),
F.lit("Horror").alias("two"),
F.lit("Drama").alias("three"),
F.col("dot_and_index"),
).select(
"name",
F.array("one", "two", "three").alias("Some_Genres"),
F.array_repeat("dot_and_index", 5).alias("Repeated_Genres"),
)
array_subset_repeated.show(1, False)
F.size
to show the number of elements in an array¶array_subset_repeated.select(
"name", F.size("Some_Genres"), F.size("Repeated_Genres")
).show()
F.array_distinct()
to remove duplicates (like SQL)¶array_subset_repeated.select(
"name",
F.array_distinct("Some_Genres"),
F.array_distinct("Repeated_Genres")
).show(1, False)
F.array_intersect
to show common values across arrays¶array_subset_repeated = array_subset_repeated.select(
"name",
F.array_intersect("Some_Genres", "Repeated_Genres").alias("Genres")
)
array_subset_repeated.show()
array_position()
to get the position of the item in an array if it exists¶WARNING:
array_position
is 1-based, unlike Python lists or extracting elements from arrays (e.g.array_subset.genres[0]
orgetItems(0)
)
# When using array_position(), the first item of the array has position 1,
# not 0 like in python.
array_subset_repeated.select(
"name",
F.array_position("Genres", "Comedy").alias("Genres"),
).show()
map
¶array
, keys need to be of the same type and the values need to be of the same type# Creating a map from two arrays: one for the keys, one for the values.
# This creates a hash-map within the column record.
# 1. Create two columns of arrays
columns = ["name", "language", "type"]
shows_map = shows.select(
*[F.lit(column) for column in columns],
F.array(*columns).alias("values")
)
shows_map = shows_map.select(F.array(*columns).alias("keys"), "values")
print("Two columns of arays")
shows_map.show(1, False)
# 2. Map them together using one array as the key, and other as value
shows_map = shows_map.select(
F.map_from_arrays("keys", "values").alias("mapped")
)
shows_map.printSchema()
print("1 column of map")
shows_map.show(1, False)
# 3. 3 ways to select a key in a map column
print("3 ways to select a key in a map")
shows_map.select(
F.col("mapped.name"), # dot_notation with col
F.col("mapped")["name"], # Python dictionary style
shows_map.mapped["name"] # dot_notation to get the column + bracket
).show()
struct
¶# "schedule" column contain array of strings and a string
shows.select("schedule").printSchema()
# A more complex struct
shows.select("_embedded").printSchema()
Above struct
visualized:
# Drop useless _embedded column and promote the fields within
shows_clean = shows.withColumn("episodes", F.col("_embedded.episodes")).drop(
"_embedded"
)
shows_clean.select("episodes").printSchema()
explode
to split arrays into rows¶# "episodes.name" == array of strings
episodes_name = shows_clean.select(F.col("episodes.name"))
episodes_name.printSchema()
# Just showing episodes_name is messy, so explode the array to show the names
episodes_name.select(F.explode("name").alias("name")).show(3, False)
pyspark.sql.types
, usually imported as T
.Two object types in pyspark.sql.types
LongType()
, DecimalType(precision, scale)
, ArrayType(StringType())
, etc.name
(str) and dataType
(type)Putting it altogether:
T.StructField("summary", T.StringType())
# For reference
shows.select("_embedded").printSchema()
# Full schema from scratch
# episode links
episode_links_schema = T.StructType(
[T.StructField("self", T.StructType([T.StructField("href", T.StringType())]))]
)
# episode image
episode_image_schema = T.StructType(
[
T.StructField("medium", T.StringType()),
T.StructField("original", T.StringType()),
]
)
# episode metadata
episode_schema = T.StructType(
[
T.StructField("_links", episode_links_schema),
T.StructField("airdate", T.DateType()),
T.StructField("airstamp", T.TimestampType()),
T.StructField("airtime", T.StringType()),
T.StructField("id", T.StringType()),
T.StructField("image", episode_image_schema),
T.StructField("name", T.StringType()),
T.StructField("number", T.LongType()),
T.StructField("runtime", T.LongType()),
T.StructField("season", T.LongType()),
T.StructField("summary", T.StringType()),
T.StructField("url", T.StringType()),
]
)
# set top level array
embedded_schema = T.StructType([T.StructField("episodes", T.ArrayType(episode_schema))])
# network
network_schema = T.StructType(
[
T.StructField(
"country",
T.StructType(
[
T.StructField("code", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("timezone", T.StringType()),
]
),
),
T.StructField("id", T.LongType()),
T.StructField("name", T.StringType()),
]
)
# shows (with embedded_schema and network_schema)
shows_schema = T.StructType(
[
T.StructField("_embedded", embedded_schema),
T.StructField("language", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("network", network_schema),
T.StructField("officialSite", T.StringType()),
T.StructField("premiered", T.StringType()),
T.StructField(
"rating", T.StructType([T.StructField("average", T.DoubleType())])
),
T.StructField("runtime", T.LongType()),
T.StructField(
"schedule",
T.StructType(
[
T.StructField("days", T.ArrayType(T.StringType())),
T.StructField("time", T.StringType()),
]
),
),
T.StructField("status", T.StringType()),
T.StructField("summary", T.StringType()),
T.StructField("type", T.StringType()),
T.StructField("updated", T.LongType()),
T.StructField("url", T.StringType()),
T.StructField("webChannel", T.StringType()),
T.StructField("weight", T.LongType()),
]
)
Read the JSON file using the schema that we built up:
mode="FAILFAST"
is a param to throw an error if it reads a malformed record versus the schema provided.dateFormat
or timestampFormat
.Default for
mode
parameter isPERMISSIVE
, which sets malformed records tonull
.
shows_with_schema = spark.read.json("./data/Ch06/shows-silicon-valley.json",
schema=shows_schema,
mode="FAILFAST")
# Check format for modified columns:
for column in ["airdate", "airstamp"]:
shows_with_schema.select(f"_embedded.episodes.{column}") \
.select(F.explode(column)) \
.show(5, False)
Example of FAILFAST
error due to conflicting schema
from py4j.protocol import Py4JJavaError
shows_schema2 = T.StructType(
[
T.StructField("_embedded", embedded_schema),
T.StructField("language", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("network", network_schema),
T.StructField("officialSite", T.StringType()),
T.StructField("premiered", T.StringType()),
T.StructField(
"rating", T.StructType([T.StructField("average", T.DoubleType())])
),
T.StructField("runtime", T.LongType()),
T.StructField(
"schedule",
T.StructType(
[
T.StructField("days", T.ArrayType(T.StringType())),
T.StructField("time", T.StringType()),
]
),
),
T.StructField("status", T.StringType()),
T.StructField("summary", T.StringType()),
T.StructField("type", T.LongType()), # switch to LongType
T.StructField("updated", T.LongType()), # switch to LongType
T.StructField("url", T.LongType()), # switch to LongType
T.StructField("webChannel", T.StringType()),
T.StructField("weight", T.LongType()),
]
)
shows_with_schema_wrong = spark.read.json(
"data/Ch06/shows-silicon-valley.json", schema=shows_schema2, mode="FAILFAST",
)
try:
shows_with_schema_wrong.show()
except Py4JJavaError:
pass
# Huge Spark ERROR stacktrace, relevant bit:
#
# Caused by: java.lang.RuntimeException: Failed to parse a value for data type
# bigint (current token: VALUE_STRING).
StructType comes with two methods for exporting its content into a JSON-esque format.
json()
outputs a string containing the json formatted schemajsonValue()
returns the schema as a dictionaryfrom pprint import pprint
pprint(shows_with_schema.select('schedule').schema.jsonValue())
You can use jsonValue
on complex schema to see its JSON representation. This is helpful when trying to remember a complex schema:
containsNull
,elementType
,type
(always array)pprint(T.StructField("array_example", T.ArrayType(T.StringType())).jsonValue())
keyType
type
(always map)valueContainsNull
valueType
keyType
# Example 1
pprint(
T.StructField("map_example", T.MapType(T.StringType(), T.LongType())).jsonValue()
)
# With both
pprint(
T.StructType(
[
T.StructField("map_example", T.MapType(T.StringType(), T.LongType())),
T.StructField("array_example", T.ArrayType(T.StringType())),
]
).jsonValue()
)
Finally, we can close the loop by making sure that our JSON-schema is consistent with the one currently being used. For this, we’ll export the schema of shows_with_schema in a JSON string, load it as a JSON object and then use StructType.fromJson() method to re-create the schema.
other_shows_schema = T.StructType.fromJson(json.loads(shows_with_schema.schema.json()))
print(other_shows_schema == shows_with_schema.schema) # True
If we were to make the shows
data frame in a traditional relational database, we could have a shows
table linked to an episodes
table using a star schema.
shows
table
show_id | name |
---|---|
143 | silicon valley |
episodes
table, joined to shows
by show_id
show_id | episode_id | name |
---|---|---|
143 | 1 | Minimal Viable Product |
143 | 2 | The Cap Table |
143 | 3 | Articles of Incorporation |
episodes
could be extended with more columns, but starts to have duplicate entries
show_id | episode_id | name | genre | day |
---|---|---|---|---|
143 | 1 | Minimal Viable Product | Comedy | Sunday |
143 | 2 | The Cap Table | Comedy | Sunday |
143 | 3 | Articles of Incorporation | Comedy | Sunday |
In contrast, a hierarchichal data frame contains complex columns with arrays and struct columns:
shows
data frame using a hierarchical model¶explode
and collect
operations to go from hierarchical to tabular and back¶We will now revisit the exploding operation by generalizing it to the map, looking at the behavior when your data frame has multiple columns, and see the different options PySpark provided with exploding.
# Exploding _embeedded.episodes
episodes = shows.select("id", F.explode("_embedded.episodes").alias("episodes"))
episodes.printSchema()
episodes.show(5)
map
¶posexplode
: explodes the column and also returns an additional column before the data that contains the array positions (LongType).explode
/ posexplode
skips null valuesepisode_name_id = shows.select(
F.map_from_arrays(
F.col("_embedded.episodes.id"), F.col("_embedded.episodes.name")
).alias("name_id")
)
episode_name_id = episode_name_id.select(
F.posexplode("name_id").alias("position", "id", "name")
)
episode_name_id.show(5, False)
collected = episodes.groupby("id").agg(F.collect_list("episodes").alias("episodes"))
print(collected.count())
collected.printSchema()
struct()
¶struct()
function takess columns as params, and returns struct column containing the columns passed as params as fields.
# Creating a struct column
struct_ex = shows.select(
F.struct(
F.col("status"), F.col("weight"), F.lit(True).alias("has_watched")
).alias("info")
)
struct_ex.show(1, False)
struct_ex.printSchema()
shows.printSchema()
This chapter is dedicated to using SQL with, and on top of PySpark. I cover how we can move from one language to the other. I also cover how we can use a SQL-like syntax within data frame methods to speed up your code and some of trade-offs you can face. Finally, we blend Python and SQL code together to get the best of both worlds.
where()
, expr()
and selectExpr()
, which can simplify the syntax for complex filtering and selection.We will be using a periodic table of elements database for the initial section, followed by a public data set provided by BackBlaze, which provides hard drive data and statistics.
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import numpy as np
spark = SparkSession.builder.getOrCreate()
# Read in table of elements data
elements = spark.read.csv(
"data/Ch07/Periodic_Table_Of_Elements.csv",
header=True,
inferSchema=True,
)
# Inspect the data frame
elements.printSchema()
# View the data frame in chunks of 3-4 columns
# column_split = np.array_split(np.array(elements.columns), len(elements.columns) // 3)
# for x in column_split:
# elements.select(*x).show(3, False)
The code below selects the phrase
column that contain "liq"
, then runs groupby and count.
SQL equivalent would be:
SELECT
period,
count(*)
FROM elements
WHERE phase = "liq"
GROUP BY period;
elements.where(F.col("phase") == "liq").groupby("period").count().show()
createOrReplaceTempView()
to read a data frame and create a Spark SQL reference. Functionally equivalent to CREATE_OR_REPLACE_VIEW
in SQL# Directly querying a data frame SQL-style does not work
try:
spark.sql(
"select period, count(*) from elements where phase='liq' group by period"
).show(5)
except AnalysisException as e:
print(e)
# Using createOrReplaceTempView
elements.createOrReplaceTempView("elements")
spark.sql(
"select period, count(*) from elements where phase='liq' group by period"
).show(5)
In SQL, they are distinct concepts: the table is materialized in memory and the view is computed on the fly. Spark’s temp views are conceptually closer to a view than a table. Spark SQL also has tables but we will not be using them, preferring reading and materializing our data into a data frame.
# Instantiate
spark.catalog
# List tables we've registered
display(spark.catalog.listTables())
# Drop a table
spark.catalog.dropTempView("elements")
display(spark.catalog.listTables())
(Note: Only reading in Q3 data due to local compute)
# Read backblaze data set into a data frame and register a SQL view
DATA_DIRECTORY = "./data/Ch07/"
# q1 = spark.read.csv(
# DATA_DIRECTORY + "drive_stats_2019_Q1", header=True, inferSchema=True
# )
# q2 = spark.read.csv(
# DATA_DIRECTORY + "data_Q2_2019", header=True, inferSchema=True
# )
q3 = spark.read.csv(
DATA_DIRECTORY + "data_Q3_2019", header=True, inferSchema=True
)
# q4 = spark.read.csv(
# DATA_DIRECTORY + "data_Q4_2019", header=True, inferSchema=True
# )
# Q4 has two more fields than the rest
# q4_fields_extra = set(q4.columns) - set(q1.columns)
# for i in q4_fields_extra:
# q1 = q1.withColumn(i, F.lit(None).cast(T.StringType()))
# q2 = q2.withColumn(i, F.lit(None).cast(T.StringType()))
# q3 = q3.withColumn(i, F.lit(None).cast(T.StringType()))
# Union the data frames
# if you are only using the minimal set of data, use this version
backblaze_2019 = q3
# if you are using the full set of data, use this version
# backblaze_2019 = (
# q1.select(q4.columns)
# .union(q2.select(q4.columns))
# .union(q3.select(q4.columns))
# .union(q4)
# )
# Setting the layout for each column according to the schema
q = backblaze_2019.select(
[
F.col(x).cast(T.LongType()) if x.startswith("smart") else F.col(x)
for x in backblaze_2019.columns
]
)
# Register the view
backblaze_2019.createOrReplaceTempView("backblaze_stats_2019")
backblaze_2019.printSchema()
select
and where
¶Use select and where to show a few hard drives serial numbers that have failed at some point (failure = 1)
# SQL order of operations: 1) select columns, then 2) filter
spark.sql("select serial_number, model, capacity_bytes from backblaze_stats_2019 where failure = 1").show(5)
# PySpark order of operations: 1) filter, then 2) select columns
backblaze_2019.where("failure=1").select(
F.col('serial_number'),
F.col('model'),
F.col('capacity_bytes')
).show(5)
groupby
and orderby
¶Look at the capacity in gigabytes of the hard drives included in the data, by model.
# Groupby and order in SQL
spark.sql(
"""
SELECT
model,
min(capacity_bytes / pow(1024, 3)) min_GB,
max(capacity_bytes / pow(1024, 3)) max_GB
FROM backblaze_stats_2019
GROUP BY model
ORDER BY max_GB DESC
"""
).show(5)
# PySpark
backblaze_2019.groupby(F.col("model")).agg(
F.min(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("min_GB"),
F.max(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("max_GB"),
).orderBy(F.col("max_GB"), ascending=False).show(5)
having
¶having
in SQL is a condition block used after grouping is done.
Filter the groupby with only those that have different min_GB and max_GB numbers
spark.sql(
"""
SELECT
model,
min(capacity_bytes / pow(1024, 3)) min_GB,
max(capacity_bytes / pow(1024, 3)) max_GB
FROM backblaze_stats_2019
GROUP BY model
HAVING min_GB <> max_GB
ORDER BY max_GB DESC
"""
).show(5)
backblaze_2019.groupby(F.col("model")).agg(
F.min(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("min_GB"),
F.max(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("max_GB"),
).where(F.col("min_GB") != F.col("max_GB")).orderBy(
F.col("max_GB"), ascending=False
).show(5)
create
¶CREATE TABLE/VIEW
Compute the number of days of operation a model has and the number of drive failures it has had
# SQL
spark.catalog.dropTempView('drive_days')
spark.catalog.dropTempView('failures')
spark.sql(
"""
CREATE TEMP VIEW drive_days AS
SELECT model, count(*) AS drive_days
FROM backblaze_stats_2019
GROUP BY model
""")
spark.sql(
"""
CREATE TEMP VIEW failures AS
SELECT model, count(*) AS failures
FROM backblaze_stats_2019
WHERE failure = 1
GROUP BY model
""")
# PySpark
drive_days = backblaze_2019.groupBy(F.col("model")).agg(
F.count(F.col("*")).alias("drive_days")
)
failures = (
backblaze_2019.where(F.col("failure") == 1)
.groupBy(F.col("model"))
.agg(F.count(F.col("*")).alias("failures"))
)
failures.show(5)
UNION
and JOIN
¶UNION
removes duplicate records, while PySpark doesn't. UNION
is equal to SQL UNION ALL
UNION
equivalent with PySpark, run distinct()
after union()
(Note: Not running 2 cells below since I only loaded Q3 data)
columns_backblaze = ", ".join(q4.columns)
q1.createOrReplaceTempView("Q1")
q1.createOrReplaceTempView("Q2")
q1.createOrReplaceTempView("Q3")
q1.createOrReplaceTempView("Q4")
spark.sql(
"""
CREATE VIEW backblaze_2019 AS
SELECT {col} FROM Q1 UNION ALL
SELECT {col} FROM Q2 UNION ALL
SELECT {col} FROM Q3 UNION ALL
SELECT {col} FROM Q4
""".format(
col=columns_backblaze
)
)
backblaze_2019 = (
q1.select(q4.columns)
.union(q2.select(q4.columns))
.union(q3.select(q4.columns))
.union(q4)
)
Joining drive_days
and failures
tables together
spark.sql(
"""
SELECT
drive_days.model,
drive_days,
failures
FROM drive_days
LEFT JOIN failures
ON
drive_days.model = failures.model
"""
).show(5)
drive_days.join(failures, on="model", how="left").show(5)
Take drive_days and failures table definitions and bundle them into a single query using CTE.
spark.sql("""
WITH drive_days as (
SELECT
model,
count(*) AS drive_days
FROM backblaze_stats_2019
GROUP BY model),
failures as (
SELECT
model,
count(*) AS failures
FROM backblaze_stats_2019
WHERE failure = 1
GROUP BY model)
SELECT
drive_days.model,
failures / drive_days failure_rate
FROM drive_days
INNER JOIN failures
ON drive_days.model = failures.model
ORDER BY failure_rate DESC
""").show(5)
# CTE sort of similar to python functions
def failure_rate(drive_stats):
drive_days = drive_stats.groupby(F.col("model")).agg(
F.count(F.col("*")).alias("drive_days")
)
failures = (
drive_stats.where(F.col("failure") == 1)
.groupby(F.col("model"))
.agg(F.count(F.col("*")).alias("failures"))
)
answer = (
drive_days.join(failures, on="model", how="inner")
.withColumn("failure_rate", F.col("failures") / F.col("drive_days"))
.orderBy(F.col("failure_rate").desc())
)
return answer
failure_rate(backblaze_2019).show(5)
print("drive_days" in dir())
This section will build on the code we’ve written so far. We’re going to write a function that, for a given capacity, will return the top 3 most reliable drives according to our failure rate.
selectExpr()
is just like select()
, but will process SQL-style operations. Nice because it removes F.col
sort of syntax.
expr()
wraps SQL-style expression into a PySpark column. Can use in lieu of F.col()
when you want to modify a column.
# Data Ingestion using Python
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DATA_DIRECTORY = "./data/Ch07/"
DATA_FILES = [
# "drive_stats_2019_Q1",
# "data_Q2_2019",
"data_Q3_2019",
# "data_Q4_2019",
]
data = [
spark.read.csv(DATA_DIRECTORY + file, header=True, inferSchema=True)
for file in DATA_FILES
]
common_columns = list(
reduce(lambda x, y: x.intersection(y), [set(df.columns) for df in data])
)
assert set(["model", "capacity_bytes", "date", "failure"]).issubset(
set(common_columns)
)
full_data = reduce(
lambda x, y: x.select(common_columns).union(y.select(common_columns)), data
)
# Processing data for the query function with selectExpr
full_data = full_data.selectExpr( # <===
"model", "capacity_bytes / pow(1024, 3) capacity_GB", "date", "failure"
)
drive_days = full_data.groupby("model", "capacity_GB").agg(
F.count("*").alias("drive_days")
)
failures = (
full_data.where("failure = 1")
.groupby("model", "capacity_GB")
.agg(F.count("*").alias("failures"))
)
summarized_data = (
drive_days.join(failures, on=["model", "capacity_GB"], how="left")
.fillna(0.0, ["failures"])
.selectExpr("model", "capacity_GB", "failures / drive_days failure_rate")
.cache()
)
# creating failures variable with expr
failures = (
full_data.where("failure = 1")
.groupby("model", "capacity_GB")
.agg(F.expr("count(*) failures")) # <===
)
# Turning failure_rate in to a function using a mix of PySpark and SQL syntax
def most_reliable_drive_for_capacity(data, capacity_GB=2048, precision=0.25, top_n=3):
"""Returns the top 3 drives for a given approximate capacity.
Given a capacity in GB and a precision as a decimal number, we keep the N
drives where:
- the capacity is between (capacity * 1/(1+precision)), capacity * (1+precision)
- the failure rate is the lowest
"""
capacity_min = capacity_GB / (1 + precision)
capacity_max = capacity_GB * (1 + precision)
answer = (
data.where(f"capacity_GB between {capacity_min} and {capacity_max}")
.orderBy("failure_rate", "capacity_GB", ascending=[True, False])
.limit(top_n)
)
return answer
RDD's Pros
key value
pairs i.e. Python dictfrom pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
collection = [1, "two", 3.0, ("four", 4), {"five": 5}]
sc = spark.sparkContext
collection_rdd = sc.parallelize(collection)
print(collection_rdd)
# Map a simple function to each element to an RDD.
# This will raise an error because not all of the elements are integers
from py4j.protocol import Py4JJavaError
import re
def add_one(value):
return value + 1
collection_rdd = collection_rdd.map(add_one)
try:
print(collection_rdd.collect())
except Py4JJavaError as e:
pass
# Stack trace galore! The important bit, you'll get one of the following:
# TypeError: can only concatenate str (not "int") to str
# TypeError: unsupported operand type(s) for +: 'dict' and 'int'
# TypeError: can only concatenate tuple (not "int") to tuple
# Safer option with a try/except inside the function
def safer_add_one(value):
try:
return value + 1
except TypeError:
return value
# reset rdd
collection_rdd = sc.parallelize(collection)
print("Before: ", collection)
# run safe adding method
collection_rdd = collection_rdd.map(safer_add_one)
print("After : ", collection_rdd.collect())
filter
¶# Filtering RDD with lambda function to keep only int and floats
collection_rdd = sc.parallelize(collection)
collection_rdd = collection_rdd.filter(lambda x: isinstance(x, (float, int)))
print(collection_rdd.collect())
# Alternative: Creating a separate function
collection_rdd = sc.parallelize(collection)
def is_string(elem):
return True if isinstance(elem, str) else False
collection_rdd = collection_rdd.filter(is_string)
print(collection_rdd.collect())
reduce
¶# Add list of numbers through reduce
from operator import add
collection_rdd = sc.parallelize(range(10))
print(collection_rdd.reduce(add))
Only give reduce
commutative and associate functions.
subtract
is not because (a - b) - c != a - (b - c)
add
, multiply
, min
and max
are both associative and commutativeimport pyspark.sql.functions as F
import pyspark.sql.types as T
fractions = [[x,y] for x in range(100) for y in range(1, 100)]
frac_df = spark.createDataFrame(fractions, ["numerator", "denominator"])
frac_df = frac_df.select(
F.array(F.col("numerator"), F.col("denominator")).alias("fraction"),
)
frac_df.show(5, False)
This section will create a function to reduce a fraction and one to transform a fraction into a floating-point number.
from fractions import Fraction
from typing import Tuple, Optional
Frac = Tuple[int, int]
def py_reduce_fraction(frac: Frac) -> Optional[Frac]:
"""Reduce a fraction represented as a 2-tuple of integers"""
num, denom = frac
if denom:
answer = Fraction(num, denom)
return answer.numerator, answer.denominator
return None
assert py_reduce_fraction((3,6)) == (1, 2)
assert py_reduce_fraction((1, 0)) is None
def py_fraction_to_float(frac: Frac) -> Optional[float]:
"""Transforms a fraction represented as a 2-tuple of integer into a float"""
num, denom = frac
if denom:
return num / denom
return None
assert py_fraction_to_float((2, 8)) == 0.25
assert py_fraction_to_float((10, 0)) is None
SparkFrac = T.ArrayType(T.LongType())
# Promote python func to udf, passing SparkFrac type alias
reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)
# apply to existing dataframe
frac_df = frac_df.withColumn(
"reduced_fraction", reduce_fraction(F.col("fraction"))
)
frac_df.show(5, False)
udf()
decorator¶@F.udf(T.DoubleType())
def fraction_to_float(frac: Frac) -> Optional[float]:
num, denom = frac
if denom:
return num / denom
return None
frac_df = frac_df.withColumn(
"fraction_float", fraction_to_float(F.col("reduced_fraction"))
)
frac_df.select("reduced_fraction", "fraction_float").distinct().show(5, False)
assert fraction_to_float.func((1, 2)) == 0.5
This chapter will use:
The chapter assumes you are using PySpark 3.0 and above.
Series to Series
Columns
objects, converts to Pandas Series
and return Series
object that gets promoted back to Column
object.Iterator of Series to Iterator of Series
Column
is batched, then fed as a Iterator object. Column
, returns single Column
Iterator of multiples Series to Iterator of Series
Columns
as input but preserves iterator pattern.We will use the National Oceanic and Atmospheric Administration (NOAA) Global Surface Summary of the Day (GSOD) dataset.
After setting up Google Cloud Platform account, intiialize PySpark with the BigQuery connector enabled
The code below doesn't work due to a lot of issues with PyArrow compatability with Java 11. I've skipped this part and just downloaded the dataset from the author's github.
Reference:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set(
"spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true"
)
conf.set("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
conf.set(
"spark.jars.packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.19.1",
)
# spark = (
# SparkSession.builder
# .config(
# "spark.jars.packages",
# "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.19.1",
# )
# .config(
# "spark.driver.extraJavaOptions",
# "-Dio.netty.tryReflectionSetAccessible=true"
# )
# .config(
# "spark.executor.extraJavaOptions",
# "-Dio.netty.tryReflectionSetAccessible=true"
# )
# .getOrCreate()
# )
spark = SparkSession.builder.config(conf=conf).getOrCreate()
After initializing, read the stations
and gsod
tables for 2010 to 2020
from functools import reduce
import pyspark.sql.functions as F
def read_df_from_bq(year):
return (
spark.read.format("bigquery").option(
"table", f"bigquery-public-data.noaa_gsod.gsod{year}"
)
.option("credentialsFile", "/Users/taichinakatani/dotfiles/keys/bq-key.json")
.option("parentProject", "still-vim-244001")
.load()
)
# Because gsod2020 has an additional date column that the previous years do not have,
# unionByName will fill the values with null
gsod = (
reduce(
lambda x, y: x.unionByName(y, allowMissingColumns=True),
[read_df_from_bq(year) for year in range(2020, 2021)],
)
.dropna(subset=["year", "mo", "da", "temp"])
.where(F.col("temp") != 9999.9)
.drop("date")
)
gsod.select(F.col('year')).show(5)
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Read from local parquet instead
gsod = spark.read.load("data/gsod_noaa/gsod2018.parquet")
pandas_udf
helps with this.Using the pandas_udf
decorator is killing the kernel for some reason.
import pandas as pd
import pyspark.sql.types as T
import pyspark.sql.functions as F
# note the syntax "pandas_udf" and how it returns a pd.Series
# @F.pandas_udf(T.DoubleType())
def f_to_c(degrees: pd.Series) -> pd.Series:
"""Transforms Farhenheit to Celcius."""
return (degrees - 32) * 5 / 9
gsod = gsod.withColumn("temp_c", f_to_c(F.col("temp")))
gsod.select("temp", "temp_c").distinct().show(5)
(pd.Series) → pd.Series
to (Iterator[pd.Series]) → Iterator[pd.Series]
yield
than return
so function returns an iteratorfrom time import sleep
from typing import Iterator
@F.pandas_udf(T.DoubleType())
def f_to_c2(degrees: Iterator[pd.Series]) -> Iterator[pd.Series]:
"""Transforms Farhenheit to Celcius."""
sleep(5)
for batch in degrees:
yield (batch - 32) * 5 / 9
gsod.select(
"temp", f_to_c2(F.col("temp")).alias("temp_c")
).distinct().show(5)
# +-----+-------------------+
# | temp| temp_c|
# +-----+-------------------+
# | 37.2| 2.8888888888888906|
# | 85.9| 29.944444444444443|
# | 53.5| 11.944444444444445|
# | 71.6| 21.999999999999996|
# |-27.6|-33.111111111111114|
# +-----+-------------------+
# only showing top 5 rows
# Setup
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
conf = SparkConf()
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Read from local parquet
gsod = spark.read.parquet("data/gsod_noaa/gsod*.parquet")
# Using vanilla groupBy, we can get the lowest temperature but not when.
coldest_temp = gsod.groupby("year").agg(F.min("temp").alias("temp"))
coldest_temp.orderBy("temp").show()
# Using left-semi self-join to get the "when"
# Self joins are generally an anti-pattern because it is SLOW.
coldest_when = gsod.join(coldest_temp, how="left_semi", on=["year", "temp"]) \
.select("stn", "year", "mo", "da", "temp")
coldest_when.orderBy("year", "mo", "da").show()
# Using a window function instead
from pyspark.sql.window import Window
# To partition according to the values of one or more columns,
# we pass the column name (or a Column object) to the partitionBy() method.
each_year = Window.partitionBy("year")
# Window is a builder class, just like SparkSession.builder
print(each_year)
each_year
runs the aggregate function F.min("temp")
over each year, rather than the entire data frame.F.min("temp")
applies the minimum temperature for that year to all rows. This is then filtered to rows with temp
that matches the aggregate min_temp
.# Use the each_year builder class
gsod.withColumn("min_temp", F.min("temp").over(each_year)).where(
"temp = min_temp"
).select("year", "mo", "da", "stn", "temp").orderBy(
"year", "mo", "da"
).show()
Bonus:
partitionBy()
can be used on more than one columnselect
:# Using window function inside a select
gsod.select(
"year",
"mo",
"da",
"stn",
"temp",
F.min("temp").over(each_year).alias("min_temp"),
).where("temp = min_temp").drop("min_temp").orderBy(
"year", "mo", "da"
).show()
rank()
, dense_rank()
, percent_rank()
, ntile()
and row_number()
# Load lightweight dataset
gsod_light = spark.read.parquet("data/Window/gsod_light.parquet")
# Inspect
gsod_light.printSchema()
gsod_light.show()
rank
& dense_rank
¶rank
gives Olympic ranking (non-consecutive, when you have multiple records that tie for a rank, the next one will be offset by the number of ties)dense_rank
ranks consecutively. Ties share the same rank, but there won’t be any gap between the ranks. Useful when you just want a cardinal position over a window.# Inspect
gsod_light.printSchema()
gsod_light.show()
# Create new window, partitioning by year and ordering by number of temperature readings
temp_per_year_asc = Window.partitionBy("year").orderBy("count_temp")
temp_per_month_asc = Window.partitionBy("mo").orderBy("count_temp")
# Using rank() with window, we get the rank accordintg the value of count_temp column
print("Using rank()")
gsod_light.withColumn("rank_tpm", F.rank().over(temp_per_month_asc)).show()
# Using dense_rank() instead to get consecutive ranking by month
print("Using dense_rank()")
gsod_light.withColumn("rank_tpm", F.dense_rank().over(temp_per_month_asc)).show()
percent_rank
¶For every window percent_rank()
computes percentage rank (0-1) based on ordered value.
formula = # records with lower value than the current / # of records in the window - 1
temp_each_year = each_year.orderBy("temp")
gsod_light.withColumn("rank_tpm", F.percent_rank().over(temp_each_year)).show()
ntile()
¶Gives n-tile for a given param.
gsod_light.withColumn("rank_tpm", F.ntile(2).over(temp_each_year)).show()
row_number()
¶Given an ordered window, it will give a increasing rank regardless of ties.
gsod_light.withColumn("row_number", F.row_number().over(temp_each_year)).show()
# Creating a window with a descending ordered column
temp_per_month_desc = Window.partitionBy("mo").orderBy(F.col("count_temp").desc())
gsod_light.withColumn("row_number", F.row_number().over(temp_per_month_desc)).show()
lag
and lead
¶The two most important functions of the analytics functions family are called
lag(col, n=1, default=None)
andlead(col, n=1, default=None)
, which will give you the value of the col column of the n-th record before and after the record you’re over, respectively.
# Get temp of previous two records using lag()
print("Temp of previous two records over each year")
gsod_light.withColumn(
"previous_temp", F.lag("temp").over(temp_each_year)
).withColumn(
"previous_temp_2", F.lag("temp", 2).over(temp_each_year)
).show()
print("Temp delta of previous record over each year")
gsod_light.withColumn(
"previous_temp_delta", F.round(F.col("temp") - F.lag("temp").over(temp_each_year), 2)
).select(["year", "mo", "temp", "previous_temp_delta"]).show()
cume_dist()
¶F(x)
for the records in the data frame.print("Percent rank vs. Cumulative distribution of temperature over each year")
gsod_light.withColumn(
"percen_rank" , F.percent_rank().over(temp_each_year)
).withColumn("cume_dist", F.cume_dist().over(temp_each_year)).show()
...more to come