diff --git a/src/debug/Logger.cpp b/src/debug/Logger.cpp index 3f80881a..03b50e21 100644 --- a/src/debug/Logger.cpp +++ b/src/debug/Logger.cpp @@ -9,7 +9,7 @@ using namespace debug; std::ofstream Logger::file; std::mutex Logger::mutex; std::string Logger::utcOffset = ""; -unsigned Logger::moduleLen = 16; +unsigned Logger::moduleLen = 20; LogMessage::~LogMessage() { logger->log(level, ss.str()); diff --git a/src/graphics/render/ChunksRenderer.cpp b/src/graphics/render/ChunksRenderer.cpp index 14a994ac..eed947ec 100644 --- a/src/graphics/render/ChunksRenderer.cpp +++ b/src/graphics/render/ChunksRenderer.cpp @@ -38,9 +38,10 @@ ChunksRenderer::ChunksRenderer( const EngineSettings& settings ) : level(level), threadPool( + "chunks-render-pool", [=](){return std::make_shared(level, cache, settings);}, [=](RendererResult& mesh){ - meshes[mesh.key] = std::shared_ptr(mesh.renderer->createMesh()); + meshes[mesh.key].reset(mesh.renderer->createMesh()); inwork.erase(mesh.key); }) { diff --git a/src/util/ThreadPool.h b/src/util/ThreadPool.h index e6b20e39..c65f1393 100644 --- a/src/util/ThreadPool.h +++ b/src/util/ThreadPool.h @@ -2,11 +2,14 @@ #define UTIL_THREAD_POOL_H_ #include +#include #include +#include #include #include #include "../delegates.h" +#include "../debug/Logger.h" namespace util { @@ -28,15 +31,19 @@ public: template class ThreadPool { + debug::Logger logger; std::queue jobs; std::queue> results; + std::mutex resultsMutex; std::vector threads; std::condition_variable jobsMutexCondition; std::mutex jobsMutex; - bool working = true; std::vector> workersBlocked; consumer resultConsumer; - std::mutex resultsMutex; + consumer onJobFailed = nullptr; + runnable onComplete = nullptr; + std::atomic busyWorkers = 0; + bool working = true; void threadLoop(int index, std::shared_ptr> worker) { std::condition_variable variable; @@ -54,27 +61,37 @@ class ThreadPool { } job = jobs.front(); jobs.pop(); + + busyWorkers++; } - R result = (*worker)(job); - { - resultsMutex.lock(); - results.push(ThreadPoolResult {variable, index, locked, result}); - locked = true; - resultsMutex.unlock(); - } - { - std::unique_lock lock(mutex); - variable.wait(lock, [&] { - return !working || !locked; - }); + try { + R result = (*worker)(job); + { + std::lock_guard lock(resultsMutex); + results.push(ThreadPoolResult {variable, index, locked, result}); + locked = true; + busyWorkers--; + } + { + std::unique_lock lock(mutex); + variable.wait(lock, [&] { + return !working || !locked; + }); + } + } catch (std::exception& err) { + if (onJobFailed) { + onJobFailed(job); + } + logger.error() << "uncaught exception: " << err.what(); } } } public: ThreadPool( + std::string name, supplier>> workersSupplier, consumer resultConsumer - ) : resultConsumer(resultConsumer) { + ) : logger(name), resultConsumer(resultConsumer) { const uint num_threads = std::thread::hardware_concurrency(); for (uint i = 0; i < num_threads; i++) { threads.emplace_back(&ThreadPool::threadLoop, this, i, workersSupplier()); @@ -82,6 +99,13 @@ public: } } ~ThreadPool(){ + terminate(); + } + + void terminate() { + if (!working) { + return; + } { std::lock_guard lock(jobsMutex); working = false; @@ -113,6 +137,13 @@ public: entry.locked = false; entry.variable.notify_all(); } + + if (onComplete && busyWorkers == 0) { + std::lock_guard lock(jobsMutex); + if (jobs.empty()) { + onComplete(); + } + } } void enqueueJob(T job) { @@ -122,6 +153,14 @@ public: } jobsMutexCondition.notify_one(); } + + void setOnJobFailed(consumer callback) { + this->onJobFailed = callback; + } + + void setOnComplete(runnable callback) { + this->onComplete = callback; + } }; } // namespace util