New accounts are 'pushed' to worker threads (#102)

This commit is contained in:
Lee *!* Clagett
2024-04-08 12:58:43 -04:00
committed by Lee *!* Clagett
parent f300bff69f
commit 80604e8133
21 changed files with 743 additions and 50 deletions

View File

@@ -36,7 +36,7 @@
#include "error.h"
#include "span.h" // monero/contrib/epee/include
#include "wire.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/json/write.h"
#include "wire/traits.h"

View File

@@ -34,11 +34,14 @@
#include <system_error>
#include "common/error.h" // monero/contrib/epee/include
#include "db/account.h"
#include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "scanner.h"
#include "serialization/json_object.h" // monero/src
#include "wire/msgpack.h"
#if MLWS_RMQ_ENABLED
#include <amqp.h>
#include <amqp_tcp_socket.h>
@@ -53,11 +56,13 @@ namespace rpc
namespace
{
constexpr const char signal_endpoint[] = "inproc://signal";
constexpr const char account_endpoint[] = "inproc://account"; // append integer every new `account_push`
constexpr const char abort_scan_signal[] = "SCAN";
constexpr const char abort_process_signal[] = "PROCESS";
constexpr const char minimal_chain_topic[] = "json-minimal-chain_main";
constexpr const char full_txpool_topic[] = "json-full-txpool_add";
constexpr const int daemon_zmq_linger = 0;
constexpr const int account_zmq_linger = 0;
constexpr const std::int64_t max_msg_sub = 10 * 1024 * 1024; // 50 MiB
constexpr const std::int64_t max_msg_req = 350 * 1024 * 1024; // 350 MiB
constexpr const std::chrono::seconds chain_poll_timeout{20};
@@ -192,6 +197,7 @@ namespace rpc
, cache_time()
, cache_interval(interval)
, cached{}
, account_counter(0)
, sync_pub()
, sync_rates()
, untrusted_daemon(untrusted_daemon)
@@ -210,12 +216,70 @@ namespace rpc
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
rates cached;
std::atomic<unsigned> account_counter;
boost::mutex sync_pub;
boost::mutex sync_rates;
const bool untrusted_daemon;
};
} // detail
expect<account_push> account_push::make(std::shared_ptr<detail::context> ctx) noexcept
{
MONERO_PRECOND(ctx != nullptr);
account_push out{ctx};
out.sock.reset(zmq_socket(ctx->comm.get(), ZMQ_PUSH));
if (out.sock == nullptr)
return {net::zmq::get_error_code()};
const std::string bind = account_endpoint + std::to_string(++ctx->account_counter);
MONERO_CHECK(do_set_option(out.sock.get(), ZMQ_LINGER, account_zmq_linger));
MONERO_ZMQ_CHECK(zmq_bind(out.sock.get(), bind.c_str()));
return {std::move(out)};
}
account_push::~account_push() noexcept
{}
expect<void> account_push::push(epee::span<const lws::account> accounts, std::chrono::seconds timeout)
{
MONERO_PRECOND(ctx != nullptr);
assert(sock.get() != nullptr);
for (const lws::account& account : accounts)
{
// use integer id values (quick and fast)
wire::msgpack_slice_writer dest{true};
try
{
wire_write::bytes(dest, account);
}
catch (const wire::exception& e)
{
return {e.code()};
}
epee::byte_slice message{dest.take_sink()};
/* This is being pushed by the thread that monitors for shutdown, so
no signal is expected. */
expect<void> sent;
const auto start = std::chrono::steady_clock::now();
while (!(sent = net::zmq::send(message.clone(), sock.get(), ZMQ_DONTWAIT)))
{
if (sent != net::zmq::make_error_code(EAGAIN))
return sent.error();
if (!scanner::is_running())
return {error::signal_abort_process};
const auto elapsed = std::chrono::steady_clock::now() - start;
if (timeout <= elapsed)
return {error::daemon_timeout};
boost::this_thread::sleep_for(boost::chrono::milliseconds{10});
}
}
return success();
}
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
{
expect<std::string> message = get_message(timeout);
@@ -312,6 +376,18 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<void> client::enable_pull_accounts()
{
detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)};
if (new_sock == nullptr)
return {net::zmq::get_error_code()};
const std::string connect =
account_endpoint + std::to_string(ctx->account_counter);
MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str()));
account_pull = std::move(new_sock);
return success();
}
expect<std::vector<std::pair<client::topic, std::string>>> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
@@ -425,6 +501,32 @@ namespace rpc
return rc;
}
expect<std::vector<lws::account>> client::pull_accounts()
{
MONERO_PRECOND(ctx != nullptr);
if (!account_pull)
MONERO_CHECK(enable_pull_accounts());
std::vector<lws::account> out{};
for (;;)
{
expect<std::string> next = net::zmq::receive(account_pull.get(), ZMQ_DONTWAIT);
if (!next)
{
if (net::zmq::make_error_code(EAGAIN))
break;
return next.error();
}
out.emplace_back();
const std::error_code error =
wire::msgpack::from_bytes(epee::byte_slice{std::move(*next)}, out.back());
if (error)
return error;
}
return {std::move(out)};
}
expect<rates> client::get_rates() const
{
MONERO_PRECOND(ctx != nullptr);
@@ -459,7 +561,7 @@ namespace rpc
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
}
rcontext rmq{};
#ifdef MLWS_RMQ_ENABLED
if (!rmq_info.address.empty())

View File

@@ -34,6 +34,7 @@
#include <zmq.h>
#include "byte_slice.h" // monero/contrib/epee/include
#include "db/fwd.h"
#include "common/expect.h" // monero/src
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
@@ -67,6 +68,31 @@ namespace rpc
std::string routing;
};
//! Every scanner "reset", a new socket is created so old messages are discarded
class account_push
{
std::shared_ptr<detail::context> ctx;
detail::socket sock;
explicit account_push(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), sock()
{}
public:
static expect<account_push> make(std::shared_ptr<detail::context> ctx) noexcept;
account_push(const account_push&) = delete;
account_push(account_push&&) = default;
~account_push() noexcept;
account_push& operator=(const account_push&) = delete;
account_push& operator=(account_push&&) = default;
//! Push new `accounts` to worker threads. Each account is sent in unique message
expect<void> push(epee::span<const lws::account> accounts, std::chrono::seconds timeout);
};
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
class client
{
@@ -74,9 +100,10 @@ namespace rpc
detail::socket daemon;
detail::socket daemon_sub;
detail::socket signal_sub;
detail::socket account_pull;
explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub(), account_pull()
{}
//! Expect `response` as the next message payload unless error.
@@ -129,6 +156,9 @@ namespace rpc
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
//! Register `this` client as listening for new accounts
expect<void> enable_pull_accounts();
//! Wait for new block announce or internal timeout.
expect<std::vector<std::pair<topic, std::string>>> wait_for_block();
@@ -173,6 +203,9 @@ namespace rpc
return response;
}
//! Retrieve new accounts to be scanned on this thread.
expect<std::vector<lws::account>> pull_accounts();
/*!
\note This is the one function that IS thread-safe. Multiple threads can
call this function with the same `this` argument.
@@ -232,6 +265,12 @@ namespace rpc
return client::make(ctx);
}
//! Create a new account push state
expect<account_push> bind_push() const noexcept
{
return account_push::make(ctx);
}
/*!
All block `client::send`, `client::receive`, and `client::wait` calls
originating from `this` object AND whose `watch_scan_signal` method was

View File

@@ -29,7 +29,7 @@
#include "cryptonote_basic/cryptonote_basic.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/field.h"
#include "wire/traits.h"

View File

@@ -31,7 +31,7 @@
#include "cryptonote_config.h" // monero/src
#include "crypto/crypto.h" // monero/src
#include "rpc/message_data_structs.h" // monero/src
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/json.h"
#include "wire/wrapper/array.h"
#include "wire/wrapper/variant.h"

View File

@@ -40,7 +40,7 @@
#include "ringct/rctOps.h" // monero/src
#include "span.h" // monero/contrib/epee/include
#include "util/random_outputs.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/json.h"
#include "wire/traits.h"

View File

@@ -31,7 +31,7 @@
#include "db/account.h"
#include "rpc/client.h"
#include "rpc/webhook.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/wrapper/array.h"
#include "wire/wrappers_impl.h"
#include "wire/write.h"