Spark RDD编程模型及算子介绍(二)
创始人
2024-01-29 09:25:26
0

文章目录

    • 常见的Action算子
    • 常见分区操作算子

常见的Action算子

  • countByKey算子:统计Key出现的次数,部分代码如下:
rdd_file = sc.textFile("../Data/input/words.txt")
rdd_map = rdd_file.flatMap(lambda line: line.split(" ")).map(lambda x:(x, 1))
rdd_count = rdd_map.countByKey()
print(rdd_count)
print(type(rdd_count))
# 返回结果为字典
# defaultdict(, {'Apple': 4, 'Banana': 5, 'Orange': 4, 'Peach': 2})
# 
  • collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。RDD是分布式对象,数据量可以很大,所以用这个算子之前需要知道如果数据集结果很大,就会把driver内存撑爆,出现oom。

  • reduce算子:对RDD数据集按照传入的逻辑进行聚合操作,部分代码如下:

rdd = sc.parallelize(range(1,10))
rdd_reduce = rdd.reduce(lambda a,b : a+b)
print(rdd_reduce)
# 45
  • fold算子:和reduce一样接收传入逻辑进行聚合,聚合是带有初始值的。这个初始值既要作用在分区内,也要作用在分区间,部分代码如下:
rdd = sc.parallelize(range(1,10),3)
rdd_reduce = rdd.fold(10,lambda a,b : a+b)
print(rdd_reduce)
# 1 分为[1,2,3] [4,5,6] [7,8,9]
# 2 每个分区+10
# 3 最后汇总再+10 得到结果85
  • first算子:取出RDD第一个元素
sc.parallelize([1,2,3,4]).first()
# 1
  • take算子:取出RDD的前N个元素
sc.parallelize([1,2,3,4],3).take(2)
# [1,2]
  • top算子:对RDD元素进行降序排序,取前N个
sc.parallelize([1,2,3,4],3).top(2)
# [4, 3]
  • count算子:计算RDD有多少条数据,返回值为一个数字
sc.parallelize([1,2,3,4],3).count()
# 4
  • takeSample算子:随机抽样RDD的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7,6,5,4,3,2,1],1)
rdd_takeSample1 = rdd.takeSample(True, 18)
print(rdd_takeSample1)
rdd_takeSample2 = rdd.takeSample(False, 18)
print(rdd_takeSample2)# [1, 1, 1, 4, 6, 4, 1, 1, 5, 4, 6, 7, 5, 1, 6, 6, 6, 2]
# [2, 4, 2, 5, 5, 6, 3, 7, 4, 1, 6, 3, 1]
# 参数一:bool型,True表示运行取同一个数据,False表示不允许取同一个数据,与数据内容无关,是否重复表示的是同一个位置的数据。
# 参数二:抽样的数目(设置为false则无法超越RDD总数)
# 参数三:随机种子(一般不需要传参)
  • takeOrdered算子:对RDD排序取前N个,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7])
#升序
rdd_takeOrdered1 = rdd.takeOrdered(4)
#降序
rdd_takeOrdered2 = rdd.takeOrdered(4,lambda x : -x)print(rdd_takeOrdered1)
print(rdd_takeOrdered2)
# [1, 2, 3, 4]
# [7, 6, 5, 4]
  • foreach算子:对RDD的每个元素,执行逻辑操作与map类似,但是这个方法没有返回值。如果想显示值,只能在里面自行打印(无需经过Driver,直接在Executor打印效率更高)。
rdd = sc.parallelize([1,2,3,4,5,6,7],1)
rdd1 = rdd.foreach(lambda x : 2*x +1)
rdd2 = rdd.foreach(lambda x : print(2*x +1))
print(rdd1)
3
5
7
9
11
13
15
None
  • saveAsTextFile算子:保存文件API,分布式执行,不经过Driver,每个分区所在的Executor直接控制数据写出到目标文件系统中,每个分区产生1个结果文件。
#设置为三个分区
rdd_file = sc.textFile("hdfs://node1:8020/Test/WordCount.txt",3)
rdd_words = rdd_file.flatMap(lambda line: line.split(" "))
rdd_map = rdd_words.map(lambda x:(x, 1))
rdd_total = rdd_map.reduceByKey(lambda a,b: a + b)
rdd_rs = rdd_total.saveAsTextFile("hdfs://node1:8020/Test/word_rs1")

结果如下图所示在HDFS WebUI上查看
在这里插入图片描述

常见分区操作算子

  • mapPartitions算子:与map相似,只是一次被传递的是一整个分区的数据,虽然在执行次数上与map相同,但是可以因为减少了网络io的传输次数,效率会大大的提高。部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
def func(iter):rs = list()for it  in iter:rs.append(2 * it + 1)return rs
rdd_part = rdd.mapPartitions(func)
rdd_rs = rdd_part.collect()
print(rdd_rs)# [3, 5, 7, 9, 11, 13]
  • foreachPartition算子:与普通foreach一样,只是一次被传递的是一整个分区的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
# 因为没有返回值所以不需要return
def func(iter):rs = list()for it  in iter:rs.append(2 * it + 1)print(rs)rdd_part = rdd.foreachPartition(func)# [3, 5]
# [7, 9]
# [11, 13]
  • partitionBy算子:对RDD进行自定义分区操作,部分代码如下
# 参数1 重新分区后有几个分区
# 参数2 自定义分区规则,函数传入(返回编号为int类型,分区编号从0开始,不要超过分区数)
rdd = sc.parallelize([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])def func(key):if key == 'a' or key == 'b' : return 0if key == 'c' or key == 'd' : return 1return 2rdd_part = rdd.partitionBy(3,func)
rdd_rs = rdd_part.glom().collect()
print(rdd_rs)# [[('a', 1), ('b', 2)], [('c', 3), ('d', 4)], [('e', 5), ('f', 6)]]
  • repartition算子:对RDD的分区执行重新分区。不建议使用此算子,除非做全局排序的时候,将其设置为1。如果修改尽量减少,不要增加,增加会导致shuffle。不管是增加还是减少都会影响并行计算(内存迭代并行的管道数量),部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
rdd_re1 = rdd.getNumPartitions()
print(rdd_re1)
rdd_re2 = rdd.repartition(1).getNumPartitions()
print(rdd_re2)
rdd_re3 = rdd.repartition(5).getNumPartitions()
print(rdd_re3)
# 3
# 1
# 5
  • coalesce算子:对分区数量进行增减,部分代码如下:
# 参数1:分区数
# 参数2:Bool True表示允许shuffle,False表示不允许(默认)。
rdd_re4 = rdd.coalesce(1).getNumPartitions()
print(rdd_re4)
rdd_re5 = rdd.coalesce(5).getNumPartitions()
print(rdd_re5)
rdd_re6 = rdd.coalesce(5,shuffle=True).getNumPartitions()
print(rdd_re6)
# 1
# 3 没有加shuffle=True这里有个API安全机制,分区不会增加
# 5
  • 在源码中我们可以发现reparation算子底层调用的就是coalesce算子,只不过shuffle定义为true。源码如下:
def repartition(self, numPartitions):return self.coalesce(numPartitions, shuffle=True)

相关内容

热门资讯

证券转银行怎么开通业务权限,中...   证券行业总资产突破10万亿元,财富管理转型迎来光明时刻。      中国证券业协会近日发布的数据...
创业基金申请计划,申请创业基金...   天眼超显示,11月29日,海南三亚田波产业私募基金管理有限公司成立,注册资本1000万元。经营范...
创业怎么贷款比较好,创业怎么去...   一、贷款对象      曲靖户籍范围内,宣威辖区内有创业实体、有营业执照的小微企业和个体工商户。...
开一家自媒体公司需要多少钱,自...   轻资产、小投资、全媒体广告创业项目、全媒体代理商加盟。      互联网广告代理是近两年非常热门...
适合穷人的18个创业项目投资小...   比尔盖茨说过:“巧妙地花一笔钱和赚一笔钱一样困难”。很多人之所以穷,不是因为铺张浪费,相反,他们...
开文具店的禁忌,2020年文具...   学校门口的文具店赚钱吗?我们今天来看看。      #就业介绍      你上学的时候去过学校附...
大学生创业项目有哪些项目,大学...   今天要给大家带来的案例是:广东有一家生鲜店,老板用“没钱买菜”在短短一年内狂赚103万。    ...
创业者必须必备的能力,中国(上...   学风建设作为学生管理的重中之重,一直是安徽万通的一项常规工作。随着新学期课程的逐步开展,为了更好...
身无分文去义乌创业,身无分文如...   马云、刘、任等。以前的摊位视频;刚刷了屏幕。      让我给你看:      1、马云摆摊: ...
南昌人才10条让人恶心,南昌创...   这两年,除了北京,几乎所有城市都加入了抢人大战。各大城市提供的条件非常诱人,包括各种但不限于租房...