gempton
Estoy tratando de ejecutar muchas tareas usando un ThreadPoolExecutor. A continuación se muestra un ejemplo hipotético:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
El problema es que rápidamente obtengo un java.util.concurrent.RejectedExecutionException
ya que el número de tareas excede el tamaño de la cola de trabajo. Sin embargo, el comportamiento deseado que busco es tener el hilo principal bloqueado hasta que haya espacio en la cola. Cuál es la mejor manera de lograr esto?
darren gilroy
En algunas circunstancias muy limitadas, puede implementar un java.util.concurrent.RejectedExecutionHandler que hace lo que necesita.
RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
Ahora. Esto es una muy mala idea por las siguientes razones
- Es propenso al interbloqueo porque todos los subprocesos en el grupo pueden morir antes de que se vea lo que puso en la cola. Mitigue esto estableciendo un tiempo de vida razonable.
- La tarea no está envuelta de la forma en que su Ejecutor puede esperar. Muchas implementaciones de ejecutores envuelven sus tareas en algún tipo de objeto de seguimiento antes de la ejecución. Mira la fuente de la tuya.
- La API desaconseja encarecidamente agregar a través de getQueue(), y puede prohibirse en algún momento.
Una estrategia casi siempre mejor es instalar ThreadPoolExecutor.CallerRunsPolicy, que acelerará su aplicación ejecutando la tarea en el subproceso que llama a execute().
Sin embargo, a veces una estrategia de bloqueo, con todos sus riesgos inherentes, es realmente lo que desea. Yo diría que en estas condiciones
- Solo tienes un hilo llamando a ejecutar ()
- Tienes que (o quieres) tener una longitud de cola muy pequeña
- Es absolutamente necesario limitar la cantidad de subprocesos que ejecutan este trabajo (generalmente por razones externas), y una estrategia de ejecución de llamadas rompería eso.
- Sus tareas son de un tamaño impredecible, por lo que las ejecuciones de llamadas podrían causar inanición si el grupo estuviera momentáneamente ocupado con 4 tareas cortas y su ejecución de llamada de un hilo se atascara con una grande.
Entonces, como digo. Rara vez se necesita y puede ser peligroso, pero ahí lo tienes.
Buena suerte.
-
Una respuesta muy bien pensada. Tomo un problema menor con tu condición de que > “Tienes que (o quieres) tener una longitud de cola muy pequeña”. Es posible que no pueda predecir cuántas tareas pondrá en cola un determinado trabajo. Tal vez esté ejecutando un trabajo diario que procesa datos de alguna base de datos y el lunes hay 500 registros para procesar, pero el martes hay 50 000. Tienes que establecer un límite superior en tu cola de modo que no explotes cuando llegue un gran trabajo. En ese caso, no hay nada de malo en esperar a que se completen algunas tareas antes de hacer más colas.
– skelly
15 de enero de 2015 a las 14:43
-
“Es propenso a interbloqueos porque todos los subprocesos en el grupo pueden morir antes de que se vea lo que puso en la cola. Mitigue esto estableciendo un tiempo de vida razonable”. ¿No se puede evitar completamente el interbloqueo configurando el tamaño mínimo del grupo en un valor mayor que cero? Cualquier otra razón es consecuencia de que Java no tiene soporte incorporado para bloquear las puestas en las colas del ejecutor. Lo cual es interesante, porque parece ser una estrategia bastante razonable. Me pregunto cuál es la razón.
– Tim Pote
30 de septiembre de 2015 a las 12:05
-
Quizás otra condición para una estrategia de bloqueo es cuando el orden de ejecución es importante. CallerRunsPolicy significará que la tarea rechazada probablemente se ejecutará antes que otros elementos pendientes en el ejecutor.
– Adrián Baker
10 de marzo de 2018 a las 19:24
-
@TimPote, la implementación actual de execute() a partir de Java 8 también se ocupa de esa condición. Si una tarea se puede poner en cola con éxito, todavía necesitamos * para verificar dos veces si deberíamos haber agregado un hilo * (porque los existentes murieron desde la última verificación) o que * el grupo se cerró desde la entrada en este método. Así que * volvemos a verificar el estado y, si es necesario, retrocedemos la puesta en cola si * se detuvo, o comenzamos un nuevo hilo si no hay ninguno. Darren, ¿ve algún problema con este enfoque con Java 8 también?
– coronel10 loco
26 de mayo de 2020 a las 22:04
Lo que debe hacer es envolver su ThreadPoolExecutor en Executor, lo que limita explícitamente la cantidad de operaciones ejecutadas simultáneamente en su interior:
private static class BlockingExecutor implements Executor {
final Semaphore semaphore;
final Executor delegate;
private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};
delegate.execute(wrapped);
}
}
Puede ajustar concurrentTasksLimit al threadPoolSize + queueSize de su ejecutor delegado y prácticamente resolverá su problema
-
Agradable y suave. ¡Gracias!
– Bastián Voigt
3 de marzo de 2020 a las 15:37
Podrías usar un semaphore
para evitar que los subprocesos entren en el grupo.
ExecutorService service = new ThreadPoolExecutor(
3,
3,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(6, false)
);
Semaphore lock = new Semaphore(6); // equal to queue capacity
for (int i = 0; i < 100000; i++ ) {
try {
lock.acquire();
service.submit(() -> {
try {
task.run();
} finally {
lock.release();
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Algunas trampas:
- Solo use este patrón con un grupo de subprocesos fijos. Es poco probable que la cola se llene con frecuencia, por lo que no se crearán nuevos hilos. Consulte los documentos de Java en ThreadPoolExecutor para obtener más detalles: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html Hay una forma de evitar esto, pero está fuera del alcance de esta respuesta.
-
El tamaño de la cola debe ser mayor que el número de subprocesos principales. Si tuviéramos que hacer la cola de tamaño 3, lo que terminaría sucediendo es:
- T0: los tres subprocesos están funcionando, la cola está vacía, no hay permisos disponibles.
- T1: el subproceso 1 finaliza, libera un permiso.
- T2: el subproceso 1 sondea la cola en busca de nuevos trabajos, no encuentra ninguno y murga.
- T3: el subproceso principal envía el trabajo al grupo, el subproceso 1 comienza a funcionar.
El ejemplo anterior se traduce para enhebrar el hilo principal. bloqueando hilo 1. Puede parecer un período pequeño, pero ahora multiplique la frecuencia por días y meses. De repente, los cortos períodos de tiempo se suman a una gran cantidad de tiempo perdido.
-
El subproceso 1 ya está bloqueado en el momento T2 cuando encuentra que la cola está vacía. No estoy seguro de haber entendido su punto sobre el hilo principal que bloquea ese hilo.
– preguntas
11 de noviembre de 2018 a las 21:54
-
@asgs “El subproceso 1 ya está bloqueado en el momento T2 cuando encuentra que la cola está vacía”. Correcto, y debido a que es responsabilidad del subproceso principal poner trabajo en la cola, puede deducir que el subproceso principal está bloqueando el subproceso 1.
– devkaoru
22 de noviembre de 2018 a las 17:56
Esto es lo que terminé haciendo:
int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
// Task logic
} finally {
lock.release();
}
});
}
Una opción bastante sencilla es envolver su BlockingQueue
con una implementación que llama put(..)
cuando offer(..)
se invoca:
public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {
(..)
public boolean offer(E o) {
try {
delegate.put(o);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
(.. implement all other methods simply by delegating ..)
}
Esto funciona porque por defecto put(..)
espera hasta que haya capacidad en la cola cuando está llena, ver:
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
Sin captura de RejectedExecutionException
o bloqueo complicado necesario.
moehoss
Aquí está mi fragmento de código en este caso:
public void executeBlocking( Runnable command ) {
if ( threadPool == null ) {
logger.error( "Thread pool '{}' not initialized.", threadPoolName );
return;
}
ThreadPool threadPoolMonitor = this;
boolean accepted = false;
do {
try {
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}
// to make sure that the monitor is freed on exit
finally {
// Notify all the threads waiting for the resource, if any.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.notifyAll();
}
}
}
} );
accepted = true;
}
catch ( RejectedExecutionException e ) {
// Thread pool is full
try {
// Block until one of the threads finishes its job and exits.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.wait();
}
}
catch ( InterruptedException ignored ) {
// return immediately
break;
}
}
} while ( !accepted );
}
threadPool es una instancia local de java.util.concurrent.ExecutorService que ya se ha inicializado.
TinkerTanque
Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada por un tiempo y luego intenta enviar la tarea nuevamente:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
Esta clase solo se puede usar en el ejecutor del grupo de subprocesos como un RejectedExecutionHandler como cualquier otro. En este ejemplo:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
El único inconveniente que veo es que el subproceso de llamada puede bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Para muchas tareas de ejecución corta, tal vez reduzca el tiempo de espera a 10 ms aproximadamente. Además, dado que este ejecutor se llama de manera recursiva, las esperas muy largas para que un subproceso esté disponible (horas) pueden provocar un desbordamiento de la pila.
Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien. ¿Me estoy perdiendo algo importante?
Echa un vistazo a esta pregunta: stackoverflow.com/questions/2001086/…
– Cirilo
10 de agosto de 2010 a las 4:33
Esta respuesta a otra pregunta sugiere usar un personalizado
BlockingQueue
subclase que bloqueaoffer()
al delegar enput()
. Creo que termina funcionando más o menos igual que elRejectedExecutionHandler
que llamagetQueue().put()
.– Robert Tupelo-Schneck
29/03/2014 a las 20:50
Poner directamente en cola sería incorrecto, como se explica en esta respuesta stackoverflow.com/a/3518588/585903
– Sumit jainista
09/04/2015 a las 13:42
@SumitJain Lea esa respuesta con más cuidado; solo una de las tres objeciones planteadas en esa respuesta se aplica al enfoque sugerido en el comentario de @Robert Tupelo-Schneck. al invocar
put()
desde dentro de la propia cola, no accede a la cola a través degetQueue()
(objeción n.º 3) y el objeto que está poniendo ya está debidamente envuelto si es necesario (objeción n.º 2). Todavía está en riesgo de interbloqueo si todos sus subprocesos mueren antes de que el elemento salga de la cola, pero ese puede ser un riesgo que la mayoría de las personas que buscan esta solución en particular estarían dispuestas a asumir.– Tim
27 de abril de 2017 a las 21:22
Posible duplicado de ¿Cómo hacer que el método de envío() de ThreadPoolExecutor se bloquee si está saturado?
– rogerdpack
5 oct 2018 a las 16:40