回收原因:RDD过时了
3. RDD
3.1. 初始化SparkContext
from pyspark import SparkContext, SparkConf
conf=SparkConf().setMaster('local').setAppName('My App')#local可以改为local[*],以增加核心数
sc=SparkContext(conf=conf)
3.2. 创建RDD
lines=sc.parallelize(['pandas','i like pandas','hello','hello world!','error line','warning line'])
lines=sc.textFile('test.txt')
RDD有两种操作:transformation, actions
3.3 transformation
RDD1.map(func)#func接受每一个元素,返回每一个元素,作为新RDD的元素
RDD1.filter(func)#筛选满足条件的RDD值,func接受每一个元素,返回一个bool类型
RDD1.flatMap(func) #如果func返回的是一个迭代器,那么把迭代器里所有对象放入新RDD中并摊平
#集合操作
RDD1.distinct()
RDD1.union(RDD2) #并集
RDD1.intersection(RDD2) #交集
RDD1.subtract(RDD2) #差集
RDD1.cartesian(RDD2) #笛卡尔积
RDD1.sample(withReplacement=False,fraction=0.2) #采样
例子:
errorsRDD=lines.filter(lambda line:'error' in line)
warningRDD=lines.filter(lambda line:'warning' in line)
badRDD=errorsRDD.union(warningRDD)# 合并
3.4. actions
RDD1.reduce(lambda x,y:x+y)
RDD1.fold(0,lambda x,y:x+y) #相当于一个带初始值的reduce
RDD1.foreach(func1) #对每个元素使用func1???
RDD1.aggregate(zeroValue,seqOp,combOp)# 没搞太清楚???
RDD1.collect()
RDD1.take(10)
RDD1.top(10)
RDD1.first()
RDD1.count()
RDD1.countByValue() #返回dict,内容是每个元素的个数
3.5. pair RDD
pair RDD是一种特殊的RDD
转化:
pair_RDD1=sc.parallelize([(1,2),(5,2),(3,4),(3,3)])
pair_RDD2=sc.parallelize([(1,5),(6,2)])
pair_RDD1.reduceByKey(func) #分组应用func,例如func=lambda x,y:x+y。是transformation(对比reduce是actions)
pari_RDD1.foldByKey(0,func) #类似fold,带初始值的reduce。是transformation,fold是action
pair_RDD1.groupByKey() # 同一个key下的value变成[value1,value2,...]???没明白
combineByKey#???
pair_RDD1.flatMapValues(func) #func 每次接受1个value,返回iterable。返回的RDD保留key,摊平value
pair_RDD1.mapValues(func) #对每个value应用func,key保持不变
pair_RDD1.keys()#返回key组成的RDD
pair_RDD1.values()#返回value组成的RDD
pair_RDD1.sortByKey()#按照key排序
# 按key排序的高级玩法
pair_RDD1.sortByKey(ascending=True,keyfunc=lambda x:str(x)) #ascending默认为True,keyfunc可以把原本元素转化成新对象,然后用新对象排序
双pair RDD操作:
pair_RDD1.subtractByKey(pair_RDD2) #删除key相同的元素
pair_RDD1.join(pair_RDD2) #内连接,返回类似[(1, (2, 5))]的RDD
pair_RDD1.leftOuterJoin(pair_RDD2) #左边的保留
pair_RDD1.rightOuterJoin(pair_RDD2) #右边的保留
pair_RDD1.cogroup(pair_RDD2) #key-value的RDD,其中value是一个iterable
pari RDD的行动:
countByKey() #返回key的计数
lookup(key) #返回key所对应的所有value
row
from pyspark.sql import Row
row=Row(k1=2,k2='abc',k3=99)
# 取数
row.k1
row['k1']
参考文献
https://blog.csdn.net/wy250229163/article/details/52354278