Los nombres de columna en este ejemplo de chispa-sql provienen de la case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... //An RDD of case class objects, from the previous example.

//The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

Sin embargo, en muchos casos, los nombres de parámetro puede ser cambiado. Esto haría que las columnas de a no ser encontrada si el archivo no se ha actualizado para reflejar el cambio.

¿Cómo puedo especificar una asignación apropiada?

Estoy pensando en algo como:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  //Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
Por desgracia, no está claro lo que quiere. 1. Escribir parquet con nombres arbitrarios? 2. Cambiar el parquet de los nombres de columna después? 3. Leer un parquet con diferentes nombres de columna y «match»/mapa de campo de caso de la clase?
¿Cómo es eso? Quiero nombres de columna del conjunto de forma manual y mapa de caso de la clase params a estas columnas.
Pero la intención de los han emparejado automáticamente?
por favor ampliar. Como he dicho quiero partido manualmente.

OriginalEl autor BAR | 2015-09-12

1 Comentario

  1. 8

    Básicamente, todo lo que la asignación que usted necesita hacer se puede lograr con DataFrame.select(...). (Aquí, supongo, de que ningún tipo de conversiones se deben realizar.)
    Dado el hacia adelante y hacia atrás asignación como los mapas, la parte esencial es

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
    //personsDF your original dataframe  
    val mappedDF = personsDF.select( mapping: _* )

    donde el mapeo es una matriz de Columns con alias.

    Código de ejemplo

    object Example {   
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}
    case class Person(name: String, age: Int)
    object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
    }
    def main(args: Array[String]) : Unit = {
    //init
    val conf = new SparkConf()
    .setAppName( "Example." )
    .setMaster( "local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF
    writeParquet( personsDF, "persons.parquet", sc, sqlContext)
    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
    }
    def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from
    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") //parquet with columns "a" and "b"
    }
    def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) //this df has columns a and b
    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
    }
    }

    Comentario

    Si usted necesita para convertir un dataframe de nuevo a un dispositivo de este tipo[de una Persona], entonces

    val rdd : RDD[Row] = personsDF.rdd
    val personsRDD : Rdd[Person] = rdd.map { r: Row => 
    Person( r.getAs("person"), r.getAs("age") )
    }

    Alternativas

    También un vistazo a Cómo convertir chispa SchemaRDD en RDD de mi caso de la clase?

    Buen enfoque. ¿Crees que esto tendría un impacto en el rendimiento, o no debería ser un factor ya que este es compilado y optimizado una vez en el interior de la tubería?
    Supongo que el último. En primer lugar, como no hay Catalizador de optimización / compilación. Segundo, selecciona (con el alias) no parecen ser costosas operaciones. Aunque, estaría interesado en ver algunas de las medidas de rendimiento ….

    OriginalEl autor Martin Senne

Dejar respuesta

Please enter your comment!
Please enter your name here