教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

如何基于RDD方式完成DataFrame的代码构建?

更新时间:2023年07月28日16时16分 来源:传智教育 浏览次数:

DataFrame对象可以从RDD转换而来,都是分布式数据集 其实就是转换一下内部存储的结构,转换为二维表结构。

将RDD转换为DataFrame方式1:

调用spark

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
  map(lambda x: x.split(',')).\
  map(lambda x: [x[0], int(x[1])])               # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

# coding:utf8
# 演示DataFrame创建的三种方式
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession.builder.\
       appName("create df").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext
# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
# 打印表结构
df.printSchema()
# 打印20行数据
df.show()
df.createTempView("ttt")
spark.sql("select * from ttt where age< 30").show()
将RDD转换为DataFrame方式2:

通过StructType对象来定义DataFrame的“表结构”转换RDD

# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
  map(lambda x:x.split(',')).\
  map(lambda x:(int(x[0]), x[1], int(x[2])))

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)
# coding:utf8
# 需求: 基于StructType的方式构建DataFrame 同样是RDD转DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
  spark = SparkSession.builder.\
    appName("create_df"). \
    config("spark.sql.shuffle.partitions", "4"). \
    getOrCreate()
  # SparkSession对象也可以获取 SparkContext
  sc = spark.sparkContext
  # 创建DF , 首先创建RDD 将RDD转DF
  rdd = sc.textFile("../data/sql/stu_score.txt").\
    map(lambda x:x.split(',')).\
    map(lambda x:(int(x[0]), x[1], int(x[2])))
  # StructType 类
  # 这个类 可以定义整个DataFrame中的Schema
  schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
  # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
  # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
  df = spark.createDataFrame(rdd, schema)
  df.printSchema()
  df.show()

将RDD转换为DataFrame方式3:

使用RDD的toDF方法转换RDD

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空

# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()

# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
# coding:utf8
# 需求: 使用toDF方法将RDD转换为DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
    spark = SparkSession.builder.\
      appName("create_df"). \
      config("spark.sql.shuffle.partitions", "4"). \
      getOrCreate()
    # SparkSession对象也可以获取 SparkContext
    sc = spark.sparkContext
    # 创建DF , 首先创建RDD 将RDD转DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
       add("id", IntegerType(), nullable=False).\
       add("name", StringType(), nullable=True).\
       add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    # 方式2: 传入完整的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()

0 分享到:
和我们在线交谈!