Webhooks for New Accounts (#79)

This commit is contained in:
Lee *!* Clagett
2023-08-23 16:07:37 -04:00
committed by Lee *!* Clagett
parent 524e26e1a4
commit aa171b77c3
13 changed files with 434 additions and 222 deletions

View File

@@ -29,6 +29,7 @@
#include <cstring>
#include <memory>
#include "db/string.h"
#include "wire.h"
#include "wire/crypto.h"
#include "wire/json/write.h"
@@ -215,7 +216,7 @@ namespace db
namespace
{
constexpr const char* map_webhook_type[] = {"tx-confirmation"};
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account"};
template<typename F, typename T>
void map_webhook_key(F& format, T& self)
@@ -292,6 +293,15 @@ namespace db
);
}
void write_bytes(wire::writer& dest, const webhook_new_account& self)
{
wire::object(dest,
wire::field<0>("event_id", std::cref(self.value.first.event_id)),
wire::field<1>("token", std::cref(self.value.second.token)),
wire::field<2>("address", address_string(self.account))
);
}
bool operator<(const webhook_dupsort& left, const webhook_dupsort& right) noexcept
{
return left.payment_id == right.payment_id ?

View File

@@ -252,7 +252,8 @@ namespace db
enum class webhook_type : std::uint8_t
{
tx_confirmation = 0,
tx_confirmation = 0, // cannot change values - stored in DB
new_account
// unconfirmed_tx,
// new_block
// confirmed_tx,
@@ -316,6 +317,14 @@ namespace db
};
void write_bytes(wire::json_writer&, const webhook_event&);
//! Returned by DB when a webhook event "tripped"
struct webhook_new_account
{
webhook_value value;
account_address account;
};
void write_bytes(wire::writer&, const webhook_new_account&);
bool operator==(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<=(transaction_link const& left, transaction_link const& right) noexcept;

View File

@@ -1635,14 +1635,14 @@ namespace db
});
}
expect<void> storage::creation_request(account_address const& address, crypto::secret_key const& key, account_flags flags) noexcept
expect<std::vector<webhook_new_account>> storage::creation_request(account_address const& address, crypto::secret_key const& key, account_flags flags) noexcept
{
MONERO_PRECOND(db != nullptr);
if (!db->create_queue_max)
return {lws::error::create_queue_max};
return db->try_write([this, &address, &key, flags] (MDB_txn& txn) -> expect<void>
return db->try_write([this, &address, &key, flags] (MDB_txn& txn) -> expect<std::vector<webhook_new_account>>
{
const expect<db::account_time> current_time = get_account_time();
if (!current_time)
@@ -1651,10 +1651,12 @@ namespace db
cursor::accounts_by_address accounts_ba_cur;
cursor::blocks blocks_cur;
cursor::accounts requests_cur;
cursor::webhooks webhooks_cur;
MONERO_CHECK(check_cursor(txn, this->db->tables.accounts_ba, accounts_ba_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.blocks, blocks_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.requests, requests_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
MDB_val keyv = lmdb::to_val(by_address_version);
MDB_val value = lmdb::to_val(address);
@@ -1709,7 +1711,24 @@ namespace db
if (err)
return {lmdb::error(err)};
return success();
std::vector<webhook_new_account> hooks{};
webhook_key wkey{account_id::invalid, webhook_type::new_account};
keyv = lmdb::to_val(wkey);
err = mdb_cursor_get(webhooks_cur.get(), &keyv, &value, MDB_SET_KEY);
for (;;)
{
if (err)
{
if (err == MDB_NOTFOUND)
break;
return {lmdb::error(err)};
}
hooks.push_back(webhook_new_account{MONERO_UNWRAP(webhooks.get_value(value)), address});
err = mdb_cursor_get(webhooks_cur.get(), &keyv, &value, MDB_NEXT_DUP);
}
return hooks;
});
}
@@ -2190,7 +2209,7 @@ namespace db
});
}
expect<void> storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event)
expect<void> storage::add_webhook(const webhook_type type, const boost::optional<account_address>& address, const webhook_value& event)
{
if (event.second.url != "zmq")
{
@@ -2210,10 +2229,13 @@ namespace db
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
webhook_key key{account_id::invalid, type};
MDB_val lmkey = lmdb::to_val(by_address_version);
MDB_val lmvalue = lmdb::to_val(address);
MDB_val lmkey{};
MDB_val lmvalue{};
if (address)
{
lmkey = lmdb::to_val(by_address_version);
lmvalue = lmdb::to_val(*address);
const int err = mdb_cursor_get(accounts_ba_cur.get(), &lmkey, &lmvalue, MDB_GET_BOTH);
if (err && err != MDB_NOTFOUND)
return {lmdb::error(err)};

View File

@@ -212,7 +212,7 @@ namespace db
rescan(block_id height, epee::span<const account_address> addresses);
//! Add an account for later approval. For use with the login endpoint.
expect<void> creation_request(account_address const& address, crypto::secret_key const& key, account_flags flags) noexcept;
expect<std::vector<webhook_new_account>> creation_request(account_address const& address, crypto::secret_key const& key, account_flags flags) noexcept;
/*!
Request lock height of an existing account. No effect if the `start_height`
@@ -249,12 +249,12 @@ namespace db
\param type The webhook event type to be tracked by the DB.
\param address is required for `type == tx_confirmation`, and is not
not needed for all other types (use default construction of zeroes).
not needed for all other types.
\param event Additional information for the webhook. A valid "http"
or "https" URL must be provided (or else error). All other information
is optional.
*/
expect<void> add_webhook(webhook_type type, const account_address& address, const webhook_value& event);
expect<void> add_webhook(webhook_type type, const boost::optional<account_address>& address, const webhook_value& event);
/*! Delete all webhooks associated with every value in `addresses`. This is
likely only valid for `tx_confirmation` event types. */

View File

@@ -33,21 +33,23 @@
#include <string>
#include <utility>
#include "common/error.h" // monero/src
#include "common/expect.h" // monero/src
#include "crypto/crypto.h" // monero/src
#include "cryptonote_config.h" // monero/src
#include "common/error.h" // monero/src
#include "common/expect.h" // monero/src
#include "crypto/crypto.h" // monero/src
#include "cryptonote_config.h" // monero/src
#include "db/data.h"
#include "db/storage.h"
#include "error.h"
#include "lmdb/util.h" // monero/src
#include "net/http_base.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "lmdb/util.h" // monero/src
#include "net/http_base.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/admin.h"
#include "rpc/client.h"
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/light_wallet.h"
#include "rpc/rates.h"
#include "rpc/webhook.h"
#include "util/http_server.h"
#include "util/gamma_picker.h"
#include "util/random_outputs.h"
@@ -134,17 +136,20 @@ namespace lws
return {std::make_pair(user->second, std::move(*reader))};
}
namespace
std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT;
struct runtime_options
{
std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT;
}
epee::net_utils::ssl_verification_t webhook_verify;
bool disable_admin_auth;
};
struct get_address_info
{
using request = rpc::account_credentials;
using response = rpc::get_address_info_response;
static expect<response> handle(const request& req, db::storage disk, rpc::client const& client)
static expect<response> handle(const request& req, db::storage disk, rpc::client const& client, runtime_options const&)
{
auto user = open_account(req, std::move(disk));
if (!user)
@@ -217,7 +222,7 @@ namespace lws
using request = rpc::account_credentials;
using response = rpc::get_address_txs_response;
static expect<response> handle(const request& req, db::storage disk, rpc::client const&)
static expect<response> handle(const request& req, db::storage disk, rpc::client const&, runtime_options const&)
{
auto user = open_account(req, std::move(disk));
if (!user)
@@ -340,7 +345,7 @@ namespace lws
using request = rpc::get_random_outs_request;
using response = rpc::get_random_outs_response;
static expect<response> handle(request req, const db::storage&, rpc::client const& gclient)
static expect<response> handle(request req, const db::storage&, rpc::client const& gclient, runtime_options const&)
{
using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
using histogram_rpc = cryptonote::rpc::GetOutputHistogram;
@@ -482,7 +487,7 @@ namespace lws
using request = rpc::get_unspent_outs_request;
using response = rpc::get_unspent_outs_response;
static expect<response> handle(request req, db::storage disk, rpc::client const& gclient)
static expect<response> handle(request req, db::storage disk, rpc::client const& gclient, runtime_options const&)
{
using rpc_command = cryptonote::rpc::GetFeeEstimate;
@@ -554,7 +559,7 @@ namespace lws
using request = rpc::account_credentials;
using response = rpc::import_response;
static expect<response> handle(request req, db::storage disk, rpc::client const&)
static expect<response> handle(request req, db::storage disk, rpc::client const&, runtime_options const&)
{
bool new_request = false;
bool fulfilled = false;
@@ -594,7 +599,7 @@ namespace lws
using request = rpc::login_request;
using response = rpc::login_response;
static expect<response> handle(request req, db::storage disk, rpc::client const&)
static expect<response> handle(request req, db::storage disk, rpc::client const& gclient, runtime_options const& options)
{
if (!key_check(req.creds))
return {lws::error::bad_view_key};
@@ -620,7 +625,19 @@ namespace lws
}
const auto flags = req.generated_locally ? db::account_generated_locally : db::default_account;
MONERO_CHECK(disk.creation_request(req.creds.address, req.creds.key, flags));
const auto hooks = disk.creation_request(req.creds.address, req.creds.key, flags);
if (!hooks)
return hooks.error();
if (!hooks->empty())
{
expect<rpc::client> client = gclient.clone();
if (!client)
return client.error();
rpc::send_webhook(
*client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify
);
}
return response{true, req.generated_locally};
}
};
@@ -630,7 +647,7 @@ namespace lws
using request = rpc::submit_raw_tx_request;
using response = rpc::submit_raw_tx_response;
static expect<response> handle(request req, const db::storage& disk, const rpc::client& gclient)
static expect<response> handle(request req, const db::storage& disk, const rpc::client& gclient, const runtime_options&)
{
using transaction_rpc = cryptonote::rpc::SendRawTxHex;
@@ -656,7 +673,7 @@ namespace lws
};
template<typename E>
expect<epee::byte_slice> call(std::string&& root, db::storage disk, const rpc::client& gclient, const bool)
expect<epee::byte_slice> call(std::string&& root, db::storage disk, const rpc::client& gclient, const runtime_options& options)
{
using request = typename E::request;
using response = typename E::response;
@@ -666,7 +683,7 @@ namespace lws
if (error)
return error;
expect<response> resp = E::handle(std::move(req), std::move(disk), gclient);
expect<response> resp = E::handle(std::move(req), std::move(disk), gclient, options);
if (!resp)
return resp.error();
@@ -695,7 +712,7 @@ namespace lws
}
template<typename E>
expect<epee::byte_slice> call_admin(std::string&& root, db::storage disk, const rpc::client&, const bool disable_auth)
expect<epee::byte_slice> call_admin(std::string&& root, db::storage disk, const rpc::client&, const runtime_options& options)
{
using request = typename E::request;
@@ -706,7 +723,7 @@ namespace lws
return error;
}
if (!disable_auth)
if (!options.disable_admin_auth)
{
if (!req.auth)
return {error::account_not_found};
@@ -735,7 +752,7 @@ namespace lws
struct endpoint
{
char const* const name;
expect<epee::byte_slice> (*const run)(std::string&&, db::storage, rpc::client const&, bool);
expect<epee::byte_slice> (*const run)(std::string&&, db::storage, rpc::client const&, const runtime_options&);
const unsigned max_size;
};
@@ -796,15 +813,15 @@ namespace lws
rpc::client client;
boost::optional<std::string> prefix;
boost::optional<std::string> admin_prefix;
bool disable_auth;
runtime_options options;
explicit internal(boost::asio::io_service& io_service, lws::db::storage disk, rpc::client client, const bool disable_auth)
explicit internal(boost::asio::io_service& io_service, lws::db::storage disk, rpc::client client, runtime_options options)
: lws::http_server_impl_base<rest_server::internal, context>(io_service)
, disk(std::move(disk))
, client(std::move(client))
, prefix()
, admin_prefix()
, disable_auth(disable_auth)
, options(std::move(options))
{
assert(std::is_sorted(std::begin(endpoints), std::end(endpoints), by_name));
}
@@ -870,7 +887,7 @@ namespace lws
}
// \TODO remove copy of json string here :/
auto body = handler->run(std::string{query.m_body}, disk.clone(), client, disable_auth);
auto body = handler->run(std::string{query.m_body}, disk.clone(), client, options);
if (!body)
{
MINFO(body.error().message() << " from " << ctx.m_remote_address.str() << " on " << handler->name);
@@ -999,15 +1016,16 @@ namespace lws
};
bool any_ssl = false;
const runtime_options options{config.webhook_verify, config.disable_admin_auth};
for (const std::string& address : addresses)
{
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), config.disable_admin_auth);
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options);
any_ssl |= init_port(ports_.back(), address, config, false);
}
for (const std::string& address : admin)
{
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), config.disable_admin_auth);
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options);
any_ssl |= init_port(ports_.back(), address, config, true);
}

View File

@@ -53,6 +53,7 @@ namespace lws
epee::net_utils::ssl_authentication_t auth;
std::vector<std::string> access_controls;
std::size_t threads;
epee::net_utils::ssl_verification_t webhook_verify;
bool allow_external;
bool disable_admin_auth;
};

View File

@@ -239,26 +239,35 @@ namespace lws { namespace rpc
expect<void> webhook_add_::operator()(wire::writer& dest, db::storage disk, request&& req) const
{
if (req.address)
switch (req.type)
{
std::uint64_t payment_id = 0;
static_assert(sizeof(payment_id) == sizeof(crypto::hash8), "invalid memcpy");
if (req.payment_id)
std::memcpy(std::addressof(payment_id), std::addressof(*req.payment_id), sizeof(payment_id));
db::webhook_value event{
db::webhook_dupsort{payment_id, boost::uuids::random_generator{}()},
db::webhook_data{
std::move(req.url),
std::move(req.token).value_or(std::string{}),
req.confirmations.value_or(1)
}
};
MONERO_CHECK(disk.add_webhook(req.type, *req.address, event));
write_bytes(dest, event);
case db::webhook_type::tx_confirmation:
if (!req.address)
return {error::bad_webhook};
break;
case db::webhook_type::new_account:
if (req.address)
return {error::bad_webhook};
break;
default:
return {error::bad_webhook};
}
else if (req.type == db::webhook_type::tx_confirmation)
return {error::bad_webhook};
std::uint64_t payment_id = 0;
static_assert(sizeof(payment_id) == sizeof(crypto::hash8), "invalid memcpy");
if (req.payment_id)
std::memcpy(std::addressof(payment_id), std::addressof(*req.payment_id), sizeof(payment_id));
db::webhook_value event{
db::webhook_dupsort{payment_id, boost::uuids::random_generator{}()},
db::webhook_data{
std::move(req.url),
std::move(req.token).value_or(std::string{}),
req.confirmations.value_or(1)
}
};
MONERO_CHECK(disk.add_webhook(req.type, req.address, event));
write_bytes(dest, event);
return success();
}

159
src/rpc/webhook.h Normal file
View File

@@ -0,0 +1,159 @@
#include <boost/thread/mutex.hpp>
#include <boost/utility/string_ref.hpp>
#include <chrono>
#include <string>
#include "byte_slice.h" // monero/contrib/epee/include
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "span.h"
#include "wire/json.h"
#include "wire/msgpack.h"
namespace lws { namespace rpc
{
namespace net = epee::net_utils;
template<typename T>
void http_send(net::http::http_simple_client& client, boost::string_ref uri, const T& event, const net::http::fields_list& params, const std::chrono::milliseconds timeout)
{
if (uri.empty())
uri = "/";
epee::byte_slice bytes{};
const std::string& url = event.value.second.url;
const std::error_code json_error = wire::json::to_bytes(bytes, event);
const net::http::http_response_info* info = nullptr;
if (json_error)
{
MERROR("Failed to generate webhook JSON: " << json_error.message());
return;
}
MINFO("Sending webhook to " << url);
if (!client.invoke(uri, "POST", std::string{bytes.begin(), bytes.end()}, timeout, std::addressof(info), params))
{
MERROR("Failed to invoke http request to " << url);
return;
}
if (!info)
{
MERROR("Failed to invoke http request to " << url << ", internal error (null response ptr)");
return;
}
if (info->m_response_code != 200)
{
MERROR("Failed to invoke http request to " << url << ", wrong response code: " << info->m_response_code);
return;
}
}
template<typename T>
void http_send(const epee::span<const T> events, const std::chrono::milliseconds timeout, net::ssl_verification_t verify_mode)
{
if (events.empty())
return;
net::http::url_content url{};
net::http::http_simple_client client{};
net::http::fields_list params;
params.emplace_back("Content-Type", "application/json; charset=utf-8");
for (const auto& event : events)
{
if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
{
MERROR("Bad URL for webhook event: " << event.value.second.url);
continue;
}
const bool https = (url.schema == "https");
if (!https && url.schema != "http")
{
MERROR("Only http or https connections: " << event.value.second.url);
continue;
}
const net::ssl_support_t ssl_mode = https ?
net::ssl_support_t::e_ssl_support_enabled : net::ssl_support_t::e_ssl_support_disabled;
net::ssl_options_t ssl_options{ssl_mode};
if (https)
ssl_options.verification = verify_mode;
if (url.port == 0)
url.port = https ? 443 : 80;
client.set_server(url.host, std::to_string(url.port), boost::none, std::move(ssl_options));
if (client.connect(timeout))
http_send(client, url.uri, event, params, timeout);
else
MERROR("Unable to send webhook to " << event.value.second.url);
client.disconnect();
}
}
template<typename T>
struct zmq_index_single
{
const std::uint64_t index;
const T& event;
};
template<typename T>
void write_bytes(wire::writer& dest, const zmq_index_single<T>& self)
{
wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event));
}
template<typename T>
void zmq_send(rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic)
{
// Each `T` should have a unique count. This is desired.
struct zmq_order
{
std::uint64_t current;
boost::mutex sync;
zmq_order()
: current(0), sync()
{}
};
static zmq_order ordering{};
//! \TODO monitor XPUB to cull the serialization
if (!events.empty() && client.has_publish())
{
// make sure the event is queued to zmq in order.
const boost::unique_lock<boost::mutex> guard{ordering.sync};
for (const auto& event : events)
{
const zmq_index_single<T> index{ordering.current++, event};
MINFO("Sending ZMQ-PUB topics " << json_topic << " and " << msgpack_topic);
expect<void> result = success();
if (!(result = client.publish<wire::json>(json_topic, index)))
MERROR("Failed to serialize+send " << json_topic << " " << result.error().message());
if (!(result = client.publish<wire::msgpack>(msgpack_topic, index)))
MERROR("Failed to serialize+send " << msgpack_topic << " " << result.error().message());
}
}
}
template<typename T>
void send_webhook(
rpc::client& client,
const epee::span<const T> events,
const boost::string_ref json_topic,
const boost::string_ref msgpack_topic,
const std::chrono::seconds timeout,
epee::net_utils::ssl_verification_t verify_mode)
{
http_send(events, timeout, verify_mode);
zmq_send(client, events, json_topic, msgpack_topic);
}
}} // lws // rpc

View File

@@ -49,18 +49,16 @@
#include "db/account.h"
#include "db/data.h"
#include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h"
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/message_data_structs.h" // monero/src
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "rpc/json.h"
#include "rpc/message_data_structs.h" // monero/src
#include "rpc/webhook.h"
#include "util/source_location.h"
#include "util/transactions.h"
#include "wire/adapted/span.h"
#include "wire/json.h"
#include "wire/msgpack.h"
#include "serialization/json_object.h"
@@ -165,130 +163,9 @@ namespace lws
return true;
}
void send_via_http(net::http::http_simple_client& client, boost::string_ref uri, const db::webhook_tx_confirmation& event, const net::http::fields_list& params, const std::chrono::milliseconds timeout)
void send_payment_hook(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events, net::ssl_verification_t verify_mode)
{
if (uri.empty())
uri = "/";
epee::byte_slice bytes{};
const std::string& url = event.value.second.url;
const std::error_code json_error = wire::json::to_bytes(bytes, event);
const net::http::http_response_info* info = nullptr;
if (json_error)
{
MERROR("Failed to generate webhook JSON: " << json_error.message());
return;
}
MINFO("Sending webhook to " << url);
if (!client.invoke(uri, "POST", std::string{bytes.begin(), bytes.end()}, timeout, std::addressof(info), params))
{
MERROR("Failed to invoke http request to " << url);
return;
}
if (!info)
{
MERROR("Failed to invoke http request to " << url << ", internal error (null response ptr)");
return;
}
if (info->m_response_code != 200 && info->m_response_code != 201)
{
MERROR("Failed to invoke http request to " << url << ", wrong response code: " << info->m_response_code);
return;
}
}
void send_via_http(const epee::span<const db::webhook_tx_confirmation> events, const std::chrono::milliseconds timeout, net::ssl_verification_t verify_mode)
{
if (events.empty())
return;
net::http::url_content url{};
net::http::http_simple_client client{};
net::http::fields_list params;
params.emplace_back("Content-Type", "application/json; charset=utf-8");
for (const db::webhook_tx_confirmation& event : events)
{
if (event.value.second.url == "zmq")
continue;
if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
{
MERROR("Bad URL for webhook event: " << event.value.second.url);
continue;
}
const bool https = (url.schema == "https");
if (!https && url.schema != "http")
{
MERROR("Only http or https connections: " << event.value.second.url);
continue;
}
const net::ssl_support_t ssl_mode = https ?
net::ssl_support_t::e_ssl_support_enabled : net::ssl_support_t::e_ssl_support_disabled;
net::ssl_options_t ssl_options{ssl_mode};
if (https)
ssl_options.verification = verify_mode;
if (url.port == 0)
url.port = https ? 443 : 80;
client.set_server(url.host, std::to_string(url.port), boost::none, std::move(ssl_options));
if (client.connect(timeout))
send_via_http(client, url.uri, event, params, timeout);
else
MERROR("Unable to send webhook to " << event.value.second.url);
client.disconnect();
}
}
struct zmq_index_single
{
const std::uint64_t index;
const db::webhook_tx_confirmation& event;
};
void write_bytes(wire::writer& dest, const zmq_index_single& self)
{
wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event));
}
void send_via_zmq(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events)
{
struct zmq_order
{
std::uint64_t current;
boost::mutex sync;
zmq_order()
: current(0), sync()
{}
};
static zmq_order ordering{};
//! \TODO monitor XPUB to cull the serialization
if (!events.empty() && client.has_publish())
{
// make sure the event is queued to zmq in order.
const boost::unique_lock<boost::mutex> guard{ordering.sync};
for (const auto& event : events)
{
const zmq_index_single index{ordering.current++, event};
MINFO("Sending ZMQ-PUB topics json-full-payment_hook and msgpack-full-payment_hook");
expect<void> result = success();
if (!(result = client.publish<wire::json>("json-full-payment_hook:", index)))
MERROR("Failed to serialize+send json-full-payment_hook: " << result.error().message());
if (!(result = client.publish<wire::msgpack>("msgpack-full-payment_hook:", index)))
MERROR("Failed to serialize+send msgpack-full-payment_hook: " << result.error().message());
}
}
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
@@ -381,8 +258,7 @@ namespace lws
else
events.pop_back(); //cannot compute tx_hash
}
send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_);
send_via_zmq(client_, epee::to_span(events));
send_payment_hook(client_, epee::to_span(events), verify_mode_);
return true;
}
};
@@ -789,8 +665,7 @@ namespace lws
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_via_http(epee::to_span(updated->second), std::chrono::seconds{5}, webhook_verify);
send_via_zmq(client, epee::to_span(updated->second));
send_payment_hook(client, epee::to_span(updated->second), webhook_verify);
if (updated->first != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
@@ -1033,16 +908,10 @@ namespace lws
return {std::move(client)};
}
void scanner::run(db::storage disk, rpc::context ctx, std::size_t thread_count, const boost::string_ref webhook_ssl_verification)
void scanner::run(db::storage disk, rpc::context ctx, std::size_t thread_count, const epee::net_utils::ssl_verification_t webhook_verify)
{
thread_count = std::max(std::size_t(1), thread_count);
net::ssl_verification_t webhook_verify = net::ssl_verification_t::none;
if (webhook_ssl_verification == "system_ca")
webhook_verify = net::ssl_verification_t::system_ca;
else if (webhook_ssl_verification != "none")
MONERO_THROW(lws::error::configuration, "Invalid webhook ssl verification mode");
rpc::client client{};
for (;;)
{

View File

@@ -32,6 +32,7 @@
#include <string>
#include "db/storage.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/client.h"
namespace lws
@@ -48,7 +49,7 @@ namespace lws
static expect<rpc::client> sync(db::storage disk, rpc::client client);
//! Poll daemon until `stop()` is called, using `thread_count` threads.
static void run(db::storage disk, rpc::context ctx, std::size_t thread_count, boost::string_ref webhook_ssl_verification);
static void run(db::storage disk, rpc::context ctx, std::size_t thread_count, epee::net_utils::ssl_verification_t webhook_verify);
//! \return True if `stop()` has never been called.
static bool is_running() noexcept { return running; }

View File

@@ -196,6 +196,13 @@ namespace
opts.set_network(args); // do this first, sets global variable :/
mlog_set_log_level(command_line::get_arg(args, opts.log_level));
const auto webhook_verify_raw = command_line::get_arg(args, opts.webhook_ssl_verification);
epee::net_utils::ssl_verification_t webhook_verify = epee::net_utils::ssl_verification_t::none;
if (webhook_verify_raw == "system_ca")
webhook_verify = epee::net_utils::ssl_verification_t::system_ca;
else if (webhook_verify_raw != "none")
MONERO_THROW(lws::error::configuration, "Invalid webhook ssl verification mode");
program prog{
command_line::get_arg(args, opts.db_path),
command_line::get_arg(args, opts.rest_servers),
@@ -204,6 +211,7 @@ namespace
{command_line::get_arg(args, opts.rest_ssl_key), command_line::get_arg(args, opts.rest_ssl_cert)},
command_line::get_arg(args, opts.access_controls),
command_line::get_arg(args, opts.rest_threads),
webhook_verify,
command_line::get_arg(args, opts.external_bind),
command_line::get_arg(args, opts.disable_admin_auth)
},
@@ -236,6 +244,7 @@ namespace
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();
const auto webhook_verify = prog.rest_config.webhook_verify;
lws::rest_server server{
epee::to_span(prog.rest_servers), prog.admin_rest_servers, disk.clone(), std::move(client), std::move(prog.rest_config)
};
@@ -245,7 +254,7 @@ namespace
MINFO("Listening for REST admin clients at " << address);
// blocks until SIGINT
lws::scanner::run(std::move(disk), std::move(ctx), prog.scan_threads, prog.webhook_ssl_verification);
lws::scanner::run(std::move(disk), std::move(ctx), prog.scan_threads, webhook_verify);
}
} // anonymous