Acabo de crear python lista de range(1,100000)
.
Utilizando SparkContext realiza los siguientes pasos:
a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])
c = a.zip(b)
>>> [(1, 1), (2, 2), -----]
sum = sc.accumulator(0)
c.foreach(lambda (x, y): life.add((y-x)))
Que da aviso de la siguiente manera:
ARN TaskSetManager: Etapa 3 contiene una tarea de gran tamaño (4644 KB). El máximo recomendado para el tamaño de la tarea es de 100 KB.
Cómo resolver esta advertencia? Es allí cualquier manera de manejar el tamaño? Y también, afectará a la vez la complejidad del big data?
- Primero de todo, ¿qué es exactamente
life
? Did you meansum
acumulador? De cualquier forma, no debería ser un problema aquí. Consulte este para obtener más detalles.
La expansión de @leo9r comentario: considere la posibilidad de usar no es un python
range
, perosc.range
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range.Así evitar la transferencia de la enorme lista de su controlador a los ejecutores.
De curso, los Ddr se utilizan generalmente para propósitos de prueba, por lo que no desea para ser difundido.
sc.range
en lugar derange
obras en el juguete ejemplo, pero se pierde en el problema más general (cómo se transmiten los datos entre python y java)Chispa de forma nativa se incluye una copia de cada variable durante el envío de la tarea. Para tamaños grandes de estas variables puede que desee utilizar Difusión De Las Variables
Si usted todavía se enfrentan a problemas de tamaño, Entonces tal vez este tipo de datos debe ser un dispositivo de este tipo en sí mismo
edit: Actualizado el enlace
mapPartitions
)? Creo que no puede ser envuelto en el interior de una emisión de la variable.[i for i in range(1, 100000)]
La idea general es que PySpark crea como muchos procesos java que no son los ejecutores, y luego envía los datos para cada proceso. Si hay muy pocos procesos, una memoria de cuello de botella que sucede en el espacio del montón java.
En su caso, el error específico es que el RDD que creó con
sc.parallelize([...])
no se especifica el número de partición (argumentonumSlices
, ver la docs). Y el RDD valores predeterminados para un número de partición que es demasiado pequeño (posiblemente está constituido por una sola partición).Para resolver este problema, basta con especificar el número de particiones que quería:
Como se especifica más y más alto número de sectores, se observa una disminución en el tamaño indicado en el mensaje de advertencia. Aumentar el número de rebanadas hasta que no obtenga más mensaje de advertencia. Por ejemplo,
significa que usted necesita para especificar más sectores.
Otro consejo que puede ser útil cuando se trata de problemas de memoria (pero esto no está relacionado con el mensaje de advertencia): por defecto, la memoria disponible para cada ejecutor es de 1 GB o así. Puede especificar cantidades mayores a través de la línea de comandos, por ejemplo, con
--executor-memory 64G
.blabla contains a task of very large size
..) – yo no conozco ninguna otra manera de obtener esta información antes de intentar.