1. Word Count with FlatMap, Map, and ReduceByKey #
Objective: Count the frequency of each word in a large text file.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Load the data
text_file = sc.textFile("path/to/your/textfile.txt")
# Word count
word_counts = (text_file
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b))
# Collect and print results
print(word_counts.collect())
2. Filter and Aggregation #
Objective: Filter out even numbers and compute the sum and average of the remaining numbers.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Load the data
numbers = sc.parallelize(range(1, 101))
# Filter and aggregate
odd_numbers = numbers.filter(lambda x: x % 2 != 0)
sum_of_odds = odd_numbers.reduce(lambda a, b: a + b)
count_of_odds = odd_numbers.count()
average_of_odds = sum_of_odds / count_of_odds
print(f"Sum: {sum_of_odds}, Average: {average_of_odds}")
3. Join Operations #
Objective: Perform an inner join on two RDDs.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create two RDDs
rdd1 = sc.parallelize([(1, "Alice"), (2, "Bob"), (3, "Charlie")])
rdd2 = sc.parallelize([(1, 100), (2, 200), (4, 400)])
# Perform inner join
joined_rdd = rdd1.join(rdd2)
# Collect and print results
print(joined_rdd.collect())
4. GroupByKey and MapValues #
Objective: Group transactions by customer and calculate the total amount spent by each customer.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
transactions = sc.parallelize([
(1, 100), (2, 150), (1, 200), (3, 300), (2, 100), (1, 50)
])
# Group by key and sum the values
customer_spend = transactions.groupByKey().mapValues(lambda amounts: sum(amounts))
# Collect and print results
print(customer_spend.collect())
5. Partitioning and Repartitioning #
Objective: Demonstrate partitioning and repartitioning of an RDD.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
rdd = sc.parallelize(range(1, 101), numSlices=10)
# Print number of partitions
print(f"Initial number of partitions: {rdd.getNumPartitions()}")
# Repartition the RDD
repartitioned_rdd = rdd.repartition(5)
# Print number of partitions after repartitioning
print(f"Number of partitions after repartitioning: {repartitioned_rdd.getNumPartitions()}")
6. Sorting and Collecting Top N #
Objective: Sort an RDD and collect the top N elements.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
rdd = sc.parallelize([(1, 200), (2, 100), (3, 300), (4, 400), (5, 150)])
# Sort by value and collect top 3
top_3 = rdd.takeOrdered(3, key=lambda x: -x[1])
# Print top 3 elements
print(top_3)
7. Broadcast Variables and Accumulators #
Objective: Use broadcast variables and accumulators in a distributed computation.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
rdd = sc.parallelize(range(1, 101))
# Broadcast variable
broadcast_var = sc.broadcast(10)
# Accumulator
accum = sc.accumulator(0)
# Use broadcast variable and accumulator
def process(x):
accum.add(x * broadcast_var.value)
return x
processed_rdd = rdd.map(process).collect()
# Print accumulator value
print(f"Accumulated value: {accum.value}")
8. Cartesian Product and Action #
Objective: Perform a Cartesian product of two RDDs and apply an action.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create two RDDs
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(["a", "b", "c"])
# Cartesian product
cartesian_rdd = rdd1.cartesian(rdd2)
# Collect and print results
print(cartesian_rdd.collect())
9. Caching and Persistence #
Objective: Demonstrate the use of caching and persistence in Spark.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
rdd = sc.parallelize(range(1, 10001))
# Cache the RDD
rdd.cache()
# Perform actions
sum_value = rdd.sum()
count_value = rdd.count()
# Print results
print(f"Sum: {sum_value}, Count: {count_value}")
10. Complex Transformation and Action #
Objective: Apply a series of transformations and actions on an RDD.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create an RDD
rdd = sc.parallelize(range(1, 101))
# Apply transformations
transformed_rdd = (rdd
.filter(lambda x: x % 2 == 0)
.map(lambda x: (x, x * 2))
.flatMap(lambda x: (x, (x[0], x[1] + 1)))
.distinct())
# Apply actions
result = transformed_rdd.collect()
# Print results
print(result)
These programs cover various Spark transformations and actions, providing practical examples to deepen your understanding of Spark’s powerful data processing capabilities. Each program is designed to be complex enough to challenge your understanding while being straightforward enough to demonstrate specific Spark functionalities.