diff --git a/src/scanner.cpp b/src/scanner.cpp index a976fa2..bb3afa0 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -960,95 +960,98 @@ namespace lws std::vector> queues; queues.resize(thread_count); - struct join_ { - scanner_sync& self; - rpc::context& ctx; - std::vector>& queues; - std::vector& threads; - - ~join_() noexcept + struct join_ { - self.stop(); - if (self.has_shutdown()) - ctx.raise_abort_process(); - else - ctx.raise_abort_scan(); + scanner_sync& self; + rpc::context& ctx; + std::vector>& queues; + std::vector& threads; - for (const auto& queue : queues) + ~join_() noexcept { - if (queue) - queue->stop(); + self.stop(); + if (self.has_shutdown()) + ctx.raise_abort_process(); + else + ctx.raise_abort_scan(); + + for (const auto& queue : queues) + { + if (queue) + queue->stop(); + } + for (auto& thread : threads) + thread.join(); } - for (auto& thread : threads) - thread.join(); + } join{self, ctx, queues, threads}; + + /* + The algorithm here is extremely basic. Users are divided evenly amongst + the configurable thread count, and grouped by scan height. If an old + account appears, some accounts (grouped on that thread) will be delayed + in processing waiting for that account to catch up. Its not the greatest, + but this "will have to do" - but we're getting closer to fixing that + too. + + Another "issue" is that each thread works independently instead of more + cooperatively for scanning. This requires a bit more synchronization, so + was left for later. Its likely worth doing to reduce the number of + transfers from the daemon, and the bottleneck on the writes into LMDB. + */ + + self.stop_ = false; + + boost::thread::attributes attrs; + attrs.set_stack_size(THREAD_STACK_SIZE); + + std::sort(users.begin(), users.end(), by_height{}); + + MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)"); + + for (std::size_t i = 0; i < queues.size(); ++i) + { + queues[i] = std::make_shared(); + + // this can create threads with no active accounts, they just wait + const std::size_t count = users.size() / (queues.size() - i); + std::vector thread_users{ + std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end()) + }; + users.erase(users.end() - count, users.end()); + + auto data = std::make_shared( + MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts + ); + threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0)); } - } join{self, ctx, queues, threads}; - /* - The algorithm here is extremely basic. Users are divided evenly amongst - the configurable thread count, and grouped by scan height. If an old - account appears, some accounts (grouped on that thread) will be delayed - in processing waiting for that account to catch up. Its not the greatest, - but this "will have to do" - but we're getting closer to fixing that - too. + users.clear(); + users.shrink_to_fit(); - Another "issue" is that each thread works independently instead of more - cooperatively for scanning. This requires a bit more synchronization, so - was left for later. Its likely worth doing to reduce the number of - transfers from the daemon, and the bottleneck on the writes into LMDB. - */ - - self.stop_ = false; - - boost::thread::attributes attrs; - attrs.set_stack_size(THREAD_STACK_SIZE); - - std::sort(users.begin(), users.end(), by_height{}); - - MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)"); - - for (std::size_t i = 0; i < queues.size(); ++i) - { - queues[i] = std::make_shared(); - - // this can create threads with no active accounts, they just wait - const std::size_t count = users.size() / (queues.size() - i); - std::vector thread_users{ - std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end()) - }; - users.erase(users.end() - count, users.end()); - - auto data = std::make_shared( - MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts + auto server = std::make_shared( + self.io_, + disk.clone(), + MONERO_UNWRAP(ctx.connect()), + queues, + std::move(active), + self.webhooks_.ssl_context() ); - threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0)); - } - users.clear(); - users.shrink_to_fit(); + rpc::scanner::server::start_user_checking(server); + if (!lws_server_addr.empty()) + rpc::scanner::server::start_acceptor(server, lws_server_addr, std::move(lws_server_pass)); - auto server = std::make_shared( - self.io_, - disk.clone(), - MONERO_UNWRAP(ctx.connect()), - queues, - std::move(active), - self.webhooks_.ssl_context() - ); + // Blocks until sigint, local scanner issue, storage issue, or exception + self.io_.restart(); + self.io_.run(); - rpc::scanner::server::start_user_checking(server); - if (!lws_server_addr.empty()) - rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass)); - - // Blocks until sigint, local scanner issue, storage issue, or exception - self.io_.run(); - self.io_.restart(); + rpc::scanner::server::stop(server); + } // block until all threads join // Make sure server stops because we could re-start after blockchain sync - rpc::scanner::server::stop(server); - self.io_.poll(); self.io_.restart(); + self.io_.poll(); } template @@ -1389,8 +1392,8 @@ namespace lws the correct timer was run. */ while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout) { - sync_.io_.run_one(); sync_.io_.restart(); + sync_.io_.run_one(); } } else