Add ZMQ-PUB support for webhooks (#75)

This commit is contained in:
Lee *!* Clagett
2023-07-30 13:27:22 -04:00
committed by Lee *!* Clagett
parent d59fed6da2
commit 15e2be618a
11 changed files with 238 additions and 22 deletions

View File

@@ -135,15 +135,17 @@ namespace rpc
{
struct context
{
explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
explicit context(zcontext comm, socket signal_pub, socket external_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
: comm(std::move(comm))
, signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub))
, daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr))
, rates_conn()
, cache_time()
, cache_interval(interval)
, cached{}
, sync_pub()
, sync_rates()
{
if (std::chrono::minutes{0} < cache_interval)
@@ -152,12 +154,14 @@ namespace rpc
zcontext comm;
socket signal_pub;
socket external_pub;
const std::string daemon_addr;
const std::string sub_addr;
http::http_simple_client rates_conn;
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
rates cached;
boost::mutex sync_pub;
boost::mutex sync_rates;
};
} // detail
@@ -243,6 +247,11 @@ namespace rpc
client::~client() noexcept
{}
bool client::has_publish() const noexcept
{
return ctx && ctx->external_pub;
}
expect<void> client::watch_scan_signals() noexcept
{
MONERO_PRECOND(ctx != nullptr);
@@ -330,6 +339,17 @@ namespace rpc
return success();
}
expect<void> client::publish(epee::byte_slice payload)
{
MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr);
if (ctx->external_pub == nullptr)
return success();
const boost::unique_lock<boost::mutex> guard{ctx->sync_pub};
return net::zmq::send(std::move(payload), ctx->external_pub.get(), 0);
}
expect<rates> client::get_rates() const
{
MONERO_PRECOND(ctx != nullptr);
@@ -343,7 +363,7 @@ namespace rpc
return ctx->cached;
}
context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval)
context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval)
{
zcontext comm{zmq_init(1)};
if (comm == nullptr)
@@ -355,9 +375,19 @@ namespace rpc
if (zmq_bind(pub.get(), signal_endpoint) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
detail::socket external_pub = nullptr;
if (!pub_addr.empty())
{
external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)};
if (external_pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
}
return context{
std::make_shared<detail::context>(
std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
std::move(comm), std::move(pub), std::move(external_pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
)
};
}

View File

@@ -38,6 +38,7 @@
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
#include "rpc/rates.h"
#include "span.h" // monero/contrib/epee/include
#include "util/source_location.h"
namespace lws
@@ -112,6 +113,9 @@ namespace rpc
return ctx != nullptr;
}
//! \return True if an external pub/sub was setup
bool has_publish() const noexcept;
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
@@ -132,6 +136,21 @@ namespace rpc
*/
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
//! Publish `payload` to ZMQ external pub socket.
expect<void> publish(epee::byte_slice payload);
//! Publish `data` after `topic` to ZMQ external pub socket.
template<typename F, typename T>
expect<void> publish(const boost::string_ref topic, const T& data)
{
epee::byte_stream bytes{};
bytes.write(topic.data(), topic.size());
const std::error_code err = F::to_bytes(bytes, data);
if (err)
return err;
return publish(epee::byte_slice{std::move(bytes)});
}
//! \return Next available RPC message response from server
expect<std::string> get_message(std::chrono::seconds timeout);
@@ -171,10 +190,11 @@ namespace rpc
\note All errors are exceptions; no recovery can occur.
\param daemon_addr Location of ZMQ enabled `monerod` RPC.
\param pub_addr Bind location for publishing ZMQ events.
\param rates_interval Frequency to retrieve exchange rates. Set value to
`<= 0` to disable exchange rate retrieval.
*/
static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval);
static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval);
context(context&&) = default;
context(context const&) = delete;