jupyter notebook使用pyspark

chenxin
30
2024-10-08

在使用jupython notebook编写pyspark程序的时候,总是报我少模块,我重装了多次,仍然是报缺少模块。
后面才发现,我安装了多个python解释器,虽然我重新使用anaconda使用新的环境,但jupyter notebook的内核依然是旧的内核(运行环境)。
当我切换内核之后,就可以正常导入pyspark并使用。

创建并激活新的环境

# 在D:\python-envs\pyspark-env创建环境,并指定python版本为3,11,任意修改
conda create --prefix D:\python-envs\pyspark-env python=3.11
# 激活环境
conda activate D:\python-envs\pyspark-env
# 安装pyspark,我选择的版本是3.4.1,无妨
conda install pyspark=3.4.1

如果在命令行前出现了环境标记则证明环境已经激活成功
图片已过期

创建并切换内核

创建内核

jupyter notebook的内核也就是代码的执行环境,内核可以支持多种编程语言,最常见的就是python。
ipykernel 是 Jupyter 项目的一部分,它提供了 Jupyter Notebook 和 JupyterLab 的 Python 内核。

# 安装 ipykernel
conda install ipykernel
# 在指定的目录创建内核,名字随意
python -m ipykernel install --name pyspark-env --display-name "Python (pyspark-env)" --prefix D:\python-envs\pyspark-env
# 查看内核列表
jupyter kernelspec list
# 执行完上述步骤之后,在打开jupyter notebook后可能仍然会出现没有我们创建的内核的情景,这个时候就需要我们更新jupyter和ipykernel
pip install --upgrade jupyter ipykernel

启动jupyter notebook

# 启动jupyter notebook,需要在激活环境的情况下启动,否则是默认base环境
jupyter notebook

切换内核

图片已过期

测试

图片已过期
需要注意的是,pyspark是一个独立的工具,它可能在不同的系统、环境和集群上运行。为了确保其独立运行,需要指定其 Python 解释器,而不是直接使用jupyter notebook指定的内核版本。

from pyspark import SparkConf, SparkContext
import random
import os

def main():
  # 指定pyspark的解释器
    os.environ['PYSPARK_PYTHON'] = "D:\python-envs\pyspark-env\python.exe"
    conf = SparkConf().setAppName("RDD Operations").setMaster("local[2]")
    sc = SparkContext(conf=conf)

    # 1. 从1万个数中抽100个,找出里面最大的3个奇数和最小的3个偶数
    numbers_rdd = sc.parallelize(range(1, 10001))
    sampled_numbers = numbers_rdd.takeSample(False, 100, seed=random.randint(0, 100))
    sampled_rdd = sc.parallelize(sampled_numbers)

    odds = sampled_rdd.filter(lambda x: x % 2 != 0).takeOrdered(3, key=lambda x: -x)
    evens = sampled_rdd.filter(lambda x: x % 2 == 0).takeOrdered(3)

    print("最大3个奇数: " + ", ".join(map(str, odds)))
    print("最小3个偶数: " + ", ".join(map(str, evens)))

    # 2. 读取文件,提取所有单词,按单词里数字的大小降序排序
    file_rdd = sc.textFile("D:/dev/python-learn/sparkwork/file.txt")
    words_rdd = file_rdd.flatMap(lambda line: line.split("\W+"))
    words_with_numbers = words_rdd.filter(lambda word: any(char.isdigit() for char in word))
    sorted_words = words_with_numbers.sortBy(lambda word: int(''.join(filter(str.isdigit, word))), ascending=False)

    print("包含数字的单词按数字大小降序排序: ")
    for word in sorted_words.collect():
        print(word)

    # 3. 生成一个整数RDD(1000个),随机抽100个,计算每个分区的数据和,并降序排序
    int_rdd = sc.parallelize(range(1, 1001))
    sampled_numbers = int_rdd.takeSample(False, 100, seed=random.randint(0, 100))
    sampled_rdd = sc.parallelize(sampled_numbers, numSlices=10)

    partition_sums_rdd = sampled_rdd.mapPartitions(lambda iter: [sum(iter)])
    sorted_partition_sums = partition_sums_rdd.sortBy(lambda x: x, ascending=False)

    print("每个分区的和降序排列: ")
    for partition_sum in sorted_partition_sums.collect():
        print(partition_sum)

    sc.stop()

if __name__ == "__main__":
    main()

为什么pyspark也要单独指定python解释器

  • 独立性:
    PySpark 是一个独立的工具,它可能在不同的系统、环境和集群上运行。为了确保其独立运行,需要指定其 Python 解释器。
  • 环境一致性:
    Jupyter Notebook 运行的内核和 PySpark 运行的环境可能有所不同。通过设置 PYSPARK_PYTHON 环境变量,可以确保 PySpark 使用与 Jupyter Notebook 相同的 Python 环境,从而避免由于环境不同引起的兼容性问题。
  • 多样性:
    同一个 Jupyter Notebook 服务器可能支持多个内核,每个内核可能有不同的 Python 版本和依赖项。PySpark 需要明确知道使用哪个 Python 解释器来确保其依赖项的一致性和功能的正常运行。
  • 集群配置:
    在分布式计算环境中,PySpark 任务可能会在不同的节点上运行。这些节点上的 Python 版本可能与提交任务的节点不同。通过显式指定 PYSPARK_PYTHON,可以确保所有节点都使用相同的 Python 环境。
    总结: 尽管 Jupyter Notebook 指定了内核,但 PySpark 作为一个独立的分布式计算框架,为了保证其运行时的独立性和一致性,需要显式地指定使用哪个 Python 解释器。这有助于确保环境的一致性,避免可能的兼容性问题。
动物装饰