If you're using Jupyter notebook setup, here is the code I used to initialize things:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity='all'
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.master("local[*]").getOrCreate()
Required format of JSON objects in files
There are 2 types of json files:
- Single line JSON
- Multiline JSON
Single line json
Every line has to have a valid json object. That's it. This is also called as the JsonLine format.
Don't wrap the json objects into an array.
Correct single line json
- One valid json object per line
- Json objects are not wrapped in an array
- Every json object can end with a comma. Spark reader won't care.
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
Wrong single line json
Objects are wrapped in an array using braces. So spark will treat the lines with braces as a single json object, and treat them as corrupt records.
Example input:
[
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1},
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2},
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1},
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
]
The loaded dataframe looks like -
+---------------+------+----------+----+-----+----+
|_corrupt_record|amount|customerid| day|month|year|
+---------------+------+----------+----+-----+----+
| [| NULL| NULL|NULL| NULL|NULL|
| NULL| 1000| 1| 1| 1|2022|
| NULL| 1000| 1| 2| 1|2022|
| NULL| 2000| 2| 1| 1|2022|
| NULL| 2000| 2| 2| 1|2022|
| ]| NULL| NULL|NULL| NULL|NULL|
+---------------+------+----------+----+-----+----+
Multiline json
You'll have to wrap the json objects into an array [ ...json objects array... ].
Wrap the json objects in an array. If you don't, spark will only read the first json object, and skip the rest.
Correct multiline json
- Objects are wrapped in an array using braces
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
]
Wrong multiline json
Objects are not wrapped in an array using braces. So spark will only read the first object, and skip the rest
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
}
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
}
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
The loaded dataframe looks like -
+------+---+-----+----+
|amount|day|month|year|
+------+---+-----+----+
| 1000| 1| 1|2022|
+------+---+-----+----+
Read JSON
Read single line json
Sample input file:
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
Code:
df = (
spark.read
.format("json")
.load("json_multi.json")
)
Read multiline json
Sample input file:
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
]
Code:
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_multi.json")
)
Find all read options for Json at - Json data source options documentation
Set schema
When a json object is read. Spark parses the object and automatically infers schema.
You can see this using df.printSchema()
Set schema of simple json
Sample input:
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
]
Code:
schema = (
StructType()
.add("customerid", IntegerType(), True)
.add("amount", IntegerType(), True)
.add("year", IntegerType(), True)
.add("month", IntegerType(), True)
.add("day", IntegerType(), True)
)
df = (
spark.read
.format("json")
.schema(schema)
.option("multiline","true")
.load("json_multi.json")
)
df.show()
df.printSchema()
Output
+----------+------+----+-----+---+
|customerid|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 2000|2022| 1| 1|
| 2| 2000|2022| 1| 2|
+----------+------+----+-----+---+
root
|-- customerid: integer (nullable = true)
|-- amount: integer (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
Set schema of complex json
Nested objects
Supose instead of directly giving you year, month and date columns, the columns are wrapped inside a date object. Let's set schema for this json.
Sample input:
[
{
"customerid": 1,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
}
},
{
"customerid": 1,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
}
},
{
"customerid": 2,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
},
{
"customerid": 2,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
}
]
Basic read without schema
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_object.json")
)
# print dataframe
df.show()
# print schema that spark inferred
df.printSchema()
Output:
+------+----------+------------------+
|amount|customerid| date|
+------+----------+------------------+
| 1000| 1|{1000, 1, 1, 2022}|
| 1000| 1|{1000, 2, 1, 2022}|
| 2000| 2|{2000, 1, 1, 2022}|
| 2000| 2|{2000, 2, 1, 2022}|
+------+----------+------------------+
root
|-- amount: long (nullable = true)
|-- customerid: long (nullable = true)
|-- date: struct (nullable = true)
| |-- amount: long (nullable = true)
| |-- day: long (nullable = true)
| |-- month: long (nullable = true)
| |-- year: long (nullable = true)
Set schema while reading
Set schema (datatypes of all fields in the objects).
Spark inferred all the objects to have type long. Suppose we want them all to be integers.
schema = (
StructType()
.add("customerid", IntegerType(), True)
.add("amount", IntegerType(), True)
.add("date", (
StructType()
.add("year", IntegerType(), True)
.add("month", IntegerType(), True)
.add("day", IntegerType(), True)
)
, True)
)
df = (
spark.read
.format("json")
.schema(schema)
.option("multiline","true")
.load("json_nested_object.json")
)
df.show()
df.printSchema()
Output:
As you can see, all the fields are now treated as integers. Just as we defined in the schema.
+----------+------+------------+
|customerid|amount| date|
+----------+------+------------+
| 1| 1000|{2022, 1, 1}|
| 1| 1000|{2022, 1, 2}|
| 2| 2000|{2022, 1, 1}|
| 2| 2000|{2022, 1, 2}|
+----------+------+------------+
root
|-- customerid: integer (nullable = true)
|-- amount: integer (nullable = true)
|-- date: struct (nullable = true)
| |-- year: integer (nullable = true)
| |-- month: integer (nullable = true)
| |-- day: integer (nullable = true)
Bonus: Flatten the dataframe
Suppose instead of the date column with json inside it, you want separate columns for year, month and day. Just select the fields from the colums.
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_object.json")
)
df_flat = df.select(
col("customerId"),
col("amount"),
col("date.year").alias("year"),
col("date.month").alias("month"),
col("date.day").alias("day"),
)
df:
+----------+------+------------+
|customerid|amount| date|
+----------+------+------------+
| 1| 1000|{2022, 1, 1}|
| 1| 1000|{2022, 1, 2}|
| 2| 2000|{2022, 1, 1}|
| 2| 2000|{2022, 1, 2}|
+----------+------+------------+
df_flat:
+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 2000|2022| 1| 1|
| 2| 2000|2022| 1| 2|
+----------+------+----+-----+---+
Objects nested within array
Sample input:
[
{
"customerid": 1,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
}
}
]
},
{
"customerid": 2,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
}
]
}
]
Basic read without schema
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_array.json")
)
df.show()
df.printSchema()
Output
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {1000, 1,...|
| 2|[{1000, {2000, 1,...|
+----------+--------------------+
root
|-- customerid: long (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: long (nullable = true)
| | |-- date: struct (nullable = true)
| | | |-- amount: long (nullable = true)
| | | |-- day: long (nullable = true)
| | | |-- month: long (nullable = true)
| | | |-- year: long (nullable = true)
Set schema while reading
Set all fields to be of type Integer
schema = (
StructType()
.add("customerid", IntegerType(), True)
.add(
"orders",
ArrayType(
StructType()
.add("amount", IntegerType(), True)
.add(
"date",
(
StructType()
.add("year", IntegerType(), True)
.add("month", IntegerType(), True)
.add("day", IntegerType(), True)
),
True,
)
),
True,
)
)
df = (
spark.read.format("json")
.schema(schema)
.option("multiline", "true")
.load("json_nested_array.json")
)
df.show()
df.printSchema()
Output:
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {2022, 1,...|
| 2|[{1000, {2022, 1,...|
+----------+--------------------+
root
|-- customerid: integer (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: integer (nullable = true)
| | |-- date: struct (nullable = true)
| | | |-- year: integer (nullable = true)
| | | |-- month: integer (nullable = true)
| | | |-- day: integer (nullable = true)
Bonus: Flatten the dataframe
Consider this input sample:
[
{
"customerid": 1,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
}
}
]
},
{
"customerid": 2,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
"amount": 2000
}
}
]
}
]
from pyspark.sql import functions as F
schema = (
StructType()
.add("customerid", IntegerType(), True)
.add(
"orders",
ArrayType(
StructType()
.add("amount", IntegerType(), True)
.add(
"date",
(
StructType()
.add("year", IntegerType(), True)
.add("month", IntegerType(), True)
.add("day", IntegerType(), True)
),
True,
)
),
True,
)
)
# read dataframe
df = (
spark.read.format("json")
.schema(schema)
.option("multiline", "true")
.load("json_nested_array.json")
)
df.show()
# explode the dataframe on orders, to create one row for every array object
df_exploded = df.withColumn("orders", F.explode(col("orders")))
df_exploded.show()
# flatten the dataframe
df_flat = (
df.withColumn("orders", F.explode(col("orders")))
.select(
col("customerId"),
col("orders.amount"),
col("orders.date.year"),
col("orders.date.month"),
col("orders.date.day")
)
)
df_flat.show()
Outputs:
df
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {2022, 1,...|
| 2|[{1000, {2022, 1,...|
+----------+--------------------+
df_expoded
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|{1000, {2022, 1, 1}}|
| 1|{1000, {2022, 1, 2}}|
| 2|{1000, {2022, 1, 1}}|
| 2|{1000, {2022, 1, 2}}|
+----------+--------------------+
df_flat
+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 1000|2022| 1| 1|
| 2| 1000|2022| 1| 2|
+----------+------+----+-----+---+
Write JSON
The barebones write method template is:
(
orders_df
.write
.format("json")
.mode("overwrite")
.save("data/orders")
)
Partition written data on specific columns
To make reading data faster and more optimized, by increasing parallelization.
Eg - Partition by year, month, date:
(
orders_df
.write
.format("json")
.mode("overwrite")
.partitionBy("year", "month", "day")
.save("data/sales")
)