Tengo un dataframe y quiero insertar en hbase. Yo sigo este documenation .

Esta es la forma en que mi dataframe aspecto:

 --------------------
|id | name | address |
|--------------------|
|23 |marry |france   |
|--------------------|
|87 |zied  |italie   |
 --------------------

Puedo crear un hbase tabla usando este código:

val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
          print("-----------------------------------------------------------------------------------------------------------")
          val tableDesc = new HTableDescriptor(tableName)
          tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
          admin.createTable(tableDesc)
        }else{
          print("Table already exists!!--------------------------------------------------------------------------------------")
        }

Y ahora ¿cómo puedo insertar este dataframe en hbase ?

En otro ejemplo tengo éxito para insertar en hbase usando este código:

val myTable = new HTable(conf, tableName)
    for (i <- 0 to 1000) {
      var p = new Put(Bytes.toBytes(""+i))
      p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
      p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
      p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
      p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
      myTable.put(p)
    }
    myTable.flushCommits()

Pero ahora estoy atascado, cómo insertar cada registro de mi dataframe en mi hbase tabla.

Gracias por su tiempo y atención

  • El problema no es claro. Usted está haciendo algo más. hbase.apache.org/book.html#_sparksql_dataframes le dice a definir el catálogo y el uso de sc.paralelizar(datos).toDF.escribir.opciones para guardar DF a HBase.
  • sí y decir que yo estoy usando esa documentación. estoy atascado aquí val data = (0 to 255).map { i => HBaseRecord(i, "extra")} cómo insertar foreach registro de mi dataframe no de 0 a 255
InformationsquelleAutor Zied Hermi | 2017-05-22

2 Comentarios

  1. 0

    utilizando respuesta para el código de formato a los efectos de
    Doc le dice:

    sc.parallelize(data).toDF.write.options(
     Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
     .format("org.apache.hadoop.hbase.spark ")
     .save()

    donde sc.paralelizar(datos).toDF es su DataFrame. Doc ejemplo vueltas scala colección de dataframe utilizando sc.paralelizar(datos).toDF

    Ya tiene su DataFrame, solo intenta llamar

    yourDataFrame.write.options(
         Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
         .format("org.apache.hadoop.hbase.spark ")
         .save()

    Y debería funcionar. Doc es bastante claro…

    UPD

    Dado un DataFrame con esquema especificado, por encima de la voluntad de crear un HBase
    tabla con 5 regiones y guardar el DataFrame en el interior. Tenga en cuenta que si
    HBaseTableCatalog.tablanueva no se especifica, la tabla tiene que ser
    pre-creados.

    Se trata de la partición de datos. Cada HBase tabla puede tener 1…X regiones. Usted debe elegir cuidadosamente el número de regiones. Bajo número de regiones es malo. De la zona alta los números que también es malo.

    • gracias por tu respuesta; puede usted explicar esta línea: HBaseTableCatalog.newTable -> "5"
    • Respuesta actualizada, véase más arriba. 5, significa crear las 5 regiones de la tabla en HBase
    • y donde está el catálogo definido ? case class HBaseRecord( col0: String, col1: String, col2: String ) object HBaseRecord{ def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, s"String$i: $t", s"String$i: $t") } } qué hacer después? gracias
    • hbase.apache.org/book.html#_define_catalog. Usted debe hacer cosas similares
    • después de agregar el script object HBaseRecord .tengo este error error: too many arguments for method apply: (i: Int, t: String)HBaseRecord in object HBaseRecord <console>:1: error: ';' expected but 'for' found. puede usted explaine me este error
    • No tengo ni idea, mejor proveer completo fragmento de código. Error es bastante obvio. Llame HBaseRecord.solicitar registro en el camino equivocado.
    • yo estoy usando la chispa de shell; tengo ese error al ejecutar este script : object HBaseRecord { def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String$i: $t", i.toByte) } }
    • se convierte en un reto. Puedes actualizar tu pregunta y poner yuor script allí? Yo no veo que la línea que hace que el error de compilación

  2. 0

    Una alternativa es buscar en rdd.saveAsNewAPIHadoopDataset, para insertar los datos en la hbase tabla.

    def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
        import spark.implicits._
    
        val config = HBaseConfiguration.create()
        config.set("hbase.zookeeper.quorum", "ip's")
        config.set("hbase.zookeeper.property.clientPort","2181")
        config.set(TableInputFormat.INPUT_TABLE, "tableName")
    
        val newAPIJobConfiguration1 = Job.getInstance(config)
        newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
        newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")
    
        val hbasePuts= df.rdd.map((row: Row) => {
          val  put = new Put(Bytes.toBytes(row.getString(0)))
          put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
          put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
          (new ImmutableBytesWritable(), put)
        })
    
        hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
        }

    Ref : https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/

    • si usted desea guardar en una tabla no debe ser config.set(TableInputFormat.OUPUT_TABLE, «tableName»)
    • El TableOutputFormat aquí utilizado es un HBase archivo de clase. hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/… Porque nos gustaría enviar datos a HBase tabla , que son la creación de la TableOutputFormat. TableInputFormat habría INPUT_TABLE que podría ser utilizado en caso de la extracción de los datos de HBase.

Dejar respuesta

Please enter your comment!
Please enter your name here