mirror of
https://codeberg.org/wownero/wownero-lws
synced 2026-01-09 23:25:16 -08:00
Fix several bugs found in new server scanner code (#146)
This commit is contained in:
committed by
Lee *!* Clagett
parent
cd62461578
commit
8080159fc8
@@ -28,6 +28,7 @@
|
|||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
#include <boost/asio/coroutine.hpp>
|
#include <boost/asio/coroutine.hpp>
|
||||||
|
#include <boost/asio/dispatch.hpp>
|
||||||
#include <boost/lexical_cast.hpp>
|
#include <boost/lexical_cast.hpp>
|
||||||
#include <boost/numeric/conversion/cast.hpp>
|
#include <boost/numeric/conversion/cast.hpp>
|
||||||
#include <sodium/utils.h>
|
#include <sodium/utils.h>
|
||||||
@@ -163,12 +164,16 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
void operator()(const boost::system::error_code& error = {})
|
void operator()(const boost::system::error_code& error = {})
|
||||||
{
|
{
|
||||||
if (!self_ || error)
|
if (error)
|
||||||
{
|
{
|
||||||
if (error == boost::asio::error::operation_aborted)
|
if (error == boost::asio::error::operation_aborted)
|
||||||
return; // exiting
|
return; // exiting
|
||||||
MONERO_THROW(error, "server acceptor failed");
|
MONERO_THROW(error, "server acceptor failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!self_ || self_->stop_)
|
||||||
|
return;
|
||||||
|
|
||||||
assert(self_->strand_.running_in_this_thread());
|
assert(self_->strand_.running_in_this_thread());
|
||||||
BOOST_ASIO_CORO_REENTER(*this)
|
BOOST_ASIO_CORO_REENTER(*this)
|
||||||
{
|
{
|
||||||
@@ -192,7 +197,7 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
void operator()(const boost::system::error_code& error = {}) const
|
void operator()(const boost::system::error_code& error = {}) const
|
||||||
{
|
{
|
||||||
if (!self_ || error == boost::asio::error::operation_aborted)
|
if (!self_ || self_->stop_ || error == boost::asio::error::operation_aborted)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
assert(self_->strand_.running_in_this_thread());
|
assert(self_->strand_.running_in_this_thread());
|
||||||
@@ -223,7 +228,7 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto reader = self_->disk_.start_read(std::move(self_->read_txn_));
|
auto reader = self_->disk_.start_read();
|
||||||
if (!reader)
|
if (!reader)
|
||||||
{
|
{
|
||||||
if (reader.matches(std::errc::no_lock_available))
|
if (reader.matches(std::errc::no_lock_available))
|
||||||
@@ -240,6 +245,8 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
if (current_users.count() < self_->active_.size())
|
if (current_users.count() < self_->active_.size())
|
||||||
{
|
{
|
||||||
// a shrinking user base, re-shuffle
|
// a shrinking user base, re-shuffle
|
||||||
|
reader->finish_read();
|
||||||
|
self_->accounts_cur_ = current_users.give_cursor();
|
||||||
self_->do_replace_users();
|
self_->do_replace_users();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -254,6 +261,8 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value<db::account>())));
|
new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value<db::account>())));
|
||||||
if (replace_threshold < new_accounts.size())
|
if (replace_threshold < new_accounts.size())
|
||||||
{
|
{
|
||||||
|
reader->finish_read();
|
||||||
|
self_->accounts_cur_ = current_users.give_cursor();
|
||||||
self_->do_replace_users();
|
self_->do_replace_users();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -268,6 +277,8 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
if (!active_copy.empty())
|
if (!active_copy.empty())
|
||||||
{
|
{
|
||||||
|
reader->finish_read();
|
||||||
|
self_->accounts_cur_ = current_users.give_cursor();
|
||||||
self_->do_replace_users();
|
self_->do_replace_users();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -306,7 +317,7 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
self_->next_thread_ %= total_threads;
|
self_->next_thread_ %= total_threads;
|
||||||
}
|
}
|
||||||
self_->read_txn_ = reader->finish_read();
|
reader->finish_read();
|
||||||
self_->accounts_cur_ = current_users.give_cursor();
|
self_->accounts_cur_ = current_users.give_cursor();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -401,6 +412,28 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
active_ = std::move(active);
|
active_ = std::move(active);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void server::do_stop()
|
||||||
|
{
|
||||||
|
assert(strand_.running_in_this_thread());
|
||||||
|
if (stop_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
MDEBUG("Stopping rpc::scanner::server async operations");
|
||||||
|
boost::system::error_code error{};
|
||||||
|
check_timer_.cancel(error);
|
||||||
|
acceptor_.cancel(error);
|
||||||
|
acceptor_.close(error);
|
||||||
|
|
||||||
|
for (auto& remote : remote_)
|
||||||
|
{
|
||||||
|
const auto conn = remote.lock();
|
||||||
|
if (conn)
|
||||||
|
boost::asio::dispatch(conn->strand_, [conn] () { conn->cleanup(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
stop_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address)
|
boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address)
|
||||||
{
|
{
|
||||||
std::string host;
|
std::string host;
|
||||||
@@ -432,12 +465,12 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
active_(std::move(active)),
|
active_(std::move(active)),
|
||||||
disk_(std::move(disk)),
|
disk_(std::move(disk)),
|
||||||
zclient_(std::move(zclient)),
|
zclient_(std::move(zclient)),
|
||||||
read_txn_{},
|
|
||||||
accounts_cur_{},
|
accounts_cur_{},
|
||||||
next_thread_(0),
|
next_thread_(0),
|
||||||
pass_hashed_(),
|
pass_hashed_(),
|
||||||
pass_salt_(),
|
pass_salt_(),
|
||||||
webhook_verify_(webhook_verify)
|
webhook_verify_(webhook_verify),
|
||||||
|
stop_(false)
|
||||||
{
|
{
|
||||||
std::sort(active_.begin(), active_.end());
|
std::sort(active_.begin(), active_.end());
|
||||||
for (const auto& local : local_)
|
for (const auto& local : local_)
|
||||||
@@ -488,6 +521,9 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
{
|
{
|
||||||
self->acceptor_.close();
|
self->acceptor_.close();
|
||||||
self->acceptor_.open(endpoint.protocol());
|
self->acceptor_.open(endpoint.protocol());
|
||||||
|
#if !defined(_WIN32)
|
||||||
|
self->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
|
||||||
|
#endif
|
||||||
self->acceptor_.bind(endpoint);
|
self->acceptor_.bind(endpoint);
|
||||||
self->acceptor_.listen();
|
self->acceptor_.listen();
|
||||||
|
|
||||||
@@ -522,7 +558,17 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
{
|
{
|
||||||
const lws::scanner_options opts{self->webhook_verify_, false, false};
|
const lws::scanner_options opts{self->webhook_verify_, false, false};
|
||||||
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
|
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
|
||||||
GET_IO_SERVICE(self->check_timer_).stop();
|
{
|
||||||
|
self->do_stop();
|
||||||
|
self->strand_.context().stop();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void server::stop(const std::shared_ptr<server>& self)
|
||||||
|
{
|
||||||
|
if (!self)
|
||||||
|
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
|
||||||
|
boost::asio::dispatch(self->strand_, [self] () { self->do_stop(); });
|
||||||
|
}
|
||||||
}}} // lws // rpc // scanner
|
}}} // lws // rpc // scanner
|
||||||
|
|||||||
@@ -65,12 +65,12 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
std::vector<db::account_id> active_;
|
std::vector<db::account_id> active_;
|
||||||
db::storage disk_;
|
db::storage disk_;
|
||||||
rpc::client zclient_;
|
rpc::client zclient_;
|
||||||
lmdb::suspended_txn read_txn_;
|
|
||||||
db::cursor::accounts accounts_cur_;
|
db::cursor::accounts accounts_cur_;
|
||||||
std::size_t next_thread_;
|
std::size_t next_thread_;
|
||||||
std::array<unsigned char, 32> pass_hashed_;
|
std::array<unsigned char, 32> pass_hashed_;
|
||||||
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
|
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
|
||||||
const ssl_verification_t webhook_verify_;
|
const ssl_verification_t webhook_verify_;
|
||||||
|
bool stop_;
|
||||||
|
|
||||||
//! Async acceptor routine
|
//! Async acceptor routine
|
||||||
class acceptor;
|
class acceptor;
|
||||||
@@ -79,6 +79,9 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
//! Reset `local_` and `remote_` scanners. Must be called in `strand_`.
|
//! Reset `local_` and `remote_` scanners. Must be called in `strand_`.
|
||||||
void do_replace_users();
|
void do_replace_users();
|
||||||
|
|
||||||
|
//! Stop all async operations
|
||||||
|
void do_stop();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);
|
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);
|
||||||
|
|
||||||
@@ -106,5 +109,8 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
//! Update `users` information on local DB
|
//! Update `users` information on local DB
|
||||||
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
|
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
|
||||||
|
|
||||||
|
//! Stop a running instance of all operations
|
||||||
|
static void stop(const std::shared_ptr<server>& self);
|
||||||
};
|
};
|
||||||
}}} // lws // rpc // scanner
|
}}} // lws // rpc // scanner
|
||||||
|
|||||||
@@ -27,6 +27,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <boost/asio/coroutine.hpp>
|
#include <boost/asio/coroutine.hpp>
|
||||||
|
#include <boost/asio/dispatch.hpp>
|
||||||
#include <boost/asio/write.hpp>
|
#include <boost/asio/write.hpp>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@@ -167,7 +168,7 @@ namespace lws { namespace rpc { namespace scanner
|
|||||||
|
|
||||||
if (msg.empty())
|
if (msg.empty())
|
||||||
{
|
{
|
||||||
self->cleanup();
|
boost::asio::dispatch(self->strand_, [self] () { self->cleanup(); });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,7 @@
|
|||||||
#include "scanner.h"
|
#include "scanner.h"
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <boost/asio/use_future.hpp>
|
||||||
#include <boost/numeric/conversion/cast.hpp>
|
#include <boost/numeric/conversion/cast.hpp>
|
||||||
#include <boost/range/combine.hpp>
|
#include <boost/range/combine.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
@@ -1044,23 +1045,27 @@ namespace lws
|
|||||||
users.clear();
|
users.clear();
|
||||||
users.shrink_to_fit();
|
users.shrink_to_fit();
|
||||||
|
|
||||||
{
|
auto server = std::make_shared<rpc::scanner::server>(
|
||||||
auto server = std::make_shared<rpc::scanner::server>(
|
self.io_,
|
||||||
self.io_,
|
disk.clone(),
|
||||||
disk.clone(),
|
MONERO_UNWRAP(ctx.connect()),
|
||||||
MONERO_UNWRAP(ctx.connect()),
|
queues,
|
||||||
queues,
|
std::move(active),
|
||||||
std::move(active),
|
opts.webhook_verify
|
||||||
opts.webhook_verify
|
);
|
||||||
);
|
|
||||||
|
|
||||||
rpc::scanner::server::start_user_checking(server);
|
rpc::scanner::server::start_user_checking(server);
|
||||||
if (!lws_server_addr.empty())
|
if (!lws_server_addr.empty())
|
||||||
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
|
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
|
||||||
}
|
|
||||||
|
|
||||||
// Blocks until sigint, local scanner issue, or exception
|
// Blocks until sigint, local scanner issue, storage issue, or exception
|
||||||
self.io_.run();
|
self.io_.run();
|
||||||
|
self.io_.restart();
|
||||||
|
|
||||||
|
// Make sure server stops because we could re-start after blockchain sync
|
||||||
|
rpc::scanner::server::stop(server);
|
||||||
|
self.io_.poll();
|
||||||
|
self.io_.restart();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename R, typename Q>
|
template<typename R, typename Q>
|
||||||
@@ -1390,14 +1395,19 @@ namespace lws
|
|||||||
|
|
||||||
boost::asio::steady_timer poll{sync_.io_};
|
boost::asio::steady_timer poll{sync_.io_};
|
||||||
poll.expires_from_now(rpc::scanner::account_poll_interval);
|
poll.expires_from_now(rpc::scanner::account_poll_interval);
|
||||||
poll.async_wait([] (boost::system::error_code) {});
|
const auto ready = poll.async_wait(boost::asio::use_future);
|
||||||
|
|
||||||
sync_.io_.run_one();
|
/* The exchange rates timer could run while waiting, so ensure that
|
||||||
|
the correct timer was run. */
|
||||||
|
while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout)
|
||||||
|
{
|
||||||
|
sync_.io_.run_one();
|
||||||
|
sync_.io_.restart();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);
|
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);
|
||||||
|
|
||||||
sync_.io_.reset();
|
|
||||||
if (has_shutdown())
|
if (has_shutdown())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user