Agrupación de subprocesos en C++ 11

6 minutos de lectura

Agrupacion de subprocesos en C 11
yktula

Preguntas relevantes:

Acerca de C++11:

  • C++ 11: std::thread agrupado?
  • ¿Async(launch::async) en C++ 11 hará obsoletos los grupos de subprocesos para evitar la costosa creación de subprocesos?

Acerca de impulsar:

  • C ++ impulsar hilo reutilizando hilos
  • boost::thread y creando un grupo de ellos!

¿Cómo obtengo un piscina de hilos para enviar tareas a, sin crearlos y borrarlos una y otra vez? Esto significa hilos persistentes para resincronizar sin unirse.


Tengo un código que se ve así:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

En lugar de crear y unir subprocesos en cada iteración, prefiero enviar tareas a mis subprocesos de trabajo en cada iteración y solo crearlos una vez.

  • Aquí hay una pregunta relacionada y mi respuesta.

    – didierc

    01/04/2013 a las 22:29

  • pensó en usar tbb (es Intel, pero de código abierto y gratuito, y hace exactamente lo que quiere: simplemente envía tareas (divisibles recursivamente) y no se preocupa por los hilos)?

    – Gualterio

    01/04/2013 a las 22:55


  • Este proyecto FOSS es mi intento de crear una biblioteca de grupo de subprocesos, échale un vistazo si quieres. -> código.google.com/p/threadpool11

    – Etéreo solo

    14/07/2013 a las 14:55

  • ¿Qué tiene de malo usar tbb?

    – Gualterio

    16 de junio de 2014 a las 8:04

1646967915 961 Agrupacion de subprocesos en C 11
Doctorado AP EcE

Esto se copia de mi respuesta a otra publicación muy similar:

  1. Comience con la cantidad máxima de subprocesos que el sistema puede admitir:

    int num_threads = std::thread::hardware_concurrency();
    
  2. Para una implementación eficiente del grupo de subprocesos, una vez que se crean los subprocesos de acuerdo con num_threads, es mejor no crear nuevos ni destruir los antiguos (uniéndose). Habrá una penalización de rendimiento, e incluso podría hacer que su aplicación vaya más lenta que la versión en serie.

Cada subproceso de C++ 11 debe ejecutarse en su función con un bucle infinito, esperando constantemente nuevas tareas para tomar y ejecutar.

A continuación se muestra cómo adjuntar una función de este tipo al grupo de subprocesos:

int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
    pool.push_back(std::thread(Infinite_loop_function));
}
  1. La función de bucle infinito. Esto es un while (true) bucle esperando la cola de tareas.
void Pool::Infinite_loop_function()
{
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            condition.wait(lock, [this](){
                return !queue.empty() || terminate_pool;
            });
            Job = queue.front();
            queue.pop();
        }

        Job(); // function<void()> type
    }
};
  1. Haz una función para agregar trabajo a tu cola
void Pool::Add_Job(function<void()> New_Job)
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        queue.push(New_Job);
    }

    condition.notify_one();
}
  1. Vincule una función arbitraria a su cola
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Una vez que integre estos ingredientes, tendrá su propio grupo de subprocesamiento dinámico. Estos subprocesos siempre se ejecutan, esperando que se realice el trabajo.

Pido disculpas si hay algunos errores de sintaxis, escribí este código y tengo mala memoria. Lamento no poder proporcionarle el código completo del conjunto de subprocesos; eso violaría la integridad de mi trabajo.

Editar: para terminar el grupo, llame al shutdown() método:

Pool::shutdown()
{
    {
        std::unique_lock<std::mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for (std::thread &th : threads)
    {
        th.join();
    }

    pool.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}

Nota: los bloques de código anónimo se utilizan para que cuando se salgan, el std::unique_lock las variables creadas dentro de ellos quedan fuera del alcance, desbloqueando el mutex.

  • ¿Cómo tienes un vector cuando thread(const thread&) = delete?

    – Christopher Pisz

    19 de diciembre de 2017 a las 0:05

  • @ChristopherPisz std::vector no requiere que sus elementos sean copiables. Puede usar vectores con tipos de solo movimiento (unique_ptr, thread, futureetc).

    – Daniel Langr

    20 de diciembre de 2017 a las 8:12


  • En shutdown(), debería ser thread_vector.clear(); en lugar de thread_vector.empty(); ¿Correcto?

    – sudheerbb

    25 de febrero de 2020 a las 5:40

  • ¿Qué pasa cuando terminas y no quedan trabajos?

    – usuario877329

    29 de abril de 2020 a las 19:09

  • “Infinite_loop_function” es un nombre gracioso para una función que consume tareas de una cola y las ejecuta.

    – Salomón Lento

    30 de abril de 2020 a las 15:12

1646967915 900 Agrupacion de subprocesos en C 11
vit-vit

Puede usar la biblioteca de grupos de subprocesos de C++, https://github.com/vit-vit/ctpl.

Luego, el código que escribió se puede reemplazar con el siguiente

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

Obtendrá la cantidad deseada de subprocesos y no los creará y eliminará una y otra vez en las iteraciones.

  • Esta debería ser la respuesta; Biblioteca C++11 de un solo encabezado, legible, directa, concisa y compatible con los estándares. ¡Buen trabajo!

    – Jonathan H.

    21/10/2015 a las 18:52


  • @ vit-vit, ¿puede dar un ejemplo con una función, por favor? ¿Cómo empujas un función miembro de clase en results[j] = p.push([&arr, j](int){ arr[j] +=2; });

    – Hani Goc

    22/10/2015 a las 17:05


  • @HaniGoc Simplemente capture la instancia por referencia.

    – Jonathan H.

    22/10/2015 a las 21:26

  • @vit-vit Le envió una solicitud de extracción para mejorar la versión STL.

    – Jonathan H.

    22/10/2015 a las 21:26

  • @vit-vit: es difícil ponerse en contacto con el mantenedor de esa biblioteca con preguntas, sugerencia sugerencia.

    – einpoklum

    20 de marzo de 2016 a las 21:29

Un conjunto de subprocesos significa que todos sus subprocesos se están ejecutando, todo el tiempo; en otras palabras, la función de subprocesos nunca regresa. Para dar a los subprocesos algo significativo que hacer, debe diseñar un sistema de comunicación entre subprocesos, tanto con el propósito de decirle al subproceso que hay algo que hacer, como para comunicar los datos de trabajo reales.

Por lo general, esto implicará algún tipo de estructura de datos concurrente, y presumiblemente cada subproceso dormirá en algún tipo de variable de condición, que se notificará cuando haya trabajo por hacer. Al recibir la notificación, uno o varios de los subprocesos se activan, recuperan una tarea de la estructura de datos concurrente, la procesan y almacenan el resultado de manera análoga.

Luego, el hilo continuaría para verificar si hay aún más trabajo por hacer y, si no, volvería a dormir.

El resultado es que tienes que diseñar todo esto tú mismo, ya que no existe una noción natural de “trabajo” que sea universalmente aplicable. Es bastante trabajo, y hay algunos problemas sutiles que debe corregir. (Puede programar en Go si le gusta un sistema que se encargue de la gestión de subprocesos detrás de escena).

  • “tienes que diseñar todo esto tú mismo” <- eso es lo que estoy tratando de evitar hacer. Sin embargo, las gorutinas parecen fantásticas.

    – Yktula

    01/04/2013 a las 22:38


  • @Yktula: Bueno, es una tarea nada trivial. Ni siquiera está claro en su publicación qué tipo de trabajo desea realizar, y eso es profundamente fundamental para la solución. Puede implementar Go en C++, pero será algo muy específico, y la mitad de las personas se quejarían de que querrían algo diferente.

    – KerrekSB

    01/04/2013 a las 22:46


Agrupacion de subprocesos en C 11
didierc

Un grupo de subprocesos es, en esencia, un conjunto de subprocesos, todos vinculados a una función que funciona como un bucle de eventos. Estos subprocesos esperarán interminablemente a que se ejecute una tarea o su propia finalización.

El trabajo del grupo de subprocesos proporciona una interfaz para enviar trabajos, definir (y tal vez modificar) la política de ejecución de estos trabajos (reglas de programación, creación de instancias de subprocesos, tamaño del grupo) y monitorear el estado de los subprocesos y los recursos relacionados.

Entonces, para un grupo versátil, uno debe comenzar definiendo qué es una tarea, cómo se inicia, cómo se interrumpe, cuál es el resultado (consulte la noción de promesa y futuro para esa pregunta), qué tipo de eventos tendrán que responder los subprocesos a, cómo los manejarán, cómo se discriminarán estos eventos de los manejados por las tareas. Como puede ver, esto puede volverse bastante complicado e imponer restricciones sobre cómo funcionarán los subprocesos, ya que la solución se vuelve cada vez más complicada.

Las herramientas actuales para el manejo de eventos son bastante básicas.

: primitivos como mutexes, variables de condición y algunas abstracciones además de eso (bloqueos, barreras). Pero en algunos casos, estas abstracciones pueden resultar inadecuadas (ver esta pregunta relacionada), y uno debe volver a usar las primitivas.

  • También hay que gestionar otros problemas:
  • señal
  • i/o

hardware (afinidad del procesador, configuración heterogénea)

¿Cómo se desarrollarían en su entorno? esta respuesta

a una pregunta similar apunta a una implementación existente destinada a boost y stl.


Ofrecí una implementación muy cruda de un grupo de subprocesos para otra pregunta, que no aborda muchos de los problemas descritos anteriormente. Es posible que desee construir sobre él. También es posible que desee echar un vistazo a los marcos existentes en otros idiomas, para encontrar inspiración.

Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

No lo veo como un problema, todo lo contrario. Creo que es el mismo espíritu de C++ heredado de C.

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

pool_funciones.h

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}

  • pool_funciones.cpp

    principal.cpp

    ¡Gracias! Esto realmente me ayudó a comenzar con las operaciones de subprocesos paralelos. Terminé usando una versión ligeramente modificada de su implementación.

  • – Robbie Caps 14/03/2019 a las 20:44 no necesitas m_aceptar_funciones ser de tipo atómico.

    m_aceptar_funciones

    protegido por exclusión mutua.


  • – dmikos

    20 de septiembre de 2020 a las 9:58

    Es bueno que podamos llamar a join()

1646967916 401 Agrupacion de subprocesos en C 11
– yo

6 ene a las 13:05 Amir Fo Puedes usar

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

subproceso_pool de la biblioteca de impulso: También puedes usar

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

  • grupo de hilos

    de la comunidad de código abierto:

    ¡Gracias! Esto realmente me ayudó a comenzar con las operaciones de subprocesos paralelos. Terminé usando una versión ligeramente modificada de su implementación.

  • – Robbie Caps 14/03/2019 a las 20:44 no necesitas m_aceptar_funciones ser de tipo atómico.

    m_aceptar_funciones

    protegido por exclusión mutua.


  • – dmikos

    20 de septiembre de 2020 a las 9:58

    Es bueno que podamos llamar a join()

– yo

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

6 ene a las 13:05

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Algo como esto podría ayudar (tomado de una aplicación que funciona). Puedes usarlo así: Tenga en cuenta que reinventar un

eficiente

  • mecanismo de cola asíncrona no es trivial. std::thread Boost::asio::io_service es una implementación muy eficiente, o en realidad es una colección de envoltorios específicos de la plataforma (por ejemplo, envuelve los puertos de finalización de E/S en Windows).

    ¿Es necesario tanto impulso con C++ 11? ¿No sería, digamos, un

    ¿satisfacer?

  • – einpoklum std 20/03/2016 a las 21:34 boost::thread_groupNo hay equivalente en boost::thread_group por boost::thread . boost::thread_group es una colección de vector instancias. Pero por supuesto, es muy fácil de reemplazar std::threadcon un

    de

    s.


¿Ha sido útil esta solución?

Esta web utiliza cookies propias y de terceros para su correcto funcionamiento y para fines analíticos y para mostrarte publicidad relacionada con sus preferencias en base a un perfil elaborado a partir de tus hábitos de navegación. Al hacer clic en el botón Aceptar, acepta el uso de estas tecnologías y el procesamiento de tus datos para estos propósitos. Configurar y más información
Privacidad