【丢弃】【spark】rdd.

2018年03月29日    Author:Guofei

文章归类:    文章编号: 151


版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2018/03/29/spark1.html

Edit

回收原因:RDD过时了

3. RDD

3.1. 初始化SparkContext

from pyspark import SparkContext, SparkConf
conf=SparkConf().setMaster('local').setAppName('My App')#local可以改为local[*],以增加核心数
sc=SparkContext(conf=conf)

3.2. 创建RDD

lines=sc.parallelize(['pandas','i like pandas','hello','hello world!','error line','warning line'])
lines=sc.textFile('test.txt')

RDD有两种操作:transformationactions

3.3 transformation

RDD1.map(func)#func接受每一个元素,返回每一个元素,作为新RDD的元素
RDD1.filter(func)#筛选满足条件的RDD值,func接受每一个元素,返回一个bool类型
RDD1.flatMap(func) #如果func返回的是一个迭代器,那么把迭代器里所有对象放入新RDD中并摊平

#集合操作
RDD1.distinct()
RDD1.union(RDD2) #并集
RDD1.intersection(RDD2) #交集
RDD1.subtract(RDD2) #差集

RDD1.cartesian(RDD2) #笛卡尔积

RDD1.sample(withReplacement=False,fraction=0.2) #采样

例子:

errorsRDD=lines.filter(lambda line:'error' in line)
warningRDD=lines.filter(lambda line:'warning' in line)

badRDD=errorsRDD.union(warningRDD)# 合并

3.4. actions

RDD1.reduce(lambda x,y:x+y)
RDD1.fold(0,lambda x,y:x+y) #相当于一个带初始值的reduce

RDD1.foreach(func1) #对每个元素使用func1???
RDD1.aggregate(zeroValue,seqOp,combOp)# 没搞太清楚???

RDD1.collect()
RDD1.take(10)
RDD1.top(10)
RDD1.first()

RDD1.count()
RDD1.countByValue() #返回dict,内容是每个元素的个数

3.5. pair RDD

pair RDD是一种特殊的RDD
转化:

pair_RDD1=sc.parallelize([(1,2),(5,2),(3,4),(3,3)])
pair_RDD2=sc.parallelize([(1,5),(6,2)])

pair_RDD1.reduceByKey(func) #分组应用func,例如func=lambda x,y:x+y。是transformation(对比reduce是actions)
pari_RDD1.foldByKey(0func) #类似fold,带初始值的reduce。是transformation,fold是action


pair_RDD1.groupByKey() # 同一个key下的value变成[value1,value2,...]???没明白
combineByKey#???


pair_RDD1.flatMapValues(func) #func 每次接受1个value,返回iterable。返回的RDD保留key,摊平value
pair_RDD1.mapValues(func) #对每个value应用func,key保持不变


pair_RDD1.keys()#返回key组成的RDD
pair_RDD1.values()#返回value组成的RDD
pair_RDD1.sortByKey()#按照key排序
# 按key排序的高级玩法
pair_RDD1.sortByKey(ascending=True,keyfunc=lambda x:str(x)) #ascending默认为True,keyfunc可以把原本元素转化成新对象,然后用新对象排序

双pair RDD操作:


pair_RDD1.subtractByKey(pair_RDD2) #删除key相同的元素
pair_RDD1.join(pair_RDD2)  #内连接,返回类似[(1, (2, 5))]的RDD
pair_RDD1.leftOuterJoin(pair_RDD2) #左边的保留
pair_RDD1.rightOuterJoin(pair_RDD2) #右边的保留
pair_RDD1.cogroup(pair_RDD2) #key-value的RDD,其中value是一个iterable

pari RDD的行动:

countByKey() #返回key的计数
lookup(key) #返回key所对应的所有value

row

from pyspark.sql import Row
row=Row(k1=2,k2='abc',k3=99)

# 取数
row.k1
row['k1']

参考文献

https://blog.csdn.net/wy250229163/article/details/52354278


您的支持将鼓励我继续创作!
WeChatQR AliPayQR qr_wechat