zeromq: ¿cómo evitar la espera infinita?

5 minutos de lectura

avatar de usuario
jesvin jose

Acabo de empezar con ZMQ. Estoy diseñando una aplicación cuyo flujo de trabajo es:

  1. uno de los muchos clientes (que tienen direcciones PULL aleatorias) PUSH una solicitud a un servidor en 5555
  2. el servidor siempre está esperando los PUSHes del cliente. Cuando llega uno, se genera un proceso de trabajo para esa solicitud en particular. Sí, los procesos de trabajo pueden existir simultáneamente.
  3. Cuando ese proceso completa su tarea, ENVÍA el resultado al cliente.

Supongo que la arquitectura PUSH/PULL es adecuada para esto. Por favor corrígeme en este.


Pero, ¿cómo manejo estos escenarios?

  1. client_receiver.recv() esperará un tiempo infinito cuando el servidor no responda.
  2. el cliente puede enviar una solicitud, pero fallará inmediatamente después, por lo tanto, un proceso de trabajo permanecerá atascado en server_sender.send() para siempre.

Entonces, ¿cómo configuro algo como un se acabó el tiempo en el modelo PUSH/PULL?


EDITAR: Gracias a las sugerencias de user938949, obtuve un respuesta de trabajo y lo comparto para la posteridad.

  • No soy un experto en 0mq, pero en muchas situaciones como esta es mejor crear su grupo de trabajadores al inicio en lugar de crear trabajadores en respuesta a los mensajes. Tal vez te estoy malinterpretando.

    – zarzamora

    24 de septiembre de 2011 a las 15:09

  • Buen punto. De hecho, planeo pre-bifurcar a los trabajadores. Me acabo de dar cuenta de que puede ser trivial con 0mq.

    – Jesvin José

    24 de septiembre de 2011 a las 16:14

Si está utilizando zeromq >= 3.0, entonces puede configurar la opción de socket RCVTIMEO:

client_receiver.RCVTIMEO = 1000 # in milliseconds

Pero en general, puedes usar encuestadores:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

Y poller.poll() toma un tiempo de espera:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts será una lista vacía si no hay nada que recibir.

Puedes votar con zmq.POLLOUTpara comprobar si un envío tendrá éxito.

O, para manejar el caso de un compañero que podría haber fallado, a:

worker.send(msg, zmq.NOBLOCK)

podría ser suficiente, que siempre regresará de inmediato, generando un ZMQError (zmq.EAGAIN) si el envío no se pudo completar.

  • ¿podría dar más detalles sobre zmq.NOBLOCK?

    – Jesvin José

    24 de septiembre de 2011 a las 16:53

  • Hola, ¿tenemos que volver a registrar un enchufe (en un sondeador) cada vez que lo desconectamos y lo volvemos a conectar?

    – mariolpantunes

    11 de mayo de 2014 a las 0:20

  • No, solo si cierra el socket y abre uno nuevo, necesita volver a registrarse.

    – visón

    11 mayo 2014 a las 21:37

  • Como @Adobri y @mknaf dijeron a continuación: si usa zmq.RCVTIMEOtambién debe configurar zmq.LINGER o de lo contrario, el socket aún no se cerrará incluso después del tiempo de espera. En Python, es socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0) message = socket.recv()

    – dthor

    2 de noviembre de 2016 a las 16:10

  • Ambas líneas funcionan en python: results_receiver.RCVTIMEO = 1000 y results_receiver.setsockopt(zmq.RCVTIMEO, 1000)

    – silgón

    23 de agosto de 2017 a las 19:11

avatar de usuario
jesvin jose

esto fue un truco rápido Hice después de referir la respuesta del usuario 938949 y http://taotetek.wordpress.com/2011/02/02/python-multiprocesamiento-con-zeromq/ . Si lo hace mejor, por favor publique su respuesta, recomendare tu respuesta.

para los que quieren soluciones duraderas sobre la fiabilidad, consulte http://zguide.zeromq.org/page:all#toc64

Compatible con la versión 3.0 de zeromq (beta ATM) se acabó el tiempo en ZMQ_RCVTIMEO y ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Servidor

El zmq.NOBLOCK asegura que cuando un cliente no existe, el send() no se bloquea.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Cliente

El objeto poller puede escuchar en muchos sockets de recepción (consulte el enlace “Multiprocesamiento de Python con ZeroMQ” arriba. Lo vinculé solo en trabajo_receptor. En el ciclo infinito, el cliente sondea con un intervalo de 1000ms. los medias el objeto devuelve vacío si no se ha recibido ningún mensaje en ese tiempo.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"

  • ¿Python tiene select? La biblioteca de Ruby tiene uno como IO.select. Puedes: if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end

    – mixónico

    6 de marzo de 2012 a las 14:18

El envío no se bloqueará si usa ZMQ_NOBLOCK, pero si intenta cerrar el socket y el contexto, este paso bloquearía la salida del programa.

La razón es que el socket espera a cualquier compañero para garantizar que los mensajes salientes se pongan en cola. Para cerrar el socket inmediatamente y vaciar los mensajes salientes del búfer, use ZMQ_LINGER y configúrelo en 0.

  • zmq.RCVTIMEO no lo ayudará si no usa zmq.LINGER porque después del tiempo de espera, el socket aún no se cerrará. Esto debe agregarse a la respuesta elegida.

    – mknaf

    28 de marzo de 2014 a las 11:21


Si solo está esperando un socket, en lugar de crear un PollerPuedes hacerlo:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Puede usar esto si su tiempo de espera cambia según la situación, en lugar de configurar work_receiver.RCVTIMEO.

¿Ha sido útil esta solución?