Me gustaría hacer algo de limpieza en el inicio de mi programa Spark (Pyspark). Por ejemplo, me gustaría eliminar datos de anteriores HDFS ejecutar. En el cerdo esto se puede hacer usando comandos como

fs -copyFromLocal ....

rmf /path/to-/hdfs

o localmente usando el comando sh.

Me preguntaba cómo hacer lo mismo con Pyspark.

  • Usted no puede hacer tal cosa con Chispa. Tal vez la mejor opción es utilizar un oozie flujo de trabajo en el que se puede poner tanto en HDFS comandos y la Chispa de puestos de trabajo y se pueden combinar de acuerdo a la lógica que usted prefiera.
InformationsquelleAutor user3803714 | 2015-12-01

3 Comentarios

  1. 12

    Puede ejecutar arbitraria de comandos de shell usando el formulario de ejemplo subproceso.llame o sh biblioteca así que algo como esto debería de funcionar bien:

    import subprocess
    
    some_path = ...
    subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

    Si el uso de Python 2.x puede intentar usar spotify/mordedura de serpiente:

    from snakebite.client import Client
    
    host = ...
    port = ...
    client = Client(host, port)
    client.delete(some_path, recurse=True)

    hdfs3 es otra biblioteca que puede ser utilizado para hacer la misma cosa:

    from hdfs3 import HDFileSystem
    
    hdfs = HDFileSystem(host=host, port=port)
    HDFileSystem.rm(some_path)

    Apache Flecha enlaces Python son la última opción (y que a menudo ya está disponible en la Chispa de clúster, como se requiere para pandas_udf):

    from pyarrow import hdfs
    
    fs = hdfs.connect(host, port)
    fs.delete(some_path, recursive=True)
  2. 3

    Puede eliminar un hdfs ruta en pyspark sin el uso de la tercera parte de las dependencias de la siguiente manera:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('abc').getOrCreate()
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    path = "Your/hdfs/path"
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

    Para mejorar un paso más allá, puedes envolver la idea en una función auxiliar que puede volver a utilizar a través de trabajos/paquetes:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('abc').getOrCreate()
    
    def delete_path(spark, path):
        sc = spark.sparkContext
        fs = (sc._jvm.org
              .apache.hadoop
              .fs.FileSystem
              .get(sc._jsc.hadoopConfiguration())
              )
        fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
    
    delete_path(spark, "Your/hdfs/path")
  3. 0

    de https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/
    utilizando sólo PySpark

    ######
    # Get fs handler from java gateway
    ######
    URI = sc._gateway.jvm.java.net.URI
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())
    
    # We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
    fs.listStatus(Path('/user/hive/warehouse'))
    # or
    fs.delete(Path('some_path'))

    las otras soluciones que no funcionó en mi caso, pero este blog ayuda 🙂

Dejar respuesta

Please enter your comment!
Please enter your name here