Luis Thibault
En el código de ejemplo a continuación, me gustaría obtener el valor de retorno de la función worker
. ¿Cómo puedo hacer esto? ¿Dónde se almacena este valor?
Código de ejemplo:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Producción:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Parece que no puedo encontrar el atributo relevante en los objetos almacenados en jobs
.
vartec
Usar variable compartida comunicar. Por ejemplo como este:
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
-
Recomendaría usar un
multiprocessing.Queue
preferible aManager
aquí. Usando unManager
requiere generar un proceso completamente nuevo, lo cual es excesivo cuando unQueue
haría.– Dano
19 de abril de 2015 a las 0:54
-
@dano: Me pregunto, si usamos el objeto Queue(), no podemos asegurar el orden cuando cada proceso devuelve el valor. Me refiero a que si necesitamos el orden en el resultado, para hacer el siguiente trabajo. ¿Cómo podríamos estar seguros de dónde es exactamente qué salida es de qué proceso?
– Construcciones de gato
29 de septiembre de 2016 a las 11:08
-
@Catbuilts Puede devolver una tupla de cada proceso, donde un valor es el valor de retorno real que le interesa y el otro es un identificador único del proceso. Pero también me pregunto por qué necesita saber qué proceso devuelve qué valor. Si eso es lo que realmente necesita saber sobre el proceso, ¿o necesita correlacionar su lista de entradas y la lista de salidas? En ese caso, recomendaría usar
multiprocessing.Pool.map
para procesar su lista de elementos de trabajo.– Dano
1 de diciembre de 2016 a las 14:43
-
advertencias para funciones con un solo argumento : debería usar
args=(my_function_argument, )
. Nota la,
coma aquí! O de lo contrario, Python se quejará de “argumentos posicionales faltantes”. Me tomó 10 minutos averiguarlo. Compruebe también el uso manual (en la sección “clase de proceso”).– yuqli
29 de abril de 2019 a las 15:17
-
@vartec un inconveniente de usar un diccionario multipriocessing.Manager() es que es pickles (serializa) el objeto que devuelve, por lo que tiene un cuello de botella dado por la biblioteca pickle de un tamaño máximo de 2GiB para que el objeto regrese. ¿Hay alguna otra forma de hacer esto evitando la serialización del objeto que regresa?
– Hirschme
13 de noviembre de 2019 a las 21:46
Marca
Creo que el enfoque sugerido por @sega_sai es mejor. Pero realmente necesita un ejemplo de código, así que aquí va:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
Que imprimirá los valores de retorno:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Si está familiarizado con map
(el Python 2 incorporado) esto no debería ser demasiado desafiante. De lo contrario, echa un vistazo a enlace de sega_Sai.
Tenga en cuenta lo poco que se necesita código. (También tenga en cuenta cómo se reutilizan los procesos).
-
Cualquier idea de por qué mi
getpid()
devolver todo el mismo valor? Estoy ejecutando Python3– zelusp
29/10/2016 a las 17:39
-
No estoy seguro de cómo Pool distribuye las tareas entre los trabajadores. ¿Tal vez todos pueden terminar en el mismo trabajador si son realmente rápidos? ¿Ocurre consistentemente? También si agrega un retraso?
– Marca
31/10/2016 a las 15:30
-
También pensé que era algo relacionado con la velocidad, pero cuando me alimento
pool.map
un rango de 1,000,000 usando más de 10 procesos Veo como máximo dos pid diferentes.– zelusp
31/10/2016 a las 19:00
-
Entonces no estoy seguro. Creo que sería interesante abrir una pregunta separada para esto.
– Marca
1 de noviembre de 2016 a las 11:27
-
Si las cosas que desea enviar una función diferente a cada proceso, use
pool.apply_async
: docs.python.org/3/library/…– Kyle
5 de junio de 2019 a las 20:28
Mateo Moisén
Para cualquier otra persona que esté buscando cómo obtener un valor de un Process
usando Queue
:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
Tenga en cuenta que en Windows o Jupyter Notebook, con multithreading
tienes que guardar esto como un archivo y ejecutar el archivo. Si lo hace en un símbolo del sistema, verá un error como este:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
-
cuando pongo algo en una cola en mi proceso de trabajo, nunca se alcanza mi unión. ¿Alguna idea de cómo podría venir esto?
– Laurens Koppenol
06/10/2016 a las 12:30
-
@LaurensKoppenol, ¿quiere decir que su código principal cuelga en p.join() permanentemente y nunca continúa? ¿Tu proceso tiene un bucle infinito?
– Mateo Moisés
06/10/2016 a las 17:44
-
Sí, cuelga allí infinitamente. Todos mis trabajadores terminan (el ciclo dentro de la función del trabajador finaliza, luego se imprime la declaración de impresión, para todos los trabajadores). La unión no hace nada. si quito el
Queue
desde mi funcion si me deja pasar eljoin()
– Laurens Koppenol
10 de octubre de 2016 a las 8:11
-
@LaurensKoppenol ¿Quizás no estás llamando?
queue.put(ret)
antes de llamarp.start()
? En ese caso, el subproceso de trabajo colgará enqueue.get()
Siempre. Puede replicar esto copiando mi fragmento de arriba mientras comentaqueue.put(ret)
.– Mateo Moisés
16 de agosto de 2017 a las 2:47
-
@Bendemann Alguien editó la respuesta y la hizo incorrecta al colocar el
queue.get
antes de queue.join. Lo he arreglado ahora colocandoqueue.get
despuésp.join
. Inténtalo de nuevo.– Mateo Moisés
28 de julio de 2020 a las 16:58
sudo
Por alguna razón, no pude encontrar un ejemplo general de cómo hacer esto con Queue
en cualquier lugar (incluso los ejemplos de documentos de Python no generan múltiples procesos), así que esto es lo que obtuve después de 10 intentos:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
es una cola de bloqueo segura para subprocesos que puede usar para almacenar los valores de retorno de los procesos secundarios. Entonces hay que pasar la cola a cada proceso. Algo menos obvio aquí es que tienes que get()
de la cola antes que tú join
la Process
es o de lo contrario la cola se llena y bloquea todo.
Actualizar para aquellos que están orientados a objetos (probado en Python 3.4):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
Este ejemplo muestra cómo utilizar una lista de multiprocesamiento.Tubo instancias para devolver cadenas de un número arbitrario de procesos:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
Producción:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
Esta solución utiliza menos recursos que una multiprocesamiento.Cola que utiliza
- un tubo
- al menos un bloqueo
- un amortiguador
- un hilo
o un multiprocesamiento.SimpleQueue que utiliza
- un tubo
- al menos un bloqueo
Es muy instructivo mirar la fuente de cada uno de estos tipos.
-
¿Cuál sería la mejor manera de hacerlo sin convertir las tuberías en una variable global?
– Nickpick
25/10/2016 a las 13:15
-
Puse todos los datos y códigos globales en una función principal y funciona igual. Eso responde tu pregunta?
– usuario3657941
25/10/2016 a las 13:43
-
¿Siempre se debe leer la tubería antes de que se le pueda agregar (enviar) cualquier valor nuevo?
– Nickpick
25/10/2016 a las 14:56
-
Esta respuesta provoca un interbloqueo si el objeto que regresa es grande. En lugar de hacer proc.join() primero, primero intentaría recv() el valor de retorno y luego haría la unión.
– L. Pes
12 de febrero de 2020 a las 20:13
-
Estoy con @L.Pes en esto. Podría ser específico del sistema operativo, pero adapté este ejemplo a mi caso de uso y los trabajadores que intentaron enviar_end. enviar (resultado) para obtener un resultado grande se colgarían indefinidamente. Unirse después de recibir lo arregló. Feliz de proporcionar un ejemplo si N = 2 es demasiado anecdótico para usted.
– Vlad
22 de abril de 2020 a las 2:10
Divyanshu Srivastava
Parece que deberías usar el multiprocesamiento.Pool class en su lugar y use los métodos .apply() .apply_async(), map()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
-
¿Cuál sería la mejor manera de hacerlo sin convertir las tuberías en una variable global?
– Nickpick
25/10/2016 a las 13:15
-
Puse todos los datos y códigos globales en una función principal y funciona igual. Eso responde tu pregunta?
– usuario3657941
25/10/2016 a las 13:43
-
¿Siempre se debe leer la tubería antes de que se le pueda agregar (enviar) cualquier valor nuevo?
– Nickpick
25/10/2016 a las 14:56
-
Esta respuesta provoca un interbloqueo si el objeto que regresa es grande. En lugar de hacer proc.join() primero, primero intentaría recv() el valor de retorno y luego haría la unión.
– L. Pes
12 de febrero de 2020 a las 20:13
-
Estoy con @L.Pes en esto. Podría ser específico del sistema operativo, pero adapté este ejemplo a mi caso de uso y los trabajadores que intentaron enviar_end. enviar (resultado) para obtener un resultado grande se colgarían indefinidamente. Unirse después de recibir lo arregló. Feliz de proporcionar un ejemplo si N = 2 es demasiado anecdótico para usted.
– Vlad
22 de abril de 2020 a las 2:10
Puedes usar el exit
incorporado para establecer el código de salida de un proceso. Se puede obtener de la exitcode
atributo del proceso:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Producción:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
-
Tenga en cuenta que este enfoque podría volverse confuso. Los procesos generalmente deben salir con el código de salida 0 si se completaron sin errores. Si tiene algo que controle los códigos de salida del proceso del sistema, es posible que los vea como errores.
– rueda ferrosa
23 mayo 2017 a las 21:50
-
Perfecto si solo desea generar una excepción en el proceso principal en caso de error.
– crizCraig
19 de julio de 2018 a las 17:45