在使用jupython notebook编写pyspark程序的时候,总是报我少模块,我重装了多次,仍然是报缺少模块。
后面才发现,我安装了多个python解释器,虽然我重新使用anaconda使用新的环境,但jupyter notebook的内核依然是旧的内核(运行环境)。
当我切换内核之后,就可以正常导入pyspark并使用。
创建并激活新的环境
# 在D:\python-envs\pyspark-env创建环境,并指定python版本为3,11,任意修改
conda create --prefix D:\python-envs\pyspark-env python=3.11
# 激活环境
conda activate D:\python-envs\pyspark-env
# 安装pyspark,我选择的版本是3.4.1,无妨
conda install pyspark=3.4.1
如果在命令行前出现了环境标记则证明环境已经激活成功
创建并切换内核
创建内核
jupyter notebook的内核也就是代码的执行环境,内核可以支持多种编程语言,最常见的就是python。
ipykernel 是 Jupyter 项目的一部分,它提供了 Jupyter Notebook 和 JupyterLab 的 Python 内核。
# 安装 ipykernel
conda install ipykernel
# 在指定的目录创建内核,名字随意
python -m ipykernel install --name pyspark-env --display-name "Python (pyspark-env)" --prefix D:\python-envs\pyspark-env
# 查看内核列表
jupyter kernelspec list
# 执行完上述步骤之后,在打开jupyter notebook后可能仍然会出现没有我们创建的内核的情景,这个时候就需要我们更新jupyter和ipykernel
pip install --upgrade jupyter ipykernel
启动jupyter notebook
# 启动jupyter notebook,需要在激活环境的情况下启动,否则是默认base环境
jupyter notebook
切换内核
测试
需要注意的是,pyspark是一个独立的工具,它可能在不同的系统、环境和集群上运行。为了确保其独立运行,需要指定其 Python 解释器,而不是直接使用jupyter notebook指定的内核版本。
from pyspark import SparkConf, SparkContext
import random
import os
def main():
# 指定pyspark的解释器
os.environ['PYSPARK_PYTHON'] = "D:\python-envs\pyspark-env\python.exe"
conf = SparkConf().setAppName("RDD Operations").setMaster("local[2]")
sc = SparkContext(conf=conf)
# 1. 从1万个数中抽100个,找出里面最大的3个奇数和最小的3个偶数
numbers_rdd = sc.parallelize(range(1, 10001))
sampled_numbers = numbers_rdd.takeSample(False, 100, seed=random.randint(0, 100))
sampled_rdd = sc.parallelize(sampled_numbers)
odds = sampled_rdd.filter(lambda x: x % 2 != 0).takeOrdered(3, key=lambda x: -x)
evens = sampled_rdd.filter(lambda x: x % 2 == 0).takeOrdered(3)
print("最大3个奇数: " + ", ".join(map(str, odds)))
print("最小3个偶数: " + ", ".join(map(str, evens)))
# 2. 读取文件,提取所有单词,按单词里数字的大小降序排序
file_rdd = sc.textFile("D:/dev/python-learn/sparkwork/file.txt")
words_rdd = file_rdd.flatMap(lambda line: line.split("\W+"))
words_with_numbers = words_rdd.filter(lambda word: any(char.isdigit() for char in word))
sorted_words = words_with_numbers.sortBy(lambda word: int(''.join(filter(str.isdigit, word))), ascending=False)
print("包含数字的单词按数字大小降序排序: ")
for word in sorted_words.collect():
print(word)
# 3. 生成一个整数RDD(1000个),随机抽100个,计算每个分区的数据和,并降序排序
int_rdd = sc.parallelize(range(1, 1001))
sampled_numbers = int_rdd.takeSample(False, 100, seed=random.randint(0, 100))
sampled_rdd = sc.parallelize(sampled_numbers, numSlices=10)
partition_sums_rdd = sampled_rdd.mapPartitions(lambda iter: [sum(iter)])
sorted_partition_sums = partition_sums_rdd.sortBy(lambda x: x, ascending=False)
print("每个分区的和降序排列: ")
for partition_sum in sorted_partition_sums.collect():
print(partition_sum)
sc.stop()
if __name__ == "__main__":
main()
为什么pyspark也要单独指定python解释器
- 独立性:
PySpark 是一个独立的工具,它可能在不同的系统、环境和集群上运行。为了确保其独立运行,需要指定其 Python 解释器。 - 环境一致性:
Jupyter Notebook 运行的内核和 PySpark 运行的环境可能有所不同。通过设置 PYSPARK_PYTHON 环境变量,可以确保 PySpark 使用与 Jupyter Notebook 相同的 Python 环境,从而避免由于环境不同引起的兼容性问题。 - 多样性:
同一个 Jupyter Notebook 服务器可能支持多个内核,每个内核可能有不同的 Python 版本和依赖项。PySpark 需要明确知道使用哪个 Python 解释器来确保其依赖项的一致性和功能的正常运行。 - 集群配置:
在分布式计算环境中,PySpark 任务可能会在不同的节点上运行。这些节点上的 Python 版本可能与提交任务的节点不同。通过显式指定 PYSPARK_PYTHON,可以确保所有节点都使用相同的 Python 环境。
总结: 尽管 Jupyter Notebook 指定了内核,但 PySpark 作为一个独立的分布式计算框架,为了保证其运行时的独立性和一致性,需要显式地指定使用哪个 Python 解释器。这有助于确保环境的一致性,避免可能的兼容性问题。