¿Cómo usar `async for` en Python?

7 minutos de lectura

Quiero decir, ¿qué obtengo al usar async for. Aquí está el código con el que escribo async for, AIter(10) podría ser reemplazado con get_range().

Pero el código se ejecuta como sincronizado, no asíncrono.

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if __name__ == "__main__":
    asyncio.run(main())

El resultado que exceptué debería ser:

start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...

Sin embargo, el resultado real es:

start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2

Sé que podría obtener el resultado exceptuado usando asyncio.gather o asyncio.wait.

Pero es difícil para mí entender lo que obtuve al usar async for aquí en lugar de simple for.

¿Cuál es la forma correcta de usar async for si quiero recorrer varios Feature objeto y utilícelos tan pronto como haya terminado. Por ejemplo:

async for f in feature_objects:
    data = await f
    with open("file", "w") as fi:
        fi.write()

  • @user4815162342, sí, muchas gracias. Pero todavía estoy buscando algún ejemplo de async source. ¿Puede agregar un ejemplo de uso de async for ¿sintaxis?

    – Neutrón pálido

    3 de junio de 2019 a las 7:50

  • Cualquier generador asíncrono puede servir como fuente asíncrona. Para un ejemplo más concreto, vea, por ejemplo, esta respuesta expone una secuencia de invocaciones de devolución de llamada como un iterador asíncrono que es iterable usando async for.

    – usuario4815162342

    3 de junio de 2019 a las 8:52

  • por cierto, puede probar aiofiles para manejar archivos de manera asíncrona

    – Tsonglew

    12 de diciembre de 2019 a las 9:18

  • una pregunta sobre el bucle for de bloqueo. Podría tener un bucle for regular for in range(10): y esperar dentro de él, por ejemplo await asyncio.sleep(i), que devolvería el control a la persona que llama y permitiría la concurrencia. ¿Derecho? Tenga en cuenta que, por supuesto, mi sueño es una tontería, ya que solo pretende simular una operación costosa (también llamada operación vinculada a io).

    –Charlie Parker

    22 de junio de 2022 a las 19:15

  • es un buen ejemplo del uso de async for es eso async for NO bloquea ya que obtiene los siguientes elementos con un implícito await it.anext_step() ¿o algo?

    –Charlie Parker

    22 de junio de 2022 a las 19:20

avatar de usuario de user4815162342
usuario4815162342

Pero es difícil para mí entender lo que obtuve al usar async for aquí en lugar de simple for.

El malentendido subyacente es esperar async for automáticamente paralelizar la iteración No hace eso, simplemente permite la iteración secuencial. sobre una fuente asíncrona. Por ejemplo, puedes usar async for para iterar sobre líneas provenientes de un flujo TCP, mensajes de un websocket o registros de base de datos de un controlador de base de datos asíncrono.

Nada de lo anterior funcionaría con un ordinario for, al menos no sin bloquear el bucle de eventos. Esto es porque for llamadas __next__ como una función de bloqueo y no espera su resultado. No puedes manualmente await elementos obtenidos por for porque for espera __next__ para señalar el final de la iteración elevando StopIteration. Si __next__ es una rutina, la StopIteration la excepción no será visible antes de esperarla. Esta es la razón por async for fue introducido, no sólo en Python, sino también en otro idiomas con async/await y generalizado for.

Si desea ejecutar las iteraciones de bucle en paralelo, debe iniciarlas como corrutinas paralelas y usar asyncio.as_completed o equivalente para recuperar sus resultados tal como vienen:

async def x(i):
    print(f"start {i}")
    await asyncio.sleep(1)
    print(f"end {i}")
    return i

# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
    result = await f
    # ... do something with the result ...

Si no le importa reaccionar a los resultados inmediatamente cuando llegan, pero los necesita todos, puede hacerlo aún más simple usando asyncio.gather:

# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])

  • Verificando mi comprensión, ambos fragmentos de código (for f in asyncio.as_completed... y results = await ... tendría que ejecutarse dentro de una función/método asíncrono, dentro de una cadena de llamadas iniciada por asyncio.run(...)¿derecho?

    – hBy2Py

    10 de marzo de 2021 a las 20:17


  • @ hBy2Py Correcto. La pregunta (y por lo tanto la respuesta también) simplemente omite esa parte por brevedad.

    – usuario4815162342

    10/03/2021 a las 21:36

  • Me gusta el explicador, pero me falta un ejemplo para el async for círculo

    – Roelante

    21 de marzo de 2021 a las 18:28

  • @Roelant Tienes razón en que un ejemplo sería útil. Esta respuesta trató de abordar los puntos específicos planteados en la pregunta, que tenía sentido en ese momento, pero reduce su valor como recurso general. Agregar un ejemplo de la vida real en este punto haría que la respuesta fuera un poco más larga de lo que es ahora. Con suerte, hay otras preguntas de SO que aclaren el problema y, si no, tal vez sea hora de una nueva pregunta.

    – usuario4815162342

    21 de marzo de 2021 a las 20:27

  • una pregunta sobre el bucle for de bloqueo. Podría tener un bucle for regular for in range(10): y esperar dentro de él, por ejemplo await asyncio.sleep(i), que devolvería el control a la persona que llama y permitiría la concurrencia. ¿Derecho? Tenga en cuenta que, por supuesto, mi sueño es una tontería, ya que solo pretende simular una operación costosa (también llamada operación vinculada a io).

    –Charlie Parker

    22 de junio de 2022 a las 19:15

(Añadiendo a la respuesta aceptada – por la generosidad de Charlie).

Suponiendo que desea consumir cada valor producido al mismo tiempo, una forma sencilla sería:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...

Explicación:

Considere el siguiente código, sin create_task:

async def process_all():
    async for obj in my_async_generator:
        await process_obj(obj))

Esto es más o menos equivalente a:

async def process_all():
    obj1 = await my_async_generator.__anext__():
    await process_obj(obj1))

    obj2 = await my_async_generator.__anext__():
    await process_obj(obj1))
    
    ...

Básicamente, el ciclo no puede continuar porque su cuerpo está bloqueando. El camino a seguir es delegar el procesamiento de cada iteración a una nueva tarea asyncio que comenzará sin bloquear el ciclo. Él, gather espere a que se procesen todas las tareas, es decir, a que se procese cada iteración.

  • realmente me encanta tu ejemplo! Sin embargo, desearíamos poder ejecutarlo. Un comentario rápido, creo que es útil mencionar que el .create_task(coroutine(args)) La función en realidad envía una corrutina para que se ejecute simultáneamente y no se bloquea.

    –Charlie Parker

    27 de junio de 2022 a las 14:05


  • Entonces, la principal diferencia entre async for y for es la diferencia entre __next__() y __anext__()? ¿Puedes ampliar la respuesta un poco más? (con un verdadero my_async_generator podría ser mucho mejor).

    – Neutrón pálido

    28 de junio de 2022 a las 0:55

Código basado en la fantástica respuesta de @matan129, solo falta el generador asíncrono para que sea ejecutable, una vez que tenga eso (o si alguien quiere contribuir con él) terminaré esto:


import time

import asyncio


async def process_all():
    """
    Example where the async for loop allows to loop through concurrently many things without blocking on each individual
    iteration but blocks (waits) for all tasks to run.
    ref:
    - https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067
    """
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))  # concurrently dispatches a coroutine to be executed.
        tasks.append(task)

    await asyncio.gather(*tasks)


async def process_obj(obj):
    await asyncio.sleep(5)  # expensive IO


if __name__ == '__main__':
    # - test asyncio
    s = time.perf_counter()
    asyncio.run(process_all())
    # - print stats
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")
    print('Success, done!\a')

¿Ha sido útil esta solución?