Estoy tratando de escribir una aplicación que se aplica una función simultáneamente con un multiprocessing.Pool. Me gustaría que esta función a un método de instancia (por lo que podemos definir de manera diferente en diferentes subclases). Esto no parece ser posible; ya que he aprendido en otros lugares, al parecer métodos vinculados no puede ser en escabeche. Entonces, ¿por qué iniciar un multiprocessing.Process con un método vinculado como un objetivo de trabajo? El código siguiente:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Produce la siguiente salida:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

¿Por qué los Procesos de manejar los métodos vinculados, pero no en la piscina?

  • Es porque ellos no pueden ser serializados por pickle. Si usted necesita estar en python2.7, y usted necesita para hacer su trabajo de código de como es… usted debe utilizar un tenedor de multiprocessing que puede pickle métodos de instancia y puede pickle un Pool. Mira pathos.multiprocessing, que se puede encontrar en la stackoverflow enlace que se cita en el post anterior.
  • Más específicamente, este enlace se muestra cómo los métodos de instancia en 2.x puede ser trivialmente serializados en un Pool: stackoverflow.com/a/21345273/2379433
  • ¿Tiene que ser el método de instancia? Son capaces de utilizar classmethod? Lo he probado y me ha funcionado muy bien para mí.
  • este no es el caso de python 3 afaik
InformationsquelleAutor dpitch40 | 2014-12-05

3 Comentarios

  1. 32

    La pickle módulo normalmente no puede pickle métodos de instancia:

    >>> import pickle
    >>> class A(object):
    ...  def z(self): print "hi"
    ... 
    >>> a = A()
    >>> pickle.dumps(a.z)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
        Pickler(file, protocol).dump(obj)
      File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
        self.save(obj)
      File "/usr/local/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle instancemethod objects

    Sin embargo, la multiprocessing módulo tiene una costumbre de Pickler que añade algo de código para habilitar esta característica:

    #
    # Try making some callable types picklable
    #
    
    from pickle import Pickler
    class ForkingPickler(Pickler):
        dispatch = Pickler.dispatch.copy()
    
        @classmethod
        def register(cls, type, reduce):
            def dispatcher(self, obj):
                rv = reduce(obj)
                self.save_reduce(obj=obj, *rv)
            cls.dispatch[type] = dispatcher
    
    def _reduce_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)
    ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

    Puede replicar esta usando el copy_reg módulo para ver que funcione para usted:

    >>> import copy_reg
    >>> def _reduce_method(m):
    ...     if m.im_self is None:
    ...         return getattr, (m.im_class, m.im_func.func_name)
    ...     else:
    ...         return getattr, (m.im_self, m.im_func.func_name)
    ... 
    >>> copy_reg.pickle(type(a.z), _reduce_method)
    >>> pickle.dumps(a.z)
    "c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."

    Cuando se utiliza Process.start a generar un nuevo proceso en Windows, es encurtidos todos los parámetros que se le pasan al proceso hijo el uso de esta costumbre ForkingPickler:

    #
    # Windows
    #
    
    else:
        # snip...
        from pickle import load, HIGHEST_PROTOCOL
    
        def dump(obj, file, protocol=None):
            ForkingPickler(file, protocol).dump(obj)
    
        #
        # We define a Popen class similar to the one from subprocess, but
        # whose constructor takes a process object as its argument.
        #
    
        class Popen(object):
            '''
            Start a subprocess to run the code of a process object
            '''
            _tls = thread._local()
    
            def __init__(self, process_obj):
                # create pipe for communication with child
                rfd, wfd = os.pipe()
    
                # get handle for read end of the pipe and make it inheritable
                ...
                # start process
                ...
    
                # set attributes of self
                ...
    
                # send information to child
                prep_data = get_preparation_data(process_obj._name)
                to_child = os.fdopen(wfd, 'wb')
                Popen._tls.process_handle = int(hp)
                try:
                    dump(prep_data, to_child, HIGHEST_PROTOCOL)
                    dump(process_obj, to_child, HIGHEST_PROTOCOL)
                finally:
                    del Popen._tls.process_handle
                    to_child.close()

    Nota «enviar la información al niño» sección. Es el uso de la dump función, que utiliza ForkingPickler a pickle los datos, lo que significa que su método de instancia puede ser en escabeche.

    Ahora, al utilizar los métodos de multiprocessing.Pool para enviar un método para un proceso hijo, es el uso de un multiprocessing.Pipe a pickle los datos. En Python 2.7, multiprocessing.Pipe está implementado en C, y llama a pickle_dumps directamente, así que no tomar ventaja de la ForkingPickler. Eso significa que el decapado de la instancia método no funciona.

    Sin embargo, si utiliza copy_reg para registrar el instancemethod tipo, en vez de una costumbre Pickler, todos intentos de decapado se verán afectadas. Así que usted puede utilizar para activar el decapado de los métodos de instancia, incluso a través de Pool:

    import multiprocessing
    import copy_reg
    import types
    
    def _reduce_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)
    copy_reg.pickle(types.MethodType, _reduce_method)
    
    def test1():
        print("Hello, world 1")
    
    def increment(x):
        return x + 1
    
    class testClass():
        def process(self):
            process1 = multiprocessing.Process(target=test1)
            process1.start()
            process1.join()
            process2 = multiprocessing.Process(target=self.test2)
            process2.start()
            process2.join()
    
        def pool(self):
            pool = multiprocessing.Pool(1)
            for answer in pool.imap(increment, range(10)):
                print(answer)
            print
            for answer in pool.imap(self.square, range(10)):
                print(answer)
    
        def test2(self):
            print("Hello, world 2")
    
        def square(self, x):
            return x * x
    
    def main():
        c = testClass()
        c.process()
        c.pool()
    
    if __name__ == "__main__":
        main()

    De salida:

    Hello, world 1
    Hello, world 2
    GOT (0, 0, (True, 1))
    GOT (0, 1, (True, 2))
    GOT (0, 2, (True, 3))
    GOT (0, 3, (True, 4))
    GOT (0, 4, (True, 5))
    1GOT (0, 5, (True, 6))
    GOT (0, 6, (True, 7))
    2
    GOT (0, 7, (True, 8))
    3
    GOT (0, 8, (True, 9))
    GOT (0, 9, (True, 10))
    4
    5
    6
    7
    8
    9
    10
    GOT (1, 0, (True, 0))
    0
    GOT (1, 1, (True, 1))
    1
    GOT (1, 2, (True, 4))
    4
    GOT (1, 3, (True, 9))
    9
    GOT (1, 4, (True, 16))
    16
    GOT (1, 5, (True, 25))
    25
    GOT (1, 6, (True, 36))
    36
    GOT (1, 7, (True, 49))
    49
    GOT (1, 8, (True, 64))
    64
    GOT (1, 9, (True, 81))
    81
    GOT None

    También tenga en cuenta que en Python 3.x, pickle puede pickle método de instancia tipos de forma nativa, por lo que ninguna de estas cosas importa más. 🙂

    • Gracias por la sugerencia; me sorprendió que el módulo de multiprocesamiento no implementar esta ya. Su solución exacta no funciona para mí, porque implica el decapado de la instancia que el método está obligado a, que la causa de otros problemas, pero me señaló en la dirección correcta. Por el contrario, soy la definición de los métodos que se ejecutarán durante el multiprocesamiento en el nivel superior de un módulo para eludir sus problemas y obtener el comportamiento quiero.
  2. 10

    Aquí una alternativa que yo uso a veces, y funciona en Python2.x:

    Puede crear un nivel superior «alias» de todo tipo a los métodos de instancia, que acepta un objeto cuyos métodos de instancia que desea ejecutar en una piscina, y tiene que llamar a los métodos de instancia para usted:

    import functools
    import multiprocessing
    def _instance_method_alias(obj, arg):
    """
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool
    """
    obj.instance_method(arg)
    return
    class MyClass(object):
    """
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool
    """
    def __init__(self):
    self.my_string = "From MyClass: {}"
    def instance_method(self, arg):
    """
    Some arbitrary instance method
    """
    print(self.my_string.format(arg))
    return
    # create an object of MyClass
    obj = MyClass()
    # use functools.partial to create a new method that always has the 
    # MyClass object passed as its first argument
    _bound_instance_method_alias = functools.partial(_instance_method_alias, obj)
    # create our list of things we will use the pool to map
    l = [1,2,3]
    # create the pool of workers
    pool = multiprocessing.Pool()
    # call pool.map, passing it the newly created function
    pool.map(_bound_instance_method_alias, l)
    # cleanup
    pool.close()
    pool.join()

    Este código produce la siguiente salida:

    De MyClass: 1

    De MyClass: 2

    De MyClass: 3

    Una limitación es que usted no puede utilizar este para los métodos que modificar el objeto. Cada proceso recibe una copia del objeto que se está llamando a los métodos, de modo que los cambios no se propaguen al proceso principal. Si usted no necesita modificar el objeto de los métodos que usted está llamando sin embargo, esto puede ser una solución sencilla.

    • Gracias por el post, no tiene sentido para mí después de bucear en el decapado, etc y esto funciona para mí. Python 3 (eventualmente) de cerrar esta brecha. Saludos!
  3. 4

    Aquí es una manera más fácil trabajar en Python 2, simplemente envuelva el original método de instancia. Funciona bien en MacOSX y Linux, no funciona en Windows, prueba de Python 2.7

    from multiprocessing import Pool
    class Person(object):
    def __init__(self):
    self.name = 'Weizhong Tu'
    def calc(self, x):
    print self.name
    return x ** 5
    def func(x, p=Person()):
    return p.calc(x)
    pool = Pool()
    print pool.map(func, range(10))

Dejar respuesta

Please enter your comment!
Please enter your name here