Estoy usando Chispa 1.5.

Tengo dos dataframes de la forma:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF ha 766,151 registros mientras linkPersonItemLessThan500DF ha 26,694,353 registros. Tenga en cuenta que estoy usando repartition(number) en linkPersonItemLessThan500DF ya que tengo la intención de unir estos dos más tarde. Estoy siguiendo el código anterior con:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

por que estoy obteniendo este resultado:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
at $iwC$$iwC$$iwC.<init>(<console>:93)
at $iwC$$iwC.<init>(<console>:95)
at $iwC.<init>(<console>:97)
at <init>(<console>:99)
at .<init>(<console>:103)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

y no entiendo cuál es el problema. Es tan simple como aumentar el tiempo de espera? Es la combinación demasiado intenso? Necesito más memoria? Es el shufffling intensivo? Alguien puede ayudar?

4 Comentarios

  1. 62

    Esto sucede porque la Chispa que trata de hacer la Difusión de Hash Join y uno de los DataFrames es muy grande, por lo que el envío de la que consume mucho tiempo.

    Usted puede:

    1. Mayor spark.sql.broadcastTimeout para aumentar el tiempo de espera – spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
    2. persist() tanto DataFrames, entonces Chispa a usar Shuffle Unirse referencia de aquí

    PySpark

    En PySpark, puede establecer la configuración cuando la generación de la chispa contexto de la siguiente manera:

    spark = SparkSession
    .builder
    .appName("Your App")
    .config("spark.sql.broadcastTimeout", "36000")
    .getOrCreate()
  2. 21

    Sólo para añadir un poco de código contexto para la muy concisa respuesta de @T. Gawęda.


    En la Chispa de la aplicación, la Chispa de SQL hizo elegir un de difusión hash join para la combinación porque «libriFirstTable50Plus3DF ha 766,151 registros», que pasó a ser menos que el llamado de difusión umbral (el valor predeterminado es de 10 mb).

    Puede controlar la emisión del umbral de uso de chispa.sql.autoBroadcastJoinThreshold propiedad de configuración.

    chispa.sql.autoBroadcastJoinThreshold se Configura el tamaño máximo en bytes de una tabla que será transmitido a todos los nodos de trabajo cuando se realiza una combinación. Al establecer este valor a -1 de radiodifusión puede ser deshabilitado. Tenga en cuenta que en la actualidad las estadísticas sólo son compatibles para la Colmena Metastore mesas donde el comando ANALYZE TABLE CALCULAR ESTADÍSTICAS noscan se ha ejecutado.

    Usted puede encontrar que tipo particular de unirse en el seguimiento de la pila:

    org.apache.chispa.sql.de la ejecución.se une.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

    BroadcastHashJoin operador físico en la Chispa de SQL utiliza un difusión de la variable para distribuir el conjunto de datos más pequeño para encender la Chispa de los ejecutores (en vez de enviar una copia de cada tarea).

    Si usted utiliza explain a la revisión de la física de plan de consulta que se nota la consulta utiliza BroadcastExchangeExec físico del operador. Aquí es donde usted puede ver la subyacente maquinaria para la difusión de la tabla más pequeña (y el tiempo de espera).

    override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
    ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
    }

    doExecuteBroadcast es parte de SparkPlan contrato que cada operador físico en la Chispa de la SQL de la siguiente manera que permite la radiodifusión, si es necesario. BroadcastExchangeExec sucede a necesitar.

    La tiempo de espera parámetro es lo que usted está buscando.

    private val timeout: Duration = {
    val timeoutValue = sqlContext.conf.broadcastTimeout
    if (timeoutValue < 0) {
    Duration.Inf
    } else {
    timeoutValue.seconds
    }
    }

    Como se puede ver se puede desactivar completamente (con un valor negativo) que podría implicar a esperar a la emisión de la variable a ser enviados a los ejecutores de forma indefinida o uso sqlContext.conf.broadcastTimeout que es exactamente chispa.sql.broadcastTimeout propiedad de configuración. El valor predeterminado es 5 * 60 segundos que se puede ver en el stacktrace:

    java.util.concurrente.TimeoutException: Futuros agotado después de [300 segundos]

  3. 0

    En mi caso, fue causada por una difusión a través de una gran dataframe:

    df.join(broadcast(largeDF))

    Así, con base en las respuestas anteriores, me fijo por la eliminación de la transmisión:

    df.join(largeDF)
  4. 0

    Además de aumentar spark.sql.broadcastTimeout o persist() ambos DataFrames,

    Puede probar:

    1.deshabilitar la difusión mediante el establecimiento de spark.sql.autoBroadcastJoinThreshold a -1

    2.aumentar la chispa de la memoria del controlador mediante el establecimiento de spark.driver.memory a un valor más alto.

Dejar respuesta

Please enter your comment!
Please enter your name here