Cuando ejecuto algo como:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

funciona bien. Sin embargo, poniendo esto como una función de una clase:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Me da el siguiente error:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

He visto un post de Alex Martelli tratar con el mismo tipo de problema, pero no fue suficientemente explícito.

  • «esto como una función de una clase»? Puedes publicar el código que realmente se obtiene el error real. Sin el código real que sólo podemos adivinar lo que estás haciendo mal.
  • he publicado el código
  • Como observación general, existen decapado módulos más poderoso que el de Python estándar módulo pickle (como el picloud módulo mencionado en esta respuesta).
  • He tenido un problema similar con los cierres en IPython.Parallel, pero no se podía solucionar el problema, empujando los objetos a los nodos. Parece bastante molesto para conseguir alrededor de este problema con el multiprocesamiento.
  • Aquí calculate es picklable, así que parece que esto se puede resolver mediante 1) la creación de un objeto de función con un constructor que copia a través de un calculate instancia y, a continuación, 2) la aprobación de una instancia de este objeto de la función a Pool‘s map método. No?
  • No creo que a ninguno de Python «cambios recientes» va a ser de ninguna ayuda. Algunas limitaciones de la multiprocessing módulo son debido a su objetivo de ser una cruz-plataforma de implementación, y la falta de un fork(2)-como la llamada de sistema en Windows. Si usted no se preocupan por Win32 apoyo, puede ser un proceso más sencillo basado en la solución. O, si usted está preparado para usar los hilos en lugar de procesos, puede sustituir from multiprocessing import Pool con from multiprocessing.pool import ThreadPool as Pool.

InformationsquelleAutor Mermoz | 2010-07-20

16 Comentarios

  1. 67

    Yo también estaba molesto por restricciones en cuanto a qué tipo de funciones a la piscina.mapa podría aceptar. Me escribió lo siguiente para evitar esto. Parece que funciona, incluso para uso recursivo de parmap.

    from multiprocessing import Process, Pipe
    from itertools import izip
    
    def spawn(f):
        def fun(pipe,x):
            pipe.send(f(x))
            pipe.close()
        return fun
    
    def parmap(f,X):
        pipe=[Pipe() for x in X]
        proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
        [p.start() for p in proc]
        [p.join() for p in proc]
        return [p.recv() for (p,c) in pipe]
    
    if __name__ == '__main__':
        print parmap(lambda x:x**x,range(1,5))
    • Esto ha funcionado muy bien para mí, gracias. He encontrado una debilidad: he intentado utilizar parmap en algunas de las funciones que pasa alrededor de un defaultdict y consiguió el PicklingError de nuevo. Yo no encontrar una solución a esto, y me reelaborado mi código para no utilizar el defaultdict.
    • Esto no funciona en Python 2.7.2 (por defecto, Jun 12 2011, 15:08:59) [MSC v. 1500 32 bits (Intel)] en win32
    • Esto funciona en Python 2.7.3 Ago 1,2012, 05:14:39. Esto no funciona en el gigante de iterables -> hace un OSError: [Errno 24] Demasiados archivos abiertos debido a que el número de tubos se abre.
    • Esta solución genera un proceso para cada elemento de trabajo. La solución de «klaus se» de abajo es más eficiente.
  2. 79

    Yo no podía usar los códigos publicado hasta el momento, ya que los códigos de usar «multiprocesamiento.La piscina de» no trabajo con expresiones lambda y los códigos no utilizar la «multiprocesamiento.La piscina de» generar el mayor número de procesos, ya que hay elementos de trabajo.

    He adaptado el código.t. genera un monto predefinido de los trabajadores y sólo se itera a través de la lista de entrada, si existe un trabajador inactivo. También he activado el «demonio» de los trabajadores de la seg.t. ctrl-c funciona como se esperaba.

    import multiprocessing
    
    
    def fun(f, q_in, q_out):
        while True:
            i, x = q_in.get()
            if i is None:
                break
            q_out.put((i, f(x)))
    
    
    def parmap(f, X, nprocs=multiprocessing.cpu_count()):
        q_in = multiprocessing.Queue(1)
        q_out = multiprocessing.Queue()
    
        proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
                for _ in range(nprocs)]
        for p in proc:
            p.daemon = True
            p.start()
    
        sent = [q_in.put((i, x)) for i, x in enumerate(X)]
        [q_in.put((None, None)) for _ in range(nprocs)]
        res = [q_out.get() for _ in range(len(sent))]
    
        [p.join() for p in proc]
    
        return [x for i, x in sorted(res)]
    
    
    if __name__ == '__main__':
        print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
    • Cómo se obtiene una barra de progreso para trabajar correctamente con este parmap función?
    • Una pregunta: he utilizado esta solución, pero notó que el python de los procesos que generó permaneció activo en memoria. Cualquier pensamiento rápido sobre cómo matar a aquellos en los que su parmap sale?
    • Sé que nos disuaden de simplemente decir gracias en los comentarios, pero su respuesta es demasiado valioso para mí, no pude resistir. Me gustaría poder dar más de una reputación…
    • ¿cuál es la razón para [q_in.put((None,None)) for _ in range(nprocs)]?
    • pasando (None, None) como el último elemento indica a fun que ha llegado a la final de la secuencia de elementos de cada proceso.
    • usted puede con una recompensa, si usted tiene la reputación suficiente a sí mismo 🙂
    • sí, no funciona para mí. Todavía me sale un error: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
    • En realidad a mí me funciona ahora! Muchas gracias! Tuve un problema porque mi función toma varios argumentos, pero me fijo.
    • se: ¿Qué quieres decir con «ctrl-c funciona como se espera»?
    • esto probablemente significa Ctrl+C se detiene como se esperaba. Strg es el nombre de la tecla Ctrl en el teclado alemán
    • Esto es realmente genial! Ya solucione mi problema!
    • _pickle.PicklingError: Can't pickle <function <lambda> at 0x00000254E1FDE6A8>: attribute lookup <lambda> on __main__ failed, usando Python 3.7.0 en Windows 10.

  3. 45

    Multiprocesamiento y decapado es discontinua y limitada a menos que salte fuera de la biblioteca estándar.

    Si utiliza un tenedor de multiprocessing llamado pathos.multiprocesssing, usted puede usar directamente las clases y métodos de la clase en el multiprocesamiento del map funciones. Esto es debido a que dill se utiliza en lugar de pickle o cPickle, y dill puede serializar casi nada en python.

    pathos.multiprocessing también proporciona una asincronía en función del mapa… y puede map funciones con múltiples argumentos (por ejemplo,map(math.pow, [1,2,3], [4,5,6]))

    El debate:
    ¿Qué puede multiprocesamiento y eneldo hacer juntos?

    y:
    http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

    Incluso maneja el código que escribió inicialmente, sin modificación, y desde el intérprete. ¿Por qué hacer nada más que es más frágil y específico para un solo caso?

    >>> from pathos.multiprocessing import ProcessingPool as Pool
    >>> class calculate(object):
    ...  def run(self):
    ...   def f(x):
    ...    return x*x
    ...   p = Pool()
    ...   return p.map(f, [1,2,3])
    ... 
    >>> cl = calculate()
    >>> print cl.run()
    [1, 4, 9]

    Obtener el código aquí:
    https://github.com/uqfoundation/pathos

    Y, simplemente para mostrar un poco más de lo que puede hacer:

    >>> from pathos.multiprocessing import ProcessingPool as Pool
    >>> 
    >>> p = Pool(4)
    >>> 
    >>> def add(x,y):
    ...   return x+y
    ... 
    >>> x = [0,1,2,3]
    >>> y = [4,5,6,7]
    >>> 
    >>> p.map(add, x, y)
    [4, 6, 8, 10]
    >>> 
    >>> class Test(object):
    ...   def plus(self, x, y): 
    ...     return x+y
    ... 
    >>> t = Test()
    >>> 
    >>> p.map(Test.plus, [t]*4, x, y)
    [4, 6, 8, 10]
    >>> 
    >>> res = p.amap(t.plus, x, y)
    >>> res.get()
    [4, 6, 8, 10]
    • el pathos.multiprocesamiento también tiene una asincronía en el mapa (amap) que permite el uso de barras de progreso y otras programación asincrónica.
    • Me gusta el pathos.multiprocesamiento, que puede servir casi una gota en el reemplazo de la no-paralelo mapa mientras que goza de los multiprocesadores. Tengo un simple contenedor de pathos.multiprocesamiento.mapa, de tal manera que es más eficaz memoria cuando el procesamiento de sólo-lectura de datos de gran tamaño de la estructura a través de múltiples núcleos, consulte este repositorio git.
    • Parece interesante, pero no se instala. Este es el mensaje de pip da: Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
    • Sí. No he publicado en un tiempo como he sido dividir la funcionalidad en paquetes separados, y también en la conversión de 2/3 compatible con el código. Mucho de lo anterior se ha modularizado en multiprocess que es de 2/3 compatible. Consulte stackoverflow.com/questions/27873093/… y pypi.python.org/pypi/multiprocess.
    • Probablemente, usted debe agregar un descargo de responsabilidad si usted es el responsable, por el camino.
    • Yo todavía tengo el pepinillo de error – los demás métodos que se trabajó muy bien para mí, sin embargo. Esta podría ser mi propio error, pero no hay mucho en el camino de documentos en el paquete de sitio, aunque esta fue mi primera opción.
    • ¿Cómo se instala? Ver: github.com/uqfoundation/pathos/issues/2 para obtener instrucciones sobre cómo instalar la versión más actualizada mediante pip. O, si usted está buscando para pathos.multiprocessing en realidad vive en un paquete llamado multiprocess, donde sólo se puede pip install multiprocess. Ver los comentarios de arriba.
    • Así como un seguimiento, pathos ha tenido una nueva versión estable y es también el 2.x y 3.x compatible.
    • ¿Cómo puedo ejecutar el multiprocesamiento para una clase anidada?
    • pido esto como es, por eso, la pregunta, o en el multiprocess o pathos página de GitHub. La respuesta corta es que (a) necesito ver algo de código, y el comentario no es un formato ideal para que, y (b) en el resumen, se podría trabajar mediante multiprocess.Pool si la instancia de la clase es serializable con dill. Tengo la sensación de que se está produciendo una clase dentro de otra clase… eso es difícil de manejar.
    • gracias @MikeMcKerns, sí, estoy hablando de una clase dentro de otra clase.. Pero va a crear un separado, de MODO que la pregunta tal como se sugiere. Gracias
    • por favor, me apunte a hacer (mencionar o lo que sea).

  4. 39

    Actualmente no hay solución a su problema, por lo que yo sé: la función que se le da a map() debe ser accesible a través de una importación de su módulo. Esta es la razón por robert código funciona: la función f() puede ser obtenido por el importador el siguiente código:

    def f(x):
        return x*x
    
    class Calculate(object):
        def run(self):
            p = Pool()
            return p.map(f, [1,2,3])
    
    if __name__ == '__main__':
        cl = Calculate()
        print cl.run()

    He añadido una sección «principal», porque de esta manera se sigue la recomendaciones para la plataforma Windows («asegúrese de que el módulo principal puede ser de forma segura importados por un nuevo intérprete de Python, sin causar efectos secundarios no deseados»).

    También he añadido una letra mayúscula en frente de Calculate, así como el seguimiento de PEP 8. 🙂

  5. 18

    La solución por mrule es correcto, pero tiene un fallo: si el niño responde con una gran cantidad de datos, puede llenar el tubo del amortiguador, bloqueo en el niño pipe.send(), mientras que el padre está a la espera para que el niño salga en pipe.join(). La solución es leer el hijo de datos antes de join()ing en el niño. Además, el niño debe cerrar el padre del final de la tubería, para evitar un punto muerto. El código de abajo de las revisiones. También ser conscientes de que este parmap crea un proceso por elemento en X. Una más avanzada solución es utilizar multiprocessing.cpu_count() dividir X en un número de trozos, y luego combinar los resultados antes de regresar. Dejo como ejercicio para el lector con el fin de no estropear la concisión de la bonita respuesta por mrule. 😉

    from multiprocessing import Process, Pipe
    from itertools import izip
    
    def spawn(f):
        def fun(ppipe, cpipe,x):
            ppipe.close()
            cpipe.send(f(x))
            cpipe.close()
        return fun
    
    def parmap(f,X):
        pipe=[Pipe() for x in X]
        proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
        [p.start() for p in proc]
        ret = [p.recv() for (p,c) in pipe]
        [p.join() for p in proc]
        return ret
    
    if __name__ == '__main__':
        print parmap(lambda x:x**x,range(1,5))
    • ¿Cómo se puede elegir el número de procesos?
    • FUNCIONA!!! Gracias. Ninguna de las otras soluciones que funcionó para mí 😀
    • Sin embargo muere muy rápidamente debido al error OSError: [Errno 24] Too many open files. Creo que no hay necesidad de ser algún tipo de límites en el número de procesos para que funcione correctamente…
  6. 13

    También he luchado con esto. Yo tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:

    from multiprocessing import Pool
    import itertools
    pool = Pool()
    class Example(object):
        def __init__(self, my_add): 
            self.f = my_add  
        def add_lists(self, list1, list2):
            # Needed to do something like this (the following line won't work)
            return pool.map(self.f,list1,list2)  

    Necesitaba utilizar la función de auto.f en una Piscina.mapa() llamada desde dentro de la misma clase y yo.f no tomar una tupla como argumento. Ya que esta función se ha incrustado en una clase, no estaba claro para mí cómo escribir el tipo de contenedor otras respuestas sugeridas.

    He resuelto este problema mediante el uso de un contenedor que lleva a una tupla/lista, donde el primer elemento es la función, y el resto de elementos son los argumentos de esa función, llamada eval_func_tuple(f_args). El uso de este, la problemática de la línea puede ser reemplazado por el retorno de la piscina.mapa(eval_func_tuple, itertools.izip(itertools.repetir(auto.f), lista1, lista2)). Aquí está el código completo:

    Archivo: util.py

    def add(a, b): return a+b
    
    def eval_func_tuple(f_args):
        """Takes a tuple of a function and args, evaluates and returns result"""
        return f_args[0](*f_args[1:])  

    Archivo: main.py

    from multiprocessing import Pool
    import itertools
    import util  
    
    pool = Pool()
    class Example(object):
        def __init__(self, my_add): 
            self.f = my_add  
        def add_lists(self, list1, list2):
            # The following line will now work
            return pool.map(util.eval_func_tuple, 
                itertools.izip(itertools.repeat(self.f), list1, list2)) 
    
    if __name__ == '__main__':
        myExample = Example(util.add)
        list1 = [1, 2, 3]
        list2 = [10, 20, 30]
        print myExample.add_lists(list1, list2)  

    Ejecutando main.py dará [11, 22, 33]. Siéntase libre de mejorar esta situación, por ejemplo eval_func_tuple también podría ser modificado para tomar los parámetros de palabra clave.

    En otra nota, en otra de las respuestas, la función de «parmap» puede ser más eficaz, para el caso de Procesos más que el número de CPUs disponibles. Estoy copiando una versión editada de abajo. Este es mi primer post y no estaba seguro de si debo editar directamente la respuesta original. También he cambiado el nombre de algunas variables.

    from multiprocessing import Process, Pipe  
    from itertools import izip  
    
    def spawn(f):  
        def fun(pipe,x):  
            pipe.send(f(x))  
            pipe.close()  
        return fun  
    
    def parmap(f,X):  
        pipe=[Pipe() for x in X]  
        processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
        numProcesses = len(processes)  
        processNum = 0  
        outputList = []  
        while processNum < numProcesses:  
            endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
            for proc in processes[processNum:endProcessNum]:  
                proc.start()  
            for proc in processes[processNum:endProcessNum]:  
                proc.join()  
            for proc,c in pipe[processNum:endProcessNum]:  
                outputList.append(proc.recv())  
            processNum = endProcessNum  
        return outputList    
    
    if __name__ == '__main__':  
        print parmap(lambda x:x**x,range(1,5))         
  7. 7

    Funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no pepinillos. Sin embargo, esto funciona:

    def f(x):
        return x*x
    
    class calculate(object):
        def run(self):
            p = Pool()
        return p.map(f, [1,2,3])
    
    cl = calculate()
    print cl.run()
    • gracias, pero me parece un poco sucio para definir la función fuera de la clase. La clase debe bundle todo lo que necesita para llevar a cabo una determinada tarea.
    • la clase debe agrupar todos los que necesita» Realmente? No puedo encontrar muchos ejemplos de esto. La mayoría de las clases dependen de otras clases o funciones. ¿Por qué llamar a una clase de dependencia «sucio»? ¿Qué hay de malo con una dependencia?
    • Así, la función no debe modificar los datos de la clase, porque sería modificar la versión en el otro proceso, por lo que podría ser un método estático. Usted puede ordenar de pepinillo un método estático: stackoverflow.com/questions/1914261/… O, para algo tan trivial, puede utilizar una expresión lambda.
  8. 7

    Tomé klaus se y aganders3 la respuesta, y de hecho documentado un módulo que es más legible y mantiene en un archivo. Usted puede simplemente añadir a su proyecto. Incluso dispone de una opción de la barra de progreso !

    """
    The ``processes`` module provides some convenience functions
    for using parallel processes in python.
    Adapted from http://stackoverflow.com/a/16071616/287297
    Example usage:
    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)
    Comments:
    "It spawns a predefined amount of workers and only iterates through the input list
    if there exists an idle worker. I also enabled the "daemon" mode for the workers so
    that KeyboardInterupt works as expected."
    Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.
    Alternatively, use this fork of multiprocessing: 
    https://github.com/uqfoundation/multiprocess
    """
    # Modules #
    import multiprocessing
    from tqdm import tqdm
    ################################################################################
    def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
    num, obj = queue_in.get()
    queue_out.put((num, func_to_apply(obj)))
    ################################################################################
    def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
    proc.daemon = True
    proc.start()
    # Display progress bar or not #
    if verbose:
    results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
    results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]
    ################################################################################
    def test():
    def slow_square(x):
    import time
    time.sleep(2)
    return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

    EDITAR: Agregado @alexander-mcfarlane sugerencia y una función de prueba

    • un problema con la barra de progreso… El bar solo mide la ineficiencia con la carga de trabajo se divide entre los procesadores. Si la carga de trabajo es perfectamente split, a continuación, todos los procesadores se join(), al mismo tiempo, y usted acaba de conseguir un flash de 100% completado en el tqdm pantalla. El único momento en que va a ser útil es si cada procesador tiene una visión sesgada de la carga de trabajo
    • mover tqdm() para envolver la línea: result = [q_out.get() for _ in tqdm(sent)] y funciona mucho mejor – mucho esfuerzo a pesar de apreciar esto realmente, así que +1
    • Gracias por los consejos, voy a probar y, a continuación, actualizar la respuesta !
    • La respuesta es actualizado, y la barra de progreso funciona mucho mejor!
  9. 6

    Yo sé que esto fue preguntado hace más de 6 años ahora, pero sólo quería añadir mi solución, ya que algunas de las sugerencias anteriores parecen tremendamente complicado, pero mi solución fue realmente muy simple.

    Todo lo que tenía que hacer era envoltura de la piscina.mapa() llamada a una función auxiliar. Pasando el objeto de la clase junto con los argumentos para el método como una tupla, que parecía un poco como esta.

    def run_in_parallel(args):
    return args[0].method(args[1])
    myclass = MyClass()
    method_args = [1,2,3,4,5,6]
    args_map = [ (myclass, arg) for arg in method_args ]
    pool = Pool()
    pool.map(run_in_parallel, args_map)
  10. 3

    He modificado klaus se del método, porque mientras estaba trabajando para mí, con listas pequeñas, iba a colgar cuando el número de elementos era de ~1000 o más. En lugar de empujar a los puestos de trabajo de una en una con el None condición de parada, me carga hasta la entrada de la cola de todos a la vez y dejar que los procesos de munch en él hasta que se vacía.

    from multiprocessing import cpu_count, Queue, Process
    def apply_func(f, q_in, q_out):
    while not q_in.empty():
    i, x = q_in.get()
    q_out.put((i, f(x)))
    # map a function using a pool of processes
    def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]
    return [x for i,x in sorted(res)]

    Edit: por desgracia ahora estoy corriendo en este error en mi sistema: Multiprocesamiento Cola maxsize límite es de 32767, esperemos que las soluciones no serán de ayuda.

  11. 1

    Sé que esta pregunta se la hicieron a 8 años y 10 meses, pero quiero presentar mi solución:

    from multiprocessing import Pool
    class Test:
    def __init__(self):
    self.main()
    @staticmethod
    def methodForMultiprocessing(x):
    print(x*x)
    def main(self):
    if __name__ == "__main__":
    p = Pool()
    p.map(Test.methodForMultiprocessing, list(range(1, 11)))
    p.close()
    TestObject = Test()

    Sólo debe hacer su función de clase en una función estática. Pero también es posible con un método de clase:

    from multiprocessing import Pool
    class Test:
    def __init__(self):
    self.main()
    @classmethod
    def methodForMultiprocessing(cls, x):
    print(x*x)
    def main(self):
    if __name__ == "__main__":
    p = Pool()
    p.map(Test.methodForMultiprocessing, list(range(1, 11)))
    p.close()
    TestObject = Test()

    Probado en Python 3.7.3

  12. 0

    No estoy seguro de si este enfoque ha sido tomada, pero un trabajo que estoy usando es:

    from multiprocessing import Pool
    t = None
    def run(n):
    return t.f(n)
    class Test(object):
    def __init__(self, number):
    self.number = number
    def f(self, x):
    print x * self.number
    def pool(self):
    pool = Pool(2)
    pool.map(run, range(10))
    if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

    La salida debe ser:

    0
    9
    18
    27
    36
    45
    54
    63
    72
    81
    0
    9
    18
    27
    36
    45
    54
    63
    72
    81
  13. 0
    class Calculate(object):
    # Your instance method to be executed
    def f(self, x, y):
    return x*y
    if __name__ == '__main__':
    inp_list = [1,2,3]
    y = 2
    cal_obj = Calculate()
    pool = Pool(2)
    results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

    Hay una posibilidad de que usted quiere aplicar esta función para cada instancia de la clase. Entonces aquí está la solución para que también

    class Calculate(object):
    # Your instance method to be executed
    def __init__(self, x):
    self.x = x
    def f(self, y):
    return self.x*y
    if __name__ == '__main__':
    inp_list = [Calculate(i) for i in range(3)]
    y = 2
    pool = Pool(2)
    results = pool.map(lambda x: x.f(y), inp_list)
  14. 0

    Aquí está mi solución, que creo que es un poco menos hackish que la mayoría de los otros aquí. Es similar a nightowl la respuesta.

    someclasses = [MyClass(), MyClass(), MyClass()]
    def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()
    othermethod = partial(method_caller, some_method='othermethod')
    with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)
  15. 0

    De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 y http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

    Podemos hacer una función externa y de la semilla con la clase de auto objeto:

    from joblib import Parallel, delayed
    def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)
    class square_class:
    def square_int(self, i):
    return i * i
    def run(self, num):
    results = []
    results = Parallel(n_jobs= -1, backend="threading")\
    (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
    print(results)

    O sin joblib:

    from multiprocessing import Pool
    import time
    def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)
    class C:
    def f(self, name):
    print 'hello %s,'%name
    time.sleep(5)
    print 'nice to meet you.'
    def run(self):
    pool = Pool(processes=2)
    names = ('frank', 'justin', 'osi', 'thomas')
    pool.map(unwrap_self_f, zip([self]*len(names), names))
    if __name__ == '__main__':
    c = C()
    c.run()
  16. 0

    Usted puede ejecutar su código sin ningún problema si de alguna manera manualmente ignorar la Pool objeto de la lista de objetos de la clase, porque no es picklepoder como el error dice. Usted puede hacer esto con la __getstate__ función (mirar aquí también) como sigue. El Pool objeto de tratar de encontrar la __getstate__ y __setstate__ funciones y ejecutarlas si se encuentra cuando se ejecuta map, map_async etc:

    class calculate(object):
    def __init__(self):
    self.p = Pool()
    def __getstate__(self):
    self_dict = self.__dict__.copy()
    del self_dict['p']
    return self_dict
    def __setstate__(self, state):
    self.__dict__.update(state)
    def f(self, x):
    return x*x
    def run(self):
    return self.p.map(self.f, [1,2,3])

    A continuación, hacer:

    cl = calculate()
    cl.run()

    le dará la salida:

    [1, 4, 9]

    He probado el código en Python 3.x y funciona.

Dejar respuesta

Please enter your comment!
Please enter your name here