Функция многозадачности в C++
Tạo vào: 18 tháng 1, 2025
Tạo vào: 18 tháng 1, 2025
Simulating workday processes
I’m detailing the function simulateWorkDay
. It generates random relationships among work sites, then either multi-threads or single-threads chunked range processing to simulate and log each site's production and remaining work, with error handling.
Breaking down the function
I’m detailing the simulateWorkDay
function. It generates relationships, initializes logs, and defines processChunk
to simulate site tasks. The function's mode, multi-threaded or single-threaded, determines the processing approach.
Thought about function simulation work day для 9 секунд
Ниже разбирается по шагам, что делает функция simulateWorkDay
и как она устроена внутри.
void Brigade::simulateWorkDay(bool multiThreaded) { generateRandomRelationships(workSites); std::vector<std::string> siteLogs(workSites.size()); ... }
generateRandomRelationships(workSites);
В начале вызывается функция, которая, судя по названию, случайным образом задаёт какие-то «отношения» между объектами в workSites
. Деталей в данном примере нет, но можно предположить, что там происходит инициализация или модификация данных (например, у разных строительных участков или рабочих задаются связи и зависимости).
std::vector<std::string> siteLogs(workSites.size());
Создаётся вектор строк siteLogs
, размер которого равен количеству элементов в workSites
. В дальнейшем в этот вектор записываются текстовые логи по каждому сайту (участку).
Далее объявляется лямбда:
auto processChunk = [&](size_t beginIndex, size_t endIndex) { for (size_t i = beginIndex; i < endIndex; ++i) { auto& site = workSites[i]; if (!site) continue; try { int production = site->calcProduction(); int remain = site->getRemaingWork(); std::ostringstream oss; oss << "SiteID=" << site->getSiteID() << " produced=" << production << " remain=" << remain << "\n"; siteLogs[i] = oss.str(); } catch (const std::exception& e) { std::ostringstream oss; oss << "Error at site #" << (site ? site->getSiteID() : -1) << ": " << e.what() << "\n"; siteLogs[i] = oss.str(); } } };
Зачем нужна лямбда?
Лямбда processChunk
создаётся для того, чтобы обрабатывать определённый диапазон строительных участков (с beginIndex
до endIndex
). Такой подход удобен при параллельной обработке, когда один и тот же код нужно запустить над разными кусками данных.
Что именно делает эта лямбда?
workSites[i]
) в указанном диапазоне.site
равен nullptr
(вектор может содержать пустые указатели), то ничего не делаем и просто пропускаем.calcProduction()
— некая функция, возвращающая «производство» за день (сколько выполнено работ).getRemaingWork()
— возвращающая, сколько ещё осталось работы.siteLogs[i]
.std::exception
), то в лог пишется сообщение об ошибке с номером сайта.Таким образом, processChunk
обрабатывает часть элементов и записывает итоговые сообщения в siteLogs
.
Далее функция проверяет флаг multiThreaded
. Если multiThreaded == true
, то включается параллельная обработка:
if (multiThreaded) { // Узнаём количество доступных "железных" потоков size_t hwThreads = std::thread::hardware_concurrency(); if (hwThreads == 0) hwThreads = 2; size_t total = workSites.size(); // Высчитываем размер "куска" для каждого потока size_t chunkSize = (total + hwThreads - 1) / hwThreads; std::vector<std::thread> threads; threads.reserve(hwThreads); size_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); // Запускаем поток с обработкой части массива [startIndex, endIndex) threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; } // Дожидаемся завершения всех потоков for (auto& th : threads) { if (th.joinable()) { th.join(); } } } else { // Однопоточная обработка processChunk(0, workSites.size()); }
size_t hwThreads = std::thread::hardware_concurrency();
hardware_concurrency()
вернуло 0 (что бывает, если информация недоступна), то назначаем hwThreads = 2
— некая «запасная» величина, чтобы хоть как-то распараллелить.size_t total = workSites.size();
— общее количество сайтов.size_t chunkSize = (total + hwThreads - 1) / hwThreads;
(10 + 3 - 1) / 3 = 4
. Таким образом, первые потоки обрабатывают по 4 сайта, а если вдруг что-то «не делится» идеально, то последние потоки могут получить меньший остаток.threads
нужного размера.for (size_t t = 0; t < hwThreads; ++t)
) запускаем потоки:
endIndex
как min(startIndex + chunkSize, total)
. То есть берём либо chunkSize
элементов, либо остаток, если элементов меньше, чем предполагается.threads.emplace_back(processChunk, startIndex, endIndex)
мы создаём новый std::thread
, который запускает лямбда-функцию processChunk
с указанным диапазоном.startIndex
на endIndex
, чтобы следующий поток обрабатывал следующую порцию.startIndex
уже превысил общее число сайтов, останавливаемся, так как все элементы распределены.join()
:
Методfor (auto& th : threads) { if (th.joinable()) { th.join(); } }
join()
блокирует текущий поток до тех пор, пока объект-поток не завершится.multiThreaded == false
, то весь диапазон [0, workSites.size())
обрабатывается просто в одном потоке вызовом processChunk(0, workSites.size());
.В результате выполнения функции:
Для каждого сайта (вектор workSites
) будет вычислено:
calcProduction()
— условная «производительность» за день;getRemaingWork()
— сколько работы ещё осталось;siteLogs[i]
.Все эти вычисления выполняются либо:
multiThreaded == false
),multiThreaded == true
), где каждый поток отвечает за свою часть вектора.Если в процессе возникло исключение (std::exception
), для такого сайта в лог пишется сообщение об ошибке.
По окончании мы имеем заполненный вектор siteLogs
с результатами по каждому объекту (или с информацией об ошибках). В коде, который вы показали, видимых действий с siteLogs
дальше нет, но обычно лог может выводиться в консоль, файл, или передаваться куда-то ещё.
generateRandomRelationships
даёт предпосылки (входные данные), которые будут учитываться при расчёте calcProduction()
и getRemaingWork()
.processChunk
— основной рабочий кусок, в котором идёт логика по каждому сайту.multiThreaded
— выбор: выполнять всё в один поток или распределить нагрузку между несколькими.std::thread::hardware_concurrency()
позволяет динамически определять оптимальное количество потоков под конкретную машину.nullptr
в if (!site) continue;
говорит о том, что в workSites
могут быть пустые указатели, которые пропускаются без вычислений.try { ... } catch(...) { ... }
важна, чтобы падение одного сайта не обрушило весь процесс.Таким образом, функция моделирует однодневную работу на разных сайтах (сайтах строительства или каких-то производственных точках), и результатом является набор логов о том, что было «произведено» и что ещё остаётся сделать — с возможностью распараллеливания обработки для ускорения.
Setting up threads
I’m mapping out a vector of threads and using reserve(hwThreads)
to prepare space. Iterating from 0 to hwThreads
, managing endIndex
, and launching threads with emplace_back(processChunk, startIndex, endIndex)
ensures efficient and controlled task processing.
Thought about thread management in simulateWorkDay
для пара секунд
Разберём по шагам, что происходит в данном фрагменте кода и зачем он нужен:
std::vector<std::thread> threads; threads.reserve(hwThreads); size_t startIndex = 0; for (size_t t = 0; t < hwThreads; ++t) { if (startIndex >= total) break; size_t endIndex = std::min(startIndex + chunkSize, total); threads.emplace_back(processChunk, startIndex, endIndex); startIndex = endIndex; }
Объявление контейнера для потоков:
std::vector<std::thread> threads; threads.reserve(hwThreads);
std::vector<std::thread> threads;
— это вектор, в котором мы будем хранить объекты std::thread
.threads.reserve(hwThreads);
— заранее выделяем память под нужное число потоков (равное hwThreads
), чтобы при добавлении (emplace_back) не было лишних реаллокаций памяти.Цикл по количеству потоков:
for (size_t t = 0; t < hwThreads; ++t) { ... }
workSites
.t
— это просто счётчик от 0 до hwThreads-1
.Проверка, не вышли ли за пределы данных:
if (startIndex >= total) break;
startIndex
— это текущая начальная позиция диапазона, который надо обрабатывать. Если startIndex
уже больше либо равен total
(размеру workSites
), значит, все данные уже распределены между потоками, и нет смысла запускать новые потоки.Вычисление границ обрабатываемого диапазона:
size_t endIndex = std::min(startIndex + chunkSize, total);
chunkSize
— это число элементов (сайтов), которое один поток должен обработать. Было рассчитано выше как (total + hwThreads - 1) / hwThreads
, чтобы распределить всю работу (примерно) поровну.endIndex
— это правая граница текущего куска. Используется std::min
, чтобы не выйти за пределы total
.[startIndex, endIndex)
.Создание потока:
threads.emplace_back(processChunk, startIndex, endIndex);
emplace_back
здесь фактически вызывает конструктор std::thread
с аргументами:
processChunk
(объект callable).Таким образом, мы «встраиваем» новый поток в конец вектора threads
без лишних копирований.
В момент вызова emplace_back
поток сразу запускается и начинает выполнять processChunk(startIndex, endIndex)
параллельно.
Лямбда-функция processChunk
уже определена выше в коде и имеет вид:
auto processChunk = [&](size_t beginIndex, size_t endIndex) { for (size_t i = beginIndex; i < endIndex; ++i) { // обработка каждого site, запись лога и т.д. } };
Сюда подставятся значения startIndex
и endIndex
, характерные для данного потока.
Сдвиг startIndex
:
startIndex = endIndex;
[startIndex, endIndex)
, мы делаем startIndex = endIndex
, чтобы следующий поток начал обрабатывать сразу следующий кусок.Таким образом, каждая итерация цикла:
std::thread
, которому передаёт лямбда-функцию processChunk
и границы этого куска.startIndex
, чтобы следующий поток обрабатывал следующий участок.Когда цикл завершается, вектор threads
содержит все запущенные потоки, каждый из которых параллельно занимается своим куском данных. Затем (ниже в коде) происходит вызов join()
для каждого потока, чтобы дождаться окончания работы.