Adding ZeroMQ Sub for chain events

This commit is contained in:
Lee Clagett
2020-08-24 22:58:29 -04:00
parent 5cb8de224c
commit b1c61c5e81
10 changed files with 293 additions and 53 deletions

View File

@@ -26,8 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-rpc_sources client.cpp daemon_zmq.cpp light_wallet.cpp rates.cpp)
set(monero-lws-rpc_headers client.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h)
set(monero-lws-rpc_sources client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp rates.cpp)
set(monero-lws-rpc_headers client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h)
add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers})
target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json)

View File

@@ -28,12 +28,14 @@
#include "client.h"
#include <boost/thread/mutex.hpp>
#include <boost/utility/string_ref.hpp>
#include <cassert>
#include <system_error>
#include "common/error.h" // monero/contrib/epee/include
#include "error.h"
#include "net/http_client.h" // monero/contrib/epee/include/net
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
namespace lws
@@ -47,8 +49,11 @@ namespace rpc
constexpr const char signal_endpoint[] = "inproc://signal";
constexpr const char abort_scan_signal[] = "SCAN";
constexpr const char abort_process_signal[] = "PROCESS";
constexpr const char minimal_chain_topic[] = "json-minimal-chain_main";
constexpr const int daemon_zmq_linger = 0;
constexpr const std::chrono::seconds chain_poll_timeout{20};
constexpr const std::chrono::minutes chain_sub_timeout{2};
struct terminate
{
void operator()(void* ptr) const noexcept
@@ -112,14 +117,14 @@ namespace rpc
template<std::size_t N>
expect<void> do_signal(void* signal_pub, const char (&signal)[N]) noexcept
{
MONERO_ZMQ_CHECK(zmq_send(signal_pub, signal, sizeof(signal), 0));
MONERO_ZMQ_CHECK(zmq_send(signal_pub, signal, sizeof(signal) - 1, 0));
return success();
}
template<std::size_t N>
expect<void> do_subscribe(void* signal_sub, const char (&signal)[N]) noexcept
{
MONERO_ZMQ_CHECK(zmq_setsockopt(signal_sub, ZMQ_SUBSCRIBE, signal, sizeof(signal)));
MONERO_ZMQ_CHECK(zmq_setsockopt(signal_sub, ZMQ_SUBSCRIBE, signal, sizeof(signal) - 1));
return success();
}
} // anonymous
@@ -128,10 +133,11 @@ namespace rpc
{
struct context
{
explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::chrono::minutes interval)
explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
: comm(std::move(comm))
, signal_pub(std::move(signal_pub))
, daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr))
, rates_conn()
, cache_time()
, cache_interval(interval)
@@ -144,7 +150,8 @@ namespace rpc
zcontext comm;
socket signal_pub;
std::string daemon_addr;
const std::string daemon_addr;
const std::string sub_addr;
http::http_simple_client rates_conn;
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
@@ -176,14 +183,26 @@ namespace rpc
{
MONERO_PRECOND(ctx != nullptr);
const int linger = daemon_zmq_linger;
int option = daemon_zmq_linger;
client out{std::move(ctx)};
out.daemon.reset(zmq_socket(out.ctx->comm.get(), ZMQ_REQ));
if (out.daemon.get() == nullptr)
return net::zmq::get_error_code();
MONERO_ZMQ_CHECK(zmq_connect(out.daemon.get(), out.ctx->daemon_addr.c_str()));
MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon.get(), ZMQ_LINGER, &linger, sizeof(linger)));
MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon.get(), ZMQ_LINGER, &option, sizeof(option)));
if (!out.ctx->sub_addr.empty())
{
out.daemon_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB));
if (out.daemon_sub.get() == nullptr)
return net::zmq::get_error_code();
option = 1; // keep only last pub message from daemon
MONERO_ZMQ_CHECK(zmq_connect(out.daemon_sub.get(), out.ctx->sub_addr.c_str()));
MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon_sub.get(), ZMQ_CONFLATE, &option, sizeof(option)));
MONERO_CHECK(do_subscribe(out.daemon_sub.get(), minimal_chain_topic));
}
out.signal_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB));
if (out.signal_sub.get() == nullptr)
@@ -204,12 +223,35 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<void> client::wait(std::chrono::seconds timeout) noexcept
expect<minimal_chain_pub> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr);
assert(signal_sub != nullptr);
return do_wait(daemon.get(), signal_sub.get(), 0, timeout);
if (daemon_sub == nullptr)
{
MONERO_CHECK(do_wait(daemon.get(), signal_sub.get(), 0, chain_poll_timeout));
return {lws::error::daemon_timeout};
}
{
const expect<void> ready = do_wait(daemon_sub.get(), signal_sub.get(), ZMQ_POLLIN, chain_sub_timeout);
if (!ready)
{
if (ready == lws::error::daemon_timeout)
MWARNING("ZeroMQ Pub/Sub chain timeout, check connection settings");
return ready.error();
}
}
expect<std::string> pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT);
if (!pub)
return pub.error();
if (!boost::string_ref{*pub}.starts_with(minimal_chain_topic))
return {lws::error::bad_daemon_response};
pub->erase(0, sizeof(minimal_chain_topic));
return minimal_chain_pub::from_json(std::move(*pub));
}
expect<void> client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept
@@ -243,7 +285,7 @@ namespace rpc
return ctx->cached;
}
context context::make(std::string daemon_addr, std::chrono::minutes rates_interval)
context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval)
{
zcontext comm{zmq_init(1)};
if (comm == nullptr)
@@ -257,7 +299,7 @@ namespace rpc
return context{
std::make_shared<detail::context>(
std::move(comm), std::move(pub), std::move(daemon_addr), rates_interval
std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
)
};
}

View File

@@ -35,8 +35,9 @@
#include "byte_slice.h" // monero/contrib/epee/include
#include "common/expect.h" // monero/src
#include "rates.h"
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
#include "rpc/rates.h"
namespace lws
{
@@ -62,16 +63,17 @@ namespace rpc
{
std::shared_ptr<detail::context> ctx;
detail::socket daemon;
detail::socket daemon_sub;
detail::socket signal_sub;
explicit client(std::shared_ptr<detail::context> ctx)
: ctx(std::move(ctx)), daemon(), signal_sub()
explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
{}
public:
//! A client with no connection (all send/receive functions fail).
explicit client() noexcept
: ctx(), daemon(), signal_sub()
: ctx(), daemon(), daemon_sub(), signal_sub()
{}
static expect<client> make(std::shared_ptr<detail::context> ctx) noexcept;
@@ -103,8 +105,8 @@ namespace rpc
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
//! Block until `timeout` or until `context::stop()` is invoked.
expect<void> wait(std::chrono::seconds timeout) noexcept;
//! Wait for new block announce or internal timeout.
expect<minimal_chain_pub> wait_for_block();
//! \return A JSON message for RPC request `M`.
template<typename M>
@@ -167,7 +169,7 @@ namespace rpc
\param rates_interval Frequency to retrieve exchange rates. Set value to
`<= 0` to disable exchange rate retrieval.
*/
static context make(std::string daemon_addr, std::chrono::minutes rates_interval);
static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval);
context(context&&) = default;
context(context const&) = delete;

83
src/rpc/daemon_pub.cpp Normal file
View File

@@ -0,0 +1,83 @@
// Copyright (c) 2020, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "daemon_pub.h"
#include "wire/crypto.h"
#include "wire/error.h"
#include "wire/field.h"
#include "wire/traits.h"
#include "wire/json/read.h"
namespace
{
struct dummy_chain_array
{
using value_type = crypto::hash;
std::uint64_t count;
std::reference_wrapper<crypto::hash> id;
void clear() noexcept {}
void reserve(std::size_t) noexcept {}
crypto::hash& back() noexcept { return id; }
void emplace_back() { ++count; }
};
}
namespace wire
{
template<>
struct is_array<dummy_chain_array>
: std::true_type
{};
}
namespace lws
{
namespace rpc
{
static void read_bytes(wire::json_reader& src, minimal_chain_pub& self)
{
dummy_chain_array chain{0, std::ref(self.top_block_id)};
wire::object(src,
wire::field("first_height", std::ref(self.top_block_height)),
wire::field("ids", std::ref(chain))
);
self.top_block_height += chain.count - 1;
if (chain.count == 0)
WIRE_DLOG_THROW(wire::error::schema::binary, "expected at least one block hash");
}
expect<minimal_chain_pub> minimal_chain_pub::from_json(std::string&& source)
{
return wire::json::from_bytes<minimal_chain_pub>(std::move(source));
}
}
}

50
src/rpc/daemon_pub.h Normal file
View File

@@ -0,0 +1,50 @@
// Copyright (c) 2020, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <cstdint>
#include <string>
#include "common/expect.h" // monero/src
#include "crypto/hash.h" // monero/src
#include "wire/json/fwd.h"
namespace lws
{
namespace rpc
{
//! Represents only the last block listed in "minimal-chain_main" pub.
struct minimal_chain_pub
{
std::uint64_t top_block_height;
crypto::hash top_block_id;
static expect<minimal_chain_pub> from_json(std::string&&);
};
}
}