¿Cómo obtener el valor de retorno de una función pasada a multiprocessing.Process?

10 minutos de lectura

Avatar de usuario de Louis Thibault
Luis Thibault

En el código de ejemplo a continuación, me gustaría obtener el valor de retorno de la función worker. ¿Cómo puedo hacer esto? ¿Dónde se almacena este valor?

Código de ejemplo:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Producción:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Parece que no puedo encontrar el atributo relevante en los objetos almacenados en jobs.

avatar de usuario de vartec
vartec

Usar variable compartida comunicar. Por ejemplo como este:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

  • Recomendaría usar un multiprocessing.Queuepreferible a Manager aquí. Usando un Manager requiere generar un proceso completamente nuevo, lo cual es excesivo cuando un Queue haría.

    – Dano

    19 de abril de 2015 a las 0:54

  • @dano: Me pregunto, si usamos el objeto Queue(), no podemos asegurar el orden cuando cada proceso devuelve el valor. Me refiero a que si necesitamos el orden en el resultado, para hacer el siguiente trabajo. ¿Cómo podríamos estar seguros de dónde es exactamente qué salida es de qué proceso?

    – Construcciones de gato

    29 de septiembre de 2016 a las 11:08


  • @Catbuilts Puede devolver una tupla de cada proceso, donde un valor es el valor de retorno real que le interesa y el otro es un identificador único del proceso. Pero también me pregunto por qué necesita saber qué proceso devuelve qué valor. Si eso es lo que realmente necesita saber sobre el proceso, ¿o necesita correlacionar su lista de entradas y la lista de salidas? En ese caso, recomendaría usar multiprocessing.Pool.map para procesar su lista de elementos de trabajo.

    – Dano

    1 de diciembre de 2016 a las 14:43

  • advertencias para funciones con un solo argumento : debería usar args=(my_function_argument, ). Nota la , coma aquí! O de lo contrario, Python se quejará de “argumentos posicionales faltantes”. Me tomó 10 minutos averiguarlo. Compruebe también el uso manual (en la sección “clase de proceso”).

    – yuqli

    29 de abril de 2019 a las 15:17


  • @vartec un inconveniente de usar un diccionario multipriocessing.Manager() es que es pickles (serializa) el objeto que devuelve, por lo que tiene un cuello de botella dado por la biblioteca pickle de un tamaño máximo de 2GiB para que el objeto regrese. ¿Hay alguna otra forma de hacer esto evitando la serialización del objeto que regresa?

    – Hirschme

    13 de noviembre de 2019 a las 21:46

Avatar de usuario de Mark
Marca

Creo que el enfoque sugerido por @sega_sai es mejor. Pero realmente necesita un ejemplo de código, así que aquí va:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Que imprimirá los valores de retorno:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Si está familiarizado con map (el Python 2 incorporado) esto no debería ser demasiado desafiante. De lo contrario, echa un vistazo a enlace de sega_Sai.

Tenga en cuenta lo poco que se necesita código. (También tenga en cuenta cómo se reutilizan los procesos).

  • Cualquier idea de por qué mi getpid() devolver todo el mismo valor? Estoy ejecutando Python3

    – zelusp

    29/10/2016 a las 17:39

  • No estoy seguro de cómo Pool distribuye las tareas entre los trabajadores. ¿Tal vez todos pueden terminar en el mismo trabajador si son realmente rápidos? ¿Ocurre consistentemente? También si agrega un retraso?

    – Marca

    31/10/2016 a las 15:30

  • También pensé que era algo relacionado con la velocidad, pero cuando me alimento pool.map un rango de 1,000,000 usando más de 10 procesos Veo como máximo dos pid diferentes.

    – zelusp

    31/10/2016 a las 19:00

  • Entonces no estoy seguro. Creo que sería interesante abrir una pregunta separada para esto.

    – Marca

    1 de noviembre de 2016 a las 11:27

  • Si las cosas que desea enviar una función diferente a cada proceso, use pool.apply_async: docs.python.org/3/library/…

    – Kyle

    5 de junio de 2019 a las 20:28

Avatar de usuario de Matthew Moisen
Mateo Moisén

Para cualquier otra persona que esté buscando cómo obtener un valor de un Process usando Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Tenga en cuenta que en Windows o Jupyter Notebook, con multithreading tienes que guardar esto como un archivo y ejecutar el archivo. Si lo hace en un símbolo del sistema, verá un error como este:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

  • cuando pongo algo en una cola en mi proceso de trabajo, nunca se alcanza mi unión. ¿Alguna idea de cómo podría venir esto?

    – Laurens Koppenol

    06/10/2016 a las 12:30

  • @LaurensKoppenol, ¿quiere decir que su código principal cuelga en p.join() permanentemente y nunca continúa? ¿Tu proceso tiene un bucle infinito?

    – Mateo Moisés

    06/10/2016 a las 17:44

  • Sí, cuelga allí infinitamente. Todos mis trabajadores terminan (el ciclo dentro de la función del trabajador finaliza, luego se imprime la declaración de impresión, para todos los trabajadores). La unión no hace nada. si quito el Queue desde mi funcion si me deja pasar el join()

    – Laurens Koppenol

    10 de octubre de 2016 a las 8:11

  • @LaurensKoppenol ¿Quizás no estás llamando? queue.put(ret) antes de llamar p.start() ? En ese caso, el subproceso de trabajo colgará en queue.get() Siempre. Puede replicar esto copiando mi fragmento de arriba mientras comenta queue.put(ret).

    – Mateo Moisés

    16 de agosto de 2017 a las 2:47

  • @Bendemann Alguien editó la respuesta y la hizo incorrecta al colocar el queue.get antes de queue.join. Lo he arreglado ahora colocando queue.get después p.join. Inténtalo de nuevo.

    – Mateo Moisés

    28 de julio de 2020 a las 16:58

avatar de usuario de sudo
sudo

Por alguna razón, no pude encontrar un ejemplo general de cómo hacer esto con Queue en cualquier lugar (incluso los ejemplos de documentos de Python no generan múltiples procesos), así que esto es lo que obtuve después de 10 intentos:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue es una cola de bloqueo segura para subprocesos que puede usar para almacenar los valores de retorno de los procesos secundarios. Entonces hay que pasar la cola a cada proceso. Algo menos obvio aquí es que tienes que get() de la cola antes que tú join la Processes o de lo contrario la cola se llena y bloquea todo.

Actualizar para aquellos que están orientados a objetos (probado en Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

Este ejemplo muestra cómo utilizar una lista de multiprocesamiento.Tubo instancias para devolver cadenas de un número arbitrario de procesos:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Producción:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Esta solución utiliza menos recursos que una multiprocesamiento.Cola que utiliza

  • un tubo
  • al menos un bloqueo
  • un amortiguador
  • un hilo

o un multiprocesamiento.SimpleQueue que utiliza

  • un tubo
  • al menos un bloqueo

Es muy instructivo mirar la fuente de cada uno de estos tipos.

  • ¿Cuál sería la mejor manera de hacerlo sin convertir las tuberías en una variable global?

    – Nickpick

    25/10/2016 a las 13:15

  • Puse todos los datos y códigos globales en una función principal y funciona igual. Eso responde tu pregunta?

    usuario3657941

    25/10/2016 a las 13:43

  • ¿Siempre se debe leer la tubería antes de que se le pueda agregar (enviar) cualquier valor nuevo?

    – Nickpick

    25/10/2016 a las 14:56

  • Esta respuesta provoca un interbloqueo si el objeto que regresa es grande. En lugar de hacer proc.join() primero, primero intentaría recv() el valor de retorno y luego haría la unión.

    – L. Pes

    12 de febrero de 2020 a las 20:13

  • Estoy con @L.Pes en esto. Podría ser específico del sistema operativo, pero adapté este ejemplo a mi caso de uso y los trabajadores que intentaron enviar_end. enviar (resultado) para obtener un resultado grande se colgarían indefinidamente. Unirse después de recibir lo arregló. Feliz de proporcionar un ejemplo si N = 2 es demasiado anecdótico para usted.

    – Vlad

    22 de abril de 2020 a las 2:10


Avatar de usuario de Divyanshu Srivastava
Divyanshu Srivastava

Parece que deberías usar el multiprocesamiento.Pool class en su lugar y use los métodos .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

  • ¿Cuál sería la mejor manera de hacerlo sin convertir las tuberías en una variable global?

    – Nickpick

    25/10/2016 a las 13:15

  • Puse todos los datos y códigos globales en una función principal y funciona igual. Eso responde tu pregunta?

    usuario3657941

    25/10/2016 a las 13:43

  • ¿Siempre se debe leer la tubería antes de que se le pueda agregar (enviar) cualquier valor nuevo?

    – Nickpick

    25/10/2016 a las 14:56

  • Esta respuesta provoca un interbloqueo si el objeto que regresa es grande. En lugar de hacer proc.join() primero, primero intentaría recv() el valor de retorno y luego haría la unión.

    – L. Pes

    12 de febrero de 2020 a las 20:13

  • Estoy con @L.Pes en esto. Podría ser específico del sistema operativo, pero adapté este ejemplo a mi caso de uso y los trabajadores que intentaron enviar_end. enviar (resultado) para obtener un resultado grande se colgarían indefinidamente. Unirse después de recibir lo arregló. Feliz de proporcionar un ejemplo si N = 2 es demasiado anecdótico para usted.

    – Vlad

    22 de abril de 2020 a las 2:10


Puedes usar el exit incorporado para establecer el código de salida de un proceso. Se puede obtener de la exitcode atributo del proceso:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Producción:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

  • Tenga en cuenta que este enfoque podría volverse confuso. Los procesos generalmente deben salir con el código de salida 0 si se completaron sin errores. Si tiene algo que controle los códigos de salida del proceso del sistema, es posible que los vea como errores.

    – rueda ferrosa

    23 mayo 2017 a las 21:50

  • Perfecto si solo desea generar una excepción en el proceso principal en caso de error.

    – crizCraig

    19 de julio de 2018 a las 17:45


¿Ha sido útil esta solución?