Me gustaría volver a crear particiones /se unen mis datos para que se guarda en uno de Parquet archivo de cada partición. También me gustaría utilizar la Chispa de SQL partitionBy de la API. Así que yo podría hacer así:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status")
  .mode(SaveMode.Append).parquet(s"$location")

He probado esto y no parecen funcionar bien. Esto es debido a que sólo hay una partición en el conjunto de datos y todos los de la partición, la compresión y el almacenamiento de los archivos tiene que ser hecho por un núcleo de la CPU.

Podía volver a escribir este para hacer las particiones manualmente (usando el filtro con las distintas partición de valores, por ejemplo) antes de llamar a fusionarse.

Pero hay una manera mejor de hacer esto usando el estándar de la Chispa de la API de SQL?

  • Encontraste la solución?

2 Comentarios

  1. 70

    Tuve el mismo problema y he encontrado una manera de hacer esto utilizando DataFrame.repartition(). El problema con el uso de coalesce(1) es que el paralelismo gotas a 1, y puede ser lento en el mejor y el error en el peor. Aumentar ese número no ayuda, si usted no coalesce(10) obtener más paralelismo, pero terminan con 10 archivos por partición.

    Para obtener un archivo por partición sin usar coalesce(), uso repartition() con las mismas columnas que desea que la salida particiones. Así que, en su caso, hacer esto:

    import spark.implicits._
    df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
    

    Una vez que hago eso me llega uno de parquet archivo por salida de la partición, en lugar de varios archivos.

    He probado esto en Python, pero supongo que en Scala debe ser el mismo.

    • Gracias por este. Voy a probarlo y responder con resultados…
    • Supongo que @PatrickMcGloin no informe de la espalda, pero este funciona muy bien y voy a animar a Patrick a aceptar la respuesta.
    • estás en lo correcto. Respuesta aceptada. Gracias user3033652.
    • sólo una nota que en scala 2.0 que uno necesita para dar una nueva org.apache.chispa.sql.Columna (la»entidad»), etcétera. como argumento para volver a crear particiones
    • O simplemente $"entity".
    • El uso de Chispa 1.6 esto funciona perfectamente para Parquet. Sin embargo, con Avro yo todavía acabar con varios archivos de cada partición.
    • Me estoy poniendo org.apache.spark.sql.AnalysisException: Cannot use all columns for partition columns a pesar de que yo estoy usando sólo 4 columnas de los 30. Alguna idea?
    • Hay algún lugar esta diferencia entre coalesce y repartition es mejor explicado? Específicamente las implicaciones para escribir el paralelismo. Comentarios en el código: github.com/apache/spark/blob/master/sql/core/src/main/scala/org/…

  2. 9

    Por definición :

    unen(numPartitions: Int): DataFrame
    Devuelve un nuevo DataFrame que tiene exactamente numPartitions particiones.

    Se puede utilizar para disminuir el número de particiones en la RDD/DataFrame con el numPartitions parámetro. Es útil para la ejecución de las operaciones de manera más eficiente después de la filtración de un gran conjunto de datos.

    Respecto a su código, no funciona bien porque lo que está haciendo en realidad es :

    1. poner todo en 1 partición que sobrecarga el conductor, ya que es sacar todos los datos en la partición 1 en el controlador (y también que no es una buena práctica)

    2. coalesce realidad baraja todos los datos en la red que puede también resultar en la pérdida de rendimiento.

    El shuffle es la Chispa del mecanismo para la re-distribución de los datos, de manera que se agrupan de manera diferente a través de las particiones. Normalmente, esto implica la copia de datos a través de los ejecutores y de las máquinas, haciendo que el shuffle de una operación compleja y costosa.

    La shuffle concepto es muy importante para manejar y entender. Siempre es preferible mezclar el mínimo posible porque es una operación costosa, ya que implica el disco I/O, de serialización de datos, y de la red I/O. Para organizar los datos para el shuffle, Chispa genera conjuntos de tareas – mapa de tareas para organizar los datos, y un conjunto de reducir las tareas para el agregado. Esta nomenclatura proviene de MapReduce y no se refieren directamente a la Chispa del mapa y reducir las operaciones.

    Internamente, los resultados individuales de mapa de tareas se guardan en la memoria hasta que no puede encajar. Entonces, estas son clasificadas de acuerdo a la partición de destino y escrito en un solo archivo. En el reducir lado, las tareas de leer atentamente las correspondientes ordenadas bloques.

    Sobre partición de parquet, sugiero que lea la respuesta aquí sobre Spark DataFrames con suelos de Parquet Particiones y también en este sección en la Chispa de la Guía de Programación para la Optimización del Rendimiento.

    Espero que esto ayude !

    • Hola, gracias por su respuesta. Estoy muy bien con la unen a tener un costo. En mi actual código de particionar los datos manualmente, entonces la llamada se unen y ahorrar en cada una de las particiones que se ejecuta bien. Pero en lugar de escribir el partitionBy paso a mí mismo que me gustaría utilizar la API adecuada. Pero, al hacerlo, el se unen tiene que venir antes de la partitionBy. Que es donde estoy atascado.
    • Pero lo que estamos haciendo es poner todo en 1 partición y, a continuación, partitionBy, usted debe simplemente partitionBy lugar
    • Pero luego me terminan con muchos archivos. Quiero limitar el número de Parquet archivos creados. Estoy de transmisión y el ahorro de una vez por minuto, por lo que cada partición ya tiene 1440 archivos. No quiero que multiplicar.
    • Ok déjame ponerlo de esta manera, el código se escribe una parquet archivo por partición de sistema de archivos (local o HDFS). Esto significa que si usted tiene 10 entidad distinta y 3 años distintos de 12 meses cada uno, etc usted podría terminar creando 1440 archivos.
    • Creo que no hay cables cruzados. Estoy transmisión de los datos y llamar a salvar a cada minuto. Así como no hay datos de esa partición, se obtendrá un nuevo archivo cada minuto. SaveMode.Anexar en Parquet crea un nuevo archivo.
    • ¿qué piensa usted acerca de esto ?
    • Todavía no sé cómo de manera eficiente guardar datos en uno de Parquet archivo por partición utilizando el API estándar, así que no creo que responda a la pregunta.
    • Me estaba preguntando zero323 @PatrickMcGloin y no a usted. 🙂
    • Si se me permite preguntar, ¿cuántas particiones ¿crees que tu DataFrame debe tener ?
    • Ja, lo siento. Por día, alrededor de las 10.
    • y cómo muchos de ellos únicos (la»entidad», «año», «mes», «día», «el estado») tupla tiene usted ? Yo diría 1440…
    • gracias … tengo una gran cantidad de datos , que es accumalated cada año , trimestral sabio. Estos datos se tuerce un poco , cuando yo trate de obtener todos los datos en un dataframe por repartitoning en («año», «cuarto») se está barajando una gran cantidad de datos en el disco derrame que está haciendo mi trabajo lento , es más sólo un ejecutor de trabajo el 80% del tiempo. Por lo tanto he decidido 1) obtener los distintos grupos de dataframe , la agrupación por año y trimestre sabio. 2) repetir/loop esta distinta marco de datos por grupo para obtener los datos de grupo donde a = año del grupo de guardar este dataframe/grop como parquet archivo
    • continuar la iteración. En Java se puede utilizar para el bucle de grupos, pero en chispa con la scala de cómo hacerlo ? su ayuda es muy apreciada.
    • Por lo tanto he decidido 1) obtener los distintos grupos de dataframe , la agrupación por año y trimestre sabio. 2) repetir/loop esta distinta marco de datos por grupo ; para obtener los datos de grupo donde a = año del grupo ; guardar este dataframe/grop como parquet archivo ; continuar la iteración;;;; En Java se puede utilizar para el bucle de grupos, pero en chispa con la scala de cómo hacerlo ?

Dejar respuesta

Please enter your comment!
Please enter your name here