Quiero unirme a los datos dos veces de la siguiente manera:

rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])

res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()

Luego me sale algo de error :

pyspark.sql.utils.AnalysisException: u’Cartesian une podría ser
prohibitivamente caros y están deshabilitados de forma predeterminada. Para habilitar de forma explícita, por favor conjunto de chispa.sql.crossJoin.enabled = true;’

Pero creo que este no es un cross join

ACTUALIZACIÓN:

res2.explain()

== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
:  :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
:  :  +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
:  :     +- *Filter isnotnull(idx#0L)
:  :        +- Scan ExistingRDD[idx#0L,val#1]
:  +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
:     +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
:        +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
:           +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
InformationsquelleAutor Zhang Tong | 2017-02-27

3 Comentarios

  1. 10

    Esto sucede porque usted join estructuras que comparten el mismo linaje y esto conduce a una trivialmente la igualdad de condición:

    res2.explain()

    == Physical Plan ==
    org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
    Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L))
    :- Filter isnotnull(idx#204L)
    :  +- LogicalRDD [idx#204L, val#205]
    +- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L))
       +- LogicalRDD [key1#209L, key2#210L, val#211L]
    and
    LogicalRDD [idx#235L, val#236]
    Join condition is missing or trivial.
    Use the CROSS JOIN syntax to allow cartesian products between these relations.;

    En caso de que como esto, usted debe utilizar alias:

    from pyspark.sql.functions import col
    
    rdd1 = spark.createDataFrame(...).alias('rdd1')
    rdd2 = spark.createDataFrame(...).alias('rdd2')
    
    res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).alias('res1')
    res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx')).explain()
    == Physical Plan ==
    *SortMergeJoin [key2#297L], [idx#360L], Inner
    :- *Sort [key2#297L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(key2#297L, 200)
    :     +- *SortMergeJoin [idx#290L], [key1#296L], Inner
    :        :- *Sort [idx#290L ASC NULLS FIRST], false, 0
    :        :  +- Exchange hashpartitioning(idx#290L, 200)
    :        :     +- *Filter isnotnull(idx#290L)
    :        :        +- Scan ExistingRDD[idx#290L,val#291]
    :        +- *Sort [key1#296L ASC NULLS FIRST], false, 0
    :           +- Exchange hashpartitioning(key1#296L, 200)
    :              +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L))
    :                 +- Scan ExistingRDD[key1#296L,key2#297L,val#298L]
    +- *Sort [idx#360L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(idx#360L, 200)
          +- *Filter isnotnull(idx#360L)
             +- Scan ExistingRDD[idx#360L,val#361]

    Para más detalles, ver CHISPA-6459.

    • cuando te refieres al «mismo linaje» creo que es un problema con chispa dataframe la evaluación diferida y planificador de consultas . OPs consulta no es un producto cartesiano de sql punto de vista, ¿verdad?
    • Se puede decir que. En resumen, si usted tiene df1 y df2 ambos derivados de df, y todos los tres de compartir col luego df1.col op df2.col podría resolverse como trivialmente verdadero o falso, aunque técnicamente (en función de las reglas de resolución) no.
    • tener sentido. He encontrado que este problema puede ser evitado mediante el uso de sql y ejecutar a través de sparkSession.sql("your sql") en lugar de dataframe basado en dsl.
  2. 3

    Yo también fue un éxito cuando se almacena el dataframe antes de la segunda combinación.

    Algo como:

    res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).persist()
    
    res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx'))
    • Hoy me gustaría utilizar .caché() en lugar de .persist() pero es sólo el estilo de codificación …
    • esto funcionó para mí también, pero no tengo idea de por qué. ¿Cuál es la diferencia?
    • Persisten no funciona en mi caso. Por favor ayuda
  3. 1

    Persistente que no me funciona.

    Lo superé con alias en DataFrames

    from pyspark.sql.functions import col
    
    df1.alias("buildings").join(df2.alias("managers"), col("managers.distinguishedName") == col("buildings.manager"))

Dejar respuesta

Please enter your comment!
Please enter your name here