注:
Admin_Log
本文需要用到pyspark模块,请自行在PyCharm中添加。
数据输入
# 导包 from pyspark import SparkConf, SparkContext # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象 rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize((1, 2, 3, 4, 5)) rdd3 = sc.parallelize("abcdefg") rdd4 = sc.parallelize({1, 2, 3, 4, 5}) rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # 如果要查看RDD里面有什么内容,需要用collect()方法 print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd5.collect())
- 结果
[1, 2, 3, 4, 5] [1, 2, 3, 4, 5] ['a', 'b', 'c', 'd', 'e', 'f', 'g'] [1, 2, 3, 4, 5] ['key1', 'key2']
数据输入 – 文件输入
# 导包 from pyspark import SparkConf, SparkContext # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象 # rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # rdd2 = sc.parallelize((1, 2, 3, 4, 5)) # rdd3 = sc.parallelize("abcdefg") # rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # # # 如果要查看RDD里面有什么内容,需要用collect()方法 # print(rdd1.collect()) # print(rdd2.collect()) # print(rdd3.collect()) # print(rdd4.collect()) # print(rdd5.collect()) # 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象 rdd = sc.textFile("E:/PycharmProjects/test.txt") print(rdd.collect()) # 停止SparkContext对象的运行(停止PySpark程序) sc.stop()
- 结果
['hello world admin', 'hello world admin hello world admin', 'hello world admin', 'hello world admin', 'adminlog www adminlog', 'adminlogcn adminlogcn adminlogcn']
map方法
# 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 通过map方法将全部数据都乘以10 # def func(data): # return data * 10 # 组成新的RDD # rdd2 = rdd.map(func) # 链式调用,调用类型事一个时,可以一直点下去 rdd2 = rdd.map(lambda x: x *10).map(lambda x: x + 5) # 链式调用 # rdd3 = rdd2.map(lambda x: x + 5) print(rdd2.collect())
- 结果
[15, 25, 35, 45, 55]
flatmap方法
# 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 准备一个rdd rdd = sc.parallelize(["hello world 666", "admin admin log adminlog", "www admin log cn"]) # 需求:将RDD数据里面的一个个单词提取出来 # rdd2 = rdd.map(lambda x: x.split(" ")) rdd2 = rdd.flatMap(lambda x: x.split(" ")) print(rdd2.collect()) sc.stop()
- 结果
['hello', 'world', '666', 'admin', 'admin', 'log', 'adminlog', 'www', 'admin', 'log', 'cn']
reduceByKey方法
# 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 66), ('女', 77), ('女', 88)]) # 求男生和女生两个组的成绩之和 rdd2 = rdd.reduceByKey(lambda a, b: a + b) print(rdd2.collect()) sc.stop()
- 结果
[('男', 187), ('女', 231)]
单词计数综合案例
# 1. 构建执行环境入口对象 # 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 2. 读取数据文件 rdd = sc.textFile("E:/PycharmProjects/test.txt") # 3. 取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # print(word_rdd.collect()) # 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1 word_with_one_rdd = word_rdd.map(lambda word: (word, 1)) # print(word_with_one_rdd.collect()) # 5. 分组并求和 result = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 6. 打印输出结果 print(result.collect()) sc.stop()
- 结果
[('world', 5), ('admin', 5), ('www', 1), ('hello', 5), ('adminlog', 2), ('adminlogcn', 3)]
filter方法
# 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 对RDD的胡数据进行过滤 [ 保留偶数 ] rdd2 = rdd.filter(lambda num: num % 2 == 0) print(rdd2.collect()) sc.stop()
- 结果
[2, 4]
distinct方法
# distinct算子功能: 对RDD数据进行去重,返回新的RDD # 语法 rdd.distinct() 无需传参 # 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 5, 5, 7, 7, 8, 8, 9, 10]) # 对RDD的数据进行去重 rdd2 = rdd.distinct() print(rdd2.collect()) sc.stop()
- 结果
[1, 2, 3, 5, 7, 8, 9, 10]
sortBy方法
# rdd.sortBy(func,ascending=False,numPartitions=1) # func: (T) -> U:告知按照rdd中的哪个数据进行排序,比如lambda x: x[1]表示按照rdd中的第二列元素进行排序# ascending True升序False降序 # numPartitions:用多少分区排序 # 导包 from pyspark import SparkConf, SparkContext import os # os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe" # 创建SparkConf类对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 服用单词计数模块 # 2. 读取数据文件 rdd = sc.textFile("E:/PycharmProjects/test.txt") # 3. 取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # print(word_rdd.collect()) # 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1 word_with_one_rdd = word_rdd.map(lambda word: (word, 1)) # print(word_with_one_rdd.collect()) # 5. 分组并求和 result = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 6. 打印输出结果 # print(result.collect()) # 对结果进行排序 # 将二元元组的1号元素返回出去,sortBy根据返回数据进行排序 # 升降序,False为降序,默认升序? final_rdd = result.sortBy(lambda x: x[1], ascending=False, numPartitions=1) print(final_rdd.collect()) sc.stop()
- 结果
[('world', 5), ('admin', 5), ('hello', 5), ('adminlogcn', 3), ('adminlog', 2), ('www', 1)]