Added webhook tx-confirmation support (#66)

This commit is contained in:
Lee *!* Clagett
2023-05-11 13:13:10 -04:00
committed by Lee *!* Clagett
parent 990e86f701
commit 3e0555e07d
32 changed files with 2051 additions and 122 deletions

View File

@@ -31,4 +31,4 @@ set(monero-lws-db_headers account.h data.h fwd.h storage.h string.h)
add_library(monero-lws-db ${monero-lws-db_sources} ${monero-lws-db_headers})
target_include_directories(monero-lws-db PUBLIC "${LMDB_INCLUDE}")
target_link_libraries(monero-lws-db monero::libraries ${LMDB_LIB_PATH})
target_link_libraries(monero-lws-db monero::libraries monero-lws-common monero-lws-wire-msgpack ${LMDB_LIB_PATH})

View File

@@ -29,8 +29,11 @@
#include <cstring>
#include <memory>
#include "wire/crypto.h"
#include "wire.h"
#include "wire/crypto.h"
#include "wire/json/write.h"
#include "wire/msgpack.h"
#include "wire/uuid.h"
namespace lws
{
@@ -99,7 +102,7 @@ namespace db
template<typename F, typename T>
void map_transaction_link(F& format, T& self)
{
wire::object(format, WIRE_FIELD(height), WIRE_FIELD(tx_hash));
wire::object(format, WIRE_FIELD_ID(0, height), WIRE_FIELD_ID(1, tx_hash));
}
}
WIRE_DEFINE_OBJECT(transaction_link, map_transaction_link);
@@ -124,19 +127,19 @@ namespace db
nullptr : std::addressof(payment_bytes);
wire::object(dest,
wire::field("id", std::cref(self.spend_meta.id)),
wire::field("block", self.link.height),
wire::field("index", self.spend_meta.index),
wire::field("amount", self.spend_meta.amount),
wire::field("timestamp", self.timestamp),
wire::field("tx_hash", std::cref(self.link.tx_hash)),
wire::field("tx_prefix_hash", std::cref(self.tx_prefix_hash)),
wire::field("tx_public", std::cref(self.spend_meta.tx_public)),
wire::optional_field("rct_mask", rct_mask),
wire::optional_field("payment_id", payment_id),
wire::field("unlock_time", self.unlock_time),
wire::field("mixin_count", self.spend_meta.mixin_count),
wire::field("coinbase", coinbase)
wire::field<0>("id", std::cref(self.spend_meta.id)),
wire::field<1>("block", self.link.height),
wire::field<2>("index", self.spend_meta.index),
wire::field<3>("amount", self.spend_meta.amount),
wire::field<4>("timestamp", self.timestamp),
wire::field<5>("tx_hash", std::cref(self.link.tx_hash)),
wire::field<6>("tx_prefix_hash", std::cref(self.tx_prefix_hash)),
wire::field<7>("tx_public", std::cref(self.spend_meta.tx_public)),
wire::optional_field<8>("rct_mask", rct_mask),
wire::optional_field<9>("payment_id", payment_id),
wire::field<10>("unlock_time", self.unlock_time),
wire::field<11>("mixin_count", self.spend_meta.mixin_count),
wire::field<12>("coinbase", coinbase)
);
}
@@ -206,9 +209,99 @@ namespace db
);
}
namespace
{
constexpr const char* map_webhook_type[] = {"tx-confirmation"};
template<typename F, typename T>
void map_webhook_key(F& format, T& self)
{
wire::object(format, WIRE_FIELD_ID(0, user), WIRE_FIELD_ID(1, type));
}
template<typename F, typename T>
void map_webhook_data(F& format, T& self)
{
wire::object(format,
WIRE_FIELD_ID(0, url),
WIRE_FIELD_ID(1, token),
WIRE_FIELD_ID(2, confirmations)
);
}
template<typename F, typename T>
void map_webhook_value(F& format, T& self, crypto::hash8& payment_id)
{
static_assert(sizeof(payment_id) == sizeof(self.first.payment_id), "bad memcpy");
wire::object(format,
wire::field<0>("payment_id", std::ref(payment_id)),
wire::field<1>("event_id", std::ref(self.first.event_id)),
wire::field<2>("token", std::ref(self.second.token)),
wire::field<3>("confirmations", self.second.confirmations),
wire::field<4>("url", std::ref(self.second.url))
);
}
}
WIRE_DEFINE_ENUM(webhook_type, map_webhook_type);
WIRE_DEFINE_OBJECT(webhook_key, map_webhook_key);
WIRE_MSGPACK_DEFINE_OBJECT(webhook_data, map_webhook_data);
void read_bytes(wire::reader& source, webhook_value& dest)
{
crypto::hash8 payment_id{};
map_webhook_value(source, dest, payment_id);
std::memcpy(std::addressof(dest.first.payment_id), std::addressof(payment_id), sizeof(payment_id));
}
void write_bytes(wire::writer& dest, const webhook_value& source)
{
crypto::hash8 payment_id;
std::memcpy(std::addressof(payment_id), std::addressof(source.first.payment_id), sizeof(payment_id));
map_webhook_value(dest, source, payment_id);
}
void write_bytes(wire::json_writer& dest, const webhook_tx_confirmation& self)
{
crypto::hash8 payment_id;
static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy");
std::memcpy(std::addressof(payment_id), std::addressof(self.value.first.payment_id), sizeof(payment_id));
// to be sent to remote url
wire::object(dest,
wire::field<0>("event", std::cref(self.key.type)),
wire::field<1>("payment_id", std::cref(payment_id)),
wire::field<2>("token", std::cref(self.value.second.token)),
wire::field<3>("confirmations", std::cref(self.value.second.confirmations)),
wire::field<4>("event_id", std::cref(self.value.first.event_id)),
WIRE_FIELD_ID(5, tx_info)
);
}
void write_bytes(wire::json_writer& dest, const webhook_event& self)
{
crypto::hash8 payment_id;
static_assert(sizeof(payment_id) == sizeof(self.link_webhook.payment_id), "bad memcpy");
std::memcpy(std::addressof(payment_id), std::addressof(self.link_webhook.payment_id), sizeof(payment_id));
wire::object(dest,
wire::field<0>("tx_info", std::cref(self.link.tx)),
wire::field<1>("output_id", std::cref(self.link.out)),
wire::field<2>("payment_id", std::cref(payment_id)),
wire::field<3>("event_id", std::cref(self.link_webhook.event_id))
);
}
bool operator<(const webhook_dupsort& left, const webhook_dupsort& right) noexcept
{
return left.payment_id == right.payment_id ?
std::memcmp(std::addressof(left.event_id), std::addressof(right.event_id), sizeof(left.event_id)) < 0 :
left.payment_id < right.payment_id;
}
/*! TODO consider making an `operator<` for `crypto::tx_hash`. Not known to be
needed elsewhere yet. */
bool operator==(transaction_link const& left, transaction_link const& right) noexcept
{
return left.height == right.height &&
std::memcmp(std::addressof(left.tx_hash), std::addressof(right.tx_hash), sizeof(left.tx_hash)) == 0;
}
bool operator<(transaction_link const& left, transaction_link const& right) noexcept
{
return left.height == right.height ?

View File

@@ -1,4 +1,4 @@
// Copyright (c) 2018-2020, The Monero Project
// Copyright (c) 2018-2020, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
@@ -26,15 +26,19 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/uuid/uuid.hpp>
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <string>
#include <utility>
#include "crypto/crypto.h"
#include "lmdb/util.h"
#include "ringct/rctTypes.h" //! \TODO brings in lots of includes, try to remove
#include "wire/fwd.h"
#include "wire/json/fwd.h"
#include "wire/msgpack/fwd.h"
#include "wire/traits.h"
namespace lws
@@ -237,6 +241,76 @@ namespace db
static_assert(sizeof(request_info) == 64 + 32 + 8 + (4 * 2), "padding in request_info");
void write_bytes(wire::writer& dest, const request_info& self, bool show_key = false);
enum class webhook_type : std::uint8_t
{
tx_confirmation = 0,
// unconfirmed_tx,
// new_block
// confirmed_tx,
// double_spend_tx,
// tx_confidence
};
WIRE_DECLARE_ENUM(webhook_type);
//! Key for upcoming webhooks or in-progress webhooks
struct webhook_key
{
account_id user;
webhook_type type;
char reserved[3];
};
static_assert(sizeof(webhook_key) == 4 + 1 + 3, "padding in webhook_key");
WIRE_DECLARE_OBJECT(webhook_key);
//! Webhook values used to sort by duplicate keys
struct webhook_dupsort
{
std::uint64_t payment_id; //!< Only used with `tx_confirmation` type.
boost::uuids::uuid event_id;
};
static_assert(sizeof(webhook_dupsort) == 8 + 16, "padding in webhoook");
//! Variable length data for a webhook key/event
struct webhook_data
{
std::string url;
std::string token;
std::uint32_t confirmations;
};
WIRE_MSGPACK_DECLARE_OBJECT(webhook_data);
//! Compatible with lmdb::table code
using webhook_value = std::pair<webhook_dupsort, webhook_data>;
WIRE_DECLARE_OBJECT(webhook_value);
//! Returned by DB when a webhook event "tripped"
struct webhook_tx_confirmation
{
webhook_key key;
webhook_value value;
output tx_info;
};
void write_bytes(wire::json_writer&, const webhook_tx_confirmation&);
//! References a specific output that triggered a webhook
struct webhook_output
{
transaction_link tx;
output_id out;
};
//! References all info from a webhook that triggered
struct webhook_event
{
webhook_output link;
webhook_dupsort link_webhook;
};
void write_bytes(wire::json_writer&, const webhook_event&);
bool operator==(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<=(transaction_link const& left, transaction_link const& right) noexcept;
inline constexpr bool operator==(output_id left, output_id right) noexcept
{
return left.high == right.high && left.low == right.low;
@@ -255,9 +329,28 @@ namespace db
return left.high == right.high ?
left.low <= right.low : left.high < right.high;
}
inline constexpr bool operator<(const webhook_key& left, const webhook_key& right) noexcept
{
return left.user == right.user ?
left.type < right.type : left.user < right.user;
}
bool operator<(const webhook_dupsort& left, const webhook_dupsort& right) noexcept;
inline bool operator==(const webhook_output& left, const webhook_output& right) noexcept
{
return left.out == right.out && left.tx == right.tx;
}
inline bool operator<(const webhook_output& left, const webhook_output& right) noexcept
{
return left.tx == right.tx ? left.out < right.out : left.tx < right.tx;
}
inline bool operator<(const webhook_event& left, const webhook_event& right) noexcept
{
return left.link == right.link ?
left.link_webhook < right.link_webhook : left.link < right.link;
}
bool operator<(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<=(transaction_link const& left, transaction_link const& right) noexcept;
/*!
Write `address` to `out` in base58 format using `lws::config::network` to

View File

@@ -1,4 +1,4 @@
// Copyright (c) 2018, The Monero Project
// Copyright (c) 2018-2023, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
@@ -29,7 +29,9 @@
#include <boost/container/static_vector.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/counting_range.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/uuid/uuid_hash.hpp>
#include <cassert>
#include <chrono>
#include <limits>
@@ -48,12 +50,15 @@
#include "lmdb/database.h"
#include "lmdb/error.h"
#include "lmdb/key_stream.h"
#include "lmdb/msgpack_table.h"
#include "lmdb/table.h"
#include "lmdb/util.h"
#include "lmdb/value_stream.h"
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "span.h"
#include "wire/filters.h"
#include "wire/json.h"
#include "wire/vector.h"
namespace lws
{
@@ -201,6 +206,12 @@ namespace db
constexpr const lmdb::basic_table<request, request_info> requests{
"requests_by_type,address", (MDB_CREATE | MDB_DUPSORT), MONERO_COMPARE(request_info, address.spend_public)
};
constexpr const lmdb::msgpack_table<webhook_key, webhook_dupsort, webhook_data> webhooks{
"webhooks_by_account_id,payment_id", (MDB_CREATE | MDB_DUPSORT), &lmdb::less<db::webhook_dupsort>
};
constexpr const lmdb::basic_table<account_id, webhook_event> events_by_account_id{
"webhook_events_by_account_id,type,block_id,tx_hash,output_id,payment_id,event_id", (MDB_CREATE | MDB_DUPSORT), &lmdb::less<webhook_event>
};
template<typename D>
expect<void> check_cursor(MDB_txn& txn, MDB_dbi tbl, std::unique_ptr<MDB_cursor, D>& cur) noexcept
@@ -451,6 +462,8 @@ namespace db
MDB_dbi spends;
MDB_dbi images;
MDB_dbi requests;
MDB_dbi webhooks;
MDB_dbi events;
} tables;
const unsigned create_queue_max;
@@ -469,6 +482,8 @@ namespace db
tables.spends = spends.open(*txn).value();
tables.images = images.open(*txn).value();
tables.requests = requests.open(*txn).value();
tables.webhooks = webhooks.open(*txn).value();
tables.events = events_by_account_id.open(*txn).value();
check_blockchain(*txn, tables.blocks);
@@ -645,6 +660,46 @@ namespace db
return requests.get_value<request_info>(value);
}
expect<std::vector<std::pair<webhook_key, std::vector<webhook_value>>>>
storage_reader::get_webhooks(cursor::webhooks cur)
{
MONERO_PRECOND(txn != nullptr);
assert(db != nullptr);
MONERO_CHECK(check_cursor(*txn, db->tables.webhooks, cur));
std::vector<std::pair<webhook_key, std::vector<webhook_value>>> out;
MDB_val key{};
MDB_val value{};
int err = mdb_cursor_get(cur.get(), &key, &value, MDB_FIRST);
for (;/* every key */;)
{
if (err)
{
if (err == MDB_NOTFOUND)
return {std::move(out)};
return {lmdb::error(err)};
}
out.emplace_back(MONERO_UNWRAP(webhooks.get_key(key)), std::vector<webhook_value>{});
for (; /* every dup key */ ;)
{
if (err)
{
if (err == MDB_NOTFOUND)
break; // inner duplicate key loop
return {lmdb::error(err)};
}
out.back().second.push_back(MONERO_UNWRAP(webhooks.get_value(value)));
err = mdb_cursor_get(cur.get(), &key, &value, MDB_NEXT_DUP);
}
err = mdb_cursor_get(cur.get(), &key, &value, MDB_NEXT);
}
return {std::move(out)};
}
namespace
{
//! `write_bytes` implementation will forward a third argument for `show_keys`.
@@ -695,6 +750,14 @@ namespace db
);
}
static void write_bytes(wire::json_writer& dest, const std::pair<webhook_key, std::vector<webhook_value>>& self)
{
wire::object(dest,
wire::field("key", std::cref(self.first)),
wire::field("value", std::cref(self.second))
);
}
expect<void> storage_reader::json_debug(std::ostream& out, bool show_keys)
{
using boost::adaptors::reverse;
@@ -713,6 +776,8 @@ namespace db
cursor::spends spends_cur;
cursor::images images_cur;
cursor::requests requests_cur;
cursor::webhooks webhooks_cur;
cursor::webhooks events_cur;
MONERO_CHECK(check_cursor(*txn, db->tables.blocks, curs.blocks_cur));
MONERO_CHECK(check_cursor(*txn, db->tables.accounts, accounts_cur));
@@ -722,6 +787,8 @@ namespace db
MONERO_CHECK(check_cursor(*txn, db->tables.spends, spends_cur));
MONERO_CHECK(check_cursor(*txn, db->tables.images, images_cur));
MONERO_CHECK(check_cursor(*txn, db->tables.requests, requests_cur));
MONERO_CHECK(check_cursor(*txn, db->tables.webhooks, webhooks_cur));
MONERO_CHECK(check_cursor(*txn, db->tables.events, events_cur));
auto blocks_partial =
get_blocks<boost::container::static_vector<block_info, 12>>(*curs.blocks_cur, 0);
@@ -760,6 +827,15 @@ namespace db
if (!requests_stream)
return requests_stream.error();
// This list should be smaller ... ?
const auto webhooks_data = webhooks.get_all(*webhooks_cur);
if (!webhooks_data)
return webhooks_data.error();
auto events_stream = events_by_account_id.get_key_stream(std::move(events_cur));
if (!events_stream)
return events_stream.error();
const wire::as_array_filter<toggle_key_output> toggle_keys_filter{{show_keys}};
wire::json_stream_writer json_stream{out};
wire::object(json_stream,
@@ -770,7 +846,9 @@ namespace db
wire::field(outputs.name, wire::as_object(outputs_stream->make_range(), wire::as_integer, wire::as_array)),
wire::field(spends.name, wire::as_object(spends_stream->make_range(), wire::as_integer, wire::as_array)),
wire::field(images.name, wire::as_object(images_stream->make_range(), output_id_key{}, wire::as_array)),
wire::field(requests.name, wire::as_object(requests_stream->make_range(), wire::enum_as_string, toggle_keys_filter))
wire::field(requests.name, wire::as_object(requests_stream->make_range(), wire::enum_as_string, toggle_keys_filter)),
wire::field(webhooks.name, std::cref(*webhooks_data)),
wire::field(events_by_account_id.name, wire::as_object(events_stream->make_range(), wire::as_integer, wire::as_array))
);
json_stream.finish();
@@ -955,6 +1033,42 @@ namespace db
return bulk_insert(*accounts_bh_cur, new_height, epee::to_span(new_by_heights));
}
expect<void> rollback_events(storage_internal::tables_ const& tables, MDB_txn& txn, const block_id height)
{
cursor::webhooks webhooks_cur;
cursor::events events_cur;
MONERO_CHECK(check_cursor(txn, tables.webhooks, webhooks_cur));
MONERO_CHECK(check_cursor(txn, tables.events, events_cur));
MDB_val key = lmdb::to_val(height);
MDB_val value{};
int err = mdb_cursor_get(events_cur.get(), &key, &value, MDB_LAST);
for ( ; /* every user */ ; )
{
for ( ; /* every event */ ;)
{
if (err)
{
if (err == MDB_NOTFOUND)
return success();
return {lmdb::error(err)};
}
const webhook_event event =
MONERO_UNWRAP(events_by_account_id.get_value<webhook_event>(value));
if (event.link.tx.height < height)
break; // inner for loop
MONERO_LMDB_CHECK(mdb_cursor_del(events_cur.get(), 0));
err = mdb_cursor_get(events_cur.get(), &key, &value, MDB_PREV);
}
err = mdb_cursor_get(events_cur.get(), &key, &value, MDB_PREV_NODUP);
}
return success();
}
expect<void> rollback_chain(storage_internal::tables_ const& tables, MDB_txn& txn, MDB_cursor& cur, block_id height)
{
MDB_val key;
@@ -971,7 +1085,8 @@ namespace db
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
return rollback_accounts(tables, txn, height);
MONERO_CHECK(rollback_accounts(tables, txn, height));
return rollback_events(tables, txn, height);
}
template<typename T>
@@ -1706,22 +1821,127 @@ namespace db
}
return success();
}
expect<void> check_hooks(MDB_cursor& webhooks_cur, MDB_cursor& events_cur, const lws::account& user)
{
const account_id user_id = user.id();
const webhook_key hook_key{user_id, webhook_type::tx_confirmation};
// check payment_id == x (match specific) webhooks second
for (const output& out : user.outputs())
{
webhook_dupsort sorter{};
static_assert(sizeof(sorter.payment_id) == sizeof(out.payment_id.short_), "bad memcpy");
std::memcpy(
std::addressof(sorter.payment_id), std::addressof(out.payment_id.short_), sizeof(sorter.payment_id)
);
MDB_val key = lmdb::to_val(hook_key);
MDB_val value = lmdb::to_val(sorter);
int err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_GET_BOTH_RANGE);
for (; /* all user/payment_id==x entries */ ;)
{
if (err)
{
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
break;
}
const webhook_dupsort db_sorter = MONERO_UNWRAP(webhooks.get_fixed_value<webhook_dupsort>(value));
if (db_sorter.payment_id != sorter.payment_id)
break;
const webhook_event event{
webhook_output{out.link, out.spend_meta.id}, db_sorter
};
MDB_val ekey = lmdb::to_val(user_id);
MDB_val evalue = lmdb::to_val(event);
MONERO_LMDB_CHECK(mdb_cursor_put(&events_cur, &ekey, &evalue, 0));
err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_NEXT_DUP);
}
}
return success();
}
expect<void>
add_ongoing_hooks(std::vector<webhook_tx_confirmation>& events, MDB_cursor& webhooks_cur, MDB_cursor& outputs_cur, MDB_cursor& events_cur, const account_id user, const block_id begin, const block_id end)
{
if (begin == end)
return success();
const webhook_key hook_key{user, webhook_type::tx_confirmation};
MDB_val key = lmdb::to_val(user);
MDB_val value{};
int err = mdb_cursor_get(&events_cur, &key, &value, MDB_SET_KEY);
for ( ; /* every ongoing event from this user */ ; )
{
if (err)
{
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
return success();
}
const webhook_event event =
MONERO_UNWRAP(events_by_account_id.get_value<webhook_event>(value));
MDB_val rkey = lmdb::to_val(hook_key);
MDB_val rvalue = lmdb::to_val(event.link_webhook);
MONERO_LMDB_CHECK(mdb_cursor_get(&webhooks_cur, &rkey, &rvalue, MDB_GET_BOTH));
MDB_val okey = lmdb::to_val(user);
MDB_val ovalue = lmdb::to_val(event.link);
MONERO_LMDB_CHECK(mdb_cursor_get(&outputs_cur, &okey, &ovalue, MDB_GET_BOTH));
events.push_back(
webhook_tx_confirmation{
MONERO_UNWRAP(webhooks.get_key(rkey)),
MONERO_UNWRAP(webhooks.get_value(rvalue)),
MONERO_UNWRAP(outputs.get_value<output>(ovalue))
}
);
const std::uint32_t requested_confirmations =
events.back().value.second.confirmations;
events.back().value.second.confirmations =
lmdb::to_native(begin) - lmdb::to_native(event.link.tx.height) + 1;
// copy next blocks from first
for (const auto block_num : boost::counting_range(lmdb::to_native(begin) + 1, lmdb::to_native(end)))
{
if (requested_confirmations <= events.back().value.second.confirmations)
break;
events.push_back(events.back());
++(events.back().value.second.confirmations);
}
if (requested_confirmations <= events.back().value.second.confirmations)
MONERO_LMDB_CHECK(mdb_cursor_del(&events_cur, 0));
err = mdb_cursor_get(&events_cur, &key, &value, MDB_NEXT_DUP);
}
return success();
}
} // anonymous
expect<std::size_t> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users)
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users)
{
if (users.empty() && chain.empty())
return 0;
return {std::make_pair(0, std::vector<webhook_tx_confirmation>{})};
MONERO_PRECOND(!chain.empty());
MONERO_PRECOND(db != nullptr);
return db->try_write([this, height, chain, users] (MDB_txn& txn) -> expect<std::size_t>
return db->try_write([this, height, chain, users] (MDB_txn& txn) -> expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
{
epee::span<const crypto::hash> chain_copy{chain};
const std::uint64_t last_update =
lmdb::to_native(height) + chain.size() - 1;
const std::uint64_t first_new = lmdb::to_native(height) + 1;
// collect all .value() errors
std::pair<std::size_t, std::vector<webhook_tx_confirmation>> updated;
if (get_checkpoints().get_max_height() <= last_update)
{
cursor::blocks blocks_cur;
@@ -1732,22 +1952,15 @@ namespace db
MONERO_LMDB_CHECK(mdb_cursor_get(blocks_cur.get(), &key, &value, MDB_SET));
MONERO_LMDB_CHECK(mdb_cursor_get(blocks_cur.get(), &key, &value, MDB_LAST_DUP));
const expect<block_info> last_block = blocks.get_value<block_info>(value);
if (!last_block)
return last_block.error();
if (last_block->id < height)
const block_info last_block = MONERO_UNWRAP(blocks.get_value<block_info>(value));
if (last_block.id < height)
return {lws::error::bad_blockchain};
const std::uint64_t last_same =
std::min(lmdb::to_native(last_block->id), last_update);
const expect<crypto::hash> hash_check =
do_get_block_hash(*blocks_cur, block_id(last_same));
if (!hash_check)
return hash_check.error();
std::min(lmdb::to_native(last_block.id), last_update);
const std::uint64_t offset = last_same - lmdb::to_native(height);
if (*hash_check != *(chain_copy.begin() + offset))
if (MONERO_UNWRAP(do_get_block_hash(*blocks_cur, block_id(last_same))) != *(chain_copy.begin() + offset))
return {lws::error::blockchain_reorg};
chain_copy.remove_prefix(offset + 1);
@@ -1764,18 +1977,21 @@ namespace db
cursor::outputs outputs_cur;
cursor::spends spends_cur;
cursor::images images_cur;
cursor::webhooks webhooks_cur;
cursor::events events_cur;
MONERO_CHECK(check_cursor(txn, this->db->tables.accounts, accounts_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.accounts_bh, accounts_bh_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.outputs, outputs_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.spends, spends_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.images, images_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.events, events_cur));
// for bulk inserts
boost::container::static_vector<account_lookup, 127> heights{};
static_assert(sizeof(heights) <= 1024, "stack vector is large");
std::size_t updated = 0;
for (auto user = users.begin() ;; ++user)
{
if (heights.size() == heights.capacity() || user == users.end())
@@ -1812,12 +2028,8 @@ namespace db
continue; // to next account
}
const expect<account_lookup> lookup =
accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup)>(temp_value);
if (!lookup)
return lookup.error();
status_key = lookup->status;
status_key =
accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup)>(temp_value).value().status;
MONERO_LMDB_CHECK(mdb_cursor_get(accounts_cur.get(), &key, &value, MDB_GET_BOTH));
}
expect<account> existing = accounts.get_value<account>(value);
@@ -1840,10 +2052,154 @@ namespace db
MONERO_CHECK(bulk_insert(*outputs_cur, user->id(), epee::to_span(user->outputs())));
MONERO_CHECK(add_spends(*spends_cur, *images_cur, user->id(), epee::to_span(user->spends())));
++updated;
MONERO_CHECK(check_hooks(*webhooks_cur, *events_cur, *user));
MONERO_CHECK(
add_ongoing_hooks(
updated.second, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
)
);
++updated.first;
} // ... for every account being updated ...
return updated;
return {std::move(updated)};
});
}
expect<void> storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event)
{
{
epee::net_utils::http::url_content url{};
if (event.second.url.empty() || !epee::net_utils::parse_url(event.second.url, url))
return {error::bad_url};
if (url.schema != "http" && url.schema != "https")
return {error::bad_url};
}
return db->try_write([this, type, &address, &event] (MDB_txn& txn) -> expect<void>
{
cursor::accounts_by_address accounts_ba_cur;
cursor::webhooks webhooks_cur;
MONERO_CHECK(check_cursor(txn, this->db->tables.accounts_ba, accounts_ba_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
webhook_key key{account_id::invalid, type};
MDB_val lmkey = lmdb::to_val(by_address_version);
MDB_val lmvalue = lmdb::to_val(address);
{
const int err = mdb_cursor_get(accounts_ba_cur.get(), &lmkey, &lmvalue, MDB_GET_BOTH);
if (err && err != MDB_NOTFOUND)
return {lmdb::error(err)};
if (err != MDB_NOTFOUND)
key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue));
}
if (key.user == account_id::invalid && type == webhook_type::tx_confirmation)
return {error::bad_webhook};
lmkey = lmdb::to_val(key);
const epee::byte_slice value = webhooks.make_value(event.first, event.second);
lmvalue = MDB_val{value.size(), const_cast<void*>(static_cast<const void*>(value.data()))};
MONERO_LMDB_CHECK(mdb_cursor_put(webhooks_cur.get(), &lmkey, &lmvalue, 0));
return success();
});
}
expect<void> storage::clear_webhooks(const epee::span<const account_address> addresses)
{
if (addresses.empty())
return success();
return db->try_write([this, addresses] (MDB_txn& txn) -> expect<void>
{
cursor::accounts_by_address accounts_ba_cur;
cursor::webhooks webhooks_cur;
cursor::events events_cur;
MONERO_CHECK(check_cursor(txn, this->db->tables.accounts_ba, accounts_ba_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.events, events_cur));
webhook_key key{account_id::invalid, webhook_type::tx_confirmation};
for (const auto& address : addresses)
{
MDB_val lmkey = lmdb::to_val(by_address_version);
MDB_val lmvalue = lmdb::to_val(address);
MONERO_LMDB_CHECK(mdb_cursor_get(accounts_ba_cur.get(), &lmkey, &lmvalue, MDB_GET_BOTH));
key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue));
lmkey = lmdb::to_val(key);
int err = mdb_cursor_get(webhooks_cur.get(), &lmkey, &lmvalue, MDB_SET);
if (!err)
MONERO_LMDB_CHECK(mdb_cursor_del(webhooks_cur.get(), MDB_NODUPDATA));
lmkey = lmdb::to_val(key.user);
err = mdb_cursor_get(events_cur.get(), &lmkey, &lmvalue, MDB_SET);
if (!err)
mdb_cursor_del(events_cur.get(), MDB_NODUPDATA);
}
return success();
});
}
expect<void> storage::clear_webhooks(std::vector<boost::uuids::uuid> ids)
{
if (ids.empty())
return success();
std::sort(ids.begin(), ids.end());
return db->try_write([this, &ids] (MDB_txn& txn) -> expect<void>
{
cursor::webhooks webhooks_cur;
cursor::events events_cur;
MONERO_CHECK(check_cursor(txn, this->db->tables.webhooks, webhooks_cur));
MONERO_CHECK(check_cursor(txn, this->db->tables.events, events_cur));
MDB_val key{};
MDB_val value{};
int err = mdb_cursor_get(webhooks_cur.get(), &key, &value, MDB_FIRST);
for ( ; /* every webhook */ ; )
{
if (err)
{
if (err == MDB_NOTFOUND)
break;
return {lmdb::error(err)};
}
const boost::uuids::uuid id =
MONERO_UNWRAP(webhooks.get_fixed_value<MONERO_FIELD(webhook_dupsort, event_id)>(value));
if (std::binary_search(ids.begin(), ids.end(), id))
MONERO_LMDB_CHECK(mdb_cursor_del(webhooks_cur.get(), 0));
err = mdb_cursor_get(webhooks_cur.get(), &key, &value, MDB_NEXT);
}
err = mdb_cursor_get(events_cur.get(), &key, &value, MDB_FIRST);
for ( ; /* every event */ ; )
{
if (err)
{
if (err == MDB_NOTFOUND)
break;
return {lmdb::error(err)};
}
const webhook_dupsort event =
MONERO_UNWRAP(events_by_account_id.get_value<MONERO_FIELD(webhook_event, link_webhook)>(value));
if (std::binary_search(ids.begin(), ids.end(), event.event_id))
MONERO_LMDB_CHECK(mdb_cursor_del(events_cur.get(), 0));
err = mdb_cursor_get(events_cur.get(), &key, &value, MDB_NEXT);
}
return success();
});
}
} // db
} // lws

View File

@@ -56,6 +56,9 @@ namespace db
MONERO_CURSOR(blocks);
MONERO_CURSOR(accounts_by_address);
MONERO_CURSOR(accounts_by_height);
MONERO_CURSOR(webhooks);
MONERO_CURSOR(events);
}
struct storage_internal;
@@ -130,6 +133,10 @@ namespace db
expect<request_info>
get_request(request type, account_address const& address, cursor::requests cur = nullptr) noexcept;
//! \return All webhooks in the DB
expect<std::vector<std::pair<webhook_key, std::vector<webhook_value>>>>
get_webhooks(cursor::webhooks cur = nullptr);
//! Dump the contents of the database in JSON format to `out`.
expect<void> json_debug(std::ostream& out, bool show_keys);
@@ -229,7 +236,28 @@ namespace db
\return True iff LMDB successfully committed the update.
*/
expect<std::size_t> update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts);
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts);
/*!
Add webhook to be tracked in the database. The webhook will "call"
the specified URL with JSON/msgpack information when the event occurs.
\param type The webhook event type to be tracked by the DB.
\param address is required for `type == tx_confirmation`, and is not
not needed for all other types (use default construction of zeroes).
\param event Additional information for the webhook. A valid "http"
or "https" URL must be provided (or else error). All other information
is optional.
*/
expect<void> add_webhook(webhook_type type, const account_address& address, const webhook_value& event);
/*! Delete all webhooks associated with every value in `addresses`. This is
likely only valid for `tx_confirmation` event types. */
expect<void> clear_webhooks(epee::span<const account_address> addressses);
//! Delete all webhooks associated with every value in `ids`
expect<void> clear_webhooks(std::vector<boost::uuids::uuid> ids);
//! `txn` must have come from a previous call on the same thread.
expect<storage_reader> start_read(lmdb::suspended_txn txn = nullptr) const;