Cuando estoy tratando de hacer lo mismo en mi código como se menciona a continuación

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

He tomado la referencia anterior a partir de aquí:
Scala: Cómo se puede reemplazar el valor en Dataframs usando scala
Pero estoy recibiendo encoder error como

Incapaz de encontrar un codificador de tipo almacenado en un conjunto de datos. Los tipos primitivos
(Int, S tring, etc) y tipos de Productos (clases) son soportados por
la importación de chispa.im plicits._ Apoyo para serializar otros tipos de
añadirán en futuras versiones.

Nota: estoy usando spark 2.0!

  • Usted necesita import spark.implicits._.
  • Gracias @Yuval . pero no funcionó.
InformationsquelleAutor Advika | 2016-09-11

2 Comentarios

  1. 67

    No hay nada inesperado aquí. Estás tratando de usar el código que ha escrito con Chispa 1.x y ya no se admite en la Chispa de la 2.0:

    • en 1.x DataFrame.map es ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
    • en 2.x Dataset[Row].map es ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

    Para ser honesto, no tenía mucho sentido en 1.x cualquiera. Independiente de la versión que usted puede simplemente utilizar DataFrame API:

    import org.apache.spark.sql.functions.{when, lower}
    
    val df = Seq(
      (2012, "Tesla", "S"), (1997, "Ford", "E350"),
      (2015, "Chevy", "Volt")
    ).toDF("year", "make", "model")
    
    df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

    Si usted realmente desea utilizar map debe utilizar de tipo estático Dataset:

    import spark.implicits._
    
    case class Record(year: Int, make: String, model: String)
    
    df.as[Record].map {
      case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
      case rec => rec
    }

    o, al menos, devolver un objeto que se tienen implícito encoder:

    df.map {
      case Row(year: Int, make: String, model: String) => 
        (year, if(make.toLowerCase == "tesla") "S" else make, model)
    }

    Por último, si para algunos completamente loco razón desea mapa sobre Dataset[Row] usted tiene que proporcionar requiere encoder:

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    //Yup, it would be possible to reuse df.schema here
    val schema = StructType(Seq(
      StructField("year", IntegerType),
      StructField("make", StringType),
      StructField("model", StringType)
    ))
    
    val encoder = RowEncoder(schema)
    
    df.map {
      case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
        Row(year, "S", model)
      case row => row
    } (encoder)
    • Esta es probablemente la peor que uno que usted podría elegir (en realidad, no hay ninguna buena razón para tratar con Row / Any aquí), pero me alegro de que fue útil.
    • Gracias @zero323. Ya después de hacer el auto examen me di cuenta de que el enfoque que he tomado es mejor que todo el propósito de la chispa. Tengo que venir para arriba con mejor lógica. 🙂
    • ¿Por qué es «completamente loco» a la mapa sobre Dataset[Row]? De hecho, tengo un caso de uso donde quiero flatMap más de Dataset[Row].
    • Porque a) no obtener mejoras en el rendimiento ofrecido por DataFrame y binario Encoders b) usted no consigue la seguridad de tipo c) de forma explícita que coincida con cada tipo que hace que sea detallado y propenso a error. d) usted tiene que especificar el esquema de Encoder. Una vez más detallado de una propenso a error. Para flatMap como en Dataframe explode normalmente es más que suficiente.
    • hablando de código en el map sí, creo que explícitamente que coincida con cada tipo en caso de implícito codificadores mientras que en el último ejemplo, sólo se especifica el tipo de atributo que se utiliza en la lógica.
  2. 5

    Para el escenario donde dataframe esquema se conoce de antemano la respuesta dada por @zero323 es la solución

    pero para el escenario con esquema dinámico /o pasar varias dataframe a una función genérica:
    Código siguiente se ha trabajado para nosotros, mientras que la migración desde 1.6.1 de 2.2.0

    import org.apache.spark.sql.Row
    
    val df = Seq(
       (2012, "Tesla", "S"), (1997, "Ford", "E350"),
       (2015, "Chevy", "Volt")
     ).toDF("year", "make", "model")
    
    val data = df.rdd.map(row => {
      val row1 = row.getAs[String](1)
      val make = if (row1.toLowerCase == "tesla") "S" else row1
      Row(row(0),make,row(2))
    })

    este código se ejecuta en las versiones de chispa.

    desventaja : la optimización de siempre
    por spark en dataframe/conjuntos de datos de la api de no ser aplicado.

Dejar respuesta

Please enter your comment!
Please enter your name here