Cómo son las etapas de división en tareas de Chispa?

Supongamos que para el siguiente que sólo una Chispa de trabajo se está ejecutando en cada momento.

Lo que quiero llegar tan lejos

Aquí es lo que yo entiendo de lo que sucede en la Chispa de la:

  1. Cuando un SparkContext se crea, cada trabajador nodo inicia un albacea.
    Los ejecutores son procesos separados (JVM), que se conecta de nuevo con el programa del controlador. Cada ejecutor tiene la jarra de que el conductor del programa. Dejar de un conductor, se apaga el ejecutores. Cada ejecutor puede contener algunas de las particiones.
  2. Cuando un trabajo se ejecuta un plan de ejecución se crea de acuerdo con el linaje gráfico.
  3. La ejecución del trabajo se divide en etapas, donde las etapas que contiene como muchos de los vecinos (en el linaje gráfico) de las transformaciones y de la acción, pero no baraja. Así, las fases están separadas por baraja.

Cómo son las etapas de división en tareas de Chispa?

Entiendo que

  • Una tarea es un comando enviado desde el controlador a un albacea por serializar el objeto de la Función.
  • El ejecutor deserializa (con el controlador de tarro) el comando (tarea) y se ejecuta en una partición.

pero

Pregunta(s)

¿Cómo puedo dividir la etapa en esas tareas?

Específicamente:

  1. Son las tareas determinadas por las transformaciones y las acciones o pueden ser múltiples transformaciones/acciones de estar en una tarea?
  2. Son las tareas determinadas por la partición (por ejemplo, una tarea por cada etapa por partición).
  3. Son las tareas determinadas por los nodos (por ejemplo, una tarea por etapa por nodo)?

Lo que yo creo (sólo respuesta parcial, incluso si la derecha)

En https://0x0fff.com/spark-architecture-shuffle, el shuffle se explica con la imagen

Cómo son las etapas de división en tareas de Chispa?

y tengo la impresión de que la regla es

cada etapa está dividida en #número de particiones de tareas, sin tener en cuenta el número de nodos

Para mi primera imagen me gustaría decir que me gustaría tener 3 mapa de tareas y de 3 a reducir las tareas.

Para la imagen de 0x0fff, yo diría que hay 8 mapa de tareas y de 3 a reducir las tareas (suponiendo que sólo hay tres de naranja y tres de color verde oscuro archivos).

Preguntas abiertas en cualquier caso

¿Es correcto? Pero incluso si eso es correcto, mis preguntas no son respondidas, porque está todavía abierto, si varias operaciones (por ejemplo, varios mapas) están dentro de una tarea o se separan en una de las tareas por operación.

Lo que otros dicen

Lo que es una tarea en la Chispa? ¿Cómo funciona la Chispa trabajador ejecutar el archivo jar? y ¿Cómo funciona el Apache Spark programador de dividir archivos en tareas? son similares, pero yo no siento que mi pregunta fue respondida claramente allí.

InformationsquelleAutor Make42 | 2016-05-30

3 Kommentare

  1. 42

    Usted tiene una muy buena esbozar aquí. Para responder a sus preguntas

    • Separado task hace necesidad de ser lanzado para cada partición de datos para cada stage. Considere la posibilidad de que cada partición es probable que residen en distintas ubicaciones físicas – por ejemplo, los bloques en HDFS o directorios/volúmenes para un sistema de archivos local.

    Nota de que la presentación de Stages es impulsado por el DAG Scheduler. Esto significa que las etapas que no son interdependientes, podrá ser sometido a la agrupación para la ejecución en paralelo: esto maximiza la paralelización de capacidad en el clúster. Así que si sus operaciones en nuestro flujo de datos pueden suceder simultáneamente vamos a esperar a ver varias etapas en marcha.

    Podemos verlo en acción en el siguiente juguete ejemplo en el que podemos hacer los siguientes tipos de operaciones:

    • de carga de dos orígenes de datos
    • realizar algunas mapa de la operación tanto de las fuentes de datos por separado
    • unirse a ellos
    • realizar algunas mapa y las operaciones de filtro en el resultado
    • guardar el resultado

    Así que entonces, ¿cuántas etapas vamos a terminar con?

    • 1 etapa de cada uno para la carga de los dos orígenes de datos en paralelo = 2 etapas
    • Una tercera etapa que representa el join que es dependiente de la en las otras dos etapas
    • Nota: todas las operaciones sucesivas de trabajo en el conjunto de datos se puede realizar en el mismo escenario, ya que ellos deben ocurrir de forma secuencial. No hay ningún beneficio para el lanzamiento de etapas adicionales porque no se les puede empezar a trabajar hasta que el antes de la operación se ha completado.

    Aquí es que toy programa

    val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
    val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
    val spj = sfi.join(sp)
    val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
    val sf = sm.filter{ case (k,v) => v % 10 == 0 }
    sf.saveAsTextFile("/data/blah/out")
    

    Y aquí está el DAG de el resultado

    Cómo son las etapas de división en tareas de Chispa?

    Ahora: cuántos tareas ? El número de tareas debe ser igual a

    Suma de (Stage * #Partitions in the stage)

    • Gracias! Por favor, pon tu respuesta en cuanto a mi texto: 1) Es mi definición de las etapas no-integral? Parece que he perdido el requisito de que una etapa no puede contener las operaciones que puedan estar en paralelo. O es la descripción de mi estrictamente lo que implica que ya? 2) El número de tareas que deben ser ejecutadas para que el trabajo está determinado por el número de particiones, pero no el número de procesadores o nodos, mientras que el número de tareas que se pueden ejecutar al mismo tiempo es dependía del número de procesadores, ¿verdad? 3) Una tarea puede contener varias operaciones?
    • 4) ¿Qué quiere decir con la última frase? Después de todo, el número de particiones que pueden variar de una etapa a otra. ¿Quieres decir que esto es cómo haya configurado su puesto de trabajo durante todas las etapas?
    • Por supuesto, el número de particiones que puede variar de una etapa a otra – estás en lo correcto. Era mi intención diciendo sum(..) que tomar en cuenta la variación.
    • wow, su respuesta fue totalmente bien, pero por desgracia, la última frase es definitivamente un concepto erróneo. Esto no significa que los números de partición en una etapa es igual al número de procesadores, sin embargo, puede establecer el número de particiones de un dispositivo de dispersión radiactiva, de acuerdo con el número de núcleos presentado en su máquina.
    • Era un caso especial – pero estoy de acuerdo en que sería engañosa, así que me estoy quitando de ella.
  2. 17

    Esto podría ayudar a comprender mejor las diferentes piezas:

    • Etapa: es una colección de tareas. Mismo proceso que se ejecuta en contra de
      diferentes subconjuntos de datos (particiones).
    • Tarea: representa una unidad de
      trabajo en una partición de un conjunto de datos distribuidas. Así, en cada etapa,
      número de tareas = número de particiones, o como usted dice «una de las tareas por
      la etapa por la partición».
    • Cada executer se ejecuta en un hilo de contenedores, y
      cada contenedor se encuentra en un nodo.
    • Cada etapa utiliza múltiples ejecutores, cada executer se asigna múltiples vcores.
    • Cada vcore puede ejecutar exactamente una tarea a la vez
    • Así, en cualquier etapa, múltiples tareas pueden ser ejecutadas en paralelo. número de tareas que se están ejecutando = número de vcores ser utilizado.
  3. 12

    Si entiendo correctamente hay 2 ( relativa ) de las cosas que confundir:

    1) ¿Qué determina el contenido de una tarea?

    2) Lo que determina el número de tareas a ser ejecutadas?

    Chispa del motor «colas» juntos simple operaciones consecutivas ddr, por ejemplo:

    rdd1 = sc.textFile( ... )
    rdd2 = rdd1.filter( ... )
    rdd3 = rdd2.map( ... )
    rdd3RowCount = rdd3.count
    

    así que cuando rdd3 es (pereza) calculada, la chispa se genera una tarea por partición de rdd1 y cada tarea se ejecutará tanto el filtro y el mapa por línea de resultado en rdd3.

    El número de tareas se determina por el número de particiones. Cada RDD tiene definido un número de particiones. Para una fuente de DDR que se lee de HDFS ( utilizando sc.archivo de texto( … ) por ejemplo ) el número de particiones es el número de divisiones generadas por el formato de entrada. Algunas operaciones en RDD(s) puede resultar en un dispositivo de este tipo con un número diferente de particiones:

    rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
    

    Otro ejemplo es une:

    rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
    

    ( La mayoría ) de las operaciones que cambian el número de particiones que implican un shuffle, Cuando hacemos por ejemplo:

    rdd2 = rdd1.repartition( 1000 ) 
    

    lo que en realidad sucede es que el trabajo de cada una de las particiones de rdd1 necesita para producir un final de salida que puede ser leído por las siguientes etapas a fin de hacer rdd2 tienen exactamente 1000 particiones ( Cómo lo hacen? Hash o Tipo ). Las tareas en este lado se refiere a veces como «Mapa ( de lado ) tareas».
    Una tarea que posteriormente se ejecutan en rdd2 actuará en una de las particiones ( de rdd2! y habría que averiguar cómo leer/combinar el mapa del lado de las salidas relevantes para esa partición. Las tareas en este lado se refiere a veces como «Reducir ( de lado ) tareas».

    Las 2 preguntas están relacionadas con: el número de tareas en una etapa es el número de particiones ( común a los consecutivos de los ddr «pegados» juntos ) y el número de particiones de un dispositivo de este tipo puede cambiar entre las etapas ( especificando el número de particiones para algunos shuffle causando la operación, por ejemplo ).

    Una vez que la ejecución de una etapa y el comienzo, sus tareas pueden ocupar tarea ranuras. El número de concurrentes tarea-ranuras es numExecutors * ExecutorCores. En general, estos pueden ser ocupados por las tareas de diferente, no dependiente de las etapas.

Kommentieren Sie den Artikel

Bitte geben Sie Ihren Kommentar ein!
Bitte geben Sie hier Ihren Namen ein

Pruebas en línea