Tengo un parquet de tabla con una de las columnas que se

array<struct<col1,col2,..colN>>

Se pueden ejecutar consultas contra esta tabla en la Colmena mediante VISTA LATERAL de la sintaxis.

Cómo leer esta tabla en un dispositivo de este tipo, y lo que es más importante cómo filtrar, mapa etc esta anidada colección en la Chispa?

No podía encontrar ninguna referencia a esto en la Chispa de la documentación. Gracias de antemano por cualquier información!

ps. Sentía que podría ser útil dar algunas estadísticas sobre la mesa.
Número de columnas en la tabla principal de ~600. Número de filas ~200m.
Número de «columnas» en anidados colección de ~10. Avg número de registros en anidados colección de ~35.

InformationsquelleAutor Tagar | 2015-05-02

4 Comentarios

  1. 20

    No hay magia en el caso de las de la colección. La chispa se encargará de la misma manera que un RDD[(String, String)] y un RDD[(String, Seq[String])].

    La lectura de tales anidada de la colección de Parquet archivos puede ser difícil, sin embargo.

    Vamos a tomar un ejemplo de la spark-shell (1.3.1):

    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Inner(a: String, b: String)
    defined class Inner
    
    scala> case class Outer(key: String, inners: Seq[Inner])
    defined class Outer
    

    Escribir el parqué de archivo:

    scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
    outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
    
    scala> outers.toDF.saveAsParquetFile("outers.parquet")
    

    Leer el parqué de archivo:

    scala> import org.apache.spark.sql.catalyst.expressions.Row
    import org.apache.spark.sql.catalyst.expressions.Row
    
    scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
    dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   
    
    scala> val outers = dataFrame.map { row =>
         |   val key = row.getString(0)
         |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
         |   Outer(key, inners)
         | }
    outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
    

    La parte importante es row.getAs[Seq[Row]](1). La representación interna de una secuencia anidada de struct es ArrayBuffer[Row], usted podría utilizar cualquier super-tipo de lugar de Seq[Row]. El 1 es el índice de columna en la fila exterior. He utilizado el método de getAs aquí, pero hay alternativas en las últimas versiones de Chispa. Ver el código fuente de la Fila rasgo.

    Ahora que usted tiene un RDD[Outer], se puede aplicar cualquier quería transformación o de la acción.

    //Filter the outers
    outers.filter(_.inners.nonEmpty)
    
    //Filter the inners
    outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
    

    Nota que hemos utilizado la chispa-SQL sólo la biblioteca para leer el parqué de archivo. Usted podría, por ejemplo, seleccionar sólo la quería columnas directamente en el DataFrame, antes de asignar a un dispositivo de este tipo.

    dataFrame.select('col1, 'col2).map { row => ... }
    
    • Gracias Lomig por la respuesta detallada. He marcado como respuesta correcta. Aunque todavía no estamos en Spark 1.3, la planificación de la actualización de este mes. Es posible prescindir de la trama de datos de la API en la Chispa de la 1.2? Podría usted por favor hágamelo saber cómo getAs[Seq[Fila]](1) obras? Índice [1] es la posición de la columna que contiene las matrices anidadas, esto es correcto?
    • Véase mi edición. Para Spark 1.2, se puede usar el mismo código para la transformación de Row a su caso de la clase. Por favor refiérase a la documentación oficial para la sintaxis para leer un parquet archivo en versiones anteriores, está muy cerca.
    • Lo consiguió. Muchas gracias. github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/… GetSeq[Fila](1) haría así?
    • Eres bienvenido. Sí, getSeq[Row] será una alternativa. No estoy seguro de que este método estaba disponible en la Chispa de la 1.2, aunque. Yo deje de comprobar.
    • Vi un post de hoy en [email protected] lista esa Chispa de SQL soporta VISTA LATERAL de la sintaxis directamente. Le intente de las dos maneras una vez que estamos en la Chispa 1.3; (a la espera de la CDH 5.4.1 para ser puesto en libertad antes de que podamos actualizar)
  2. 8

    Voy a dar un Python basado en la respuesta, ya que es lo que estoy usando. Creo Scala tiene algo similar.

    La explode función fue añadida en la Chispa de la 1.4.0 para manejar matrices anidadas en DataFrames, de acuerdo a la Python API docs.

    Crear una prueba dataframe:

    from pyspark.sql import Row
    
    df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
    df.show()
    
    ## +-+--------------------+
    ## |a|             intlist|
    ## +-+--------------------+
    ## |1|ArrayBuffer(1, 2, 3)|
    ## |2|ArrayBuffer(4, 5, 6)|
    ## +-+--------------------+
    

    Uso explode para aplanar la columna de la lista:

    from pyspark.sql.functions import explode
    
    df.select(df.a, explode(df.intlist)).show()
    
    ## +-+---+
    ## |a|_c0|
    ## +-+---+
    ## |1|  1|
    ## |1|  2|
    ## |1|  3|
    ## |2|  4|
    ## |2|  5|
    ## |2|  6|
    ## +-+---+
    
    • Gracias dnlbrky. Parece más sencillo de leer que la Scala. Definitivamente voy a probar tu python ejemplo.., probablemente no habría Chispa 1.4 pesar de que hasta en algún momento a finales de este año, una vez Cloudera libera de la CDH 5.5 🙂 Esperamos tener la Chispa de 1,5 por ese tiempo.
  3. 3

    Otro enfoque podría ser el uso de la coincidencia de patrones como este:

    val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
      case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
        case List(a:String, b: String) => (a, b)
      }).toList
    })
    

    Puede patrón coincide directamente en Fila, pero probablemente no para un par de razones.

  4. 1

    Respuestas de arriba son todos grandes respuestas y abordar esta cuestión desde diferentes lados; la Chispa de SQL también es muy útil para el acceso de datos anidadas.

    Aquí el ejemplo de cómo utilizar la función explode() en SQL directamente a la consulta anidada colección.

    SELECT hholdid, tsp.person_seq_no 
    FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
            FROM disc_mrt.unified_fact uf
         )
    

    tsp_ids es un entramado de estructuras, que tiene muchos atributos, incluyendo person_seq_no que estoy seleccionando en la consulta externa de arriba.

    Anterior fue probado en la Chispa de la 2.0. Hice una pequeña prueba y no funciona en la Chispa de la 1.6. Esta pregunta se la hicieron cuando la Chispa 2 no estaba alrededor, por lo que esta respuesta agrega muy bien a la lista de opciones disponibles para tratar con estructuras anidadas.

    Notable no se ha resuelto JIRAs en explode() para el acceso SQL:

Dejar respuesta

Please enter your comment!
Please enter your name here