He dataframe con estas columnas id, price, timestamp.

Me gustaría encontrar el valor de la mediana agrupada por id.

Estoy usando este código para encontrarlo pero me da este error.

from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)

No es posible utilizar DataFrameStatFunctions para llenar los valores en la nueva columna?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
InformationsquelleAutor BK C. | 2017-07-24

2 Comentarios

  1. 34

    Bien, de hecho es no posible utilizar approxQuantile para rellenar valores en una nueva dataframe columna, pero esto no es por eso que usted está recibiendo este error. Por desgracia, todo debajo de la historia es bastante frustrante, como He argumentado que es el caso con muchos de Chispa (especialmente PySpark) características y su falta de documentación adecuada.

    Para empezar, no es uno, sino dos approxQuantile métodos; el primero es parte de la norma DataFrame clase, es decir, no es necesario importar DataFrameStatFunctions:

    spark.version
    # u'2.1.1'
    
    sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]
    
    df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
    df.show()
    # +------+---------+------+ 
    # |  Name|     Role|Salary|
    # +------+---------+------+
    # |   bob|Developer|125000| 
    # |  mark|Developer|108000|
    # |  carl|   Tester| 70000|
    # | peter|Developer|185000|
    # |   jon|   Tester| 65000|
    # | roman|   Tester| 82000|
    # | simon|Developer| 98000|
    # |  eric|Developer|144000|
    # |carlos|   Tester| 75000|
    # | henry|Developer|110000|
    # +------+---------+------+
    
    med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
    med
    # [98000.0]

    El segundo es parte de DataFrameStatFunctions, pero si lo usas como usted lo hace, usted sale el error que informe:

    from pyspark.sql import DataFrameStatFunctions as statFunc
    med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
    # TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)

    debido a que el uso correcto es

    med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
    med2
    # [82000.0]

    aunque usted no será capaz de encontrar un ejemplo sencillo en el PySpark documentación acerca de esto (me tomó algo de tiempo para averiguarlo yo mismo)… La mejor parte? Los dos valores son no es igual:

    med == med2
    # False

    Sospecho que esto es debido a la no-determinista algoritmo utilizado (después de todo, se supone que debe ser un aproximado mediana), y aún si vuelve a ejecutar los comandos con el mismo juguete de datos se pueden obtener diferentes valores y diferentes de los que me informe aquí) – sugiero que experimentar un poco para conseguir la sensación de…

    Pero, como ya he dicho, esta no es la razón por la que usted no puede utilizar approxQuantile para rellenar valores en una nueva dataframe columna – incluso si utiliza la sintaxis correcta, obtendrá un error diferente:

    df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
    # AssertionError: col should be Column

    Aquí, col se refiere al segundo argumento de la withColumn operación, es decir, el approxQuantile uno, y el mensaje de error dice que no es un Column tipo – de hecho, se trata de una lista:

    type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
    # list

    Así, cuando el llenado de los valores de la columna, la Chispa de espera argumentos de tipo Column, y usted puede utilizar las listas; aquí está un ejemplo de cómo crear una nueva columna con los valores de la media por Función en lugar de la mediana: la

    import pyspark.sql.functions as func
    from pyspark.sql import Window
    
    windowSpec = Window.partitionBy(df['Role'])
    df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
    df2.show()
    # +------+---------+------+------------------+
    # |  Name|     Role|Salary|       mean_salary| 
    # +------+---------+------+------------------+
    # |  carl|   Tester| 70000|           73000.0| 
    # |   jon|   Tester| 65000|           73000.0|
    # | roman|   Tester| 82000|           73000.0|
    # |carlos|   Tester| 75000|           73000.0|
    # |   bob|Developer|125000|128333.33333333333|
    # |  mark|Developer|108000|128333.33333333333| 
    # | peter|Developer|185000|128333.33333333333| 
    # | simon|Developer| 98000|128333.33333333333| 
    # |  eric|Developer|144000|128333.33333333333|
    # | henry|Developer|110000|128333.33333333333| 
    # +------+---------+------+------------------+

    que funciona porque, contrariamente a approxQuantile, mean devuelve un Column:

    type(func.mean(df['Salary']).over(windowSpec))
    # pyspark.sql.column.Column
  2. 1

    El cálculo de cuantiles en grupos (agregados) ejemplo

    Como agregados de la función que falta para grupos, voy a agregar un ejemplo de la construcción de la función llamada por el nombre (percentile_approx para este caso) :

    from pyspark.sql.column import Column, _to_java_column, _to_seq
    
    def from_name(sc, func_name, *params):
        """
           create call by function name 
        """
        callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
        func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
        return Column(func)

    Aplicar percentile_approx función en groupBy:

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as f
    
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
    
    # build percentile_approx function call by name: 
    target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])
    
    
    # load dataframe for persons data 
    # with columns "person_id", "group_id" and "salary"
    persons = spark.read.parquet( ... )
    
    # apply function for each group
    persons.groupBy("group_id").agg(
        target.alias("target")).show()

Dejar respuesta

Please enter your comment!
Please enter your name here