diff --git a/src/rpc/scanner/server.cpp b/src/rpc/scanner/server.cpp index 6672c65..9a955d0 100644 --- a/src/rpc/scanner/server.cpp +++ b/src/rpc/scanner/server.cpp @@ -147,6 +147,8 @@ namespace lws { namespace rpc { namespace scanner if (msg.users.empty()) return true; + MINFO("Client (" << self->remote_endpoint() << ") processed " + << msg.blocks.size() << " block(s) against " << msg.users.size() << " account(s)"); server::store(self->parent_, std::move(msg.users), std::move(msg.blocks)); return true; } diff --git a/src/scanner.cpp b/src/scanner.cpp index ab1abd4..024764e 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -584,7 +584,7 @@ namespace lws scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender); } - void do_scan_loop(scanner_sync& self, std::shared_ptr data, const bool leader_thread) noexcept + void do_scan_loop(scanner_sync& self, std::shared_ptr data, const size_t thread_n) noexcept { struct stop_ { @@ -614,7 +614,7 @@ namespace lws auto new_client = MONERO_UNWRAP(client.clone()); MONERO_UNWRAP(new_client.watch_scan_signals()); user_data store_local{disk.clone()}; - if (!scanner::loop(self, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread)) + if (!scanner::loop(self, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, thread_n)) return; } @@ -656,8 +656,10 @@ namespace lws scanner::~scanner() {} - bool scanner::loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread) + bool scanner::loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const size_t thread_n) { + const bool leader_thread = thread_n == 0; + if (users.empty()) return true; @@ -927,6 +929,9 @@ namespace lws } // for each block reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write + + MINFO("Thread " << thread_n << " processed " << blockchain.size() << " blocks(s) @ height " << fetched->start_height << " against " << users.size() << " account(s)"); + if (!store(self.io_, client, self.webhooks_, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow))) return false; @@ -1025,7 +1030,7 @@ namespace lws auto data = std::make_shared( MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts ); - threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0)); + threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i)); } users.clear(); @@ -1280,7 +1285,6 @@ namespace lws MONERO_THROW(updated.error(), "Failed to update accounts on disk"); } - MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)"); send_payment_hook(io, client, webhook, epee::to_span(updated->confirm_pubs)); send_spend_hook(io, client, webhook, epee::to_span(updated->spend_pubs)); if (updated->accounts_updated != users.size()) diff --git a/src/scanner.h b/src/scanner.h index 265733b..fd99614 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -117,7 +117,7 @@ namespace lws \throw std::exception on hard errors (shutdown) conditions \return True iff `queue` indicates thread now has zero accounts. False indicates a soft, typically recoverable error. */ - static bool loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread); + static bool loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const size_t thread_n); //! Use `client` to sync blockchain data, and \return client if successful. expect sync(rpc::client client, const bool untrusted_daemon = false, const bool regtest = false); diff --git a/src/server_main.cpp b/src/server_main.cpp index 2839d2c..49971af 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -299,6 +299,8 @@ namespace void run(program prog) { + auto sub_address = prog.daemon_sub; + boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval, prog.untrusted_daemon); @@ -307,6 +309,9 @@ namespace lws::scanner scanner{disk.clone(), prog.rest_config.webhook_verify}; MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); + if (!sub_address.empty()) + MINFO("Using monerod ZMQ sub at " << sub_address); + auto client = scanner.sync(ctx.connect().value(), prog.untrusted_daemon).value(); const auto enable_subaddresses = bool(prog.rest_config.max_subaddresses);