Add support for remote scanning via custom TCP (#118)

This commit is contained in:
Lee *!* Clagett
2024-09-22 19:55:28 -04:00
committed by Lee *!* Clagett
parent a5d802cd9b
commit cd62461578
31 changed files with 2950 additions and 500 deletions

View File

@@ -60,6 +60,8 @@
#include "rpc/json.h"
#include "rpc/lws_pub.h"
#include "rpc/message_data_structs.h" // monero/src
#include "rpc/scanner/queue.h"
#include "rpc/scanner/server.h"
#include "rpc/webhook.h"
#include "util/blocks.h"
#include "util/source_location.h"
@@ -72,8 +74,6 @@
namespace lws
{
std::atomic<bool> scanner::running{true};
// Not in `rates.h` - defaulting to JSON output seems odd
std::ostream& operator<<(std::ostream& out, lws::rates const& src)
{
@@ -87,54 +87,24 @@ namespace lws
{
namespace net = epee::net_utils;
constexpr const std::chrono::seconds account_poll_interval{10};
constexpr const std::chrono::minutes block_rpc_timeout{2};
constexpr const std::chrono::seconds send_timeout{30};
constexpr const std::chrono::seconds sync_rpc_timeout{30};
struct thread_sync
{
boost::mutex sync;
boost::condition_variable user_poll;
std::atomic<bool> update;
};
struct options
{
net::ssl_verification_t webhook_verify;
bool enable_subaddresses;
bool untrusted_daemon;
};
struct thread_data
{
explicit thread_data(rpc::client client, db::storage disk, std::vector<lws::account> users, options opts)
: client(std::move(client)), disk(std::move(disk)), users(std::move(users)), opts(opts)
explicit thread_data(rpc::client client, db::storage disk, std::vector<lws::account> users, std::shared_ptr<rpc::scanner::queue> queue, scanner_options opts)
: client(std::move(client)), disk(std::move(disk)), users(std::move(users)), queue(std::move(queue)), opts(std::move(opts))
{}
rpc::client client;
db::storage disk;
std::vector<lws::account> users;
options opts;
std::shared_ptr<rpc::scanner::queue> queue;
scanner_options opts;
};
// until we have a signal-handler safe notification system
void checked_wait(const std::chrono::nanoseconds wait)
{
static constexpr const std::chrono::milliseconds interval{500};
const auto start = std::chrono::steady_clock::now();
while (scanner::is_running())
{
const auto current = std::chrono::steady_clock::now() - start;
if (wait <= current)
break;
const auto sleep_time = std::min(wait - current, std::chrono::nanoseconds{interval});
boost::this_thread::sleep_for(boost::chrono::nanoseconds{sleep_time.count()});
}
}
bool is_new_block(std::string&& chain_msg, db::storage& disk, const account& user)
bool is_new_block(std::string&& chain_msg, std::optional<db::storage>& disk, const account& user)
{
const auto chain = rpc::minimal_chain_pub::from_json(std::move(chain_msg));
if (!chain)
@@ -146,7 +116,13 @@ namespace lws
if (user.scan_height() < db::block_id(chain->top_block_height))
return true;
auto reader = disk.start_read();
if (!disk)
{
MWARNING("Assuming new block - no access to local DB");
return true;
}
auto reader = disk->start_read();
if (!reader)
{
MWARNING("Failed to start DB read: " << reader.error());
@@ -179,7 +155,7 @@ namespace lws
{
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
}
std::size_t get_target_time(db::block_id height)
{
const hardfork_t* fork = nullptr;
@@ -221,15 +197,7 @@ namespace lws
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
{
return left.scan_height() < right.scan_height();
}
};
struct add_spend
{
void operator()(lws::account& user, const db::spend& spend) const
@@ -331,12 +299,12 @@ namespace lws
expect<db::storage_reader> reader;
db::cursor::subaddress_indexes cur;
subaddress_reader(db::storage const& disk, const bool enable_subaddresses)
subaddress_reader(std::optional<db::storage> const& disk, const bool enable_subaddresses)
: reader(common_error::kInvalidArgument), cur(nullptr)
{
if (enable_subaddresses)
if (disk && enable_subaddresses)
{
reader = disk.start_read();
reader = disk->start_read();
if (!reader)
MERROR("Subadress lookup failure: " << reader.error().message());
}
@@ -598,7 +566,7 @@ namespace lws
scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{});
}
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const options& opts)
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const scanner_options& opts)
{
// uint64::max is for txpool
static const std::vector<std::uint64_t> fake_outs(
@@ -615,7 +583,7 @@ namespace lws
const auto time =
boost::numeric_cast<std::uint64_t>(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
subaddress_reader reader{disk, opts.enable_subaddresses};
subaddress_reader reader{std::optional<db::storage>{disk.clone()}, opts.enable_subaddresses};
send_webhook sender{disk, client, opts.webhook_verify};
for (const auto& tx : parsed->txes)
scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender);
@@ -630,49 +598,106 @@ namespace lws
MINFO("Updated exchange rates: " << *(*new_rates));
}
void scan_loop(thread_sync& self, std::shared_ptr<thread_data> data, const bool untrusted_daemon, const bool leader_thread) noexcept
void do_scan_loop(scanner_sync& self, std::shared_ptr<thread_data> data, const bool leader_thread) noexcept
{
try
struct stop_
{
scanner_sync& self;
~stop_() { self.stop(); }
} stop{self};
// thread entry point, so wrap everything in `try { } catch (...) {}`
try
{
// boost::thread doesn't support move-only types + attributes
rpc::client client{std::move(data->client)};
db::storage disk{std::move(data->disk)};
std::vector<lws::account> users{std::move(data->users)};
const options opts = std::move(data->opts);
assert(!users.empty());
assert(std::is_sorted(users.begin(), users.end(), by_height{}));
const std::shared_ptr<rpc::scanner::queue> queue{std::move(data->queue)};
const scanner_options opts{std::move(data->opts)};
data.reset();
struct stop_
{
thread_sync& self;
~stop_() noexcept
{
self.update = true;
self.user_poll.notify_one();
}
} stop{self};
if (!queue)
return;
// RPC server assumes that `start_height == 0` means use
while (self.is_running())
{
if (!users.empty())
{
auto new_client = MONERO_UNWRAP(client.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
user_data store_local{disk.clone()};
if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
return;
}
users.clear();
auto status = queue->wait_for_accounts();
if (status.replace)
users = std::move(*status.replace);
users.insert(
users.end(),
std::make_move_iterator(status.push.begin()),
std::make_move_iterator(status.push.end())
);
}
}
catch (std::exception const& e)
{
self.shutdown();
MERROR(e.what());
}
catch (...)
{
self.shutdown();
MERROR("Unknown exception");
}
}
} // anonymous
scanner::scanner(db::storage disk)
: disk_(std::move(disk)), sync_(), signals_(sync_.io_)
{
signals_.add(SIGINT);
signals_.async_wait([this] (const boost::system::error_code& error, int)
{
if (error != boost::asio::error::operation_aborted)
shutdown();
});
}
scanner::~scanner()
{}
bool scanner::loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
{
if (users.empty())
return true;
{ // previous `try` block; leave to prevent git blame spam
std::sort(users.begin(), users.end(), by_height{});
/// RPC server assumes that `start_height == 0` means use
// block ids. This technically skips genesis block.
cryptonote::rpc::GetBlocksFast::Request req{};
req.start_height = std::uint64_t(users.begin()->scan_height());
req.start_height = std::max(std::uint64_t(1), req.start_height);
req.prune = !untrusted_daemon;
req.prune = !opts.untrusted_daemon;
epee::byte_slice block_request = rpc::client::make_message("get_blocks_fast", req);
if (!send(client, block_request.clone()))
return;
return false;
std::vector<crypto::hash> blockchain{};
std::vector<db::pow_sync> new_pow{};
db::pow_window pow_window{};
const db::block_info last_checkpoint = db::storage::get_last_checkpoint();
const db::block_id last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk.start_read()).get_last_pow_block()).id;
while (!self.update && scanner::is_running())
db::block_id last_pow{};
if (opts.untrusted_daemon && disk)
last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id;
while (!stop)
{
blockchain.clear();
new_pow.clear();
@@ -684,7 +709,7 @@ namespace lws
if (timeout)
MWARNING("Block retrieval timeout, resetting scanner");
if (timeout || resp.matches(std::errc::interrupted))
return;
return false;
MONERO_THROW(resp.error(), "Failed to retrieve blocks from daemon");
}
@@ -692,7 +717,7 @@ namespace lws
if (!fetched)
{
MERROR("Failed to retrieve next blocks: " << fetched.error().message() << ". Resetting state and trying again");
return;
return false;
}
if (fetched->blocks.empty())
@@ -701,35 +726,46 @@ namespace lws
if (fetched->start_height != req.start_height)
{
MWARNING("Daemon sent wrong blocks, resetting state");
return;
return false;
}
{
expect<std::vector<lws::account>> new_accounts = client.pull_accounts();
if (!new_accounts)
bool resort = false;
auto status = queue.get_accounts();
if (status.replace && status.replace->empty() && status.push.empty())
return true; // no work explictly given, leave
if (status.replace)
{
MERROR("Failed to pull new accounts: " << new_accounts.error().message());
return; // get all active accounts the easy way
MINFO("Received " << status.replace->size() << " replacement account(s) for scanning");
users = std::move(*status.replace);
resort = true;
}
if (!new_accounts->empty())
if (!status.push.empty())
{
MINFO("Received " << new_accounts->size() << " new account(s) for scanning");
std::sort(new_accounts->begin(), new_accounts->end(), by_height{});
const db::block_id oldest = new_accounts->front().scan_height();
MINFO("Received " << status.push.size() << " new account(s) for scanning");
users.insert(
users.end(),
std::make_move_iterator(new_accounts->begin()),
std::make_move_iterator(new_accounts->end())
std::make_move_iterator(status.push.begin()),
std::make_move_iterator(status.push.end())
);
resort = true;
}
if (resort)
{
assert(!users.empty()); // by logic from above
std::sort(users.begin(), users.end(), by_height{});
const db::block_id oldest = users.front().scan_height();
if (std::uint64_t(oldest) < fetched->start_height)
{
req.start_height = std::uint64_t(oldest);
block_request = rpc::client::make_message("get_blocks_fast", req);
if (!send(client, block_request.clone()))
return;
return false;
continue; // to next get_blocks_fast read
}
// else, the oldest new account is within the newly fetched range
// else, the oldest new account is within the newly fetch range
}
}
@@ -744,7 +780,7 @@ namespace lws
{
expect<std::vector<std::pair<rpc::client::topic, std::string>>> new_pubs = client.wait_for_block();
if (new_pubs.matches(std::errc::interrupted))
return; // reset entire state (maybe shutdown)
return false; // reset entire state (maybe shutdown)
if (!new_pubs)
break; // exit wait for block loop, and try fetching new blocks
@@ -757,9 +793,9 @@ namespace lws
auto message = new_pubs->begin();
for ( ; message != new_pubs->end(); ++message)
{
if (message->first != rpc::client::topic::txpool)
if (!disk || message->first != rpc::client::topic::txpool)
break; // inner for loop
scan_transactions(std::move(message->second), epee::to_mut_span(users), disk, client, opts);
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts);
}
for ( ; message != new_pubs->end(); ++message)
@@ -771,19 +807,19 @@ namespace lws
// request next chunk of blocks
if (!send(client, block_request.clone()))
return;
return false;
continue; // to next get_blocks_fast read
} // if only one block was fetched
// request next chunk of blocks
if (!send(client, block_request.clone()))
return;
return false;
if (fetched->blocks.size() != fetched->output_indices.size())
throw std::runtime_error{"Bad daemon response - need same number of blocks and indices"};
blockchain.push_back(cryptonote::get_block_hash(fetched->blocks.front().block));
if (untrusted_daemon)
if (opts.untrusted_daemon)
new_pow.push_back(db::pow_sync{fetched->blocks.front().block.timestamp});
auto blocks = epee::to_mut_span(fetched->blocks);
@@ -798,10 +834,10 @@ namespace lws
else
fetched->start_height = 0;
if (untrusted_daemon)
if (disk && opts.untrusted_daemon)
{
pow_window = MONERO_UNWRAP(
MONERO_UNWRAP(disk.start_read()).get_pow_window(db::block_id(fetched->start_height))
MONERO_UNWRAP(disk->start_read()).get_pow_window(db::block_id(fetched->start_height))
);
}
@@ -836,7 +872,7 @@ namespace lws
reader
);
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
if (block.prev_id != blockchain.back())
MONERO_THROW(error::bad_blockchain, "A blocks prev_id does not match");
@@ -848,19 +884,19 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (!scanner::is_running())
return;
if (stop)
return false;
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height)));
// skip POW hashing if done previously
if (last_pow < db::block_id(fetched->start_height))
if (disk && last_pow < db::block_id(fetched->start_height))
{
if (!verify_timestamp(block.timestamp, pow_window.median_timestamps))
MONERO_THROW(error::bad_blockchain, "Block failed timestamp check - possible chain forgery");
const crypto::hash pow =
get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, disk, initial_height, epee::to_span(blockchain));
get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, *disk, initial_height, epee::to_span(blockchain));
if (!cryptonote::check_hash(pow, diff))
MONERO_THROW(error::bad_blockchain, "Block had too low difficulty");
}
@@ -872,7 +908,7 @@ namespace lws
for (auto tx_data : boost::combine(block.tx_hashes, txes, indices))
{
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
if (cryptonote::get_transaction_hash(boost::get<1>(tx_data)) != boost::get<0>(tx_data))
MONERO_THROW(error::bad_blockchain, "Hash of transaction does not match hash in block");
@@ -889,7 +925,7 @@ namespace lws
);
}
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
const auto last_difficulty =
pow_window.cumulative_diffs.empty() ?
@@ -905,255 +941,130 @@ namespace lws
} // for each block
reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write
auto updated = disk.update(
users.front().scan_height(), epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow)
);
if (!updated)
{
if (updated == lws::error::blockchain_reorg)
{
MINFO("Blockchain reorg detected, resetting state");
return;
}
MONERO_THROW(updated.error(), "Failed to update accounts on disk");
}
if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts))
return false;
if (untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height))
// TODO
if (opts.untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height))
{
MINFO("On chain with hash " << blockchain.back() << " and difficulty " << diff << " at height " << fetched->start_height);
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return;
}
for (account& user : users)
user.updated(db::block_id(fetched->start_height));
// Publish when all scan threads have past this block
if (!blockchain.empty() && client.has_publish())
rpc::publish_scanned(client, blockchain.back(), epee::to_span(users));
}
}
catch (std::exception const& e)
{
scanner::stop();
MERROR(e.what());
}
catch (...)
{
scanner::stop();
MERROR("Unknown exception");
}
}
lws::account prep_account(db::storage_reader& reader, const lws::db::account& user)
{
std::vector<std::pair<db::output_id, db::address_index>> receives{};
std::vector<crypto::public_key> pubs{};
auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id));
const std::size_t elems = receive_list.count();
receives.reserve(elems);
pubs.reserve(elems);
for (auto output = receive_list.make_iterator(); !output.is_end(); ++output)
{
auto id = output.get_value<MONERO_FIELD(db::output, spend_meta.id)>();
auto subaddr = output.get_value<MONERO_FIELD(db::output, recipient)>();
receives.emplace_back(std::move(id), std::move(subaddr));
pubs.emplace_back(output.get_value<MONERO_FIELD(db::output, pub)>());
}
return lws::account{user, std::move(receives), std::move(pubs)};
}
return false;
} // end scan_loop
namespace
{
/*!
Launches `thread_count` threads to run `scan_loop`, and then polls for
active account changes in background
*/
void check_loop(db::storage disk, rpc::context& ctx, std::size_t thread_count, std::vector<lws::account> users, std::vector<db::account_id> active, const options opts)
void check_loop(scanner_sync& self, db::storage disk, rpc::context& ctx, const std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, std::vector<lws::account> users, std::vector<db::account_id> active, const scanner_options& opts)
{
assert(0 < thread_count);
assert(0 < users.size());
assert(users.size() == active.size());
assert(thread_count || !lws_server_addr.empty());
assert(!thread_count || !users.empty());
thread_sync self{};
std::vector<boost::thread> threads{};
threads.reserve(thread_count);
std::vector<std::shared_ptr<rpc::scanner::queue>> queues;
queues.resize(thread_count);
struct join_
{
thread_sync& self;
std::vector<boost::thread>& threads;
scanner_sync& self;
rpc::context& ctx;
std::vector<std::shared_ptr<rpc::scanner::queue>>& queues;
std::vector<boost::thread>& threads;
~join_() noexcept
{
self.update = true;
ctx.raise_abort_scan();
self.stop();
if (self.has_shutdown())
ctx.raise_abort_process();
else
ctx.raise_abort_scan();
for (const auto& queue : queues)
{
if (queue)
queue->stop();
}
for (auto& thread : threads)
thread.join();
}
} join{self, threads, ctx};
} join{self, ctx, queues, threads};
/*
The algorithm here is extremely basic. Users are divided evenly amongst
the configurable thread count, and grouped by scan height. If an old
account appears, some accounts (grouped on that thread) will be delayed
in processing waiting for that account to catch up. Its not the greatest,
but this "will have to do" for the first cut.
Its not expected that many people will be running
"enterprise level" of nodes where accounts are constantly added.
but this "will have to do" - but we're getting closer to fixing that
too.
Another "issue" is that each thread works independently instead of more
cooperatively for scanning. This requires a bit more synchronization, so
was left for later. Its likely worth doing to reduce the number of
transfers from the daemon, and the bottleneck on the writes into LMDB.
If the active user list changes, all threads are stopped/joined, and
everything is re-started.
*/
self.stop_ = false;
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
threads.reserve(thread_count);
std::sort(users.begin(), users.end(), by_height{});
// enable the new bind point before registering pull accounts
lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push());
MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)");
MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)");
bool leader_thread = true;
bool remaining_threads = true;
while (!users.empty() && --thread_count)
for (std::size_t i = 0; i < queues.size(); ++i)
{
const std::size_t per_thread = std::max(std::size_t(1), users.size() / (thread_count + 1));
const std::size_t count = std::min(per_thread, users.size());
queues[i] = std::make_shared<rpc::scanner::queue>();
// this can create threads with no active accounts, they just wait
const std::size_t count = users.size() / (queues.size() - i);
std::vector<lws::account> thread_users{
std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end())
};
users.erase(users.end() - count, users.end());
rpc::client client = MONERO_UNWRAP(ctx.connect());
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(thread_users), opts
MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts
);
threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread));
leader_thread = false;
threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0));
}
if (!users.empty())
{
rpc::client client = MONERO_UNWRAP(ctx.connect());
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
users.clear();
users.shrink_to_fit();
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(users), opts
{
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);
threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread));
remaining_threads = false;
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
}
auto last_check = std::chrono::steady_clock::now();
lmdb::suspended_txn read_txn{};
db::cursor::accounts accounts_cur{};
boost::unique_lock<boost::mutex> lock{self.sync};
while (scanner::is_running())
{
update_rates(ctx);
for (;;)
{
//! \TODO use signalfd + ZMQ? Windows is the difficult case...
self.user_poll.wait_for(lock, boost::chrono::seconds{1});
if (self.update || !scanner::is_running())
return;
auto this_check = std::chrono::steady_clock::now();
if (account_poll_interval <= (this_check - last_check))
{
last_check = this_check;
break;
}
}
auto reader = disk.start_read(std::move(read_txn));
if (!reader)
{
if (reader.matches(std::errc::no_lock_available))
{
MWARNING("Failed to open DB read handle, retrying later");
continue;
}
MONERO_THROW(reader.error(), "Failed to open DB read handle");
}
auto current_users = MONERO_UNWRAP(
reader->get_accounts(db::account_status::active, std::move(accounts_cur))
);
if (current_users.count() < active.size())
{
// cannot remove accounts via ZMQ (yet)
MINFO("Decrease in active user accounts detected, stopping scan threads...");
return;
}
std::vector<db::account_id> active_copy = active;
std::vector<lws::account> new_;
for (auto user = current_users.make_iterator(); !user.is_end(); ++user)
{
const db::account_id user_id = user.get_value<MONERO_FIELD(db::account, id)>();
const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id);
if (loc == active_copy.end() || *loc != user_id)
{
new_.emplace_back(prep_account(*reader, user.get_value<db::account>()));
active.insert(
std::lower_bound(active.begin(), active.end(), user_id), user_id
);
}
else
active_copy.erase(loc);
}
if (!active_copy.empty())
{
MINFO("Change in active user accounts detected, stopping scan threads...");
return;
}
if (!new_.empty())
{
if (remaining_threads)
{
MINFO("Received new account(s), starting more thread(s)");
return;
}
const auto pushed = pusher.push(epee::to_span(new_), std::chrono::seconds{1});
if (!pushed)
{
MERROR("Failed to push new account to workers: " << pushed.error().message());
return; // pull in new accounts by resetting state
}
else
MINFO("Pushed " << new_.size() << " new accounts to worker thread(s)");
}
read_txn = reader->finish_read();
accounts_cur = current_users.give_cursor();
} // while scanning
// Blocks until sigint, local scanner issue, or exception
self.io_.run();
}
template<typename R, typename Q>
expect<typename R::response> fetch_chain(rpc::client& client, const char* endpoint, const Q& req)
expect<typename R::response> fetch_chain(const scanner_sync& self, rpc::client& client, const char* endpoint, const Q& req)
{
expect<void> sent{lws::error::daemon_timeout};
@@ -1162,7 +1073,7 @@ namespace lws
while (!(sent = client.send(std::move(msg), std::chrono::seconds{1})))
{
if (!scanner::is_running())
if (self.has_shutdown())
return {lws::error::signal_abort_process};
if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start))
@@ -1177,7 +1088,7 @@ namespace lws
while (!(resp = client.get_message(std::chrono::seconds{1})))
{
if (!scanner::is_running())
if (self.has_shutdown())
return {lws::error::signal_abort_process};
if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start))
@@ -1190,7 +1101,7 @@ namespace lws
}
// does not validate blockchain hashes
expect<rpc::client> sync_quick(db::storage disk, rpc::client client)
expect<rpc::client> sync_quick(const scanner_sync& self, db::storage disk, rpc::client client)
{
MINFO("Starting blockchain sync with daemon");
@@ -1203,7 +1114,7 @@ namespace lws
if (req.known_hashes.empty())
return {lws::error::bad_blockchain};
auto resp = fetch_chain<rpc::get_hashes_fast>(client, "get_hashes_fast", req);
auto resp = fetch_chain<rpc::get_hashes_fast>(self, client, "get_hashes_fast", req);
if (!resp)
return resp.error();
@@ -1229,7 +1140,7 @@ namespace lws
}
// validates blockchain hashes
expect<rpc::client> sync_full(db::storage disk, rpc::client client)
expect<rpc::client> sync_full(const scanner_sync& self, db::storage disk, rpc::client client)
{
MINFO("Starting blockchain sync with daemon");
@@ -1245,7 +1156,7 @@ namespace lws
if (req.block_ids.empty())
return {lws::error::bad_blockchain};
auto resp = fetch_chain<rpc::get_blocks_fast>(client, "get_blocks_fast", req);
auto resp = fetch_chain<rpc::get_blocks_fast>(self, client, "get_blocks_fast", req);
if (!resp)
return resp.error();
@@ -1307,7 +1218,7 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (!scanner::is_running())
if (self.has_shutdown())
return {error::signal_abort_process};
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(height));
@@ -1352,37 +1263,119 @@ namespace lws
}
} // anonymous
expect<rpc::client> scanner::sync(db::storage disk, rpc::client client, const bool untrusted_daemon)
bool user_data::store(db::storage& disk, rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
{
if (untrusted_daemon)
return sync_full(std::move(disk), std::move(client));
return sync_quick(std::move(disk), std::move(client));
if (users.empty())
return true;
if (!std::is_sorted(users.begin(), users.end(), by_height{}))
throw std::logic_error{"users must be sorted!"};
auto updated = disk.update(users[0].scan_height(), chain, users, pow);
if (!updated)
{
if (updated == lws::error::blockchain_reorg)
{
MINFO("Blockchain reorg detected, resetting state");
return false;
}
MONERO_THROW(updated.error(), "Failed to update accounts on disk");
}
MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return false;
}
// Publish when all scan threads have past this block
// only address is printed from users, so height doesn't need updating
if (!chain.empty() && client.has_publish())
rpc::publish_scanned(client, chain[chain.size() - 1], epee::to_span(users));
return true;
}
void scanner::run(db::storage disk, rpc::context ctx, std::size_t thread_count, const epee::net_utils::ssl_verification_t webhook_verify, const bool enable_subaddresses, const bool untrusted_daemon)
bool user_data::operator()(rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
{
thread_count = std::max(std::size_t(1), thread_count);
return store(disk_, client, chain, users, pow, opts);
}
expect<rpc::client> scanner::sync(rpc::client client, const bool untrusted_daemon)
{
if (has_shutdown())
MONERO_THROW(common_error::kInvalidArgument, "this has shutdown");
if (untrusted_daemon)
return sync_full(sync_, disk_.clone(), std::move(client));
return sync_quick(sync_, disk_.clone(), std::move(client));
}
void scanner::run(rpc::context ctx, std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, const scanner_options& opts)
{
if (has_shutdown())
MONERO_THROW(common_error::kInvalidArgument, "this has shutdown");
if (!lws_server_addr.empty() && (opts.enable_subaddresses || opts.untrusted_daemon))
MONERO_THROW(error::configuration, "Cannot use remote scanner with subaddresses or untrusted daemon");
if (lws_server_addr.empty())
thread_count = std::max(std::size_t(1), thread_count);
/*! \NOTE Be careful about references and lifetimes of the callbacks. The
ones below are safe because no `io_service::run()` call is after the
destruction of the references.
\NOTE That `ctx` will need a strand or lock if multiple
`io_service::run()` calls are used. */
boost::asio::steady_timer rate_timer{sync_.io_};
class rate_updater
{
boost::asio::steady_timer& rate_timer_;
rpc::context& ctx_;
const std::chrono::minutes rate_interval_;
public:
explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
{}
void operator()(const boost::system::error_code& error = {}) const
{
update_rates(ctx_);
rate_timer_.expires_from_now(rate_interval_);
rate_timer_.async_wait(*this);
}
std::chrono::minutes rate_interval() const noexcept { return rate_interval_; }
};
{
rate_updater updater{rate_timer, ctx};
if (std::chrono::minutes{0} < updater.rate_interval())
updater();
}
rpc::client client{};
for (;;)
{
const auto last = std::chrono::steady_clock::now();
update_rates(ctx);
std::vector<db::account_id> active;
std::vector<lws::account> users;
if (thread_count)
{
MINFO("Retrieving current active account list");
auto reader = MONERO_UNWRAP(disk.start_read());
auto reader = MONERO_UNWRAP(disk_.start_read());
auto accounts = MONERO_UNWRAP(
reader.get_accounts(db::account_status::active)
);
for (db::account user : accounts.make_range())
{
users.emplace_back(prep_account(reader, user));
users.emplace_back(MONERO_UNWRAP(reader.get_full_account(user)));
active.insert(
std::lower_bound(active.begin(), active.end(), user.id), user.id
);
@@ -1391,21 +1384,27 @@ namespace lws
reader.finish_read();
} // cleanup DB reader
if (users.empty())
if (thread_count && users.empty())
{
MINFO("No active accounts");
checked_wait(account_poll_interval - (std::chrono::steady_clock::now() - last));
boost::asio::steady_timer poll{sync_.io_};
poll.expires_from_now(rpc::scanner::account_poll_interval);
poll.async_wait([] (boost::system::error_code) {});
sync_.io_.run_one();
}
else
check_loop(disk.clone(), ctx, thread_count, std::move(users), std::move(active), options{webhook_verify, enable_subaddresses, untrusted_daemon});
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);
if (!scanner::is_running())
sync_.io_.reset();
if (has_shutdown())
return;
if (!client)
client = MONERO_UNWRAP(ctx.connect());
expect<rpc::client> synced = sync(disk.clone(), std::move(client), untrusted_daemon);
expect<rpc::client> synced = sync(std::move(client), opts.untrusted_daemon);
if (!synced)
{
if (!synced.matches(std::errc::timed_out))