¿Cuál es la forma correcta de filtro de marco de datos por el campo de marca de hora?

He intentado diferentes formatos de fecha y formas de filtrado, nada ayuda: pyspark devuelve 0 objetos, o tira un error que no entiendo formato datetime

Aquí es lo que tengo hasta el momento:

from pyspark import SparkContext
from pyspark.sql import SQLContext

from django.utils import timezone
from django.conf import settings

from myapp.models import Collection

sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')

rango de campo de marca de hora:

system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)

intento 1

date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
    date_from.isoformat(), date_to.isoformat()
)
sf = sf.filter(date_filter)
sf.count()

Out[12]: 0

intento 2

sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to)
sf.count()

---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o63.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 4.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException: 
ERROR: syntax error at or near "18"
# 
# ups.. JDBC doesn't understand 24h time format??

intento de 3

sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \
     (date_from.isoformat(), date_to.isoformat())
     )
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o97.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 17.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"

los datos existentes en la tabla, aunque:

django_filters = {
    'my_col__gte': date_from,
    'my_col__lte': date_to
    }
Collection.objects.filter(**django_filters).count()

Out[17]: 1093436

O de esta manera

django_range_filter = {'my_col__range': (date_from, date_to)}
Collection.objects.filter(**django_range_filter).count()

Out[19]: 1093436

OriginalEl autor funkifunki | 2015-07-14

2 Comentarios

  1. 8

    Permite asumir el marco de datos es como sigue:

    sf = sqlContext.createDataFrame([
        [datetime.datetime(2013, 6, 29, 11, 34, 29)],
        [datetime.datetime(2015, 7, 14, 11, 34, 27)],
        [datetime.datetime(2012, 3, 10, 19, 00, 11)],
        [datetime.datetime(2016, 2, 8, 12, 21)],
        [datetime.datetime(2014, 4, 4, 11, 28, 29)]
    ], ('my_col', ))

    con el esquema:

    root
     |-- my_col: timestamp (nullable = true)

    y desea encontrar las fechas en el siguiente rango:

    import datetime, time 
    dates = ("2013-01-01 00:00:00",  "2015-07-01 00:00:00")
    
    timestamps = (
        time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
        for s in dates)

    Es posible consultar el uso de las marcas de tiempo calculada en un lado del conductor:

    q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps)
    sf.where(q1).show()

    o el uso de unix_timestamp función:

    q2 = """CAST(my_col AS INT)
            BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')
            AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)
    
    sf.where(q2).show()

    También es posible el uso de udf en forma similar a lo que he descrito en un otra respuesta.

    Si utiliza raw SQL es posible extraer de los diferentes elementos de marca de tiempo utilizando year, date, etc.

    sqlContext.sql("""SELECT * FROM sf
        WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()

    EDITAR:

    Desde Chispa 1.5 puede utilizar las funciones integradas:

    dates = ("2013-01-01",  "2015-07-01")
    date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
    
    sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
    la primera solución que funciona, muchas gracias!
    la segunda solución de error: Py4JJavaError: se ha producido Un error al llamar a o32.el filtro. : java.util.NoSuchElementException: la clave no encontrado: unix_timestamp no es un problema, aunque, probablemente causado por mi específicos de instalación
    Es posible. Como lo que me refiero unix_timestamp es parte de la Colmena Udf.

    OriginalEl autor zero323

  2. 0

    ¿Algo como esto:

    import pyspark.sql.functions as func
    
    df = sf.select(func.to_date(sf.my_col).alias("time"))
    sf = df.filter(sf.time > date_from).filter(sf.time < date_to)
    sf = sf.seleccionar …. se debe df = sf.seleccione …

    OriginalEl autor Sean

Dejar respuesta

Please enter your comment!
Please enter your name here