Esta pregunta no es nueva, sin embargo, me estoy encontrando comportamiento sorprendente en la Chispa. Necesito agregar una columna de Identificadores de fila a un DataFrame. He utilizado el DataFrame método monotonically_increasing_id() y no me dan un adicional de col de uniques Identificadores de fila (que NO son consecutivos por el camino, pero son únicos).

El problema que estoy teniendo es que cuando me filtrar el DataFrame los Identificadores de fila en el resultado DataFrame son re-asignado. Los dos DataFrames se muestra a continuación.

  • la primera de ellas es la inicial de DataFrame con Identificadores de fila agregado como sigue:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • la segunda DataFrame es la que se obtiene después de filtrado en la col P a través de df.filter(col("P")).

El problema es ilustrado por el rowId para custId 169 de la oit, el cual fue de 5 en la parte inicial del DataFrame, pero después de filtrado que rowId (5) fue re-asignado a custmId 773 cuando custId 169 se filtran! No sé por qué este es el comportamiento predeterminado.

Me gustaría que la rowIds a ser «pegajosa»; si puedo eliminar filas de la DataFrame no quieren que su Id de «re-uso», yo quiero que ellos se han ido también, junto con sus filas. Es posible hacer eso? Yo no veo ninguna banderas para solicitar este comportamiento de monotonically_increasing_id método.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|
  • Podría usted compartir su código completo para la generación de los dos ejemplo DataFrames? Para lo que vale, esto es probablemente debido a la optimización de consultas SQL que tiene lugar en el cual «independiente» mapa de fases puede ser reorganizado.
  • Hamel, realmente no hay otro transformaciones o acción que he publicado. Las tramas de datos que se muestran son el resultado de la df.show(). Usted puede muy fácilmente recrear este comportamiento, crear un marco de datos y agregar un IDENTIFICADOR de fila de la columna como la de arriba, a continuación, añadir una aleatorias booleanas columna a ella. Luego de filtro en la columna y ver cómo los Identificadores de fila que usted obtiene de monótonamente creciente son «re-utilizados» como lo describo.
  • Yo en realidad iba a añadir que la forma más sencilla de reproducir es el uso de una única partición.
  • Problema en la Chispa tracker para esto: CHISPA 14241
  • Gracias Nick por tomar esto.
  • Este comportamiento se está fijo en la Chispa de la 2.1.0. Consulte CHISPA 14393 y el vinculado PRs.
  • De niza. Gracias por actualizar el hilo.

InformationsquelleAutor Kai | 2016-02-29

6 Comentarios

  1. 16

    Spark 2.0

    • Este es un problema que se ha resuelto en la Chispa de la 2.0 con CHISPA-14241.

    • Otro problema similar se ha resuelto en la Chispa de la 2.1 con CHISPA-14393

    Chispa 1.x

    Problema que la experiencia es más bien sutil, pero puede ser reducido a un simple hecho de monotonically_increasing_id es una muy fea función. Claramente no es puro y su valor depende de algo que está completamente fuera de su control.

    No toma ningún parámetro, por lo que desde el optimizador de perspectiva no importa cuando se le llama y puede ser empujado después de todas las demás operaciones. Por lo tanto el comportamiento que se observa.

    Si usted toma un vistazo al código que encontrarás este es explícitamente marcado por la ampliación de MonotonicallyIncreasingID expresión con Nondeterministic.


    Creo que no hay ninguna solución elegante pero de una manera que usted puede manejar esto es añadir una artificial de la dependencia en el filtrado de valor. Por ejemplo, con un UDF como esto:

    from pyspark.sql.types import LongType
    from pyspark.sql.functions import udf
    
    bound = udf(lambda _, v: v, LongType()) 
    
    (df
      .withColumn("rn", monotonically_increasing_id())
      # Due to nondeterministic behavior it has to be a separate step
      .withColumn("rn", bound("P", "rn"))  
      .where("P"))

    En general, podría ser más limpio para añadir índices a partir de los zipWithIndex en un RDD y, a continuación, volver a convertir un DataFrame.


    * Solución mostrada anteriormente ya no es una solución válida (ni necesario) en la Chispa de la 2.x donde Python Udf son sujetos de la ejecución del plan de optimizaciones.

  2. 4

    No podía reproducir este. Estoy usando Spark 2.0, así que tal vez el comportamiento ha cambiado, o que no estoy haciendo lo mismo que usted.

    val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
    .toDF("name", "value","flag")
    .withColumn("rowd", monotonically_increasing_id())
    
    df.show
    
    val df2 = df.filter(col("flag")=== true)
    
    df2.show
    
    df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
    +-----+-----+-----+----+
    | name|value| flag|rowd|
    +-----+-----+-----+----+
    |  one|    1| true|   0|
    |  two|    2|false|   1|
    |three|    3| true|   2|
    | four|    4| true|   3|
    +-----+-----+-----+----+
    df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
    +-----+-----+----+----+
    | name|value|flag|rowd|
    +-----+-----+----+----+
    |  one|    1|true|   0|
    |three|    3|true|   2|
    | four|    4|true|   3|
    +-----+-----+----+----+
    
  3. 3

    Recientemente estuve trabajando en un problema similar. Aunque monotonically_increasing_id() es muy rápido, no es confiable y no va a dar usted consecutivos números de fila, sólo el aumento de enteros únicos.

    La creación de una partición de windows y, a continuación, utilizando row_number().over(some_windows_partition) consume mucho tiempo.

    La mejor solución hasta el momento es el uso de zip con el índice y, a continuación, convertir el archivo comprimido de nuevo a la original dataframe, con el nuevo esquema, incluyendo el índice de la columna.

    Intente esto:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType
    
    new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
    zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
    indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
    

    Donde original_dataframe es el dataframe tienes que añadir un índice en y row_with_index es el nuevo esquema con el índice de columna que puede escribir como

    row_with_index = Row(
    "calendar_date"
    ,"year_week_number"
    ,"year_period_number"
    ,"realization"
    ,"index"
    )
    

    Aquí, calendar_date, year_week_number, year_period_number, y realization fueron las columnas de mi original dataframe. Puede reemplazar los nombres con los nombres de las columnas. El índice es el nuevo nombre de la columna había que añadir para los números de fila.

    Este proceso es en gran medida más eficiente y más suave cuando se compara con row_number().over(some_windows_partition) método.

    Espero que esto ayude.

  4. 2

    Para obtener todo el cambio de la evaluación de monotonically_increasing_id(), usted podría intentar escribir el dataframe en el disco, y la re-lectura. A continuación, la columna de id de ahora es simplemente un campo de datos que se lee, sino que calcula dinámicamente en algún punto de la tubería. Aunque es bastante feo solución, funcionó cuando hice una prueba rápida.

  5. 2

    Esto funcionó para mí. Crea otra columna de identidad y utiliza la ventana de la función row_number

    import org.apache.spark.sql.functions.{row_number}
    import org.apache.spark.sql.expressions.Window
    
    val df1: DataFrame = df.withColumn("Id",lit(1))
    
    df1
    .select(
    ...,
    row_number()
    .over(Window
    .partitionBy("Id"
    .orderBy(col("...").desc))
    )
    .alias("Row_Nbr")
    )
    
    • Esto iba a funcionar, pero la fuerza de los datos en la partición 1 extracción de la distribución y posiblemente conducir a la memoria excepciones en conjuntos de datos grandes

Dejar respuesta

Please enter your comment!
Please enter your name here