Here’s a detailed list of PySpark DataFrame operations, categorized for clarity:
1. Creating DataFrames #
- From existing RDD
df = spark.createDataFrame(rdd, schema)
- From a CSV file
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
- From a JSON file
df = spark.read.json("path/to/file.json")
- From a Parquet file
df = spark.read.parquet("path/to/file.parquet")
2. Basic Operations #
- Show data
df.show()
df.show(n=20, truncate=True, vertical=False)
- Print schema
df.printSchema()
- Get data types
df.dtypes
3. Data Selection and Filtering #
- Select columns
df.select("column1", "column2").show()
- Filter rows
df.filter(df["column"] > value).show()
- Distinct values
df.select("column").distinct().show()
4. Column Operations #
- Add new column
df.withColumn("new_column", df["existing_column"] + 1).show()
- Rename column
df.withColumnRenamed("old_name", "new_name").show()
- Drop column
df.drop("column").show()
5. Aggregations and Grouping #
- Group by and aggregate
df.groupBy("column").count().show()
df.groupBy("column").agg({"another_column": "sum"}).show()
- Aggregate without grouping
df.agg({"column": "max"}).show()
6. Joins #
- Inner join
df1.join(df2, df1["key"] == df2["key"], "inner").show()
- Left join
df1.join(df2, df1["key"] == df2["key"], "left").show()
- Right join
df1.join(df2, df1["key"] == df2["key"], "right").show()
- Full outer join
df1.join(df2, df1["key"] == df2["key"], "outer").show()
7. Sorting and Ordering #
- Sort by column
df.sort("column").show()
df.sort(df["column"].desc()).show()
- Order by column
df.orderBy("column").show()
8. Window Functions #
- Define a window
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("column").orderBy("another_column")
- Apply window function
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec)).show()
9. Handling Missing Data #
- Drop missing data
df.dropna().show()
df.dropna(subset=["column1", "column2"]).show()
- Fill missing data
df.fillna({"column1": 0, "column2": "unknown"}).show()
10. Dataframe Operations #
- Union
df1.union(df2).show()
- Intersect
df1.intersect(df2).show()
- Subtract
df1.subtract(df2).show()
11. Advanced Functions #
- UDF (User Defined Functions)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def sample_udf(column_value):
return "prefix_" + column_value
sample_udf = udf(sample_udf, StringType())
df.withColumn("new_column", sample_udf(df["existing_column"])).show()
- Pivoting
df.groupBy("column").pivot("pivot_column").sum("value_column").show()
12. Saving DataFrames #
- Save as CSV
df.write.csv("path/to/save.csv", header=True)
- Save as JSON
df.write.json("path/to/save.json")
- Save as Parquet
df.write.parquet("path/to/save.parquet")
This list covers a broad range of PySpark DataFrame operations that are commonly used in data processing and analysis. If you need more detailed examples or have specific use cases in mind, feel free to ask!