Estoy tratando de comparar las diferentes maneras de agregar mis datos.

Esta es mi entrada de datos con 2 elementos de la página,el visitante):

(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)

De trabajo con un comando SQL en la Chispa de SQL con este código:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
                              """select page
                                       ,count(distinct visitor) as visitor
                                   from logs
                               group by page
                              """)
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)

Puedo obtener este resultado:

(PAG1,3) //PAG1 has been visited by 3 different visitors
(PAG2,2) //PAG2 has been visited by 2 different visitors

Ahora, me gustaría obtener el mismo resultado usando Dataframes y thiers API, pero no puedo obtener el mismo resultado:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)

De hecho, eso es lo que se obtiene como resultado:

[PAG1,8]  //just the simple page count for every page
[PAG2,4]

Es probable que haya algo tonto, pero no puedo ver ahora mismo.

Gracias de antemano!

FF

2 Comentarios

  1. 49

    Lo que usted necesita es el DataFrame función de agregación countDistinct:

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    case class Log(page: String, visitor: String)
    
    val logs = data.map(p => Log(p._1,p._2))
                .toDF()
    
    val result = logs.select("page","visitor")
                .groupBy('page)
                .agg('page, countDistinct('visitor))
    
    result.foreach(println)
    • Me sale este error –> no encontrado: valor countDistinct
    • es un método en org.apache.spark.sql.functions, importar de que :), editar hecho.
    • con intelliJ i’have a escribir la agg/countDistinct comando como este .agg(org.apache.chispa.sql.funciones.countDistinct(«visitante»)) porque incluso si he importado org.apache.chispa.sql.funciones todavía me da el mismo error… de todos modos esto funciona, pero que sólo recibe el visitante de la columna y no la página de la columna ([2],[3])… lo que me estoy perdiendo?
    • ¿ha agregado el subrayado después de las funciones? org.apache.spark.sql.functions._
    • ops, no me escriba bien… ahora la importación de obras, pero todavía tengo sólo una columna como resultado: el visitante ([2],[3])… y así no puede coincidir con la página de visitante
    • te olvides de añadir «de la página» en agg(), consulte edición de
    • ya allí, actualizar por favor
    • Permítanos continuar esta discusión en el chat.
    • probar: from pyspark.sql.functions import *

  2. 2

    Puede utilizar dataframe del groupBy comando dos veces para hacerlo. Aquí, df1 es su entrada original.

    val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))
    

    Este comando produce el siguiente resultado:

    page  visitor  count
    ----  ------   ----
    PAG2    V2       2
    PAG1    V3       1
    PAG1    V1       5
    PAG1    V2       2
    PAG2    V1       2
    

    A continuación, utilizar la groupBy nuevo comando para obtener el resultado final.

     df2.groupBy($"page").agg(count($"visitor").as("count"))
    

    Resultado Final:

    page   count
    ----   ----
    PAG1    3
    PAG2    2
    

Dejar respuesta

Please enter your comment!
Please enter your name here