Tengo un spark dataframe ‘mydataframe’ con muchas columnas. Estoy tratando de ejecutar kmeans sólo en dos columnas: lat y long (latitud & longitud) utilizando como valores simples). Quiero extraer 7 grupos basados sólo en esos 2 columnas y, a continuación, quiero adjuntar el clúster de asignación a mi original dataframe. He intentado:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

Pero estoy recibiendo un error después de un tiempo:

org.apache.chispa.SparkException: Trabajo abortado debido a la falla de fase: Tarea 1 en la etapa 5191.0 fallado 4 veces, la más reciente fracaso: la pérdida de la tarea 1.3 en la etapa 5191.0 (TID 260738, 10.19.211.69, albacea 1): org.apache.chispa.la api.python.PythonException: Traceback (la mayoría de llamada reciente pasado)

He probado a desconectar y volver a conectar el clúster. Mismo resultado. ¿Qué estoy haciendo mal?

Muchas gracias!

  • Se puede añadir el resultado de data_rdd.tomar(5)?
  • Por favor, muestran un ejemplo de tus datos y aplicaciones; la Chispa de la versión. También, ¿por qué el uso de la antigua MLlib (que requiere Ddr) en lugar de la nueva & recomienda ML, que puede trabajar directamente con dataframes?
  • En los datos geográficos, el uso de Haversine distancia, y no uso kmeans.
  • gracias! ¿Qué recomendaría usted como el más apropiado para la latitud y la longitud de la agrupación?
  • Haversine distancia, y la ÓPTICA de la agrupación.
  • Gracias! Se han implementado en la Chispa?
  • Estoy viendo sólo LDA y GMM aquí: spark.apache.org/docs/latest/ml-clustering.html

InformationsquelleAutor user3245256 | 2017-12-01

2 Comentarios

  1. 38

    Ya que en base a otro reciente de la cuestión de la tuya, supongo que usted está en sus primeros pasos con la Chispa de la agrupación (que son incluso importar sqrt & array, sin tener que hacer uso de ellos, probablemente porque es como que en el docs ejemplo), permítanme ofrecer asesoramiento en un nivel más general, en lugar de en la pregunta específica que usted está pidiendo aquí (esperemos que también ahorra posteriormente la apertura de 3-4 más preguntas, tratando de conseguir su clúster de asignaciones de nuevo en su dataframe)…

    Desde

    1. los datos se encuentran ya en un dataframe

    2. desea fijar la pertenencia al clúster de nuevo en su inicial
      dataframe

    usted no tiene ninguna razón para volver a un dispositivo de este tipo y el uso de la (pronto será obsoleta) MLlib paquete; que va a hacer su trabajo mucho más fácil, elegante, y el uso eficiente del (ahora recomendado) ML paquete, que trabaja directamente con dataframes.

    Paso 0 – hacer un juguete de datos se asemeja a la suya:

    spark.version
    # u'2.2.0'
    
    df = spark.createDataFrame([[0, 33.3, -17.5],
                                  [1, 40.4, -20.5],
                                  [2, 28., -23.9],
                                  [3, 29.5, -19.0],
                                  [4, 32.8, -18.84]
                                 ],
                                  ["other","lat", "long"])
    
    df.show()
    # +-----+----+------+
    # |other| lat|  long|
    # +-----+----+------+
    # |    0|33.3| -17.5|
    # |    1|40.4| -20.5| 
    # |    2|28.0| -23.9|
    # |    3|29.5| -19.0|
    # |    4|32.8|-18.84|
    # +-----+----+------+

    Paso 1 a armar sus características

    En contraste con la mayoría ML de paquetes, la Chispa ML requiere su entrada características para ser reunidos en un sola columna de su dataframe, normalmente denominada features; y proporciona un método específico para hacer esto, VectorAssembler:

    from pyspark.ml.feature import VectorAssembler
    
    vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
    new_df = vecAssembler.transform(df)
    new_df.show()
    # +-----+----+------+-------------+ 
    # |other| lat|  long|     features|
    # +-----+----+------+-------------+
    # |    0|33.3| -17.5| [33.3,-17.5]|
    # |    1|40.4| -20.5| [40.4,-20.5]|
    # |    2|28.0| -23.9| [28.0,-23.9]| 
    # |    3|29.5| -19.0| [29.5,-19.0]|
    # |    4|32.8|-18.84|[32.8,-18.84]|
    # +-----+----+------+-------------+ 

    Como tal vez ya adivinó, el argumento inputCols sirve para contar VectoeAssembler que las columnas particulares en nuestro dataframe son para ser utilizado como características.

    Paso 2 de ajuste KMeans modelo

    from pyspark.ml.clustering import KMeans
    
    kmeans = KMeans(k=2, seed=1)  # 2 clusters here
    model = kmeans.fit(new_df.select('features'))

    select('features') aquí sirve para contar el algoritmo que la columna de la dataframe a utilizar para la agrupación de recordar que, después del Paso 1 anterior, su original lat & long características no son más utilizados directamente.

    Paso 3 – transformar su inicial dataframe incluir clúster de asignaciones

    transformed = model.transform(new_df)
    transformed.show()    
    # +-----+----+------+-------------+----------+ 
    # |other| lat|  long|     features|prediction|
    # +-----+----+------+-------------+----------+
    # |    0|33.3| -17.5| [33.3,-17.5]|         0| 
    # |    1|40.4| -20.5| [40.4,-20.5]|         1|
    # |    2|28.0| -23.9| [28.0,-23.9]|         0|
    # |    3|29.5| -19.0| [29.5,-19.0]|         0|
    # |    4|32.8|-18.84|[32.8,-18.84]|         0|
    # +-----+----+------+-------------+----------+

    La última columna de la transformed dataframe, prediction, muestra el clúster de asignación – en mi caso de los juguetes, he acabado con 4 registros en clúster #0 y 1 récord en la categoría #1.

    Puede manipular la transformed dataframe con select declaraciones, o incluso drop la features columna (que ahora ha cumplido su función y puede ser que ya no es necesario)…

    Esperemos que usted está mucho más cerca ahora a lo que realmente quería lograr en el primer lugar. Para la extracción de clúster de estadísticas, etc., otro reciente respuesta de mina podría ser útil…

    • Bonita respuesta (+1), pero me falta la causa del error explicación
    • Yo era (soy…) sólo acerca de para agregarlo (por separado como una respuesta, sin embargo)… 😮
    • No tengo tiempo de responder, pero tengo tiempo para leer a veces 🙂
    • es bueno saber que tengo a mi espalda 😉
    • Queridos desertnaut, muchísimas gracias por tomarte tu tiempo y escribir la mejor respuesta de stackoverflow que he leído jamás. Voy a estar seguro para mantener una excelente fuente de ir hacia adelante. Sí, lo has adivinado correctamente – me hubiera pedido más preguntas! 🙂 Yo no tenía idea de que yo estoy usando algunos de los antiguos, depreciado de la biblioteca y estoy muy contento de que usted me mostró el «camino correcto». Lo he comprendido todo en tu excelente explicación. Una pequeña pregunta (más Chispa relacionados de kMeans relacionados): Está bien – de almacenamiento y memoria perspectiva – para producir más y más nuevos dataframes (df, luego df_new) – incluso si el df es enorme?
    • estándar de práctica que tiene para ceder sus datos transformados en nuevos dataframes como usted va. En cualquier caso, el experimento y ver…
    • muy buena descripción y recommeded.

  2. 4

    A pesar de mi otra respuesta general, y en el caso de que, por cualquier razón, debe pegarse con MLlib & RDDs, aquí es lo que hace que su error con el mismo juguete df.

    Cuando select columnas de un dataframe para convertir a la RDD, como usted lo hace, el resultado es un DDR de Filas:

    df.select('lat', 'long').rdd.collect()
    # [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]

    que no es adecuado como una entrada a MLlib KMeans. Usted necesitará un map operación para que esto funcione:

    df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
    # [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]

    Así, el código debe ser similar a esto:

    from pyspark.mllib.clustering import KMeans, KMeansModel
    
    rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
    clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
    clusters.centers
    # [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]
    • lo prometido es deuda… 🙂
    • gran adición. Una cosa, collect() devolver la lista y usted puede enviar dataframe a kmeans modelo de formación también.
    • Utilizamos collect sólo para los resultados finales; si podemos utilizar aquí, no habría razón para molestarse con Chispa alguna, estaríamos mucho mejor con scikit-learn o similar…

Dejar respuesta

Please enter your comment!
Please enter your name here