Cuando disminuye el número de particiones que se puede utilizar de coalesce, lo cual es genial porque no causan un shuffle y parece que funciona al instante (no requiere un trabajo adicional de la etapa).

Me gustaría hacer lo contrario a veces, pero repartition induce un shuffle. Creo que hace unos meses, de hecho, me dieron este trabajo mediante el uso de CoalescedRDD con balanceSlack = 1.0 – entonces, ¿qué pasaría se podría dividir una partición, de modo que la resultante de las particiones lugar donde todos en el mismo nodo (tan pequeño neta IO).

Este tipo de funcionalidad es automática en Hadoop, uno sólo ajustes el tamaño de división. No parece trabajar de esta manera en la Chispa a menos que uno está disminuyendo el número de particiones. Creo que la solución podría ser escribir una costumbre particionador junto con una costumbre RDD donde definimos getPreferredLocations … pero he pensado que es una simple y común, cosa que seguramente debe de ser sencilla de hacerlo?

Cosas nuevas:

.set("spark.default.parallelism", partitions) en mi SparkConf, y cuando en el contexto de la lectura de parquet he intentado sqlContext.sql("set spark.sql.shuffle.partitions= ..., que en 1.0.0, se produce un error Y realmente no quieren que yo quiero, yo quiero el número de partición para cambiar a través de todos los tipos de puestos de trabajo, no sólo baraja.

  • Cualquier suerte de encontrar una solución para esto?
InformationsquelleAutor samthebest | 2014-11-20

3 Comentarios

  1. 0

    Yo no entender exactamente lo que su punto. Qué quiere decir que usted tiene ahora 5 particiones, pero después de la siguiente operación desea que los datos se distribuyen a 10? Porque tener 10, pero aún con 5 no tiene mucho sentido… El proceso de envío de datos a nuevas particiones que tiene que suceder en algún momento.

    Al hacer coalesce, usted puede deshacerse de unsued particiones, por ejemplo: si usted tenía inicialmente 100, pero después de reduceByKey tienes 10 (como allí donde sólo el 10 claves), se puede establecer coalesce.

    Si desea que el proceso de ir a otro lado, sólo podía forzar algún tipo de partición:

    [RDD].partitionBy(new HashPartitioner(100))

    No estoy seguro de que es lo que estás buscando, pero espero.

    • Cada partición tiene un lugar, es decir, un nodo, supongamos que tengo 5 particiones y 5 nodos. Si me llaman repartition, o su código, a 10 particiones, esta baraja los datos que es de datos para cada uno de los 5 nodos pueden pasar a través de la red hacia otros nodos. Lo que yo quiero, es esa Chispa simplemente divide cada partición en 2 sin mover los datos a su alrededor – esto es lo que sucede en Hadoop al ajustar configuración de split.
    • No estoy seguro de si usted puede hacerlo. Supongo que tendría algún tipo de .forEachNode función. Pero nunca he visto nada como esto. Y no estoy seguro de si se puede implementar fácilmente. El particionador que tiene que devolver la misma partición para el mismo objeto cada vez. Por defecto Chispa uso HashPartitioner, que hacer hashCode modulo number_of_partitions. Si usted acaba de dividir los datos en dos nuevas particiones, que definitivamente iba a terminar en no sus lugares. Por eso shuffle es necesario. Tal vez si usted tiene su propio particionador, podría aumentar el número de particiones sin arrastrando los pies sobre la red.
  2. -1

    Como usted sabe pyspark el uso de algún tipo de «perezoso» modo de funcionamiento. Sólo hacer el cálculo cuando hay algún tipo de acción a realizar (por ejemplo un «df.count()» o un «df.show()». Así que lo que pueden hacer es definir el shuffle de partición entre esas acciones.

    Se puede escribir :

    sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=100")
    # you spark code here with some transformation and at least one action
    df = df.withColumn("sum", sum(df.A).over(your_window_function))
    df.count() # your action
    
    df = df.filter(df.B <10)
    df = df.count()   
    
    sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=10")
    # you reduce the number of partition because you know you will have a lot 
    # less data
    df = df.withColumn("max", max(df.A).over(your_other_window_function))
    df.count() # your action
    • spark.sql.shuffle.partitionssólo tendrá un efecto sobre la transposición de operaciones tales como suma, aggegation y clasificación… pero no en el filtrado

Dejar respuesta

Please enter your comment!
Please enter your name here