Switch from epee http client to boost::beast. All HTTP now non-blocking. (#150)

This commit is contained in:
Lee *!* Clagett
2024-12-04 17:25:07 -05:00
committed by Lee *!* Clagett
parent 66b7497a34
commit 29358f1323
22 changed files with 1095 additions and 257 deletions

View File

@@ -56,6 +56,7 @@
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "rpc/json.h"
@@ -75,18 +76,9 @@
namespace lws
{
// Not in `rates.h` - defaulting to JSON output seems odd
std::ostream& operator<<(std::ostream& out, lws::rates const& src)
{
wire::json_stream_writer dest{out};
lws::write_bytes(dest, src);
dest.finish();
return out;
}
namespace
{
namespace net = epee::net_utils;
namespace enet = epee::net_utils;
constexpr const std::chrono::minutes block_rpc_timeout{2};
constexpr const std::chrono::seconds send_timeout{30};
@@ -152,9 +144,9 @@ namespace lws
return true;
}
void send_payment_hook(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events, net::ssl_verification_t verify_mode)
void send_payment_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span<const db::webhook_tx_confirmation> events)
{
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
rpc::send_webhook_async(io, client, http, events, "json-full-payment_hook:", "msgpack-full-payment_hook:");
}
std::size_t get_target_time(db::block_id height)
@@ -194,9 +186,9 @@ namespace lws
vec.erase(vec.begin());
};
void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode)
void send_spend_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span<const db::webhook_tx_spend> events)
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
rpc::send_webhook_async(io, client, http, events, "json-full-spend_hook:", "msgpack-full-spend_hook:");
}
struct add_spend
@@ -219,7 +211,7 @@ namespace lws
{
db::storage const& disk_;
rpc::client& client_;
net::ssl_verification_t verify_mode_;
scanner_sync& http_;
std::unordered_map<crypto::hash, crypto::hash> txpool_;
bool operator()(expect<db::storage_reader>& reader, lws::account& user, const db::output& out)
@@ -290,7 +282,7 @@ namespace lws
else
events.pop_back(); //cannot compute tx_hash
}
send_payment_hook(client_, epee::to_span(events), verify_mode_);
send_payment_hook(http_.io_, client_, http_.webhooks_, epee::to_span(events));
return true;
}
};
@@ -567,7 +559,7 @@ namespace lws
scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{});
}
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const scanner_options& opts)
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, scanner_sync& self, rpc::client& client, const scanner_options& opts)
{
// uint64::max is for txpool
static const std::vector<std::uint64_t> fake_outs(
@@ -585,20 +577,11 @@ namespace lws
boost::numeric_cast<std::uint64_t>(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
subaddress_reader reader{std::optional<db::storage>{disk.clone()}, opts.enable_subaddresses};
send_webhook sender{disk, client, opts.webhook_verify};
send_webhook sender{disk, client, self};
for (const auto& tx : parsed->txes)
scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender);
}
void update_rates(rpc::context& ctx)
{
const expect<boost::optional<lws::rates>> new_rates = ctx.retrieve_rates();
if (!new_rates)
MERROR("Failed to retrieve exchange rates: " << new_rates.error().message());
else if (*new_rates)
MINFO("Updated exchange rates: " << *(*new_rates));
}
void do_scan_loop(scanner_sync& self, std::shared_ptr<thread_data> data, const bool leader_thread) noexcept
{
struct stop_
@@ -629,7 +612,7 @@ namespace lws
auto new_client = MONERO_UNWRAP(client.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
user_data store_local{disk.clone()};
if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
if (!scanner::loop(self, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
return;
}
@@ -657,8 +640,8 @@ namespace lws
}
} // anonymous
scanner::scanner(db::storage disk)
: disk_(std::move(disk)), sync_(), signals_(sync_.io_)
scanner::scanner(db::storage disk, epee::net_utils::ssl_verification_t webhook_verify)
: disk_(std::move(disk)), sync_(webhook_verify), signals_(sync_.io_)
{
signals_.add(SIGINT);
signals_.async_wait([this] (const boost::system::error_code& error, int)
@@ -671,7 +654,7 @@ namespace lws
scanner::~scanner()
{}
bool scanner::loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
bool scanner::loop(scanner_sync& self, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
{
if (users.empty())
return true;
@@ -698,7 +681,7 @@ namespace lws
if (opts.untrusted_daemon && disk)
last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id;
while (!stop)
while (!self.stop_)
{
blockchain.clear();
new_pow.clear();
@@ -796,7 +779,7 @@ namespace lws
{
if (!disk || message->first != rpc::client::topic::txpool)
break; // inner for loop
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts);
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, self, client, opts);
}
for ( ; message != new_pubs->end(); ++message)
@@ -885,7 +868,7 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (stop)
if (self.stop_)
return false;
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height)));
@@ -942,7 +925,7 @@ namespace lws
} // for each block
reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write
if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts))
if (!store(self.io_, client, self.webhooks_, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow)))
return false;
// TODO
@@ -1051,7 +1034,7 @@ namespace lws
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
self.webhooks_.ssl_context()
);
rpc::scanner::server::start_user_checking(server);
@@ -1274,7 +1257,7 @@ namespace lws
}
} // anonymous
bool user_data::store(db::storage& disk, rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
bool user_data::store(boost::asio::io_context& io, db::storage& disk, rpc::client& client, net::http::client& webhook, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow)
{
if (users.empty())
return true;
@@ -1293,8 +1276,8 @@ namespace lws
}
MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
send_payment_hook(io, client, webhook, epee::to_span(updated->confirm_pubs));
send_spend_hook(io, client, webhook, epee::to_span(updated->spend_pubs));
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
@@ -1309,9 +1292,9 @@ namespace lws
return true;
}
bool user_data::operator()(rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
bool user_data::operator()(boost::asio::io_context& io, rpc::client& client, net::http::client& webhook, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow)
{
return store(disk_, client, chain, users, pow, opts);
return store(io, disk_, client, webhook, chain, users, pow);
}
expect<rpc::client> scanner::sync(rpc::client client, const bool untrusted_daemon)
@@ -1335,26 +1318,25 @@ namespace lws
/*! \NOTE Be careful about references and lifetimes of the callbacks. The
ones below are safe because no `io_context::run()` call is after the
destruction of the references.
\NOTE That `ctx` will need a strand or lock if multiple
`io_context::run()` calls are used. */
destruction of the references. */
boost::asio::steady_timer rate_timer{sync_.io_};
class rate_updater
{
boost::asio::io_context& io_;
boost::asio::steady_timer& rate_timer_;
rpc::context& ctx_;
const std::chrono::minutes rate_interval_;
public:
explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
explicit rate_updater(boost::asio::io_context& io, boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: io_(io), rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
{}
void operator()(const boost::system::error_code& error = {}) const
{
update_rates(ctx_);
const expect<void> status = ctx_.retrieve_rates_async(io_);
if (!status)
MERROR("Unable to retrieve exchange rates: " << status.error());
rate_timer_.expires_from_now(rate_interval_);
rate_timer_.async_wait(*this);
}
@@ -1363,7 +1345,7 @@ namespace lws
};
{
rate_updater updater{rate_timer, ctx};
rate_updater updater{sync_.io_, rate_timer, ctx};
if (std::chrono::minutes{0} < updater.rate_interval())
updater();
}