Yo estoy tratando de probar cómo escribir datos en HDFS 2.7 uso de la Chispa 2.1. Mis datos es una simple secuencia de valores ficticios y la salida debe ser dividido por los atributos: id y clave.

 //Simple case class to cast the data
 case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)

 //Actual data to be stored
 val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1),
    SimpleTest("test", 12, 13.5.toFloat, 2),
    SimpleTest("test", 12, 13.5.toFloat, 3),
    SimpleTest("simple", 12, 13.5.toFloat, 1),
    SimpleTest("simple", 12, 13.5.toFloat, 2),
    SimpleTest("simple", 12, 13.5.toFloat, 3)
 )

 //Spark's workflow to distribute, partition and store
 //sc and sql are the SparkContext and SparkSession, respectively
 val testDataP = sc.parallelize(testData, 6)
 val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
 testDf.write.partitionBy("id", "key").parquet("/path/to/file")

Estoy esperando para obtener la siguiente estructura de árbol en HDFS:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet

Pero cuando voy a ejecutar el código anterior, se me sale el siguiente resultado:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet

No sé si hay algo mal en el código, o hay algo más que la Chispa está haciendo.

Estoy ejecutando spark-submit de la siguiente manera:

chispa-enviar-el nombre de la APLICACIÓN –maestro local –controlador de memoria 30G –ejecutor de la memoria 30G –ejecutor-núcleos 8 –num-ejecutores 8 –conf chispa.io.la compresión.codec=lzf –conf chispa.akka.frameSize=1024 –conf chispa.el controlador.maxResultSize=1 g –conf chispa.sql.orco.la compresión.codec=sin comprimir –conf chispa.sql.de parquet.filterPushdown=true –class miclase myFatJar.jar

InformationsquelleAutor Daniel Lopez | 2017-05-02

2 Comentarios

  1. 10

    Interesante, ya que…bueno…«a mí me funciona».

    Tal como la describe el conjunto de datos utilizando SimpleTest caso de la clase en la Chispa de la 2.1 estás import spark.implicits._ lejos de tener un tipo de Dataset.

    En mi caso, spark es sql.

    En otras palabras, usted no tiene que crear testDataP y testDf (utilizando sql.createDataFrame).

    import spark.implicits._
    ...
    val testDf = testData.toDS
    testDf.write.partitionBy("id", "key").parquet("/path/to/file")

    En otro terminal (después de salvar a /tmp/testDf directorio):

    $ tree /tmp/testDf/
    /tmp/testDf/
    ├── _SUCCESS
    ├── id=simple
    │   ├── key=1
    │   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    │   ├── key=2
    │   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    │   └── key=3
    │       └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    └── id=test
        ├── key=1
        │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
        ├── key=2
        │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
        └── key=3
            └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    
    8 directories, 7 files
  2. 2

    He encontrado una solución! De acuerdo a Cloudera, es un mapred-site.xml problema de configuración (vea el vínculo de abajo). También, en lugar de escribir el dataframe como: testDf.write.partitionBy("id", "key").parquet("/path/to/file")

    Yo lo hice de la siguiente manera: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). Puede sustituir <namenode> y <port> con el HDFS’ masternode nombre y el puerto, respectivamente.

    Agradecimiento especial a @jacek-laskowski, por su valiosa contribución.

    Referencias:

    https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

    Escrito a HDFS en la Chispa de la/Scala

Dejar respuesta

Please enter your comment!
Please enter your name here