Me gustaría crear un JSON a partir de una Chispa v. 1.6 (usando scala) dataframe. Sé que no es la solución simple de hacer df.toJSON.

Sin embargo, mi problema es un poco diferente. Considere, por ejemplo, un dataframe con las siguientes columnas:

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

Me gustaría tener al final un dataframe con

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

donde C es un JSON que contiene C1, C2, C3. Por desgracia, creo que en tiempo de compilación no sé lo que el dataframe parece (a excepción de las columnas A y B que siempre están «fijos»).

Como por la razón de por qué tengo esto: yo estoy usando Protobuf para el envío de alrededor de los resultados. Por desgracia, mi dataframe, a veces, tiene más columnas que se espera y todavía iba a enviar a aquellos que a través de Protobuf, pero no quiero especificar todas las columnas en la definición.

¿Cómo puedo lograr esto?

  • todavía un dataframe
  • No, lo siento, y no me refiero a cómo agregar C1, C2, C3 como una cadena JSON columna a la existente dataframe. He actualizado el post para aclarar la versión de la Chispa y la scala como lenguaje.
  • Lo siento! Seguro, acabo de actualizar a la pregunta (junto con una razón de por qué le gustaría lograr este) y se añadió un ejemplo.
InformationsquelleAutor navige | 2016-03-22

4 Comentarios

  1. 18

    Chispa 2.1 debe tener soporte nativo para este caso de uso (ver #15354).

    import org.apache.spark.sql.functions.to_json
    df.select(to_json(struct($"c1", $"c2", $"c3")))
  2. 5

    Primera permite convertir C a un struct:

    val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))

    Esta es la estructura puede ser convertido a JSONL utilizando toJSON como antes:

    dfStruct.toJSON.collect
    //Array[String] = Array(
    //  {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
    //  {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})

    Yo no soy consciente de que cualquier método integrado que puede convertir una sola columna, pero puede convertir de forma individual y join o utilizar su favorito JSON parser en una UDF.

    case class C(C1: String, C2: Int, C3: Boolean)
    
    object CJsonizer {
      import org.json4s._
      import org.json4s.JsonDSL._
      import org.json4s.jackson.Serialization
      import org.json4s.jackson.Serialization.write
    
      implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
    
      def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
    }
    
    
    val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
      CJsonizer.toJSON(c1, c2, c3))
    
    df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
    • En realidad, mi pregunta es, realmente, se trata de la segunda parte de cómo convertir las columnas individuales a JSON. Usted está mencionando join-ción de las columnas, pero que en realidad no funciona como me tienen por un lado un RDD[String] y en la otra mano un DataFrame
    • Como él dice, sólo tiene que utilizar un UDF. Usted incluso no tiene que utilizar un completo soplado JSON parser en el UDF – sólo puede elaborar una cadena JSON sobre la marcha usando map y mkString. Usted probablemente tendrá que usar DataFrame.columns o posiblemente DataFrame.dtypes tanto a las embarcaciones de las select declaración y como la base de la map en el UDF.
    • Estoy de acuerdo con @DavidGriffin – udf puede ser la solución más simple aquí. Y Jackson y json4s ya son arrastrados con otras dependencias.
    • Mi problema con el JSON de los analizadores que he visto es que usted necesita saber de antemano lo que el esquema se ve como — como con su solución de @zero323-sólo funciona para aquellos columnas específicas. ¿Y si los nombres eran diferentes? Lo que si hay más de 3 columnas?
    • El único problema que veo es que Row es muy fea estructura de datos. De lo contrario, usted puede simplemente construir un complejo arbitrario AST con Ascensor / json4s y convertir a JSON. Pero la verdad es que es mucho esfuerzo para ponerlo en un MODO de respuesta.
    • Row es feo por la misma razón odio a tratar con JSON en la Scala-es un choque de culturas Afloja, goosey vs fuerte, estáticos. SQL es afloja goosey-usted es un select lejos de la definición de un nuevo tipo, por lo tanto, Row es desordenado. Avro del GenericRecord tiene el mismo problema.

  3. 5

    Aquí, no JSON parser, y se adapta a su esquema:

    import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
    
    df.select(
      col(df.columns(0)),
      col(df.columns(1)),
      concat(
        lit("{"), 
        concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
          val c = dt._1;
          val t = dt._2;
          concat(
            lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
            col(c),
            lit(if(t=="StringType") "\""; else "") 
          )
        }):_*), 
        lit("}")
      ) as "C"
    ).collect()
    • parece un poco chapucero pero funciona 🙂
    • Sí y sí. JSON es pero hacky en general, si usted me pregunta.
  4. 1

    Yo use este comando para resolver el to_json problema:

    output_df = (df.select(to_json(struct(col("*"))).alias("content")))

Dejar respuesta

Please enter your comment!
Please enter your name here