Tengo un dataframe con un esquema como tal:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

Buscando una manera de grupo (o tal vez acumulativo?) este dataframe por visitorid donde el trackingIds y emailIds columnas gustaría añadir juntos. Así, por ejemplo, si mi inicial df parece:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []

Me gustaría que mi salida de df a este aspecto

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]

Intentar utilizar groupBy y agg los operadores, pero no tienen mucha suerte.

3 Comentarios

  1. 38

    Chispa >= 2.4

    Puede reemplazar flatten udf con construido-en acoplar función

    import org.apache.spark.sql.functions.flatten

    dejando el resto como está.

    Chispa >= 2.0, < 2.4

    Es posible, pero bastante caro. Utilizando los datos que usted ha proporcionado:

    case class Record(
        visitorId: String, trackingIds: Array[String], emailIds: Array[String])
    
    val df = Seq(
      Record("a158", Array("666b"), Array("12")),
      Record("7g21", Array("c0b5"), Array("45")),
      Record("7g21", Array("c0b4"), Array("87")),
      Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF

    y una función auxiliar:

    import org.apache.spark.sql.functions.udf
    
    val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)

    podemos llenar los espacios en blanco con marcadores de posición:

    import org.apache.spark.sql.functions.{array, lit, when}
    
    val dfWithPlaceholders = df.withColumn(
      "emailIds", 
      when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))

    collect_lists y flatten:

    import org.apache.spark.sql.functions.{array, collect_list}
    
    val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
    val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
    
    df
      .groupBy($"visitorId")
      .agg(trackingIds, emailIds)
    
    //+---------+------------------+--------+
    //|visitorId|       trackingIds|emailIds|
    //+---------+------------------+--------+
    //|     a158|[666b, 666b, 777c]|  [12, ]|
    //|     7g21|      [c0b5, c0b4]|[45, 87]|
    //+---------+------------------+--------+

    Con estáticamente con Dataset:

    df.as[Record]
      .groupByKey(_.visitorId)
      .mapGroups { case (key, vs) => 
        vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
          case (trackingIds, emailIds) => 
            Record(key, trackingIds.flatten, emailIds.flatten)
      }}
    
    //+---------+------------------+--------+
    //|visitorId|       trackingIds|emailIds|
    //+---------+------------------+--------+
    //|     a158|[666b, 666b, 777c]|  [12, ]|
    //|     7g21|      [c0b5, c0b4]|[45, 87]|
    //+---------+------------------+--------+

    Chispa 1.x

    Usted puede convertir el RDD y grupo

    import org.apache.spark.sql.Row
    
    dfWithPlaceholders.rdd
      .map {
         case Row(id: String, 
           trcks: Seq[String @ unchecked],
           emails: Seq[String @ unchecked]) => (id, (trcks, emails))
      }
      .groupByKey
      .map {case (key, vs) => vs.toArray.unzip match {
        case (trackingIds, emailIds) => 
          Record(key, trackingIds.flatten, emailIds.flatten)
      }}
      .toDF
    
    //+---------+------------------+--------+
    //|visitorId|       trackingIds|emailIds|
    //+---------+------------------+--------+
    //|     7g21|      [c0b5, c0b4]|[45, 87]|
    //|     a158|[666b, 666b, 777c]|  [12, ]|
    //+---------+------------------+--------+
    • Lo que si tenemos que eliminar los duplicados en trackingIds?
    • A menos que me estoy perdiendo algo, el tipo estático ejemplo también se puede hacer con reduceGroups en lugar de mapGroups (y requerirá un mapa subsecuente además para obtener el resultado en el formato deseado). La documentación para el mapGroups parecen implicar que reducen puede ser más eficiente («Si una aplicación tiene la intención de realizar una agregación encima de cada tecla, es mejor utilizar el de reducir función o un Agregador»).
    • Su observación parece ser válida, pero no sería de aplicación al caso, donde la función de agregado opera en la memoria constante. Aquí, donde size(f(x, y)) = size(x) + size(y), el costo de agregación (y repite la copia de datos) superan con creces los beneficios de mapa del lado de reducción, en casi todos los casos.
    • href=»https://stackoverflow.com/q/54616498/6910411″>Chispa SQL: el uso de collect_set sobre la matriz de valores?
    • la implementación de un aplanar udf para Spark >= 2.0, < 2.4 .. ¿por qué dicen que va a ser muy caro?
  2. 20

    @zero323 la respuesta es bastante muy completa, pero la Chispa que nos da más flexibilidad. Cómo sobre la siguiente solución?

    import org.apache.spark.sql.functions._
    inventory
      .select($"*", explode($"trackingIds") as "tracking_id")
      .select($"*", explode($"emailIds") as "email_id")
      .groupBy("visitorId")
      .agg(
        collect_list("tracking_id") as "trackingIds",
        collect_list("email_id") as "emailIds")

    Que sin embargo deja fuera todos los vacíos de las colecciones (así que hay margen de mejora :))

    • En esta solución, es posible aplicar un orderBy() después de la groupBy y antes de la agg()? O en esta situación orderBy va a ser no determinista?
    • En mi opinión, la respuesta no es la novela para el seguimiento razones a)explotar está en desuso en la chispa.2.2 . b) collect_list en un conjunto de datos muy grande puede bloquear el proceso del controlador con OutOfMemoryError
    • Por favor, no se confunda con explode operador y explode función. B) posiblemente.
    • Laskowski Hace java tienen equivalente para explotar la función?
    • ¿por qué controlador de ir fuera de la memoria? es que no collect_list ejecutor lado? no recoger.
  3. 2

    Puede utilizar definido por el Usuario agregada funciones.

    1) crear una personalizada UDAF el uso de la scala clase llamada customAggregation.

    package com.package.name
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    import scala.collection.JavaConverters._
    class CustomAggregation() extends UserDefinedAggregateFunction {
    //Input Data Type Schema
    def inputSchema: StructType = StructType(Array(StructField("col5", ArrayType(StringType))))
    //Intermediate Schema
    def bufferSchema = StructType(Array(
    StructField("col5_collapsed",  ArrayType(StringType))))
    //Returned Data Type .
    def dataType: DataType = ArrayType(StringType)
    //Self-explaining
    def deterministic = true
    //This function is called whenever key changes
    def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = Array.empty[String] //initialize array
    }
    //Iterate over each entry of a group
    def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) =
    if(!input.isNullAt(0))
    buffer.getList[String](0).toArray ++ input.getList[String](0).toArray
    else
    buffer.getList[String](0).toArray
    }
    //Merge two partial aggregates
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1.getList[String](0).toArray ++ buffer2.getList[String](0).toArray
    }
    //Called after all the entries are exhausted.
    def evaluate(buffer: Row) = {
    buffer.getList[String](0).asScala.toList.distinct
    }
    }

    2), a Continuación, utilizar la UDAF en el código como

    //define UDAF
    val CustomAggregation = new CustomAggregation()
    DataFrame
    .groupBy(col1,col2,col3)
    .agg(CustomAggregation(DataFrame(col5))).show()
    • Usted debe realmente evitar UserDefinedAggregateFunctions con complejo de búferes.
    • por qué así?
    • He construido una UDAF con un Scala caso de la clase en el búfer. No quiero decir que es muy eficiente, pero desde que he usado esto en una canalización de la PNL, la agregación de los costes de procesamiento ~2 órdenes de magnitud menor que los costos para el documento de análisis. Por lo que dependiendo de su aplicación, un UDAF, mientras que el lento, podría estar bien.

Dejar respuesta

Please enter your comment!
Please enter your name here