Quiero correr mi aplicación existente con Apache Spark y MySQL.

InformationsquelleAutor pangkaj paul | 2014-12-31

10 Comentarios

  1. 30

    De pySpark, es el trabajo para mí :

    dataframe_mysql = mySqlContext.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/my_bd_name",
        driver = "com.mysql.jdbc.Driver",
        dbtable = "my_tablename",
        user="root",
        password="root").load()
    • mySqlContext debe ser sqlContext
    • ^Esta es sólo una variable. Puedes nombrar como quieras. any_name_of_SQL_Context = SQLContext(sc)
    • Si estoy usando ODBC y en lugar de JDBC, sería exactamente el mismo, sólo con esos dos pasaron en el texto anterior?
    • Para spark2.x, el uso de dataframe = spark_session.read.format("jdbc").options(...).load()
  2. 14

    Usando Scala, esto funcionó para mí :
    Utilice los comandos siguientes:

    sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0
    
    import org.apache.spark.sql.SQLContext
    
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    
    val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()
    
    dataframe_mysql.show()
  3. 14

    Con spark 2.0.x,puede utilizar DataFrameReader y DataFrameWriter.
    Uso SparkSession.leer para acceder a DataFrameReader y uso del conjunto de datos.escribir para tener acceso a DataFrameWriter.

    Suponga el uso de chispa-shell.

    leer ejemplo

    val prop=new java.util.Properties()
    prop.put("user","username")
    prop.put("password","yourpassword")
    val url="jdbc:mysql://host:port/db_name"
    
    val df=spark.read.jdbc(url,"table_name",prop) 
    df.show()

    leer el ejemplo 2

    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()

    de chispa doc

    leer ejemplo3

    Si quieres leer los datos de un resultado de la consulta en lugar de una tabla.

    val sql="""select * from db.your_table where id>1"""
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql:dbserver")
      .option("dbtable",  s"( $sql ) t")
      .option("user", "username")
      .option("password", "password")
      .load()

    escribir ejemplo

    import org.apache.spark.sql.SaveMode
    
    val prop=new java.util.Properties()
    prop.put("user","username")
    prop.put("password","yourpassword")
    val url="jdbc:mysql://host:port/db_name"
    //df is a dataframe contains the data which you want to write.
    df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)

    中文版戳我

    • Trabajó muy bien y limpiamente! Gracias por este
  4. 10

    Para Scala, si utiliza el sbt esto también va a trabajar.

    En su build.sbt de archivo:

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "1.6.2",
        "org.apache.spark" %% "spark-sql" % "1.6.2",
        "org.apache.spark" %% "spark-mllib" % "1.6.2",
        "mysql" % "mysql-connector-java" % "5.1.12"
    )

    A continuación, sólo tiene que declarar el uso del conductor.

    Class.forName("com.mysql.jdbc.Driver").newInstance
    
    val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")
    
    val sc = new SparkContext(conf)
    
    val sqlContext = new SQLContext(sc)
    
    val data = sqlContext.read
    .format("jdbc")
    .option("url", "jdbc:mysql://<HOST>:3306/<database>")
    .option("user", <USERNAME>)
    .option("password", <PASSWORD>)
    .option("dbtable", "MYSQL_QUERY")
    .load()
    • parece un autocompletar faux pas, com.a mí mismo.jdbc.Conductor -> com.mysql.jdbc.Conductor?
    • Estás en lo cierto! gracias por la captura de ese.
  5. 6
    public static void main(String[] args) {
        Map<String, String> options = new HashMap<String, String>();
        options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
        options.put("dbtable", "<TableName>");
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        // DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
        DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
        System.out.println("Data------------------->" + jdbcDF.toJSON().first());
        Row[] rows = jdbcDF.collect();
        System.out.println("Without Filter \n ------------------------------------------------- ");
        for (Row row2 : rows) {
            System.out.println(row2.toString());
        }
        System.out.println("Filter Data\n ------------------------------------------------- ");
        jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
        rows = jdbcDF.collect();
        for (Row row2 : rows) {
            System.out.println(row2.toString());
        }
    }
    • Este código se halp para conectar chispa con la base de datos
  6. 5

    Para Java(usando maven), añadir chispa a las dependencias y controlador sql dependencias de la pom.xml archivo,

    <properties>
        <java.version>1.8</java.version>
        <spark.version>1.6.3</spark.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
     <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    Código de ejemplo, supongamos que el mysql localiza a nivel local, nombre de base de datos es prueba, nombre de usuario es raíz y contraseña es contraseña, y dos tablas en la db test son tabla1 y tabla2

    SparkConf sparkConf = new SparkConf();
    SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
    SQLContext sqlContext = new SQLContext(sc);
    
    // here you can run sql query
    String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
    // or use an existed table directly
    // String sql = "table1";
    DataFrame dataFrame = sqlContext
        .read()
        .format("jdbc")
        .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
        .option("user", "root")
        .option("password", "password")
        .option("dbtable", sql)
        .load();
    
    // continue your logical code
    ......
  7. 4

    Basado en este infoobjects artículo tratar el siguiente (suponiendo que Java o Scala, no estoy seguro de cómo esto funciona con python):

    • agregar el mysql-connector-java a la ruta de su chispa de clúster
    • inicializar el controlador: Class.forName("com.mysql.jdbc.Driver")
    • crear un JdbcRDD fuente de datos:

    val myRDD = new JdbcRDD( sc, () => 
                                   DriverManager.getConnection(url,username,password),
                            "select first_name,last_name,gender from person limit ?, ?",
                            1,//lower bound
                            5,//upper bound
                            2,//number of partitions
                            r =>
                              r.getString("last_name") + ", " + r.getString("first_name"))
    • JdbcRDD es desanimado ahora. Mejor mirar el DataFrame interfaz en la Chispa de la 1.4 y posterior.
    • Eso es cierto, aunque cuando la pregunta se le preguntó y respondió, no estaba disponible.
    • Sí, claro que sí. Acabo de encontrar esto cuando la búsqueda de mí mismo y de los otros pueden hacer lo mismo, así que he actualizado, para cerciorarse de nuevo a la gente a encontrar el material más reciente.
  8. 3

    Para Java, esto funcionó para mí:

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf()
                .setAppName(appName)
                .setSparkHome(sparkHome)
                .setMaster(masterUri);
    
        return sparkConf;
    }
    
    @Bean
    public JavaSparkContext javaSparkContext() {
        return new JavaSparkContext(sparkConf());
    }
    
    @Bean
    public SparkSession sparkSession() {
        return SparkSession
                .builder()
                .sparkContext(javaSparkContext().sc())
                .appName("Java Spark SQL basic example")
                .getOrCreate();
    }

    Properties properties = new Properties();
            properties.put("user", "root");
            properties.put("password", "root");
            properties.put("driver", "com.mysql.cj.jdbc.Driver");
            sparkSession.read()
                        .jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
                        .show();

    de curso, para MySQL, necesitaba el conector:

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>

    Y tengo

    +-------+------------------+--------------+---------------+
    |BOOK_ID|        BOOK_TITLE|   BOOK_AUTHOR|      BORR_NAME|
    +-------+------------------+--------------+---------------+
    |      1|        Gyűrű kúra|J.R.K. Tolkien|   Sára Sarolta|
    |      2|     Kecske-eledel|     Mekk Elek|Maláta Melchior|
    |      3|      Répás tészta| Vegán Eleazár|           null|
    |      4|Krumpli és pityóka| Farmer Emília|           null|
    +-------+------------------+--------------+---------------+
  9. 2

    Por Chispa 2.1.0 y Scala (En Windows 7 OS), a continuación el código funciona bastante bien para mí:

    import org.apache.spark.sql.SparkSession
    
    object MySQL {
      def main(args: Array[String]) {
        //At first create a Spark Session as the entry point of your app
        val spark:SparkSession = SparkSession
          .builder()
          .appName("JDBC")
          .master("local[*]")
          .config("spark.sql.warehouse.dir", "C:/Exp/")
          .getOrCreate();    
    
        val dataframe_mysql = spark.read.format("jdbc")
                              .option("url", "jdbc:mysql://localhost/feedback")
                              .option("driver", "com.mysql.jdbc.Driver")
                              .option("dbtable", "person") //replace with own
                              .option("user", "root") //replace with own 
                              .option("password", "vertrigo") // replace with own
                              .load()
    
        dataframe_mysql.show()
      }
    }
    • Indica la opción de controlador, como en su respuesta era necesario para mí para hacer que funcione
  10. 1
       val query: String =
        "select col1, col2 from schema.table_name where condition"
    
      val url= "jdbc:mysql://<ip>:3306/<schema>"
      val username = ""
      val password = ""
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val df = sqlContext.load("jdbc", Map(
        "url" -> (url + "/?user=" + username + "&password=" + password),
        "dbtable" -> s"($query) as tbl",
        "driver" -> "com.mysql.jdbc.Driver"))
    
    df.show()
    • SQLContext.la carga es obsoleto ahora y será eliminado en 2.0

Dejar respuesta

Please enter your comment!
Please enter your name here