Me gustaría leer un archivo CSV en la chispa y convertirla en DataFrame y almacenarlo en HDFS con df.registerTempTable("table_name")

He intentado:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error que me dieron:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

¿Qué es el derecho de comandos para cargar el archivo CSV como DataFrame en Apache Spark?

InformationsquelleAutor Donbeo | 2015-04-17

11 Comentarios

  1. 132

    chispa-csv es parte del núcleo de la Chispa de la funcionalidad y no requiere de una biblioteca independiente.
    Por lo que sólo podría hacer por ejemplo

    df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

    Scala,(esto funciona para cualquier formato-en el delimitador de mención «,» csv, «\t» para tsv etc)
    val df = sqlContext.read.format("com.databricks.spark.csv")
    .option("delimiter", ",")
    .load("csvfile.csv")

    • Uso df = spark.read.format("csv").option("header", "true").load("PATH/*.csv") para cargar todos los archivos CSV de una carpeta
  2. 145

    Analizar CSV y carga como DataFrame/conjunto de datos con Chispa 2.x

    Primero inicializar SparkSession objeto por defecto estará disponible en las conchas como spark

    val spark = org.apache.spark.sql.SparkSession.builder
    .master("local")
    .appName("Spark CSV Reader")
    .getOrCreate;

    Utilizar cualquiera de las siguientes forma de cargar CSV como DataFrame/DataSet

    1. Hacerlo en forma programática

     val df = spark.read
    .format("csv")
    .option("header", "true") //first line in file has headers
    .option("mode", "DROPMALFORMED")
    .load("hdfs:///csv/file/dir/file.csv")

    2. Usted puede hacer esto de SQL manera

     val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

    Dependencias:

     "org.apache.spark" % "spark-core_2.11" % 2.0.0,
    "org.apache.spark" % "spark-sql_2.11" % 2.0.0,


    Spark versión < 2.0

    val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

    Dependencias:

    "org.apache.spark" % "spark-sql_2.10" % 1.6.0,
    "com.databricks" % "spark-csv_2.10" % 1.6.0,
    "com.univocity" % "univocity-parsers" % LATEST,
    • hacer esta sesión requiere de la colmena? Me estoy haciendo la colmena errores.
    • No hay necesidad de. Sólo spark-core_2.11 y spark-sql_2.11 de 2.0.1 versión está bien. Si es posible agregar el mensaje de error.
    • podemos convertir un tubo de archivo delimitado a un dataframe?
    • Sí, por supuesto! pruebe algo como esto spark.read.format("csv").option("delimiter ", "|") ...
    • La otra opción para programmatic way es dejar fuera de la .format("csv") y reemplazar .load(... con .csv(.... El option método pertenece a la DataFrameReader clase como el que devuelve el read método, donde la load y csv los métodos devuelven un dataframe, así que no puede tener opciones de etiquetado después de que ellos se llaman. Esta respuesta es bastante completa, sino que debe vincularse a la documentación para que la gente pueda ver todos los otros CSV opciones disponibles spark.apache.org/docs/latest/api/scala/…*):org.apache.chispa.sql.DataFrame
    • La documentación que he enlazado en el comentario de arriba menciona sep (default ,): sets a single character as a separator for each field and value. y no dice nada acerca de delimiter así que me gustaría utilizar sep incluso si delimiter está trabajando.

  3. 13

    Es para cuya Hadoop es la 2.6 y la Chispa es de 1.6 y sin «databricks» paquete.

    import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
    import org.apache.spark.sql.Row;
    val csv = sc.textFile("/path/to/file.csv")
    val rows = csv.map(line => line.split(",").map(_.trim))
    val header = rows.first
    val data = rows.filter(_(0) != header(0))
    val rdd = data.map(row => Row(row(0),row(1).toInt))
    val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))
    val df = sqlContext.createDataFrame(rdd, schema)
  4. 11

    Con Spark 2.0, lo siguiente es cómo usted puede leer CSV

    val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder
    .config(conf = conf)
    .appName("spark session example")
    .getOrCreate()
    val path = "/Users/xxx/Downloads/usermsg.csv"
    val base_df = sparkSession.read.option("header","true").
    csv(path)
    • Hay una diferencia entre spark.read.csv(path) y spark.read.format("csv").load(path)?
  5. 8

    En Java 1.8 Este fragmento de código que funcionan a la perfección para leer archivos CSV

    POM.xml

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
    </dependency>
    <dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
    </dependency>

    Java

    SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    //create Spark Context
    SparkContext context = new SparkContext(conf);
    //create spark Session
    SparkSession sparkSession = new SparkSession(context);
    Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    System.out.println("========== Print Schema ============");
    df.printSchema();
    System.out.println("========== Print Data ==============");
    df.show();
    System.out.println("========== Print title ==============");
    df.select("title").show();
    • Mientras que esto puede ser útil para alguien. La pregunta tiene una Scala etiqueta.
  6. 4

    Centavo de Chispa 2 ejemplo es la manera de hacerlo en spark2. Hay un truco más: tienes que encabezado generado por hacer una exploración inicial de los datos, mediante el establecimiento de la opción inferSchema a true

    Aquí, entonces, assumming que spark es una chispa de la sesión se ha establecido, es la operación de carga en el CSV archivo de índice de todas las imágenes Landsat que amazon host en el S3.

      /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
    .csv("s3a://landsat-pds/scene_list.gz")

    La mala noticia es que esto desencadena un análisis a través del archivo; para algo grande como este 20+MB comprimido en formato CSV, que puede tomar años 30 a través de una larga distancia de conexión. Tener en cuenta: usted es mejor apagar manualmente la codificación del esquema una vez que has conseguido que viene en.

    (fragmento de código de Software Apache License 2.0 con licencia para evitar toda ambigüedad; algo que yo he hecho como una demo o de prueba de integración de S3 integración)

    • Yo no había visto esta csv método o pasar de un mapa de opciones. Acordaron siempre mejor proveer esquema explícito, inferSchema está bien para una rápida n sucio (también conocido como la ciencia de datos), pero terrible para ETL.
  7. 3

    Hay un montón de desafíos para el análisis de un archivo CSV, se siguen sumando si el tamaño del archivo es más grande, si hay no-inglés/escape/separador/otros caracteres en los valores de la columna, que podría causar errores de análisis.

    La magia es, pues, en las opciones que se utilizan. El que funcionó para mí y la esperanza deben cubrir la mayor parte de los casos de borde están en el código a continuación:

    ### Create a Spark Session
    spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
    ### Note the options that are used. You may have to tweak these in case of error
    html_df = spark.read.csv(html_csv_file_path, 
    header=True, 
    multiLine=True, 
    ignoreLeadingWhiteSpace=True, 
    ignoreTrailingWhiteSpace=True, 
    encoding="UTF-8",
    sep=',',
    quote='"', 
    escape='"',
    maxColumns=2,
    inferSchema=True)

    Espero que ayude. Para más referencia: El uso de PySpark 2 para leer CSV tener el código fuente HTML

    Nota: El código de arriba es de Chispa 2 de la API, donde el archivo CSV de la lectura de la API viene con construido-en paquetes de Chispa instalable.

    Nota: PySpark es un wrapper) de Python para la Chispa y comparte la misma API Scala/Java.

    • Esto se debe considerar como la verdadera respuesta actualizada.
  8. 1

    En caso de que usted está construyendo un frasco con la scala 2.11 y Apache 2.0 o superior.

    No hay necesidad de crear un sqlContext o sparkContext objeto. Sólo un SparkSession objeto basta el requisito para todas las necesidades.

    Siguiente es mycode que funciona bien:

    import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
    import org.apache.log4j.{Level, LogManager, Logger}
    object driver {
    def main(args: Array[String]) {
    val log = LogManager.getRootLogger
    log.info("**********JAR EXECUTION STARTED**********")
    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter","|")
    .option("inferSchema","true")
    .load("d:/small_projects/spark/test.pos")
    df.show()
    }
    }

    En caso de que usted está ejecutando en el grupo acaba de cambiar .master("local") a .master("yarn") mientras que la definición de la sparkBuilder objeto

    La Chispa Doc cubre este:
    https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

  9. 0

    Formato de archivo predeterminado es de Parquet y con chispa.lea.. y lectura de archivos csv que ¿por qué usted está recibiendo la excepción. Especificar el formato csv con la api de que usted está tratando de utilizar

  10. 0

    Carga un archivo CSV y devuelve el resultado como un DataFrame.

    df=sparksession.read.option("header", true).csv("file_name.csv")

    Dataframe trata de un archivo como archivo de formato csv.

  11. 0

    Intente esto si utilizar spark 2.0+

    For non-hdfs file:
    df = spark.read.csv("file:///csvfile.csv")
    For hdfs file:
    df = spark.read.csv("hdfs:///csvfile.csv")
    For hdfs file (with different delimiter than comma:
    df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

    Nota:- este trabajo para cualquier archivo delimitado. Sólo utilice la opción(«delimitador»,) para cambiar el valor.

    Espero que esto sea útil.

Dejar respuesta

Please enter your comment!
Please enter your name here