Quiero cargar una gran cantidad de entradas (~ 600k) en una tabla simple en una base de datos PostgreSQL, con una clave externa, una marca de tiempo y 3 flotantes por cada entrada. Sin embargo, se necesitan 60 ms por cada entrada para ejecutar la inserción masiva central descrita aquí, por lo que toda la ejecución tardaría 10 h. Descubrí que es un problema de rendimiento de executemany()
método, sin embargo se ha resuelto con el execute_values()
método en psicopg2 2.7.
El código que ejecuto es el siguiente:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
Veo que es un problema común, sin embargo no he logrado encontrar una solución en sqlalchemy en sí. ¿Hay alguna forma de decirle a sqlalchemy que llame execute_values()
en algunas ocasiones? ¿Hay alguna otra forma de implementar inserciones enormes sin construir las declaraciones SQL por mí mismo?
¡Gracias por la ayuda!
Mientras tanto, se hizo posible (desde SqlAlchemy 1.2.0) con el use_batch_mode
bandera en el create_engine()
función. Ver el documentos. utiliza el execute_batch()
función de psycopg.extras
.
No es la respuesta que está buscando en el sentido de que esto no aborda el intento de instruir a SQLAlchemy para que use los extras de psycopg, y requiere, en cierto modo, SQL manual, pero: puede acceder a las conexiones subyacentes de psycopg desde un motor con raw_connection()
, que permite utilizar COPIADO DE:
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
y luego
bulk_copy(engine, SimpleTable.__table__, values)
Esto debería ser bastante rápido en comparación con la ejecución de instrucciones INSERT. Mover 600 000 registros en esta máquina tomó alrededor de 8 segundos, ~13 µs/registro. También puede usar las conexiones sin procesar y el cursor con el paquete de extras.
.
Estaba a punto de sugerir usar
SimpleTable.__table__.insert().values(values)
, que se compilaría en una sola declaración INSERT con múltiples tuplas VALUES, pero resultó que en realidad era aún más lento en mi máquina. La compilación en sí fue tan lenta como usar su método que se basa enexecutemany()
.– Ilja Everila
10 abr.