¿Cómo ejecutar repetidamente una función cada x segundos?

12 minutos de lectura

avatar de usuario de davidmytton
davidmytton

Quiero ejecutar repetidamente una función en Python cada 60 segundos para siempre (al igual que un NSTemporizador en Objective C o setTimeout en JS). Este código se ejecutará como un demonio y es efectivamente como llamar a la secuencia de comandos de python cada minuto usando un cron, pero sin necesidad de que el usuario lo configure.

En esta pregunta sobre un cron implementado en Python, la solución parece efectivamente solo dormir() durante x segundos. No necesito una funcionalidad tan avanzada, así que tal vez algo como esto funcione

while True:
    # Code executed here
    time.sleep(60)

¿Hay algún problema previsible con este código?

  • Un punto pedante, pero puede ser crítico, su código anterior no se ejecuta cada 60 segundos, deja un espacio de 60 segundos entre ejecuciones. Solo sucede cada 60 segundos si su código ejecutado no toma tiempo en absoluto.

    – Simón

    23 de enero de 2009 a las 21:12

  • además time.sleep(60) puede volver tanto antes como después

    – jfs

    19 de marzo de 2014 a las 7:25

  • Todavía me pregunto: ¿Hay algún problema previsible con este código?

    – dwitvliet

    27 de enero de 2015 a las 18:39

  • El “problema previsible” es que no puede esperar 60 iteraciones por hora simplemente usando time.sleep(60). Entonces, si agrega un elemento por iteración y mantiene una lista de longitud establecida … el promedio de esa lista no representará un “período” de tiempo constante; por lo tanto, funciones como “promedio móvil” pueden estar haciendo referencia a puntos de datos que son demasiado antiguos, lo que distorsionará su indicación.

    – litepresencia

    21 de febrero de 2017 a las 14:28


  • @Banana Sí, puede esperar cualquier problema causado porque su secuencia de comandos no se ejecuta EXACTAMENTE cada 60 segundos. Por ejemplo. Empecé a hacer algo como esto para dividir transmisiones de video y cargarlas, y terminé obteniendo transmisiones de 5 a 10 ~ segundos más porque la cola de medios se almacena en búfer mientras proceso los datos dentro del bucle. Depende de tus datos. Si la función es una especie de perro guardián simple que le advierte, por ejemplo, cuando su disco está lleno, no debería tener ningún problema con esto. Si está revisando las alertas de advertencia de una planta de energía nuclear, puede terminar con una ciudad. completamente explotado x

    – DGoiko

    31 de octubre de 2018 a las 11:46


avatar de usuario de nosklo
nosklo

Si su programa aún no tiene un bucle de eventos, use el programado módulo, que implementa un programador de eventos de propósito general.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    sc.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Si ya está utilizando una biblioteca de bucles de eventos como asyncio, trio, tkinter, PyQt5, gobject, kivyy muchos otros: simplemente programe la tarea utilizando los métodos de su biblioteca de bucle de eventos existente.

  • El módulo programado es para programar funciones que se ejecutan después de un tiempo, ¿cómo se usa para repetir una llamada de función cada x segundos sin usar time.sleep()?

    – Baishampayan Ghose

    23 de enero de 2009 a las 21:13

  • Luego apscheduler en paquetes.python.org/APScheduler también debería recibir una mención en este punto.

    – Daniel F.

    27 de enero de 2013 a las 20:06

  • nota: esta versión puede variar. podrías usar enterabs() para evitarlo Aquí hay una versión sin deriva para comparar.

    – jfs

    28/10/2014 a las 16:45

  • @JavaSa: porque “haz tus cosas” no es instantáneo y los errores de time.sleep puede acumularse aquí. “ejecutar cada X segundos” y “ejecutar con un retraso de ~X segundos repetidamente” no son lo mismo. Véase también este comentario

    – jfs

    25 de enero de 2017 a las 15:42


  • podrías moverte s.enter(...) al inicio de la función para reducir la deriva. Además, ¿cuál es el punto de sc?

    – Salomón Ucko

    18 de diciembre de 2019 a las 1:19

Avatar de usuario de Dave Rove
dave rove

Bloquee su ciclo de tiempo al reloj del sistema de esta manera:

import time
starttime = time.time()
while True:
    print("tick")
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))

  • +1. el tuyo y el twisted respuesta son las únicas respuestas que ejecutan una función cada x segundos. El resto ejecuta la función con un retraso de x segundos después de cada llamada.

    – jfs

    18 de septiembre de 2014 a las 7:09

  • Si agregara algún código a esto que tomó más de un segundo… Se perdería el tiempo y comenzaría a retrasarse… La respuesta aceptada en este caso es correcta… Cualquiera puede repetir un comando de impresión simple y hacer que se ejecute cada segundo sin demora…

    – Enojado 84

    1 de enero de 2016 a las 22:48

  • yo prefiero from time import time, sleep por las implicaciones existenciales 😉

    – Voluntad

    16 de febrero de 2016 a las 4:40

  • Funciona fantásticamente. No hay necesidad de restar su starttime si comienzas sincronizándolo a una hora determinada: time.sleep(60 - time.time() % 60) ha estado funcionando bien para mí. lo he usado como time.sleep(1200 - time.time() % 1200) y me da logs en el :00 :20 :40exactamente como yo quería.

    – TemporalWolf

    14/07/2016 a las 19:22


  • @AntonSchigur para evitar la deriva después de múltiples iteraciones. Una iteración individual puede comenzar un poco tarde o temprano dependiendo de sleep(), timer() la precisión y el tiempo que lleva ejecutar el cuerpo del ciclo, pero en promedio, las iteraciones siempre ocurren en los límites del intervalo (incluso si se omiten algunas): while keep_doing_it(): sleep(interval - timer() % interval). compararlo con sólo while keep_doing_it(): sleep(interval) donde los errores pueden acumularse después de varias iteraciones.

    – jfs

    01/08/2016 a las 20:51

Si desea una forma sin bloqueo de ejecutar su función periódicamente, en lugar de un bucle infinito de bloqueo, usaría un temporizador de subprocesos. De esta manera, su código puede seguir ejecutándose y realizar otras tareas y aún así llamar a su función cada n segundos. Utilizo mucho esta técnica para imprimir información de progreso en tareas largas e intensivas de CPU/disco/red.

Aquí está el código que publiqué en una pregunta similar, con el control start() y stop():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Uso:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Características:

  • Solo biblioteca estándar, sin dependencias externas
  • start() y stop() es seguro llamar varias veces incluso si el temporizador ya se ha iniciado/detenido
  • la función a llamar puede tener argumentos posicionales y con nombre
  • Tu puedes cambiar interval en cualquier momento, será efectivo después de la próxima ejecución. Igual por args, kwargs e incluso function!

  • Esta solución parece ir a la deriva con el tiempo; Necesitaba una versión que tenga como objetivo llamar a la función cada n segundos sin deriva. Publicaré una actualización en una pregunta separada.

    – eraoul

    5 de diciembre de 2016 a las 0:21

  • En def _run(self) Estoy tratando de entender por qué llamas self.start() antes de self.function(). ¿Puedes elaborar? Yo pensaría llamando start() primero self.is_running siempre sería False entonces siempre haríamos girar un nuevo hilo.

    – Episcopo Rico

    21 de diciembre de 2016 a las 20:08

  • Creo que llegué al fondo. La solución de @MestreLion ejecuta una función cada x segundos (es decir, t=0, t=1x, t=2x, t=3x, …) donde en el código de ejemplo de los carteles originales se ejecuta una función con X segundo intervalo en el medio. Además, esta solución creo que tiene un error si interval es más corto que el tiempo que toma function ejecutar. En ese caso, self._timer se sobrescribirá en el start función.

    – Episcopo Rico

    21 de diciembre de 2016 a las 23:09

  • Sí, @RichieEpiscopo, la llamada a .function() después .start() es ejecutar la función en t=0. Y no creo que sea un problema si function lleva más tiempo que intervalpero sí, puede haber algunas condiciones de carrera en el código.

    – Mestre León

    6 de febrero de 2017 a las 15:12

  • @eraoul: sí, esta solución se desvía, aunque se necesitan unos cientos o incluso un par de miles de ejecuciones antes de que se desvíe un solo segundo, dependiendo de su sistema. Si tal deriva es relevante para usted, le sugiero encarecidamente que utilice un sistema programador como cron

    – Mestre León

    24 de abril de 2019 a las 16:06

Avatar de usuario de Aaron Maenpaa
Aarón Maenpaa

Es posible que desee considerar Retorcido que es una biblioteca de red de Python que implementa el Patrón de reactores.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Si bien “while True: sleep(60)” probablemente funcionará, es probable que Twisted ya implemente muchas de las características que eventualmente necesitará (daemonización, registro o manejo de excepciones como lo señala bobince) y probablemente sea una solución más robusta.

avatar de usuario de eraoul
Eraoul

Aquí hay una actualización del código de MestreLion que evita la deriva con el tiempo.

La clase RepeatedTimer aquí llama a la función dada cada “intervalo” segundos según lo solicitado por el OP; la programación no depende de cuánto tiempo se tarde en ejecutar la función. Me gusta esta solución porque no tiene dependencias de bibliotecas externas; esto es solo Python puro.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Ejemplo de uso (copiado de la respuesta de MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

  • Acepto que esto es lo mejor: no hay paquetes de terceros y he probado que no se desvía con el tiempo.

    – Elendurwen

    13 de julio de 2021 a las 18:46

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Si desea hacer esto sin bloquear el código restante, puede usarlo para permitir que se ejecute en su propio hilo:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Esta solución combina varias características que rara vez se encuentran combinadas en las otras soluciones:

  • Manejo de excepciones: En la medida de lo posible, en este nivel, las excepciones se manejan correctamente, es decir, se registran con fines de depuración sin abortar nuestro programa.
  • Sin encadenamiento: La implementación similar a una cadena común (para programar el próximo evento) que encuentra en muchas respuestas es frágil en el aspecto de que si algo sale mal dentro del mecanismo de programación (threading.Timer o lo que sea), esto terminará la cadena. No habrá más ejecuciones entonces, incluso si la razón del problema ya está solucionada. Un bucle simple y esperando con un simple sleep() es mucho más robusto en comparación.
  • Sin deriva: Mi solución mantiene un registro exacto de los tiempos en los que se supone que debe ejecutarse. No hay deriva en función del tiempo de ejecución (como en muchas otras soluciones).
  • Salto a la comba: Mi solución omitirá tareas si una ejecución tomó demasiado tiempo (por ejemplo, haga X cada cinco segundos, pero X tomó 6 segundos). Este es el comportamiento estándar de cron (y por una buena razón). Muchas otras soluciones simplemente ejecutan la tarea varias veces seguidas sin demora. En la mayoría de los casos (p. ej., tareas de limpieza), esto no se desea. Si se es deseado, simplemente use next_time += delay en cambio.

  • Acepto que esto es lo mejor: no hay paquetes de terceros y he probado que no se desvía con el tiempo.

    – Elendurwen

    13 de julio de 2021 a las 18:46

Avatar de usuario de Adrian P
Adrián P.

La forma más fácil que creo que es:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

De esta manera se ejecuta su código, luego espera 60 segundos y luego se ejecuta de nuevo, espera, ejecuta, etc. No hay necesidad de complicar las cosas 😀

  • En realidad, esta no es la respuesta: time sleep() solo se puede usar para esperar X segundos después de cada ejecución. Por ejemplo, si su función tarda 0,5 segundos en ejecutarse y usa time.sleep(1), significa que su función se ejecuta cada 1,5 segundos, no 1. Debe usar otros módulos y/o subprocesos para asegurarse de que algo funcione Y veces en cada X segundo.

    – kommradHomer

    18 de septiembre de 2013 a las 10:09

  • @kommradHomer: La respuesta de Dave Rove demuestra que usted pueden usar time.sleep() ejecutar algo cada X segundos

    – jfs

    21 de septiembre de 2014 a las 4:56

  • En mi opinión, el código debería llamar time.sleep() en while True bucle como: def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)

    – Leonard Lepadatu

    17 de noviembre de 2017 a las 13:09

¿Ha sido útil esta solución?