DataFrame

chenxin
16
2024-11-04

DataFrame的组成

结构层面

  • StructType对象:描述整个DataFrame表结构
  • StructType对象:描述一个列信息

数据层面

  • Row对象:记录一行对象
  • Column对象:记录一个列的数据以及列信息

DataFrame的创建

基于RDD创建

方式 1:基于 RDD 直接创建

# spark为SparkSession对象
rdd=sc.parallelize([('zhangsan',19),('lisi',20)])
# 指定列名分别是name和age
df=spark.createDataFrame(rdd,schema=['name','age'])

示例

from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    # 获取SparkSession对象
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    # 创建RDD
    rdd = spark.sparkContext \
        .textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
        .map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
    df = spark.createDataFrame(rdd, schema=['name', 'age'])
    # 打印表结构
    df.printSchema()
    # 展示前20条数据,不截断,全部展示
    df.show(20, False)
    # 将dataframe转换为临时视图表,使用sql
    df.createTempView("people")
    spark.sql("select * from people where age > 19").show()


从图中可以看出,age为long类型,dataframe会根据rdd中的数据类型自动推断

方式2:基于 RDD 和 StructType 对象创建

rdd=sc.parallelize([('zhangsan',19),('lisi',20)])
# field:列名,data_type:列类型,nullable:是否允许为空
schema = StructType().\
      add("name",StringType(),False).\
      add(field="age",data_type=IntegerType(),nullable=True)
df=spark.createDataFrame(rdd,schema=schema)

示例

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,NullType,IntegerType
import os
if __name__ == "__main__":
  os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
  # 获取SparkSession对象
  spark = SparkSession.builder \
      .appName("create_1") \
      .master("local[*]") \
      .getOrCreate()

  rdd = spark.sparkContext \
      .textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
      .map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
  # 只有此处不同,构建StructType对象
  schema = StructType().\
      add("name",StringType(),False).\
      add(field="age",data_type=IntegerType(),nullable=True)
  # 基于rdd和StructType对象构建DataFrame
  df=spark.createDataFrame(rdd,schema=schema)
  df.printSchema()
  df.show(100)

使用 RDD 快捷创建 DataFrame

# toDF,方式一,不指定列类型,只写列名,Spark会根据rdd内容自动推断列类型
df1=rdd.toDF(['name','age'])
# toDF,方式二,使用StructType对象
schema = StructType().\
        add("name",StringType(),False).\
        add(field="age",data_type=IntegerType(),nullable=True)
df2=rdd.toDF(schema=schema)

示例

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,NullType,IntegerType
import os
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    # 获取SparkSession对象
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()

    rdd = spark.sparkContext \
        .textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
        .map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
    # 构建StructType对象
    schema = StructType().\
        add("name",StringType(),False).\
        add(field="age",data_type=IntegerType(),nullable=True)
    # toDF方式1
    df1=rdd.toDF(schema=['name','age'])
    df1.printSchema()
    # toDF方式2
    df2=rdd.toDF(schema=schema)
    df2.printSchema()

基于Pandas的DataFrame转换为Spark的DataFrame

from pyspark.sql import SparkSession
import os
import pandas as pd
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    # 创建Pandas的DataFrame
    pdf=pd.DataFrame(
        {
            "id":[1],
            "name":["chenxin"],
            "age":[20]
        }
    )
    df=spark.createDataFrame(pdf)
    df.printSchema()
    df.show()

读取外部数据构建DataFrame

语法

# 可以选多种数据源
df=sparksession.read.format("text|csv|json|parquet")
  .option("K","V") #option可选
  .schema(StructType | String) # String类型可以为 ("name STRING","age INT")
  .load("读取文件路径,支持本地文件系统和hdfs文件系统")

读取text数据源

当数据源是text时,spark会将文本文件中的每一行作为列的一个值,同时dataframe只有一列,若不指定schema,则列名默认是value

from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    sc=spark.sparkContext
    # 获取StructType对象
    schema = StructType().add("data",StringType(),nullable=False)
    # 读取text数据源
    df=spark.read.format("text").\
        schema(schema=schema).\
        load(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt")
    df.printSchema()
    df.show(20)

读取Json类型

json类型文件不需要指定schema,因为json自带schema

from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    sc=spark.sparkContext
    df=spark.read.format("json").\
        load(r"D:\dev\python-learn\sparklearn\DataFrame\people.json")
    df.printSchema()
    df.show(20)

读取csv数据格式

csv格式的文件有许多的文件属性可以设置

  • sep:
    描述:设置分隔符。
    默认值:,
    示例:.option("sep", ";")
  • header:
    描述:指定 CSV 文件是否包含表头。
    默认值:False
    示例:.option("header", True)
  • encoding:
    描述:设置文件的编码格式。
    默认值:UTF-8
    示例:.option("encoding", "ISO-8859-1")
    schema:
    描述:手动指定 CSV 文件的数据模式(StructType)。
    默认值:None,Spark 可以自动推断数据类型(通过 inferSchema)。
  • inferSchema:
    描述:是否启用自动数据类型推断。
    默认值:False
    示例:.option("inferSchema", True)
  • nullValue:
    描述:指定哪些字符串应被视为 null 值。
    示例:.option("nullValue", "NA")
  • quote:
    描述:指定用于引用字段的字符。
    默认值:"
    示例:.option("quote", "'")
  • escape:
    描述:指定用于转义引号的字符。
    示例:.option("escape", "\")
  • comment:
    描述:将某个字符指定为注释行的标记,行首带此字符的行会被跳过。
    示例:.option("comment", "#")
  • mode:
    描述:控制如何处理文件中有问题的行。
    值:"PERMISSIVE"(默认值,尽量解析)、"DROPMALFORMED"(丢弃有问题的行)、"FAILFAST"(遇到问题时立即报错)。
    示例:.option("mode", "DROPMALFORMED")
  • timestampFormat:
    描述:指定时间戳格式。
    默认值:yyyy-MM-dd'T'HH:mm:ss.SSSXXX
    示例:.option("timestampFormat", "MM/dd/yyyy HH:mm:ss")
  • maxColumns:
    描述:限制 CSV 文件中允许的最大列数。
    示例:.option("maxColumns", 100)
  • maxCharsPerColumn:
    描述:限制单个列允许的最大字符数。
    示例:.option("maxCharsPerColumn", 4096)
  • ignoreLeadingWhiteSpace / ignoreTrailingWhiteSpace:
    描述:是否忽略每个字段的前导或尾随空格。
    默认值:True
    示例:.option("ignoreLeadingWhiteSpace", True)
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    sc=spark.sparkContext
    df=spark.read.format("csv").\
        option("sep",",").\ # 设置分隔符为,
        option("header",True).\ # 是否有表头
        option("encoding","UTF-8").\ # 编码格式
        schema("name STRING,age INT").\ # schema
        load("D:\dev\python-learn\sparklearn\DataFrame\people.csv")
    df.printSchema()
    df.show()

结果如下

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 25|
|   Andy| 30|
| Justin| 19|
+-------+---+

读取parquet文件

什么是parquet
Parquet 文件是一种列式存储格式,常用于大数据处理(如 Apache Spark 和 Hive),有以下特点:

  • 列式存储:读取指定列时更高效,减少 I/O。
  • 嵌入式模式:文件自带列名和数据类型信息。
  • 高压缩率:支持列级压缩,节省存储空间。
    parquet与普通文本文件的区别
    存储结构:Parquet 是列式存储,普通文本(如 CSV)是行式存储。
    模式信息:Parquet 内嵌模式,普通文本需外部指定。
    读取性能:Parquet 读取特定列更快,普通文本需读取整行。
    压缩效果:Parquet 更高效,普通文本整体压缩效果较低。

parquet文件无法直接打开查看,需要安装相关插件,如Avro and Parquet Viewer

from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    spark = SparkSession.builder \
        .appName("create_1") \
        .master("local[*]") \
        .getOrCreate()
    sc=spark.sparkContext
    df=spark.read.format("parquet").\
        load("users.parquet")

    df.printSchema()
    df.show()

动物装饰