Функция многозадачности в C++
Tạo vào: 18 tháng 1, 2025
Tạo vào: 18 tháng 1, 2025
расскажи подробнее о работе данной функции:
void Brigade::simulateWorkDay(bool multiThreaded) {
generateRandomRelationships(workSites);
std::vectorstd::string siteLogs(workSites.size());
// Лямбда для обработки диапазона [begin, end)
auto processChunk = [&](size_t beginIndex, size_t endIndex)
{
for (size_t i = beginIndex; i < endIndex; ++i) {
auto& site = workSites[i];
if (!site) continue;
texttry { // Имитация работы int production = site->calcProduction(); int remain = site->getRemaingWork(); std::ostringstream oss; oss << "SiteID=" << site->getSiteID() << " produced=" << production << " remain=" << remain << "\n"; siteLogs[i] = oss.str(); } catch (const std::exception& e) { std::ostringstream oss; oss << "Error at site #" << (site ? site->getSiteID() : -1) << ": " << e.what() << "\n"; siteLogs[i] = oss.str(); } } }; if (multiThreaded) { // количество потоков (обычно = числу ядер) size_t hwThreads = std::thread::hardware_concurrency(); if (hwThreads == 0) hwThreads = 2; size_t total = workSites.size(); size_t chunkSize = (total + hwThreads - 1) / hwThreads; // округляем вверх std::vector<std::thread> threads; threads.reserve(hwThreads); size_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; } // Ждём все потоки for (auto& th : threads) { if (th.joinable()) { th.join(); } } } else { // Однопоточная обработка processChunk(0, workSites.size()); }
}
Ниже разбирается по шагам, что делает функция simulateWorkDay
и как она устроена внутри.
cppvoid Brigade::simulateWorkDay(bool multiThreaded) { generateRandomRelationships(workSites); std::vector<std::string> siteLogs(workSites.size()); ... }
generateRandomRelationships(workSites);
В начале вызывается функция, которая, судя по названию, случайным образом задаёт какие-то «отношения» между объектами в workSites
. Деталей в данном примере нет, но можно предположить, что там происходит инициализация или модификация данных (например, у разных строительных участков или рабочих задаются связи и зависимости).
std::vector<std::string> siteLogs(workSites.size());
Создаётся вектор строк siteLogs
, размер которого равен количеству элементов в workSites
. В дальнейшем в этот вектор записываются текстовые логи по каждому сайту (участку).
Далее объявляется лямбда:
cppauto processChunk = [&](size_t beginIndex, size_t endIndex) { for (size_t i = beginIndex; i < endIndex; ++i) { auto& site = workSites[i]; if (!site) continue; try { int production = site->calcProduction(); int remain = site->getRemaingWork(); std::ostringstream oss; oss << "SiteID=" << site->getSiteID() << " produced=" << production << " remain=" << remain << "\n"; siteLogs[i] = oss.str(); } catch (const std::exception& e) { std::ostringstream oss; oss << "Error at site #" << (site ? site->getSiteID() : -1) << ": " << e.what() << "\n"; siteLogs[i] = oss.str(); } } };
Зачем нужна лямбда?
Лямбда processChunk
создаётся для того, чтобы обрабатывать определённый диапазон строительных участков (с beginIndex
до endIndex
). Такой подход удобен при параллельной обработке, когда один и тот же код нужно запустить над разными кусками данных.
Что именно делает эта лямбда?
workSites[i]
) в указанном диапазоне.site
равен nullptr
(вектор может содержать пустые указатели), то ничего не делаем и просто пропускаем.calcProduction()
— некая функция, возвращающая «производство» за день (сколько выполнено работ).getRemaingWork()
— возвращающая, сколько ещё осталось работы.siteLogs[i]
.std::exception
), то в лог пишется сообщение об ошибке с номером сайта.Таким образом, processChunk
обрабатывает часть элементов и записывает итоговые сообщения в siteLogs
.
Далее функция проверяет флаг multiThreaded
. Если multiThreaded == true
, то включается параллельная обработка:
cppif (multiThreaded) { // Узнаём количество доступных "железных" потоков size_t hwThreads = std::thread::hardware_concurrency(); if (hwThreads == 0) hwThreads = 2; size_t total = workSites.size(); // Высчитываем размер "куска" для каждого потока size_t chunkSize = (total + hwThreads - 1) / hwThreads; std::vector<std::thread> threads; threads.reserve(hwThreads); size_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); // Запускаем поток с обработкой части массива [startIndex, endIndex) threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; } // Дожидаемся завершения всех потоков for (auto& th : threads) { if (th.joinable()) { th.join(); } } } else { // Однопоточная обработка processChunk(0, workSites.size()); }
size_t hwThreads = std::thread::hardware_concurrency();
hardware_concurrency()
вернуло 0 (что бывает, если информация недоступна), то назначаем hwThreads = 2
— некая «запасная» величина, чтобы хоть как-то распараллелить.size_t total = workSites.size();
— общее количество сайтов.size_t chunkSize = (total + hwThreads - 1) / hwThreads;
(10 + 3 - 1) / 3 = 4
. Таким образом, первые потоки обрабатывают по 4 сайта, а если вдруг что-то «не делится» идеально, то последние потоки могут получить меньший остаток.threads
нужного размера.for (size_t t = 0; t < hwThreads; ++t)
) запускаем потоки:
endIndex
как min(startIndex + chunkSize, total)
. То есть берём либо chunkSize
элементов, либо остаток, если элементов меньше, чем предполагается.threads.emplace_back(processChunk, startIndex, endIndex)
мы создаём новый std::thread
, который запускает лямбда-функцию processChunk
с указанным диапазоном.startIndex
на endIndex
, чтобы следующий поток обрабатывал следующую порцию.startIndex
уже превысил общее число сайтов, останавливаемся, так как все элементы распределены.join()
:
Методcppfor (auto& th : threads) { if (th.joinable()) { th.join(); } }
join()
блокирует текущий поток до тех пор, пока объект-поток не завершится.multiThreaded == false
, то весь диапазон [0, workSites.size())
обрабатывается просто в одном потоке вызовом processChunk(0, workSites.size());
.В результате выполнения функции:
Для каждого сайта (вектор workSites
) будет вычислено:
calcProduction()
— условная «производительность» за день;getRemaingWork()
— сколько работы ещё осталось;siteLogs[i]
.Все эти вычисления выполняются либо:
multiThreaded == false
),multiThreaded == true
), где каждый поток отвечает за свою часть вектора.Если в процессе возникло исключение (std::exception
), для такого сайта в лог пишется сообщение об ошибке.
По окончании мы имеем заполненный вектор siteLogs
с результатами по каждому объекту (или с информацией об ошибках). В коде, который вы показали, видимых действий с siteLogs
дальше нет, но обычно лог может выводиться в консоль, файл, или передаваться куда-то ещё.
generateRandomRelationships
даёт предпосылки (входные данные), которые будут учитываться при расчёте calcProduction()
и getRemaingWork()
.processChunk
— основной рабочий кусок, в котором идёт логика по каждому сайту.multiThreaded
— выбор: выполнять всё в один поток или распределить нагрузку между несколькими.std::thread::hardware_concurrency()
позволяет динамически определять оптимальное количество потоков под конкретную машину.nullptr
в if (!site) continue;
говорит о том, что в workSites
могут быть пустые указатели, которые пропускаются без вычислений.try { ... } catch(...) { ... }
важна, чтобы падение одного сайта не обрушило весь процесс.Таким образом, функция моделирует однодневную работу на разных сайтах (сайтах строительства или каких-то производственных точках), и результатом является набор логов о том, что было «произведено» и что ещё остаётся сделать — с возможностью распараллеливания обработки для ускорения.
расскажи еще раз подробнее про этот кусок:
std::vectorstd::thread threads;
threads.reserve(hwThreads);
textsize_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; }
Разберём по шагам, что происходит в данном фрагменте кода и зачем он нужен:
cppstd::vector<std::thread> threads; threads.reserve(hwThreads); size_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; }
Объявление контейнера для потоков:
cppstd::vector<std::thread> threads; threads.reserve(hwThreads);
std::vector<std::thread> threads;
— это вектор, в котором мы будем хранить объекты std::thread
.threads.reserve(hwThreads);
— заранее выделяем память под нужное число потоков (равное hwThreads
), чтобы при добавлении (emplace_back) не было лишних реаллокаций памяти.Цикл по количеству потоков:
cppfor (size_t t = 0; t < hwThreads; ++t) { ... }
workSites
.t
— это просто счётчик от 0 до hwThreads-1
.Проверка, не вышли ли за пределы данных:
cppif (startIndex >= total) break;
startIndex
— это текущая начальная позиция диапазона, который надо обрабатывать. Если startIndex
уже больше либо равен total
(размеру workSites
), значит, все данные уже распределены между потоками, и нет смысла запускать новые потоки.Вычисление границ обрабатываемого диапазона:
cppsize_t endIndex = std::min(startIndex + chunkSize, total);
chunkSize
— это число элементов (сайтов), которое один поток должен обработать. Было рассчитано выше как (total + hwThreads - 1) / hwThreads
, чтобы распределить всю работу (примерно) поровну.endIndex
— это правая граница текущего куска. Используется std::min
, чтобы не выйти за пределы total
.[startIndex, endIndex)
.Создание потока:
cppthreads.emplace_back(processChunk, startIndex, endIndex);
emplace_back
здесь фактически вызывает конструктор std::thread
с аргументами:
processChunk
(объект callable).Таким образом, мы «встраиваем» новый поток в конец вектора threads
без лишних копирований.
В момент вызова emplace_back
поток сразу запускается и начинает выполнять processChunk(startIndex, endIndex)
параллельно.
Лямбда-функция processChunk
уже определена выше в коде и имеет вид:
cppauto processChunk = [&](size_t beginIndex, size_t endIndex) { for (size_t i = beginIndex; i < endIndex; ++i) { // обработка каждого site, запись лога и т.д. } };
Сюда подставятся значения startIndex
и endIndex
, характерные для данного потока.
Сдвиг startIndex
:
cppstartIndex = endIndex;
[startIndex, endIndex)
, мы делаем startIndex = endIndex
, чтобы следующий поток начал обрабатывать сразу следующий кусок.Таким образом, каждая итерация цикла:
std::thread
, которому передаёт лямбда-функцию processChunk
и границы этого куска.startIndex
, чтобы следующий поток обрабатывал следующий участок.Когда цикл завершается, вектор threads
содержит все запущенные потоки, каждый из которых параллельно занимается своим куском данных. Затем (ниже в коде) происходит вызов join()
для каждого потока, чтобы дождаться окончания работы.