diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 1f6bdbb..b830e72 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -112,17 +112,89 @@ namespace rpc { using connection = std::unique_ptr; + explicit rcontext(rmq_details&& info) + : conn(), info(std::move(info)) + {} + bool is_available() const noexcept { return conn != nullptr; } + bool connect() + { + if (info.address.empty()) + return true; + + epee::net_utils::http::url_content url{}; + if (!epee::net_utils::parse_url(info.address, url)) + MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ"); + if (url.port == 0) + MONERO_THROW(error::configuration, "No port specified for RMQ"); + if (url.uri.empty()) + url.uri = "/"; + + std::string user; + std::string pass; + boost::regex expression{"(\\w+):(\\w+)"}; + boost::smatch matcher; + if (boost::regex_search(info.credentials, matcher, expression)) + { + user = matcher[1]; + pass = matcher[2]; + } + + conn.reset(amqp_new_connection()); + if (!conn) + MONERO_THROW(error::rmq_failure, "Failed to create new RMQ connection"); + const auto socket = amqp_tcp_socket_new(conn.get()); + if (!socket) + MONERO_THROW(error::rmq_failure, "Unable to create RMQ socket"); + + int status = amqp_socket_open(socket, url.host.c_str(), url.port); + if (status != 0) + { + MERROR("Unable to open RMQ socket: " << status); + conn.reset(); + return false; + } + + if (!user.empty() || !pass.empty()) + { + if (amqp_login(conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL) + { + MERROR("Failure to login RMQ socket"); + conn.reset(); + return false; + } + } + if (amqp_channel_open(conn.get(), rmq_channel) == nullptr) + { + MERROR("Unable to open RMQ channel"); + conn.reset(); + return false; + } + + if (amqp_get_rpc_reply(conn.get()).reply_type != AMQP_RESPONSE_NORMAL) + { + MERROR("Failed receiving channel open reply"); + conn.reset(); + return false; + } + + MINFO("Connected to RMQ server " << url.host << ":" << url.port); + return true; + } + connection conn; - std::string exchange; - std::string routing; + const rmq_details info; }; #else // !MLWS_RMQ_ENABLED struct rcontext { + constexpr explicit rcontext(const rmq_details&) + {} + static constexpr bool is_available() noexcept { return false; } + static constexpr bool connect() noexcept { return true; } }; #endif @@ -196,11 +268,11 @@ namespace rpc { struct context { - explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon) + explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq_src, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) , external_pub(std::move(external_pub)) - , rmq(std::move(rmq)) + , rmq(std::move(rmq_src)) , daemon_addr(std::move(daemon_addr)) , sub_addr(std::move(sub_addr)) , rates_conn(epee::net_utils::ssl_verification_t::system_ca) @@ -213,6 +285,8 @@ namespace rpc , rates_running() { rates_running.clear(); + if (!rmq.connect()) + MONERO_THROW(error::configuration, "RMQ misconfigured"); } zcontext comm; @@ -448,6 +522,7 @@ namespace rpc rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0); #ifdef MLWS_RMQ_ENABLED + if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{payment_topic_json()})) { const auto topic = reinterpret_cast(std::memchr(payload.data(), ':', payload.size())); @@ -457,12 +532,27 @@ namespace rpc amqp_bytes_t message{}; message.len = payload.size(); message.bytes = const_cast(payload.data()); - const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr, message); - if (rmq_rc != 0) + + int rmq_rc = 0; + unsigned tries = 0; + for (; tries < 2; ++tries) { - MERROR("Failed RMQ Publish with return code: " << rmq_rc); - return {error::rmq_failure}; + rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.info.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.info.routing.c_str()), 0, 0, nullptr, message); + if (rmq_rc == 0) + break; + + if (tries == 0) + { + MWARNING("Failed RMQ Publish with return code: " << rmq_rc << ". Retrying."); + if (!ctx->rmq.connect()) + break; + } + else + MERROR("Failed RMQ Publish with return code: " << rmq_rc << ". Dropping."); } + + if (rmq_rc) + return {error::rmq_failure}; } #endif return rc; @@ -502,64 +592,15 @@ namespace rpc if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); } - - rcontext rmq{}; -#ifdef MLWS_RMQ_ENABLED - if (!rmq_info.address.empty()) - { - rmq.exchange = std::move(rmq_info.exchange); - rmq.routing = std::move(rmq_info.routing); - epee::net_utils::http::url_content url{}; - if (!epee::net_utils::parse_url(rmq_info.address, url)) - MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ"); - if (url.port == 0) - MONERO_THROW(error::configuration, "No port specified for RMQ"); - if (url.uri.empty()) - url.uri = "/"; - std::string user; - std::string pass; - boost::regex expression{"(\\w+):(\\w+)"}; - boost::smatch matcher; - if (boost::regex_search(rmq_info.credentials, matcher, expression)) - { - user = matcher[1]; - pass = matcher[2]; - } - - rmq.conn.reset(amqp_new_connection()); - const auto socket = amqp_tcp_socket_new(rmq.conn.get()); - if (!socket) - MONERO_THROW(error::configuration, "Unable to create RMQ socket"); - - int status = amqp_socket_open(socket, url.host.c_str(), url.port); - if (status != 0) - { - MERROR("Unable to open RMQ socket: " << status); - MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket"); - } - - if (!user.empty() || !pass.empty()) - { - if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL) - MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket"); - } - if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr) - MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel"); - - if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL) - MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply"); - - MINFO("Connected to RMQ server " << url.host << ":" << url.port); - } -#else // !MLWS_RMQ_ENABLED +#ifndef MLWS_RMQ_ENABLED if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty()) MONERO_THROW(error::configuration, "RabbitMQ support not enabled"); #endif - + return context{ std::make_shared( - std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval, untrusted_daemon + std::move(comm), std::move(pub), std::move(external_pub), rcontext{std::move(rmq_info)}, std::move(daemon_addr), std::move(sub_addr), rates_interval, untrusted_daemon ) }; }