Estoy un poco confundida a encontrar la manera correcta de guardar los datos en HDFS, después de procesarlos con chispa.

Esto es lo que estoy tratando de hacer. Estoy calculando el mínimo, máximo y desviación estándar de los campos numéricos. Mis archivos de entrada de millones de filas, pero la producción sólo tienen alrededor de 15-20 campos. Así, la salida es un valor único(escalares) para cada campo.

Por ejemplo: voy a cargar todas las filas de CAMPO1 en un dispositivo de este tipo, y al final, voy a tener 3 valores individuales para el CAMPO 1(MIN, MAX, SD). Yo concatenados estos tres valores en la cadena temporal. En el final, voy a tener de 15 a veinte filas, que contiene 4 columnas en este formato siguiente

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

Este es un fragmento de código:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

Así, tengo 3 variables, min_value, max_value y SD que quiero guardar de nuevo a hdfs.

Pregunta 1:
Desde la salida va a ser bastante pequeño, sólo debo guardar de forma local en el servidor? o debo volcado a HDFS. A mí me parece como volcar el archivo localmente hace mejor sentido.

Pregunta 2:
En la chispa, me pueden llamar a los siguientes para guardar un DDR en el fichero de texto

some_RDD.saveAsTextFile("hdfs://namenode/path")

¿Cómo puedo lograr lo mismo en el de una variable de Cadena que no es un dispositivo de este tipo en la scala? debo paralelizar mi resultado en un dispositivo de este tipo primero y, a continuación, llamar a saveAsTextFile?

InformationsquelleAutor user2773013 | 2014-06-30

2 Comentarios

  1. 12

    Para guardar localmente acaba de hacer

    some_RDD.collect()

    A continuación, guardar la matriz resultante con algo como el de este pregunta. Y sí, si el conjunto de datos es pequeño, y puede caber fácilmente en la memoria debe recoger y llevar al conductor del programa. Otra opción si los datos es un poco grande para almacenar en la memoria es sólo some_RDD.coalesce(numParitionsToStoreOn). Tenga en cuenta coalesce también toma un valor booleano shuffle, si usted está haciendo cálculos sobre los datos antes de coalescencia, debe establecer esta a true para obtener más paralelismo en los cálculos. Se unen, se reduce el número de nodos que almacenan los datos cuando usted llame some_RDD.saveAsTextFile("hdfs://namenode/path"). Si el archivo es muy pequeño, pero que lo necesite en hdfs, llamada repartition(1), que es el mismo que coalesce(1,true), esto se asegurará de que sus datos sólo se guarda en un nodo.

    ACTUALIZACIÓN:
    Así que si todo lo que quieres hacer es salvar los tres valores en HDFS usted puede hacer esto.
    sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

    Básicamente, usted está poniendo sólo el 3 vars en una tupla, envoltura de que en una Lista y establecer el paralelismo a uno, ya que los datos es muy pequeña

    • Lo siento, no era lo suficientemente clara. Mi resultado final no es en RDD formato. Mi resultado final es un único valor de la unidad(escalares). Así, voy a través de la RDD, calcular los cuartiles para cada campo, y almacenar cada valor escalar en una cadena temporal. Así que, realmente no puedo usar saveAsTextFile
    • está usted seguro de que no sólo significa un valor, tipo de unidad en la scala medio vacío, incluso si sólo tiene un valor todavía puede utilizar recoger
    • si el resultado final no es un dispositivo de este tipo, ¿cómo llegaste a ese punto, tu pregunta es bastante detallada de la onu. He actualizado para mostrar cómo podría ahorrar a hdfs, pero en un solo nodo
    • thx una tonelada aaronman. Lo sentimos acerca de ser claro. He actualizado a la pregunta. Hoepfully es un poco más claro.
    • echa un vistazo a la sección de actualización creo que hace exactamente lo que desea
  2. 5

    Respuesta 1: Ya que sólo necesita varios escalar, me gustaría decir almacenarlos en el sistema de archivos local. En primer lugar puede hacer val localValue = rdd.collect(), que recogerá todos los datos de los trabajadores a la maestra. Y, a continuación, llamar a java.io a escribir cosas en el disco.

    Respuesta 2: Se puede hacer sc.paralelizar(yourString).saveAsTextFile(«hdfs://host/yourFile»). El va a escribir cosas a parte-000*. Si quieres tener todas las cosas en un solo archivo, hdfs dfs -getmerge está aquí para ayudarle.

    • sc.paralelizar(yourString) no funcionará como paralelizar el método con excepción de la lista no cadena, si conoces alguna manera de pasar la cadena a esto, por favor responder

Dejar respuesta

Please enter your comment!
Please enter your name here