Switch from epee http server to boost::beast http server. Min boost 1.70 (#136)

There is roughly a 7.4% increase in performance in the switch to
boost::beast. Additionally, the REST endpoints `/daemon_status`,
`/get_unspent_outs`, and `/submit_raw_tx` do not block in ZMQ calls,
allowing for better response times regardless of `monerod` status.

The REST endpoints `/login and `/get_random_outs` still need updates
to prevent blocking (`/login` is conditional on DB state).
This commit is contained in:
Lee *!* Clagett
2024-10-23 11:31:03 -04:00
committed by Lee *!* Clagett
parent 8080159fc8
commit 075dc5d7c2
15 changed files with 1525 additions and 411 deletions

View File

@@ -39,6 +39,7 @@
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "net/zmq_async.h"
#include "scanner.h"
#include "serialization/json_object.h" // monero/src
#include "wire/msgpack.h"
@@ -140,7 +141,7 @@ namespace rpc
break;
const int err = zmq_errno();
if (err != EINTR)
return net::zmq::make_error_code(err);
return ::net::zmq::make_error_code(err);
}
if (items[0].revents)
return success();
@@ -186,7 +187,7 @@ namespace rpc
{
struct context
{
explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon)
explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon)
: comm(std::move(comm))
, signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub))
@@ -207,8 +208,8 @@ namespace rpc
}
zcontext comm;
socket signal_pub;
socket external_pub;
net::zmq::socket signal_pub;
net::zmq::socket external_pub;
rcontext rmq;
const std::string daemon_addr;
const std::string sub_addr;
@@ -223,19 +224,15 @@ namespace rpc
};
} // detail
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
expect<void> parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc)
{
expect<std::string> message = get_message(timeout);
if (!message)
return message.error();
try
{
cryptonote::rpc::FullMessage fm{std::move(*message)};
cryptonote::rpc::FullMessage fm{std::move(msg)};
const cryptonote::rpc::error json_error = fm.getError();
if (!json_error.use)
{
response.fromJson(fm.getMessage());
parser.fromJson(fm.getMessage());
return success();
}
@@ -249,6 +246,30 @@ namespace rpc
return {lws::error::bad_daemon_response};
}
expect<net::zmq::socket> client::make_daemon(const std::shared_ptr<detail::context>& ctx) noexcept
{
assert(ctx != nullptr);
net::zmq::socket daemon{zmq_socket(ctx->comm.get(), ZMQ_REQ)};
if (daemon.get() == nullptr)
return net::zmq::get_error_code();
MONERO_CHECK(do_set_option(daemon.get(), ZMQ_LINGER, daemon_zmq_linger));
if (ctx->untrusted_daemon)
MONERO_CHECK(do_set_option(daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req));
MONERO_ZMQ_CHECK(zmq_connect(daemon.get(), ctx->daemon_addr.c_str()));
return daemon;
}
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
{
expect<std::string> message = get_message(timeout);
if (!message)
return message.error();
return parse_response(response, std::move(*message), loc);
}
expect<std::string> client::get_message(std::chrono::seconds timeout)
{
MONERO_PRECOND(ctx != nullptr);
@@ -274,13 +295,10 @@ namespace rpc
client out{std::move(ctx)};
out.daemon.reset(zmq_socket(out.ctx->comm.get(), ZMQ_REQ));
if (out.daemon.get() == nullptr)
return net::zmq::get_error_code();
MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_LINGER, daemon_zmq_linger));
if (out.ctx->untrusted_daemon)
MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req));
MONERO_ZMQ_CHECK(zmq_connect(out.daemon.get(), out.ctx->daemon_addr.c_str()));
expect<net::zmq::socket> daemon = make_daemon(out.ctx);
if (!daemon)
return daemon.error();
out.daemon = std::move(*daemon);
if (!out.ctx->sub_addr.empty())
{
@@ -381,6 +399,16 @@ namespace rpc
return {lws::error::bad_daemon_response};
}
expect<net::zmq::async_client> client::make_async_client(boost::asio::io_service& io) const
{
MONERO_PRECOND(ctx != nullptr);
expect<net::zmq::socket> daemon = make_daemon(ctx);
if (!daemon)
return daemon.error();
return net::zmq::async_client::make(io, std::move(*daemon));
}
expect<void> client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept
{
MONERO_PRECOND(ctx != nullptr);
@@ -399,7 +427,7 @@ namespace rpc
return success();
}
expect<void> client::publish(epee::byte_slice payload)
expect<void> client::publish(epee::byte_slice payload) const
{
MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr);
@@ -451,16 +479,16 @@ namespace rpc
if (comm == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_init");
detail::socket pub{zmq_socket(comm.get(), ZMQ_PUB)};
net::zmq::socket pub{zmq_socket(comm.get(), ZMQ_PUB)};
if (pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(pub.get(), signal_endpoint) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
detail::socket external_pub = nullptr;
net::zmq::socket external_pub = nullptr;
if (!pub_addr.empty())
{
external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)};
external_pub = net::zmq::socket{zmq_socket(comm.get(), ZMQ_PUB)};
if (external_pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)

View File

@@ -26,6 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/optional/optional.hpp>
#include <chrono>
#include <memory>
@@ -36,27 +37,19 @@
#include "byte_slice.h" // monero/contrib/epee/include
#include "db/fwd.h"
#include "common/expect.h" // monero/src
#include "net/zmq.h" // monero/src
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
#include "rpc/rates.h"
#include "util/source_location.h"
namespace net { namespace zmq { struct async_client; }}
namespace lws
{
namespace rpc
{
namespace detail
{
struct close
{
void operator()(void* ptr) const noexcept
{
if (ptr)
zmq_close(ptr);
}
};
using socket = std::unique_ptr<void, close>;
struct context;
}
@@ -68,18 +61,23 @@ namespace rpc
std::string routing;
};
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
expect<void> parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc = {});
//! Abstraction for ZMQ RPC client. All `const` and `static` methods are thread-safe.
class client
{
std::shared_ptr<detail::context> ctx;
detail::socket daemon;
detail::socket daemon_sub;
detail::socket signal_sub;
net::zmq::socket daemon;
net::zmq::socket daemon_sub;
net::zmq::socket signal_sub;
explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
{}
//! \return Connection to daemon REQ/REP.
static expect<net::zmq::socket> make_daemon(const std::shared_ptr<detail::context>& ctx) noexcept;
//! Expect `response` as the next message payload unless error.
expect<void> get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc);
@@ -140,6 +138,9 @@ namespace rpc
return cryptonote::rpc::FullMessage::getRequest(name, message, 0);
}
//! \return `async_client` to daemon. Thread safe.
expect<net::zmq::async_client> make_async_client(boost::asio::io_service& io) const;
/*!
Queue `message` for sending to daemon. If the queue is full, wait a
maximum of `timeout` seconds or until `context::raise_abort_scan` or
@@ -148,11 +149,11 @@ namespace rpc
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
//! Publish `payload` to ZMQ external pub socket.
expect<void> publish(epee::byte_slice payload);
expect<void> publish(epee::byte_slice payload) const;
//! Publish `data` after `topic` to ZMQ external pub socket.
template<typename F, typename T>
expect<void> publish(const boost::string_ref topic, const T& data)
expect<void> publish(const boost::string_ref topic, const T& data) const
{
epee::byte_stream bytes{};
bytes.write(topic.data(), topic.size());
@@ -174,15 +175,8 @@ namespace rpc
return response;
}
//! Retrieve new accounts to be scanned on this thread.
expect<std::vector<lws::account>> pull_accounts();
/*!
\note This is the one function that IS thread-safe. Multiple threads can
call this function with the same `this` argument.
\return Recent exchange rates.
*/
/*! Never blocks for I/O - that is performed on another thread.
\return Recent exchange rates. */
expect<rates> get_rates() const;
};

View File

@@ -34,6 +34,7 @@
#include <stdexcept>
#include <type_traits>
#include "config.h"
#include "db/string.h"
#include "error.h"
#include "time_helper.h" // monero/contrib/epee/include
@@ -198,6 +199,14 @@ namespace lws
namespace rpc
{
daemon_status_response::daemon_status_response()
: outgoing_connections_count(0),
incoming_connections_count(0),
height(0),
network(lws::rpc::network_type(lws::config::network)),
state(daemon_state::unavailable)
{}
namespace
{
constexpr const char* map_daemon_state[] = {"ok", "no_connections", "synchronizing", "unavailable"};

View File

@@ -94,7 +94,9 @@ namespace rpc
struct daemon_status_response
{
daemon_status_response() = delete;
//! Defaults to current network in unavailable state
daemon_status_response();
std::uint64_t outgoing_connections_count;
std::uint64_t incoming_connections_count;
std::uint64_t height;

View File

@@ -138,7 +138,7 @@ namespace lws { namespace rpc
}
template<typename T>
void zmq_send(rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic)
void zmq_send(const rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic)
{
// Each `T` should have a unique count. This is desired.
struct zmq_order
@@ -174,7 +174,7 @@ namespace lws { namespace rpc
template<typename T>
void send_webhook(
rpc::client& client,
const rpc::client& client,
const epee::span<const T> events,
const boost::string_ref json_topic,
const boost::string_ref msgpack_topic,