Restarting on reorg is broken; fix asio::io_context::restart calls (#177)

This commit is contained in:
Lee *!* Clagett
2025-09-02 11:07:35 -04:00
committed by Lee *!* Clagett
parent 8d854e9bc6
commit 44278d0d11

View File

@@ -960,95 +960,98 @@ namespace lws
std::vector<std::shared_ptr<rpc::scanner::queue>> queues;
queues.resize(thread_count);
struct join_
{
scanner_sync& self;
rpc::context& ctx;
std::vector<std::shared_ptr<rpc::scanner::queue>>& queues;
std::vector<boost::thread>& threads;
~join_() noexcept
struct join_
{
self.stop();
if (self.has_shutdown())
ctx.raise_abort_process();
else
ctx.raise_abort_scan();
scanner_sync& self;
rpc::context& ctx;
std::vector<std::shared_ptr<rpc::scanner::queue>>& queues;
std::vector<boost::thread>& threads;
for (const auto& queue : queues)
~join_() noexcept
{
if (queue)
queue->stop();
self.stop();
if (self.has_shutdown())
ctx.raise_abort_process();
else
ctx.raise_abort_scan();
for (const auto& queue : queues)
{
if (queue)
queue->stop();
}
for (auto& thread : threads)
thread.join();
}
for (auto& thread : threads)
thread.join();
} join{self, ctx, queues, threads};
/*
The algorithm here is extremely basic. Users are divided evenly amongst
the configurable thread count, and grouped by scan height. If an old
account appears, some accounts (grouped on that thread) will be delayed
in processing waiting for that account to catch up. Its not the greatest,
but this "will have to do" - but we're getting closer to fixing that
too.
Another "issue" is that each thread works independently instead of more
cooperatively for scanning. This requires a bit more synchronization, so
was left for later. Its likely worth doing to reduce the number of
transfers from the daemon, and the bottleneck on the writes into LMDB.
*/
self.stop_ = false;
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
std::sort(users.begin(), users.end(), by_height{});
MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)");
for (std::size_t i = 0; i < queues.size(); ++i)
{
queues[i] = std::make_shared<rpc::scanner::queue>();
// this can create threads with no active accounts, they just wait
const std::size_t count = users.size() / (queues.size() - i);
std::vector<lws::account> thread_users{
std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end())
};
users.erase(users.end() - count, users.end());
auto data = std::make_shared<thread_data>(
MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts
);
threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0));
}
} join{self, ctx, queues, threads};
/*
The algorithm here is extremely basic. Users are divided evenly amongst
the configurable thread count, and grouped by scan height. If an old
account appears, some accounts (grouped on that thread) will be delayed
in processing waiting for that account to catch up. Its not the greatest,
but this "will have to do" - but we're getting closer to fixing that
too.
users.clear();
users.shrink_to_fit();
Another "issue" is that each thread works independently instead of more
cooperatively for scanning. This requires a bit more synchronization, so
was left for later. Its likely worth doing to reduce the number of
transfers from the daemon, and the bottleneck on the writes into LMDB.
*/
self.stop_ = false;
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
std::sort(users.begin(), users.end(), by_height{});
MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)");
for (std::size_t i = 0; i < queues.size(); ++i)
{
queues[i] = std::make_shared<rpc::scanner::queue>();
// this can create threads with no active accounts, they just wait
const std::size_t count = users.size() / (queues.size() - i);
std::vector<lws::account> thread_users{
std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end())
};
users.erase(users.end() - count, users.end());
auto data = std::make_shared<thread_data>(
MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
self.webhooks_.ssl_context()
);
threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0));
}
users.clear();
users.shrink_to_fit();
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(server, lws_server_addr, std::move(lws_server_pass));
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
self.webhooks_.ssl_context()
);
// Blocks until sigint, local scanner issue, storage issue, or exception
self.io_.restart();
self.io_.run();
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
// Blocks until sigint, local scanner issue, storage issue, or exception
self.io_.run();
self.io_.restart();
rpc::scanner::server::stop(server);
} // block until all threads join
// Make sure server stops because we could re-start after blockchain sync
rpc::scanner::server::stop(server);
self.io_.poll();
self.io_.restart();
self.io_.poll();
}
template<typename R, typename Q>
@@ -1389,8 +1392,8 @@ namespace lws
the correct timer was run. */
while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout)
{
sync_.io_.run_one();
sync_.io_.restart();
sync_.io_.run_one();
}
}
else