1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| 1.pandas转pyspark:spark_df = sqlContest.createDataFrame(df)
2.一行拆分多行:movies.withColumn("genre", explode(split($"genre", "[|]"))).show
3.df.groupby("id").agg(F.collect_set("code"))
4.一列拆分多列:val Df2 = Df1.withColumn("splitcol",split(col("contents"), ",")).select(
col("splitcol").getItem(0).as("col1"),
col("splitcol").getItem(1).as("col2"),
col("splitcol").getItem(2).as("col3")
).drop("splitcol");
5. 列重命名:df.withColumnRenamed("user_id", "user")
6. service_result = df_info.withColumn("service_explode", explode(col("service_prod_code")))
7. 添加id列:df.withColumn("id", monotonically_increasing_id())
8. 交集:sentenceDataFrame1.select("sentence").intersect(sentenceDataFrame.select("sentence"))
9. union并集
10. subtract差集
11. 过滤:numeric_filtered = df.where((col('LOW') != 'null'))).show()
12. 新增一列:df.withColumn('column', lit(10))
13. 连接mysql操作:df.write.format('jdbc').options(url='', driver='',dbtable='',user='',password='').model('append').save()
14. 去重:df = df.dropDuplicates()
15. 修改列类型:df.select(df.col("col1").cast(DoubleType)).show()
16. 增加自增列:val spec = Window.partitionBy().orderBy($"lon")
val df1 = dataframe.withColumn("id", row_number().over(spec))
17.排序:df.orderBy('column_name', ascending=False)
18.dataframe创建表:df.write.mode('overwrite').partitionBy("c").saveAsTable("t")
19.suammary 函数可用于dataframe求均值,方差,最大值最小值等等
20.crossJoin:笛卡尔积
21.goods.write.csv("goods", sep="\t", encoding="utf-8", mode='overwrite')
22.get_json_object(data,'$[*].id') as data
23.title.filter("""category_id != {0}""".format(45)).select("title").rdd.takeSample(withReplacement=False,num=5)
24.data = spark.read
.option("inferSchema","true")#自动推导数据类型
.option("header","true")#读取列名
.csv("/user/root/image.csv")#文件路径
25..registerTempTable("table2")
26.data = data.withColumn("pay_ment",data.pay_ment.astype("float"))
27. spark.yarn.executor.memoryOverhead=6000 单位M
28. F.datediff(F.current_date(), F.col("time")).alias("datediff")日期diff
29. F.hour(F.col("time")).alias("hour"),
F.minute(F.col("time")).alias("minute"),
F.second(F.col("time")).alias("second")
30.df.sample(fraction=0.5, seed=3)
31.df.approxQuantile(col='value',probabilities=[0.1,0.5,0.9],relativeError=0.1) 计算分位数
32.spark.read.load('hdfs')
33.data.withColumn("recallsize",data.recallsize.astype("int"))修改数据类型
34.dataframe.drop(*(“第 1 列”、“第 2 列”、“第 n 列”)
|