Here are five critical practice programs involving Spark DataFrame operations, along with their solutions:
Practice Program 1: Reading and Writing DataFrames #
Problem Statement: Read a CSV file into a Spark DataFrame, perform some transformations, and write the transformed DataFrame to a new CSV file.
Solution:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("ReadWriteDataFrame").getOrCreate()
# Read CSV file into DataFrame
df = spark.read.csv("input.csv", header=True, inferSchema=True)
# Perform transformations
df_filtered = df.filter(df['age'] > 30)
df_selected = df_filtered.select('name', 'age')
# Write transformed DataFrame to a new CSV file
df_selected.write.csv("output.csv", header=True)
# Stop Spark session
spark.stop()
Practice Program 2: DataFrame Aggregations #
Problem Statement: Calculate the average salary for each department from a given DataFrame.
Solution:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameAggregation").getOrCreate()
# Sample data
data = [("John", "HR", 3000), ("Doe", "HR", 4000),
("Jane", "IT", 5000), ("Mary", "IT", 6000)]
# Create DataFrame
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Group by department and calculate average salary
avg_salary_df = df.groupBy("department").avg("salary")
# Show the result
avg_salary_df.show()
# Stop Spark session
spark.stop()
Practice Program 3: Joining DataFrames #
Problem Statement: Join two DataFrames on a common column and select specific columns from the resulting DataFrame.
Solution:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("JoinDataFrames").getOrCreate()
# Sample data
data1 = [("John", 1), ("Doe", 2), ("Jane", 3)]
data2 = [(1, "HR"), (2, "IT"), (3, "Finance")]
# Create DataFrames
columns1 = ["name", "dept_id"]
columns2 = ["dept_id", "department"]
df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)
# Join DataFrames on dept_id
joined_df = df1.join(df2, "dept_id")
# Select specific columns
result_df = joined_df.select("name", "department")
# Show the result
result_df.show()
# Stop Spark session
spark.stop()
Practice Program 4: Handling Missing Data #
Problem Statement: Handle missing data in a DataFrame by filling missing values with a default value.
Solution:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("HandleMissingData").getOrCreate()
# Sample data
data = [("John", None), ("Doe", 25), ("Jane", None), ("Mary", 30)]
# Create DataFrame
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Fill missing values with a default value
df_filled = df.fillna({'age': 0})
# Show the result
df_filled.show()
# Stop Spark session
spark.stop()
Practice Program 5: DataFrame UDF (User Defined Functions) #
Problem Statement: Create a custom function to transform a column in a DataFrame using a UDF.
Solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameUDF").getOrCreate()
# Sample data
data = [("John", "HR"), ("Doe", "IT"), ("Jane", "Finance")]
# Create DataFrame
columns = ["name", "department"]
df = spark.createDataFrame(data, columns)
# Define UDF to convert department to uppercase
def convert_uppercase(department):
return department.upper()
# Register UDF
convert_uppercase_udf = udf(convert_uppercase, StringType())
# Apply UDF to DataFrame
df_transformed = df.withColumn("department_upper", convert_uppercase_udf(df["department"]))
# Show the result
df_transformed.show()
# Stop Spark session
spark.stop()
These practice programs cover reading/writing data, aggregations, joins, handling missing data, and using UDFs in Spark DataFrame operations. They provide a solid foundation for working with Spark DataFrames in various scenarios.