DataFrame的组成
结构层面
- StructType对象:描述整个DataFrame表结构
- StructType对象:描述一个列信息
数据层面
- Row对象:记录一行对象
- Column对象:记录一个列的数据以及列信息
DataFrame的创建
基于RDD创建
方式 1:基于 RDD 直接创建
# spark为SparkSession对象
rdd=sc.parallelize([('zhangsan',19),('lisi',20)])
# 指定列名分别是name和age
df=spark.createDataFrame(rdd,schema=['name','age'])
示例
from pyspark.sql import SparkSession
import os
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
# 获取SparkSession对象
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
# 创建RDD
rdd = spark.sparkContext \
.textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
.map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
df = spark.createDataFrame(rdd, schema=['name', 'age'])
# 打印表结构
df.printSchema()
# 展示前20条数据,不截断,全部展示
df.show(20, False)
# 将dataframe转换为临时视图表,使用sql
df.createTempView("people")
spark.sql("select * from people where age > 19").show()
从图中可以看出,age为long类型,dataframe会根据rdd中的数据类型自动推断
方式2:基于 RDD 和 StructType 对象创建
rdd=sc.parallelize([('zhangsan',19),('lisi',20)])
# field:列名,data_type:列类型,nullable:是否允许为空
schema = StructType().\
add("name",StringType(),False).\
add(field="age",data_type=IntegerType(),nullable=True)
df=spark.createDataFrame(rdd,schema=schema)
示例
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,NullType,IntegerType
import os
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
# 获取SparkSession对象
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
rdd = spark.sparkContext \
.textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
.map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
# 只有此处不同,构建StructType对象
schema = StructType().\
add("name",StringType(),False).\
add(field="age",data_type=IntegerType(),nullable=True)
# 基于rdd和StructType对象构建DataFrame
df=spark.createDataFrame(rdd,schema=schema)
df.printSchema()
df.show(100)
使用 RDD 快捷创建 DataFrame
# toDF,方式一,不指定列类型,只写列名,Spark会根据rdd内容自动推断列类型
df1=rdd.toDF(['name','age'])
# toDF,方式二,使用StructType对象
schema = StructType().\
add("name",StringType(),False).\
add(field="age",data_type=IntegerType(),nullable=True)
df2=rdd.toDF(schema=schema)
示例
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,NullType,IntegerType
import os
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
# 获取SparkSession对象
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
rdd = spark.sparkContext \
.textFile(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt") \
.map(lambda line: (line.split(",")[0], int(line.split(",")[1])))
# 构建StructType对象
schema = StructType().\
add("name",StringType(),False).\
add(field="age",data_type=IntegerType(),nullable=True)
# toDF方式1
df1=rdd.toDF(schema=['name','age'])
df1.printSchema()
# toDF方式2
df2=rdd.toDF(schema=schema)
df2.printSchema()
基于Pandas的DataFrame转换为Spark的DataFrame
from pyspark.sql import SparkSession
import os
import pandas as pd
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
# 创建Pandas的DataFrame
pdf=pd.DataFrame(
{
"id":[1],
"name":["chenxin"],
"age":[20]
}
)
df=spark.createDataFrame(pdf)
df.printSchema()
df.show()
读取外部数据构建DataFrame
语法
# 可以选多种数据源
df=sparksession.read.format("text|csv|json|parquet")
.option("K","V") #option可选
.schema(StructType | String) # String类型可以为 ("name STRING","age INT")
.load("读取文件路径,支持本地文件系统和hdfs文件系统")
读取text数据源
当数据源是text时,spark会将文本文件中的每一行作为列的一个值,同时dataframe只有一列,若不指定schema,则列名默认是value
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
sc=spark.sparkContext
# 获取StructType对象
schema = StructType().add("data",StringType(),nullable=False)
# 读取text数据源
df=spark.read.format("text").\
schema(schema=schema).\
load(r"D:\dev\python-learn\sparklearn\DataFrame\people.txt")
df.printSchema()
df.show(20)
读取Json类型
json类型文件不需要指定schema,因为json自带schema
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
sc=spark.sparkContext
df=spark.read.format("json").\
load(r"D:\dev\python-learn\sparklearn\DataFrame\people.json")
df.printSchema()
df.show(20)
读取csv数据格式
csv格式的文件有许多的文件属性可以设置
- sep:
描述:设置分隔符。
默认值:,
示例:.option("sep", ";") - header:
描述:指定 CSV 文件是否包含表头。
默认值:False
示例:.option("header", True) - encoding:
描述:设置文件的编码格式。
默认值:UTF-8
示例:.option("encoding", "ISO-8859-1")
schema:
描述:手动指定 CSV 文件的数据模式(StructType)。
默认值:None,Spark 可以自动推断数据类型(通过 inferSchema)。 - inferSchema:
描述:是否启用自动数据类型推断。
默认值:False
示例:.option("inferSchema", True) - nullValue:
描述:指定哪些字符串应被视为 null 值。
示例:.option("nullValue", "NA") - quote:
描述:指定用于引用字段的字符。
默认值:"
示例:.option("quote", "'") - escape:
描述:指定用于转义引号的字符。
示例:.option("escape", "\") - comment:
描述:将某个字符指定为注释行的标记,行首带此字符的行会被跳过。
示例:.option("comment", "#") - mode:
描述:控制如何处理文件中有问题的行。
值:"PERMISSIVE"(默认值,尽量解析)、"DROPMALFORMED"(丢弃有问题的行)、"FAILFAST"(遇到问题时立即报错)。
示例:.option("mode", "DROPMALFORMED") - timestampFormat:
描述:指定时间戳格式。
默认值:yyyy-MM-dd'T'HH:mm:ss.SSSXXX
示例:.option("timestampFormat", "MM/dd/yyyy HH:mm:ss") - maxColumns:
描述:限制 CSV 文件中允许的最大列数。
示例:.option("maxColumns", 100) - maxCharsPerColumn:
描述:限制单个列允许的最大字符数。
示例:.option("maxCharsPerColumn", 4096) - ignoreLeadingWhiteSpace / ignoreTrailingWhiteSpace:
描述:是否忽略每个字段的前导或尾随空格。
默认值:True
示例:.option("ignoreLeadingWhiteSpace", True)
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
sc=spark.sparkContext
df=spark.read.format("csv").\
option("sep",",").\ # 设置分隔符为,
option("header",True).\ # 是否有表头
option("encoding","UTF-8").\ # 编码格式
schema("name STRING,age INT").\ # schema
load("D:\dev\python-learn\sparklearn\DataFrame\people.csv")
df.printSchema()
df.show()
结果如下
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 25|
| Andy| 30|
| Justin| 19|
+-------+---+
读取parquet文件
什么是parquet
Parquet 文件是一种列式存储格式,常用于大数据处理(如 Apache Spark 和 Hive),有以下特点:
- 列式存储:读取指定列时更高效,减少 I/O。
- 嵌入式模式:文件自带列名和数据类型信息。
- 高压缩率:支持列级压缩,节省存储空间。
parquet与普通文本文件的区别
存储结构:Parquet 是列式存储,普通文本(如 CSV)是行式存储。
模式信息:Parquet 内嵌模式,普通文本需外部指定。
读取性能:Parquet 读取特定列更快,普通文本需读取整行。
压缩效果:Parquet 更高效,普通文本整体压缩效果较低。
parquet文件无法直接打开查看,需要安装相关插件,如Avro and Parquet Viewer
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StringType
import pandas as pd
if __name__ == "__main__":
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
spark = SparkSession.builder \
.appName("create_1") \
.master("local[*]") \
.getOrCreate()
sc=spark.sparkContext
df=spark.read.format("parquet").\
load("users.parquet")
df.printSchema()
df.show()