Cómo guardar la salida de la chispa de la consulta sql en un archivo de texto

Estoy escribiendo un simple consumidor de programa con spark streaming. Mi código de salvar a algunos de los datos en el archivo, pero no TODOS los datos. Alguien me puede ayudar como solucionar esto. No estoy seguro de donde me estoy perdiendo los datos. Puedo obtener los datos de kafka tema, a continuación, aplico mi esquema de la clase java Bean.

public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

    HashMap<String, String> kafkaParams = new HashMap<String, String>();

    kafkaParams.put("metadata.broker.list", broker);
    JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
    ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);

JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>() 

                                         {
                   public String call(Tuple2<String, String> message)

                                             {
                                                 return message._2();}});
words.foreachRDD(
          new Function2<JavaRDD<String>, Time, Void>() {
       public Void call(JavaRDD<String> rdd, Time time) {
   SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
              //Convert RDD[String] to RDD[case class] to DataFrame
              JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
                public JavaRow call(String line) throws Exception{
                String[] fields = line.split(",");
                  JavaRow record = new JavaRow(fields[0], fields[1],fields[2]  );

                  return record;

                }

              });

              DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
              wordsDataFrame.registerTempTable("Data");
              DataFrame wDataFrame = sqlContext.sql(" select * from Data");  
              if(!wDataFrame.rdd().isEmpty()){
             wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
              return null;
            }} );
ssc.start();
 ssc.awaitTermination();}

}

InformationsquelleAutor Hoda Moradi | 2016-03-17

3 Kommentare

  1. 3

    Que podría ser debido a que no especifica el modo de escritura. En lugar de utilizar este,

    df.write.mode('append').text("/path/to/file")
    

    P. s: no estoy acostumbrado a hacerlo en java, el que me dio es un scala/python equivalente

  2. 2
    val sqlContext = new HiveContext(sc)
    val df = sqlContext.sql("select * from tableName)
    df.write.text("/path/to/file")
    

    será escrito como un particiones archivo de texto, por lo que tendrá sus resultados espaciados entre un montón de archivos de la etiqueta de la parte-00000, pero va a estar ahí.

  3. 1

    Me enteré de por qué lo hace, en caso de que alguien más tiene el mismo problema. Al hacer foreachRDD es esencialmente ejecuta su función en cada una de las RDD de la DStream de guardar todo en el mismo archivo. Así que sobrescribir los demás datos y el primer o el último escritor gana. La forma más fácil de arreglar es guardarlos en un archivo con un nombre único. Así que he usado saveAsTextFile(ruta + tiempo().milisegundos().toString()) y se ha solucionado el problema. Pero, usted podría tener la misma marca de tiempo dos veces, así que puedo hacer esto aún más mediante la adición de un número aleatorio.

Kommentieren Sie den Artikel

Bitte geben Sie Ihren Kommentar ein!
Bitte geben Sie hier Ihren Namen ein

Recent Articles

Python «set» con duplicados/elementos repetidos

Hay una forma estándar de representar un "conjunto" que puede contener elementos duplicados. Como yo lo entiendo, un conjunto tiene exactamente un cero o...

Python: generador de expresión vs rendimiento

En Python, ¿hay alguna diferencia entre la creación de un generador de objetos a través de un generador de expresión versus el uso de...

Cómo exportar/importar la Masilla lista de sesiones?

Hay una manera de hacer esto? O tengo que tomar manualmente cada archivo de Registro? InformationsquelleAutor s.webbandit | 2012-10-23

no distingue mayúsculas de minúsculas coincidentes en xpath?

Por ejemplo, para el xml a continuación <CATALOG> <CD title="Empire Burlesque"/> <CD title="empire burlesque"/> <CD...