Cómo hacer coincidir Dataframe los nombres de columna de la Scala caso de los atributos de la clase?

Los nombres de columna en este ejemplo de chispa-sql provienen de la case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... //An RDD of case class objects, from the previous example.

//The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

Sin embargo, en muchos casos, los nombres de parámetro puede ser cambiado. Esto haría que las columnas de a no ser encontrada si el archivo no se ha actualizado para reflejar el cambio.

¿Cómo puedo especificar una asignación apropiada?

Estoy pensando en algo como:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  //Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
Por desgracia, no está claro lo que quiere. 1. Escribir parquet con nombres arbitrarios? 2. Cambiar el parquet de los nombres de columna después? 3. Leer un parquet con diferentes nombres de columna y «match»/mapa de campo de caso de la clase?
¿Cómo es eso? Quiero nombres de columna del conjunto de forma manual y mapa de caso de la clase params a estas columnas.
Pero la intención de los han emparejado automáticamente?
por favor ampliar. Como he dicho quiero partido manualmente.

OriginalEl autor BAR | 2015-09-12

1 Kommentar

  1. 8

    Básicamente, todo lo que la asignación que usted necesita hacer se puede lograr con DataFrame.select(...). (Aquí, supongo, de que ningún tipo de conversiones se deben realizar.)
    Dado el hacia adelante y hacia atrás asignación como los mapas, la parte esencial es

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
    //personsDF your original dataframe  
    val mappedDF = personsDF.select( mapping: _* )

    donde el mapeo es una matriz de Columns con alias.

    Código de ejemplo

    object Example {   
    
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkContext, SparkConf}
    
      case class Person(name: String, age: Int)
    
      object Mapping {
        val from = Map("name" -> "a", "age" -> "b")
        val to = Map("a" -> "name", "b" -> "age")
      }
    
      def main(args: Array[String]) : Unit = {
        //init
        val conf = new SparkConf()
          .setAppName( "Example." )
          .setMaster( "local[*]")
    
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //create persons
        val persons = Seq(Person("bob", 35), Person("alice", 27))
        val personsRDD = sc.parallelize(persons, 4)
        val personsDF = personsRDD.toDF
    
        writeParquet( personsDF, "persons.parquet", sc, sqlContext)
    
        val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
      }
    
      def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
        import Mapping.from
    
        val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
    
        val mappedDF = personsDF.select( mapping: _* )
        mappedDF.write.parquet("/output/path.parquet") //parquet with columns "a" and "b"
      }
    
      def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
        import Mapping.to
        val df = sqlContext.read.parquet(path) //this df has columns a and b
    
        val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
        df.select( mapping: _* )
      }
    }

    Comentario

    Si usted necesita para convertir un dataframe de nuevo a un dispositivo de este tipo[de una Persona], entonces

    val rdd : RDD[Row] = personsDF.rdd
    val personsRDD : Rdd[Person] = rdd.map { r: Row => 
      Person( r.getAs("person"), r.getAs("age") )
    }

    Alternativas

    También un vistazo a Cómo convertir chispa SchemaRDD en RDD de mi caso de la clase?

    Buen enfoque. ¿Crees que esto tendría un impacto en el rendimiento, o no debería ser un factor ya que este es compilado y optimizado una vez en el interior de la tubería?
    Supongo que el último. En primer lugar, como no hay Catalizador de optimización / compilación. Segundo, selecciona (con el alias) no parecen ser costosas operaciones. Aunque, estaría interesado en ver algunas de las medidas de rendimiento ….

    OriginalEl autor Martin Senne

Kommentieren Sie den Artikel

Bitte geben Sie Ihren Kommentar ein!
Bitte geben Sie hier Ihren Namen ein

Pruebas en línea