Actualmente estoy tirando de datos de SQL Server mediante PyODBC y tratando de insertar en una tabla de Hive en un Tiempo casi Real (NRT) manera.

Tengo una sola fila de la fuente y se convierte en la Lista[Strings] y la creación de esquemas mediante programación, pero mientras que la creación de un DataFrame, la Chispa es tirar StructType error.

>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>
>>> row = aj.fetchone()
>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])
>>> schemaString
u'ID ExternalID Name Description Notes Type Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 Capture19 Capture20 User UserState ModifiedDateTime UploadedDateTime'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> [f.dataType for f in schema.fields]
[StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType]
>>> myrdd = sc.parallelize(rowstr)
>>> myrdd.collect()
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py", line 1132, in _verify_type
raise TypeError("StructType can not accept object in type %s" % type(obj))
TypeError: StructType can not accept object in type <type 'str'>
InformationsquelleAutor ThirdEye | 2016-04-17

1 Comentario

  1. 12

    aquí está la razón por mensaje de error:

    >>> rowstr
    ['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]   
    #rowstr is a list of str
    >>> myrdd = sc.parallelize(rowstr)
    #myrdd is a rdd of str
    >>> schema = StructType(fields)
    #schema is StructType([StringType, StringType, ....])
    >>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
    #myrdd should have been RDD([StringType, StringType,...]) but is RDD(str)

    para solucionar esto, hay que hacer el DDR de tipo adecuado:

    >>> myrdd = sc.parallelize([rowstr])
    • En cuanto a «buen tipo» ¿qué es eso? Una lista de str? sc.paralelizar()?

Dejar respuesta

Please enter your comment!
Please enter your name here