pyspark

"pyspark"

Posted by zwt on August 3, 2021

操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1.pandas转pyspark:spark_df = sqlContest.createDataFrame(df)
2.一行拆分多行:movies.withColumn("genre", explode(split($"genre", "[|]"))).show
3.df.groupby("id").agg(F.collect_set("code"))
4.一列拆分多列:val Df2 = Df1.withColumn("splitcol",split(col("contents"), ",")).select(
            col("splitcol").getItem(0).as("col1"),
            col("splitcol").getItem(1).as("col2"),
            col("splitcol").getItem(2).as("col3")
        ).drop("splitcol");
5. 列重命名:df.withColumnRenamed("user_id", "user")
6. service_result = df_info.withColumn("service_explode", explode(col("service_prod_code")))
7. 添加id列:df.withColumn("id", monotonically_increasing_id())
8. 交集:sentenceDataFrame1.select("sentence").intersect(sentenceDataFrame.select("sentence"))
9. union并集
10. subtract差集
11. 过滤:numeric_filtered = df.where((col('LOW')    != 'null'))).show()
12. 新增一列:df.withColumn('column', lit(10))
13. 连接mysql操作:df.write.format('jdbc').options(url='', driver='',dbtable='',user='',password='').model('append').save()
14. 去重:df = df.dropDuplicates()
15. 修改列类型:df.select(df.col("col1").cast(DoubleType)).show()
16. 增加自增列:val spec = Window.partitionBy().orderBy($"lon") 
val df1 = dataframe.withColumn("id", row_number().over(spec))
17.排序:df.orderBy('column_name', ascending=False)
18.dataframe创建表:df.write.mode('overwrite').partitionBy("c").saveAsTable("t")
19.suammary 函数可用于dataframe求均值,方差,最大值最小值等等
20.crossJoin:笛卡尔积
21.goods.write.csv("goods", sep="\t", encoding="utf-8", mode='overwrite') 
22.get_json_object(data,'$[*].id') as data
23.title.filter("""category_id != {0}""".format(45)).select("title").rdd.takeSample(withReplacement=False,num=5)
24.data = spark.read
    .option("inferSchema","true")#自动推导数据类型
    .option("header","true")#读取列名
    .csv("/user/root/image.csv")#文件路径
25..registerTempTable("table2")
26.data = data.withColumn("pay_ment",data.pay_ment.astype("float"))
27. spark.yarn.executor.memoryOverhead=6000 单位M
28. F.datediff(F.current_date(), F.col("time")).alias("datediff")日期diff
29. F.hour(F.col("time")).alias("hour"), 
    F.minute(F.col("time")).alias("minute"),
    F.second(F.col("time")).alias("second") 
30.df.sample(fraction=0.5, seed=3)
31.df.approxQuantile(col='value',probabilities=[0.1,0.5,0.9],relativeError=0.1)  计算分位数
32.spark.read.load('hdfs')
33.data.withColumn("recallsize",data.recallsize.astype("int"))修改数据类型
34.dataframe.drop(*(“第 1 列”、“第 2 列”、“第 n 列”)

udf

1
2
3
4
5
6
7
8
9
10
11
12
1.
def udf_test(column1, column2, constant_var):
    if column1 == column2:
        return column1
    else:
        return constant_var
apply_test = udf(udf_test, StringType())
df = df.withColumn('new_column', apply_test('column1', 'column2'))
2.
@udf(returnType=StringType())
def test():
	return dd

model

导出为pmml

1
2
3
1.pip install pyspark2pmml
2.下载对应的jar包:https://github.com/jpmml/pyspark2pmml
3.submit提交对应的jar包:'spark.jars', 'jpmml-sparkml-executable-1.5.12.jar'

LogisticRegressionWithLBFGS

在需要获取概率值的时候需要执行:model.clearThreshold()

导出mysql

1
2
3
4
5
6
df.write.format('jdbc').options(
      url='',
      driver='com.mysql.jdbc.Driver',
      dbtable='',
      user='',
      password='').mode('append').save()

submit

org.apache.spark.shuffle.FetchFailedException:Failed to connect 原因:

1
shuffle执行的时间变长,内存使用过多导致无响应心跳,超过默认的spark.network.timeout=120s,对应的Executor会被移除,任务丢失

解决:

1
2
3
4
5
减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存
增大spark.network.timeout,从而允许有更多时间去等待心跳响应
增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少
同时增大spark.executor.memory,保证每个Executor有足够的可用内存
增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)

分桶

1
2
3
4
5
6
7
8
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(
    inputCol='values1',
    outputCol='buckets',
    splits=[-float("inf"), 0.5, 1.4, float("inf")],
    handleInvalid='keep',)
bucketed = bucketizer.transform(df)
bucketed.show()

参考

1.时间处理