He leído muchos documentos acerca de google bigquery-python, pero no puedo entender cómo manejar bigquery de datos por código de python.

Al principio, tengo que hacer una nueva tabla como la siguiente.

credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
               'projectId': project_id}
table_ref = {'tableId': table_id,
             'datasetId': dataset_id,
             'projectId': project_id}

dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
    {'name': 'id', 'type': 'string'},
...
]}

table = service.tables().insert(body = table, **dataset_ref).execute()

Y, a continuación, quiero insertar datos en esta tabla, por lo que traté de hacer, como el de abajo.

fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)

table = service.tables().patch(body = fetch_list, **table_ref).execute()

Pero no pasó nada.

¿Cómo puedo actualizar los datos nuevos en bigquery tabla?

Por favor me muestran algunos ejemplos de códigos.

OriginalEl autor Luis Kang | 2016-04-17

1 Comentario

  1. 16

    EDICIÓN de Noviembre de 2018:

    La respuesta a esta pregunta ya está desfasada como la de google cloud cliente ha evolucionado considerablemente desde este último post.

    Este enlace se muestra cómo utilizar el cliente más reciente para la transmisión y la este tiene un ejemplo de que el trabajo de inserción de la operación como se mencionó anteriormente en la respuesta.

    Original Respuesta:

    Hay un par de maneras diferentes que puede utilizar para insertar datos a BQ.

    Para una comprensión más profunda de cómo el python-api funciona, aquí descubrirás todo lo que necesitas: bq-python-api (en primer lugar los documentos que son un poco de miedo pero después de obtener un bloqueo de la misma es algo muy simple).

    Hay 2 métodos principales que utilizo para insertar datos a BQ. La primera es la transmisión de datos y se supone que debe usarse cuando se puede insertar una fila por fila en tiempo real de la moda. Ejemplo de código:

    import uuid
    def stream_data(self, table, data, schema):
        # first checks if table already exists. If it doesn't, then create it
        r = self.service.tables().list(projectId=your_project_id,
                                         datasetId=your_dataset_id).execute()
        table_exists = [row['tableReference']['tableId'] for row in
                        r['tables'] if
                        row['tableReference']['tableId'] == table]
        if not table_exists:
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': your_project_id,
                    'datasetId': your_dataset_id
                },
                'schema': schema
            }
            self.service.tables().insert(projectId=your_project_id,
                                         datasetId=your_dataset_id,
                                         body=body).execute()
    
        # with table created, now we can stream the data
        # to do so we'll use the tabledata().insertall() function.
        body = {
            'rows': [
                {
                    'json': data,
                    'insertId': str(uuid.uuid4())
                }
            ]
        }
        self.service.tabledata().insertAll(projectId=your_project_id),
                                           datasetId=your_dataset_id,
                                           tableId=table,
                                             body=body).execute(num_retries=5)

    Aquí mi self.service es corresponsal para su service objeto.

    Un ejemplo de entrada data que tenemos en nuestro proyecto:

    data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}

    Y su corresponsal schema:

    schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}

    La otra forma de insertar datos es el uso de la trabajo de inserción función. Como se puede ver en la documentación, se acepta varias fuentes de datos. Tengo un ejemplo de cómo se pueden cargar los resultados de una consulta en otra tabla:

    def create_table_from_query(self,
                                query,
                                dest_table,
                                how):
        body = {
            'configuration': {
                'query': {
                    'destinationTable': {
                        'projectId': your_project_id,
                        'tableId': dest_table,
                        'datasetId': your_dataset_id
                    },
                    'writeDisposition': how,
                    'query': query,
                },
            }
        }
    
        response = self.connector.jobs().insert(projectId=self._project_id,
                                                body=body).execute()
        self.wait_job_completion(response['jobReference']['jobId'])
    
    def wait_job_completion(self, job_id):
        while True:
            response = self.connector.jobs().get(projectId=self._project_id,
                                                 jobId=job_id).execute()
            if response['status']['state'] == 'DONE':
                return

    La how de entrada acepta las opciones disponibles para este campo en la documentación (tales como «WRITE_TRUNCATE», o «WRITE_APPEND»).

    Puede cargar los datos desde un archivo csv por ejemplo, en este caso, el configuration variable debe ser definida a lo largo de las líneas:

    "configuration": {
      "load": {
        "fieldDelimiter": "\t"
        "sourceFormat": "CSV"
        "destinationTable": {
          "projectId": your_project_id,
          "tableId": table_id,
          "datasetId": your_dataset_id
        },
        "writeDisposition": "WRITE_TRUNCATE"
        "schema": schema,
        "sourceUris": file_location_in_google_cloud_storage
      },
    }

    (Usando como ejemplo un archivo csv delimitado por tabulaciones. Podría ser un archivo json así, en la documentación que le guiará a través de las opciones disponibles).

    Ejecución de los trabajos (a) requerir algún tiempo para que se complete (por eso hemos creado el wait_job_completion método). Debería ser más barato, aunque como en comparación con transmisión en tiempo real.

    Cualquier pregunta, háganoslo saber,

    Hola a partir de la documentación sólo veo que usted puede upload CSV o los resultados de la Consulta a la tabla, pero me pregunto ¿cómo puedo añadir una fila (python lista) a una tabla utilizando el método insert ()?
    Tal vez esto va a ayudar a: googlecloudplatform.github.io/google-cloud-python/estable/…. El python cliente de google ha evolucionado mucho desde la última vez publicado esta respuesta, puede ser vale la pena aprender a usarlo así: googlecloudplatform.github.io/google-cloud-python/estable/… fíjese que en su caso podría ser mejor usar el live stream’ método con el que usted sólo desea insertar un par de listas de BQ.
    eso es impresionante! gracias!
    Enlaces están rotos @WillianFuks
    estás en lo correcto. Aquí está el enlace correcto para la transmisión en vivo: gcloud-python.readthedocs.io/es/última/bigquery/…. Para leer un archivo CSV de almacenamiento, he aquí un ejemplo: gcloud-python.readthedocs.io/es/última/bigquery/…. Voy a actualizar mi respuesta con estos enlaces. Gracias por el mensaje 🙂

    OriginalEl autor Willian Fuks

Dejar respuesta

Please enter your comment!
Please enter your name here