Reconnect RMQ on publish failure (#181)

This commit is contained in:
Lee *!* Clagett
2025-09-10 11:28:33 -04:00
committed by Lee *!* Clagett
parent 48060d1111
commit 8955547536

View File

@@ -112,17 +112,89 @@ namespace rpc
{
using connection = std::unique_ptr<amqp_connection_state_t_, rdestroy>;
explicit rcontext(rmq_details&& info)
: conn(), info(std::move(info))
{}
bool is_available() const noexcept { return conn != nullptr; }
bool connect()
{
if (info.address.empty())
return true;
epee::net_utils::http::url_content url{};
if (!epee::net_utils::parse_url(info.address, url))
MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ");
if (url.port == 0)
MONERO_THROW(error::configuration, "No port specified for RMQ");
if (url.uri.empty())
url.uri = "/";
std::string user;
std::string pass;
boost::regex expression{"(\\w+):(\\w+)"};
boost::smatch matcher;
if (boost::regex_search(info.credentials, matcher, expression))
{
user = matcher[1];
pass = matcher[2];
}
conn.reset(amqp_new_connection());
if (!conn)
MONERO_THROW(error::rmq_failure, "Failed to create new RMQ connection");
const auto socket = amqp_tcp_socket_new(conn.get());
if (!socket)
MONERO_THROW(error::rmq_failure, "Unable to create RMQ socket");
int status = amqp_socket_open(socket, url.host.c_str(), url.port);
if (status != 0)
{
MERROR("Unable to open RMQ socket: " << status);
conn.reset();
return false;
}
if (!user.empty() || !pass.empty())
{
if (amqp_login(conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
{
MERROR("Failure to login RMQ socket");
conn.reset();
return false;
}
}
if (amqp_channel_open(conn.get(), rmq_channel) == nullptr)
{
MERROR("Unable to open RMQ channel");
conn.reset();
return false;
}
if (amqp_get_rpc_reply(conn.get()).reply_type != AMQP_RESPONSE_NORMAL)
{
MERROR("Failed receiving channel open reply");
conn.reset();
return false;
}
MINFO("Connected to RMQ server " << url.host << ":" << url.port);
return true;
}
connection conn;
std::string exchange;
std::string routing;
const rmq_details info;
};
#else // !MLWS_RMQ_ENABLED
struct rcontext
{
constexpr explicit rcontext(const rmq_details&)
{}
static constexpr bool is_available() noexcept { return false; }
static constexpr bool connect() noexcept { return true; }
};
#endif
@@ -196,11 +268,11 @@ namespace rpc
{
struct context
{
explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon)
explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq_src, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon)
: comm(std::move(comm))
, signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub))
, rmq(std::move(rmq))
, rmq(std::move(rmq_src))
, daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr))
, rates_conn(epee::net_utils::ssl_verification_t::system_ca)
@@ -213,6 +285,8 @@ namespace rpc
, rates_running()
{
rates_running.clear();
if (!rmq.connect())
MONERO_THROW(error::configuration, "RMQ misconfigured");
}
zcontext comm;
@@ -448,6 +522,7 @@ namespace rpc
rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0);
#ifdef MLWS_RMQ_ENABLED
if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{payment_topic_json()}))
{
const auto topic = reinterpret_cast<const std::uint8_t*>(std::memchr(payload.data(), ':', payload.size()));
@@ -457,12 +532,27 @@ namespace rpc
amqp_bytes_t message{};
message.len = payload.size();
message.bytes = const_cast<std::uint8_t*>(payload.data());
const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr, message);
if (rmq_rc != 0)
int rmq_rc = 0;
unsigned tries = 0;
for (; tries < 2; ++tries)
{
MERROR("Failed RMQ Publish with return code: " << rmq_rc);
return {error::rmq_failure};
rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.info.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.info.routing.c_str()), 0, 0, nullptr, message);
if (rmq_rc == 0)
break;
if (tries == 0)
{
MWARNING("Failed RMQ Publish with return code: " << rmq_rc << ". Retrying.");
if (!ctx->rmq.connect())
break;
}
else
MERROR("Failed RMQ Publish with return code: " << rmq_rc << ". Dropping.");
}
if (rmq_rc)
return {error::rmq_failure};
}
#endif
return rc;
@@ -502,64 +592,15 @@ namespace rpc
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
}
rcontext rmq{};
#ifdef MLWS_RMQ_ENABLED
if (!rmq_info.address.empty())
{
rmq.exchange = std::move(rmq_info.exchange);
rmq.routing = std::move(rmq_info.routing);
epee::net_utils::http::url_content url{};
if (!epee::net_utils::parse_url(rmq_info.address, url))
MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ");
if (url.port == 0)
MONERO_THROW(error::configuration, "No port specified for RMQ");
if (url.uri.empty())
url.uri = "/";
std::string user;
std::string pass;
boost::regex expression{"(\\w+):(\\w+)"};
boost::smatch matcher;
if (boost::regex_search(rmq_info.credentials, matcher, expression))
{
user = matcher[1];
pass = matcher[2];
}
rmq.conn.reset(amqp_new_connection());
const auto socket = amqp_tcp_socket_new(rmq.conn.get());
if (!socket)
MONERO_THROW(error::configuration, "Unable to create RMQ socket");
int status = amqp_socket_open(socket, url.host.c_str(), url.port);
if (status != 0)
{
MERROR("Unable to open RMQ socket: " << status);
MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket");
}
if (!user.empty() || !pass.empty())
{
if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket");
}
if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr)
MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel");
if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL)
MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply");
MINFO("Connected to RMQ server " << url.host << ":" << url.port);
}
#else // !MLWS_RMQ_ENABLED
#ifndef MLWS_RMQ_ENABLED
if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty())
MONERO_THROW(error::configuration, "RabbitMQ support not enabled");
#endif
return context{
std::make_shared<detail::context>(
std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval, untrusted_daemon
std::move(comm), std::move(pub), std::move(external_pub), rcontext{std::move(rmq_info)}, std::move(daemon_addr), std::move(sub_addr), rates_interval, untrusted_daemon
)
};
}