JSON - read and write


file-io

If you're using Jupyter notebook setup, here is the code I used to initialize things:

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity='all'


from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local[*]").getOrCreate()

Required format of JSON objects in files

There are 2 types of json files:

  1. Single line JSON
  2. Multiline JSON

Single line json

Every line has to have a valid json object. That's it. This is also called as the JsonLine format.

Don't wrap the json objects into an array.

Correct single line json

  • One valid json object per line
  • Json objects are not wrapped in an array
  • Every json object can end with a comma. Spark reader won't care.
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}

Wrong single line json

Objects are wrapped in an array using braces. So spark will treat the lines with braces as a single json object, and treat them as corrupt records.

Example input:

[
    {"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1},
    {"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2},
    {"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1},
    {"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
]

The loaded dataframe looks like -

+---------------+------+----------+----+-----+----+
|_corrupt_record|amount|customerid| day|month|year|
+---------------+------+----------+----+-----+----+
|              [|  NULL|      NULL|NULL| NULL|NULL|
|           NULL|  1000|         1|   1|    1|2022|
|           NULL|  1000|         1|   2|    1|2022|
|           NULL|  2000|         2|   1|    1|2022|
|           NULL|  2000|         2|   2|    1|2022|
|              ]|  NULL|      NULL|NULL| NULL|NULL|
+---------------+------+----------+----+-----+----+

Multiline json

You'll have to wrap the json objects into an array [ ...json objects array... ].

Wrap the json objects in an array. If you don't, spark will only read the first json object, and skip the rest.

Correct multiline json

  • Objects are wrapped in an array using braces
[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 2000
  }
]

Wrong multiline json

Objects are not wrapped in an array using braces. So spark will only read the first object, and skip the rest

{
  "customerid": 1,
  "year": 2022,
  "month": 1,
  "day": 1,
  "amount": 1000
}
{
  "customerid": 1,
  "year": 2022,
  "month": 1,
  "day": 2,
  "amount": 1000
}
{
  "customerid": 2,
  "year": 2022,
  "month": 1,
  "day": 1,
  "amount": 2000
}
{
  "customerid": 2,
  "year": 2022,
  "month": 1,
  "day": 2,
  "amount": 2000
}

The loaded dataframe looks like -

+------+---+-----+----+
|amount|day|month|year|
+------+---+-----+----+
|  1000|  1|    1|2022|
+------+---+-----+----+

Read JSON

Read single line json

Sample input file:

{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}

Code:

df = (
    spark.read
    .format("json")
    .load("json_multi.json")
)

Read multiline json

Sample input file:

[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 2000
  }
]

Code:

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_multi.json")
)

Find all read options for Json at - Json data source options documentation

Set schema

When a json object is read. Spark parses the object and automatically infers schema. You can see this using df.printSchema()

Set schema of simple json

Sample input:

[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 2000
  }
]

Code:

schema = (
    StructType()
    .add("customerid", IntegerType(), True)
    .add("amount", IntegerType(), True)
    .add("year", IntegerType(), True)
    .add("month", IntegerType(), True)
    .add("day", IntegerType(), True)
)
df = (
    spark.read
    .format("json")
    .schema(schema)
    .option("multiline","true")
    .load("json_multi.json")
)
df.show()
df.printSchema()

Output

+----------+------+----+-----+---+
|customerid|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  2000|2022|    1|  1|
|         2|  2000|2022|    1|  2|
+----------+------+----+-----+---+

root
 |-- customerid: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

Set schema of complex json

Nested objects

Supose instead of directly giving you year, month and date columns, the columns are wrapped inside a date object. Let's set schema for this json.

Sample input:

[
  {
    "customerid": 1,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 1,
      "amount": 1000
    }
  },
  {
    "customerid": 1,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 2,
      "amount": 1000
    }
  },
  {
    "customerid": 2,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 1,
      "amount": 2000
    }
  },
  {
    "customerid": 2,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 2,
      "amount": 2000
    }
  }
]

Basic read without schema

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_object.json")
)
# print dataframe
df.show()
# print schema that spark inferred
df.printSchema()

Output:

+------+----------+------------------+
|amount|customerid|              date|
+------+----------+------------------+
|  1000|         1|{1000, 1, 1, 2022}|
|  1000|         1|{1000, 2, 1, 2022}|
|  2000|         2|{2000, 1, 1, 2022}|
|  2000|         2|{2000, 2, 1, 2022}|
+------+----------+------------------+

root
 |-- amount: long (nullable = true)
 |-- customerid: long (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- amount: long (nullable = true)
 |    |-- day: long (nullable = true)
 |    |-- month: long (nullable = true)
 |    |-- year: long (nullable = true)

Set schema while reading

Set schema (datatypes of all fields in the objects).

Spark inferred all the objects to have type long. Suppose we want them all to be integers.

schema = (
    StructType()
    .add("customerid", IntegerType(), True)
    .add("amount", IntegerType(), True)
    .add("date", (
            StructType()
                .add("year", IntegerType(), True)
                .add("month", IntegerType(), True)
                .add("day", IntegerType(), True)
        )
         , True)
)
df = (
    spark.read
    .format("json")
    .schema(schema)
    .option("multiline","true")
    .load("json_nested_object.json")
)
df.show()
df.printSchema()

Output:

As you can see, all the fields are now treated as integers. Just as we defined in the schema.

+----------+------+------------+
|customerid|amount|        date|
+----------+------+------------+
|         1|  1000|{2022, 1, 1}|
|         1|  1000|{2022, 1, 2}|
|         2|  2000|{2022, 1, 1}|
|         2|  2000|{2022, 1, 2}|
+----------+------+------------+

root
 |-- customerid: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- year: integer (nullable = true)
 |    |-- month: integer (nullable = true)
 |    |-- day: integer (nullable = true)

Bonus: Flatten the dataframe

Suppose instead of the date column with json inside it, you want separate columns for year, month and day. Just select the fields from the colums.

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_object.json")
)
df_flat = df.select(
    col("customerId"),
    col("amount"),
    col("date.year").alias("year"),
    col("date.month").alias("month"),
    col("date.day").alias("day"),
)

df:

+----------+------+------------+
|customerid|amount|        date|
+----------+------+------------+
|         1|  1000|{2022, 1, 1}|
|         1|  1000|{2022, 1, 2}|
|         2|  2000|{2022, 1, 1}|
|         2|  2000|{2022, 1, 2}|
+----------+------+------------+

df_flat:

+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  2000|2022|    1|  1|
|         2|  2000|2022|    1|  2|
+----------+------+----+-----+---+

Objects nested within array

Sample input:

[
  {
    "customerid": 1,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
          "amount": 1000
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
          "amount": 1000
        }
      }
    ]
  },
  {
    "customerid": 2,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
          "amount": 2000
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
          "amount": 2000
        }
      }
    ]
  }
]

Basic read without schema

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_array.json")
)
df.show()
df.printSchema()

Output

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {1000, 1,...|
|         2|[{1000, {2000, 1,...|
+----------+--------------------+

root
 |-- customerid: long (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: long (nullable = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- amount: long (nullable = true)
 |    |    |    |-- day: long (nullable = true)
 |    |    |    |-- month: long (nullable = true)
 |    |    |    |-- year: long (nullable = true)

Set schema while reading

Set all fields to be of type Integer

schema = (
    StructType()
    .add("customerid", IntegerType(), True)
    .add(
        "orders",
        ArrayType(
            StructType()
            .add("amount", IntegerType(), True)
            .add(
                "date",
                (
                    StructType()
                    .add("year", IntegerType(), True)
                    .add("month", IntegerType(), True)
                    .add("day", IntegerType(), True)
                ),
                True,
            )
        ),
        True,
    )
)
df = (
    spark.read.format("json")
    .schema(schema)
    .option("multiline", "true")
    .load("json_nested_array.json")
)
df.show()
df.printSchema()

Output:

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {2022, 1,...|
|         2|[{1000, {2022, 1,...|
+----------+--------------------+

root
 |-- customerid: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: integer (nullable = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- year: integer (nullable = true)
 |    |    |    |-- month: integer (nullable = true)
 |    |    |    |-- day: integer (nullable = true)

Bonus: Flatten the dataframe

Consider this input sample:

[
  {
    "customerid": 1,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
          "amount": 1000
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
          "amount": 1000
        }
      }
    ]
  },
  {
    "customerid": 2,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
          "amount": 2000
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
          "amount": 2000
        }
      }
    ]
  }
]
from pyspark.sql import functions as F

schema = (
    StructType()
    .add("customerid", IntegerType(), True)
    .add(
        "orders",
        ArrayType(
            StructType()
            .add("amount", IntegerType(), True)
            .add(
                "date",
                (
                    StructType()
                    .add("year", IntegerType(), True)
                    .add("month", IntegerType(), True)
                    .add("day", IntegerType(), True)
                ),
                True,
            )
        ),
        True,
    )
)

# read dataframe
df = (
    spark.read.format("json")
    .schema(schema)
    .option("multiline", "true")
    .load("json_nested_array.json")
)
df.show()

# explode the dataframe on orders, to create one row for every array object
df_exploded = df.withColumn("orders", F.explode(col("orders")))
df_exploded.show()

# flatten the dataframe
df_flat = (
    df.withColumn("orders", F.explode(col("orders")))
    .select(
        col("customerId"),
        col("orders.amount"),
        col("orders.date.year"),
        col("orders.date.month"),
        col("orders.date.day")
)
    )
df_flat.show()

Outputs:

df

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {2022, 1,...|
|         2|[{1000, {2022, 1,...|
+----------+--------------------+

df_expoded

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|{1000, {2022, 1, 1}}|
|         1|{1000, {2022, 1, 2}}|
|         2|{1000, {2022, 1, 1}}|
|         2|{1000, {2022, 1, 2}}|
+----------+--------------------+

df_flat

+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  1000|2022|    1|  1|
|         2|  1000|2022|    1|  2|
+----------+------+----+-----+---+

Write JSON

The barebones write method template is:

(
    orders_df
    .write
    .format("json")
    .mode("overwrite")
    .save("data/orders")
)

Partition written data on specific columns

To make reading data faster and more optimized, by increasing parallelization.

Eg - Partition by year, month, date:

(
    orders_df
    .write
    .format("json")
    .mode("overwrite")
    .partitionBy("year", "month", "day")
    .save("data/sales")
)

TABLE OF CONTENTS