LibQuicR
Loading...
Searching...
No Matches
server.cpp
// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause
#include <condition_variable>
#include <map>
#include <oss/cxxopts.hpp>
#include <set>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
#include <quicr/cache.h>
#include <quicr/detail/defer.h>
#include <quicr/server.h>
#include "signal_handler.h"
using TrackNamespaceHash = uint64_t;
using TrackNameHash = uint64_t;
using FullTrackNameHash = uint64_t;
struct CacheObject
{
};
template<>
struct std::less<CacheObject>
{
constexpr bool operator()(const CacheObject& lhs, const CacheObject& rhs) const noexcept
{
return lhs.headers.object_id < rhs.headers.object_id;
}
};
namespace qserver_vars {
bool force_track_alias{ true };
std::mutex state_mutex;
std::map<quicr::TrackNamespace, std::map<quicr::ConnectionHandle, std::set<quicr::messages::TrackAlias>>>
announce_active;
std::map<quicr::messages::TrackAlias,
std::map<quicr::ConnectionHandle, std::shared_ptr<quicr::PublishTrackHandler>>>
subscribes;
std::map<quicr::ConnectionHandle, std::map<quicr::messages::SubscribeId, quicr::messages::TrackAlias>>
subscribe_alias_sub_id;
struct SubscribeInfo
{
uint64_t connection_handle;
uint64_t subscribe_id;
uint64_t track_alias;
bool operator<(const SubscribeInfo& other) const
{
return connection_handle < other.connection_handle ||
(connection_handle == other.connection_handle && subscribe_id < other.subscribe_id);
}
bool operator==(const SubscribeInfo& other) const
{
return connection_handle == other.connection_handle && subscribe_id == other.subscribe_id;
}
bool operator>(const SubscribeInfo& other) const
{
return connection_handle > other.connection_handle ||
(connection_handle == other.connection_handle && subscribe_id > other.subscribe_id);
}
};
std::map<quicr::TrackNamespace, std::map<TrackNameHash, std::set<SubscribeInfo>>> subscribe_active;
std::map<quicr::messages::TrackAlias,
std::map<quicr::ConnectionHandle, std::shared_ptr<quicr::SubscribeTrackHandler>>>
pub_subscribes;
std::map<quicr::TrackNamespace, std::set<quicr::ConnectionHandle>> subscribes_announces;
std::map<quicr::TrackNamespaceHash, quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>> cache;
std::shared_ptr<quicr::ThreadedTickService> tick_service = std::make_shared<quicr::ThreadedTickService>();
std::unordered_map<uint64_t, std::atomic_bool> stop_fetch;
}
class MySubscribeTrackHandler : public quicr::SubscribeTrackHandler
{
public:
MySubscribeTrackHandler(const quicr::FullTrackName& full_track_name)
: SubscribeTrackHandler(full_track_name,
3,
quicr::messages::GroupOrder::kAscending,
quicr::messages::FilterType::kLatestObject)
{
}
void ObjectReceived(const quicr::ObjectHeaders& object_headers, quicr::BytesSpan data) override
{
if (data.size() > 255) {
SPDLOG_CRITICAL("Example server is for example only, received data > 255 bytes is not allowed!");
SPDLOG_CRITICAL("Use github.com/quicr/laps for full relay functionality");
throw std::runtime_error("Example server is for example only, received data > 255 bytes is not allowed!");
}
latest_group_ = object_headers.group_id;
latest_object_ = object_headers.object_id;
std::lock_guard<std::mutex> _(qserver_vars::state_mutex);
auto track_alias = GetTrackAlias();
if (!track_alias.has_value()) {
SPDLOG_DEBUG("Data without valid track alias");
return;
}
auto sub_it = qserver_vars::subscribes.find(track_alias.value());
if (sub_it == qserver_vars::subscribes.end()) {
SPDLOG_INFO("No subscribes, not relaying data size: {0} ", data.size());
return;
}
// Cache Object
if (qserver_vars::cache.count(*track_alias) == 0) {
qserver_vars::cache.insert(std::make_pair(
*track_alias,
quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>{ 50000, 1, qserver_vars::tick_service }));
}
auto& cache_entry = qserver_vars::cache.at(*track_alias);
CacheObject object{ object_headers, { data.begin(), data.end() } };
if (auto group = cache_entry.Get(object_headers.group_id)) {
group->insert(std::move(object));
} else {
cache_entry.Insert(object_headers.group_id, { std::move(object) }, 50000);
}
// Fan out to all subscribers
for (const auto& [conn_id, pth] : sub_it->second) {
pth->PublishObject(object_headers, data);
}
}
void StatusChanged(Status status) override
{
if (status == Status::kOk) {
SPDLOG_INFO("Track alias: {0} is subscribed", GetTrackAlias().value());
} else {
std::string reason = "";
switch (status) {
reason = "not connected";
break;
reason = "subscribe error";
break;
reason = "not authorized";
break;
reason = "not subscribed";
break;
reason = "pending subscribe response";
break;
reason = "unsubscribing";
break;
default:
break;
}
SPDLOG_INFO("Track alias: {0} failed to subscribe reason: {1}", GetTrackAlias().value(), reason);
}
}
private:
uint64_t latest_group_{ 0 };
uint64_t latest_object_{ 0 };
};
class MyPublishTrackHandler : public quicr::PublishTrackHandler
{
public:
MyPublishTrackHandler(const quicr::FullTrackName& full_track_name,
quicr::TrackMode track_mode,
uint8_t default_priority,
uint32_t default_ttl)
: quicr::PublishTrackHandler(full_track_name, track_mode, default_priority, default_ttl)
{
}
void StatusChanged(Status status) override
{
if (status == Status::kOk) {
SPDLOG_INFO("Publish track alias {0} has subscribers", GetTrackAlias().value());
} else {
std::string reason = "";
switch (status) {
reason = "not connected";
break;
reason = "not announced";
break;
reason = "not authorized";
break;
reason = "pending announce response";
break;
reason = "no subscribers";
break;
reason = "sending unannounce";
break;
default:
break;
}
SPDLOG_INFO("Publish track alias: {0} not ready, reason: {1}", GetTrackAlias().value(), reason);
}
}
void MetricsSampled(const quicr::PublishTrackMetrics& metrics) override
{
SPDLOG_DEBUG("Metrics sample time: {0}"
" track_alias: {1}"
" objects sent: {2}"
" bytes sent: {3}"
" object duration us: {4}"
" queue discards: {5}"
" queue size: {6}",
GetTrackAlias().value(),
metrics.bytes_published,
metrics.quic.tx_queue_size.avg);
}
};
class MyServer : public quicr::Server
{
public:
MyServer(const quicr::ServerConfig& cfg)
: quicr::Server(cfg)
{
}
void NewConnectionAccepted(quicr::ConnectionHandle connection_handle, const ConnectionRemoteInfo& remote) override
{
SPDLOG_INFO("New connection handle {0} accepted from {1}:{2}", connection_handle, remote.ip, remote.port);
}
void MetricsSampled(quicr::ConnectionHandle connection_handle, const quicr::ConnectionMetrics& metrics) override
{
SPDLOG_DEBUG("Metrics sample time: {0}"
" connection handle: {1}"
" rtt_us: {2}"
" srtt_us: {3}"
" rate_bps: {4}"
" lost pkts: {5}",
connection_handle,
metrics.quic.rtt_us.max,
metrics.quic.srtt_us.max,
metrics.quic.tx_rate_bps.max,
metrics.quic.tx_lost_pkts);
}
std::vector<quicr::ConnectionHandle> UnannounceReceived(quicr::ConnectionHandle connection_handle,
const quicr::TrackNamespace& track_namespace) override
{
auto th = quicr::TrackHash({ track_namespace, {}, std::nullopt });
SPDLOG_DEBUG("Received unannounce from connection handle: {0} for namespace hash: {1}, removing all tracks "
"associated with namespace",
connection_handle,
th.track_namespace_hash);
std::vector<quicr::ConnectionHandle> sub_annos_connections;
// TODO: Fix O(prefix namespaces) matching
for (const auto& [ns, conns] : qserver_vars::subscribes_announces) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (auto sub_conn_handle : conns) {
SPDLOG_DEBUG(
"Received unannounce matches prefix subscribed from connection handle: {} for namespace hash: {}",
sub_conn_handle,
th.track_namespace_hash);
sub_annos_connections.emplace_back(sub_conn_handle);
}
}
for (auto track_alias : qserver_vars::announce_active[track_namespace][connection_handle]) {
auto ptd = qserver_vars::pub_subscribes[track_alias][connection_handle];
if (ptd != nullptr) {
SPDLOG_INFO(
"Received unannounce from connection handle: {0} for namespace hash: {1}, removing track alias: {2}",
connection_handle,
th.track_namespace_hash,
track_alias);
UnsubscribeTrack(connection_handle, ptd);
}
qserver_vars::pub_subscribes[track_alias].erase(connection_handle);
if (qserver_vars::pub_subscribes[track_alias].empty()) {
qserver_vars::pub_subscribes.erase(track_alias);
}
}
qserver_vars::announce_active[track_namespace].erase(connection_handle);
if (qserver_vars::announce_active[track_namespace].empty()) {
qserver_vars::announce_active.erase(track_namespace);
}
return sub_annos_connections;
}
const quicr::TrackNamespace& prefix_namespace) override
{
auto it = qserver_vars::subscribes_announces.find(prefix_namespace);
if (it == qserver_vars::subscribes_announces.end()) {
return;
}
auto th = quicr::TrackHash({ prefix_namespace, {}, std::nullopt });
SPDLOG_INFO("Unsubscribe announces received connection handle: {} for namespace_hash: {}, removing",
connection_handle,
th.track_namespace_hash);
}
std::pair<std::optional<quicr::messages::SubscribeAnnouncesErrorCode>, std::vector<quicr::TrackNamespace>>
const quicr::TrackNamespace& prefix_namespace,
{
auto th = quicr::TrackHash({ prefix_namespace, {}, std::nullopt });
std::cout << "size of subscribe announces " << qserver_vars::subscribes_announces.size() << std::endl;
auto [it, is_new] = qserver_vars::subscribes_announces.try_emplace(prefix_namespace);
it->second.insert(connection_handle);
if (is_new) {
SPDLOG_INFO("Subscribe announces received connection handle: {} for namespace_hash: {}, adding to state",
connection_handle,
th.track_namespace_hash);
}
std::vector<quicr::TrackNamespace> matched_ns;
// TODO: Fix O(prefix namespaces) matching
for (const auto& [ns, _] : qserver_vars::announce_active) {
if (ns.HasSamePrefix(prefix_namespace)) {
matched_ns.push_back(ns);
}
}
return { std::nullopt, std::move(matched_ns) };
}
void AnnounceReceived(quicr::ConnectionHandle connection_handle,
const quicr::TrackNamespace& track_namespace,
{
auto th = quicr::TrackHash({ track_namespace, {}, std::nullopt });
SPDLOG_INFO("Received announce from connection handle: {0} for namespace_hash: {1}",
connection_handle,
th.track_namespace_hash);
// Add to state if not exist
auto [anno_conn_it, is_new] = qserver_vars::announce_active[track_namespace].try_emplace(connection_handle);
if (!is_new) {
SPDLOG_INFO("Received announce from connection handle: {} for namespace hash: {} is duplicate, ignoring",
connection_handle,
th.track_namespace_hash);
return;
}
AnnounceResponse announce_response;
announce_response.reason_code = quicr::Server::AnnounceResponse::ReasonCode::kOk;
auto& anno_tracks = qserver_vars::announce_active[track_namespace][connection_handle];
std::vector<quicr::ConnectionHandle> sub_annos_connections;
// TODO: Fix O(prefix namespaces) matching
for (const auto& [ns, conns] : qserver_vars::subscribes_announces) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (auto sub_conn_handle : conns) {
SPDLOG_DEBUG(
"Received announce matches prefix subscribed from connection handle: {} for namespace hash: {}",
sub_conn_handle,
th.track_namespace_hash);
sub_annos_connections.emplace_back(sub_conn_handle);
}
}
ResolveAnnounce(connection_handle, track_namespace, sub_annos_connections, announce_response);
// Check if there are any subscribes. If so, send subscribe to announce for all tracks matching namespace
for (const auto& [ns, sub_tracks] : qserver_vars::subscribe_active) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (const auto& [track_name, si] : sub_tracks) {
if (not si.empty()) { // Have subscribes
auto& a_si = *si.begin();
if (anno_tracks.find(a_si.track_alias) == anno_tracks.end()) {
SPDLOG_INFO("Sending subscribe to announcer connection handle: {0} subscribe track_alias: {1}",
connection_handle,
a_si.track_alias);
anno_tracks.insert(a_si.track_alias); // Add track to state
const auto pub_track_h = qserver_vars::subscribes[a_si.track_alias][a_si.connection_handle];
auto sub_track_handler =
std::make_shared<MySubscribeTrackHandler>(pub_track_h->GetFullTrackName());
sub_track_handler->SetTrackAlias(
a_si.track_alias); // Publisher handler may have different track alias
SubscribeTrack(connection_handle, sub_track_handler);
qserver_vars::pub_subscribes[a_si.track_alias][connection_handle] = sub_track_handler;
}
}
}
}
}
void ConnectionStatusChanged(quicr::ConnectionHandle connection_handle, ConnectionStatus status) override
{
if (status == ConnectionStatus::kConnected) {
SPDLOG_DEBUG("Connection ready connection_handle: {0} ", connection_handle);
} else {
SPDLOG_DEBUG(
"Connection changed connection_handle: {0} status: {1}", connection_handle, static_cast<int>(status));
// Remove all subscribe announces for this connection handle
std::vector<quicr::TrackNamespace> remove_ns;
for (auto& [ns, conns] : qserver_vars::subscribes_announces) {
auto it = conns.find(connection_handle);
if (it != conns.end()) {
conns.erase(it);
if (conns.empty()) {
remove_ns.emplace_back(ns);
}
}
}
for (auto ns : remove_ns) {
qserver_vars::subscribes_announces.erase(ns);
}
}
}
const quicr::ClientSetupAttributes& client_setup_attributes) override
{
ClientSetupResponse client_setup_response;
SPDLOG_INFO("Client setup received from endpoint_id: {0}", client_setup_attributes.endpoint_id);
return client_setup_response;
}
void UnsubscribeReceived(quicr::ConnectionHandle connection_handle, uint64_t subscribe_id) override
{
SPDLOG_INFO("Unsubscribe connection handle: {0} subscribe_id: {1}", connection_handle, subscribe_id);
auto ta_conn_it = qserver_vars::subscribe_alias_sub_id.find(connection_handle);
if (ta_conn_it == qserver_vars::subscribe_alias_sub_id.end()) {
SPDLOG_WARN("Unable to find track alias connection for connection handle: {0} subscribe_id: {1}",
connection_handle,
subscribe_id);
return;
}
auto ta_it = ta_conn_it->second.find(subscribe_id);
if (ta_it == ta_conn_it->second.end()) {
SPDLOG_WARN("Unable to find track alias for connection handle: {0} subscribe_id: {1}",
connection_handle,
subscribe_id);
return;
}
std::lock_guard<std::mutex> _(qserver_vars::state_mutex);
auto track_alias = ta_it->second;
ta_conn_it->second.erase(ta_it);
if (!ta_conn_it->second.size()) {
qserver_vars::subscribe_alias_sub_id.erase(ta_conn_it);
}
auto& track_h = qserver_vars::subscribes[track_alias][connection_handle];
if (track_h == nullptr) {
SPDLOG_WARN("Unsubscribe unable to find track delegate for connection handle: {0} subscribe_id: {1}",
connection_handle,
subscribe_id);
return;
}
const auto& tfn = track_h->GetFullTrackName();
auto th = quicr::TrackHash(tfn);
qserver_vars::subscribes[track_alias].erase(connection_handle);
bool unsub_pub{ false };
if (!qserver_vars::subscribes[track_alias].size()) {
unsub_pub = true;
qserver_vars::subscribes.erase(track_alias);
}
qserver_vars::subscribe_active[tfn.name_space][th.track_name_hash].erase(
qserver_vars::SubscribeInfo{ connection_handle, subscribe_id, th.track_fullname_hash });
if (!qserver_vars::subscribe_active[tfn.name_space][th.track_name_hash].size()) {
qserver_vars::subscribe_active[tfn.name_space].erase(th.track_name_hash);
}
if (!qserver_vars::subscribe_active[tfn.name_space].size()) {
qserver_vars::subscribe_active.erase(tfn.name_space);
}
if (unsub_pub) {
SPDLOG_INFO("No subscribers left, unsubscribe publisher track_alias: {0}", track_alias);
auto anno_ns_it = qserver_vars::announce_active.find(tfn.name_space);
if (anno_ns_it == qserver_vars::announce_active.end()) {
return;
}
for (auto& [pub_connection_handle, tracks] : anno_ns_it->second) {
if (tracks.find(th.track_fullname_hash) != tracks.end()) {
SPDLOG_INFO("Unsubscribe to announcer conn_id: {0} subscribe track_alias: {1}",
pub_connection_handle,
th.track_fullname_hash);
tracks.erase(th.track_fullname_hash); // Add track alias to state
auto sub_track_h = qserver_vars::pub_subscribes[th.track_fullname_hash][pub_connection_handle];
if (sub_track_h != nullptr) {
UnsubscribeTrack(pub_connection_handle, sub_track_h);
}
}
}
}
}
uint64_t subscribe_id,
uint64_t proposed_track_alias,
[[maybe_unused]] quicr::messages::FilterType filter_type,
const quicr::FullTrackName& track_full_name,
const quicr::SubscribeAttributes& attrs) override
{
auto th = quicr::TrackHash(track_full_name);
SPDLOG_INFO("New subscribe connection handle: {} subscribe_id: {} computed track alias: {} proposed "
"track_alias: {} priority: {}",
connection_handle,
subscribe_id,
th.track_fullname_hash,
proposed_track_alias,
attrs.priority);
if (qserver_vars::force_track_alias && proposed_track_alias && proposed_track_alias != th.track_fullname_hash) {
std::ostringstream err;
err << "Use track alias: " << th.track_fullname_hash;
connection_handle,
subscribe_id,
{ quicr::SubscribeResponse::ReasonCode::kRetryTrackAlias, err.str(), th.track_fullname_hash });
return;
}
std::optional<uint64_t> latest_group_id = std::nullopt;
std::optional<uint64_t> latest_object_id = std::nullopt;
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
latest_group_id = latest_object->headers.group_id;
latest_object_id = latest_object->headers.object_id;
}
}
ResolveSubscribe(connection_handle,
subscribe_id,
{
quicr::SubscribeResponse::ReasonCode::kOk,
std::nullopt,
std::nullopt,
latest_group_id,
latest_object_id,
});
auto pub_track_h =
std::make_shared<MyPublishTrackHandler>(track_full_name, quicr::TrackMode::kStream, attrs.priority, 50000);
if (not qserver_vars::force_track_alias) {
pub_track_h->SetTrackAlias(proposed_track_alias);
}
qserver_vars::subscribes[th.track_fullname_hash][connection_handle] = pub_track_h;
qserver_vars::subscribe_alias_sub_id[connection_handle][subscribe_id] = th.track_fullname_hash;
// record subscribe as active from this subscriber
qserver_vars::subscribe_active[track_full_name.name_space][th.track_name_hash].emplace(
qserver_vars::SubscribeInfo{ connection_handle, subscribe_id, th.track_fullname_hash });
// Create a subscribe track that will be used by the relay to send to subscriber for matching objects
BindPublisherTrack(connection_handle, subscribe_id, pub_track_h, false);
// Subscribe to announcer if announcer is active
bool success = false;
for (auto& [ns, conns] : qserver_vars::announce_active) {
if (!ns.HasSamePrefix(track_full_name.name_space)) {
continue;
}
success = true;
// Loop through connectio handles
for (auto& [conn_h, tracks] : conns) {
// aggregate subscriptions
if (tracks.find(th.track_fullname_hash) == tracks.end()) {
last_subscription_time_ = std::chrono::steady_clock::now();
SPDLOG_INFO("Sending subscribe to announcer connection handler: {0} subscribe track_alias: {1}",
conn_h,
th.track_fullname_hash);
tracks.insert(th.track_fullname_hash); // Add track alias to state
auto sub_track_h = std::make_shared<MySubscribeTrackHandler>(track_full_name);
auto copy_sub_track_h = sub_track_h;
SubscribeTrack(conn_h, sub_track_h);
SPDLOG_INFO("Sending subscription to announcer connection: {0} hash: {1}, handler: {2}",
conn_h,
th.track_fullname_hash,
sub_track_h->GetFullTrackName().track_alias.value());
qserver_vars::pub_subscribes[th.track_fullname_hash][conn_h] = copy_sub_track_h;
} else {
if (!last_subscription_time_.has_value()) {
last_subscription_time_ = std::chrono::steady_clock::now();
}
auto now = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(now - last_subscription_time_.value())
.count();
if (elapsed > kSubscriptionDampenDurationMs_) {
// send subscription update
auto& sub_track_h = qserver_vars::pub_subscribes[th.track_fullname_hash][conn_h];
if (sub_track_h == nullptr) {
return;
}
SPDLOG_INFO("Sending subscription update to announcer connection: {0} hash: {1}",
th.track_namespace_hash,
subscribe_id);
UpdateTrackSubscription(conn_h, sub_track_h);
last_subscription_time_ = std::chrono::steady_clock::now();
}
}
}
}
if (not success) {
SPDLOG_INFO("Subscribe to track namespace hash: {0}, does not have any announcements.",
th.track_namespace_hash);
}
}
bool FetchReceived([[maybe_unused]] quicr::ConnectionHandle connection_handle,
[[maybe_unused]] uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
{
SPDLOG_INFO("Received Fetch for conn_id: {} subscribe_id: {} start_group: {} end_group: {}",
connection_handle,
subscribe_id,
attrs.start_group,
attrs.end_group);
const auto th = quicr::TrackHash(track_full_name);
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it == qserver_vars::cache.end()) {
SPDLOG_WARN("No cache entry for the hash {}", th.track_fullname_hash);
return false;
}
auto& [_, cache_entry] = *cache_entry_it;
const auto groups = cache_entry.Get(attrs.start_group, attrs.end_group);
if (groups.empty()) {
SPDLOG_WARN("No groups found for requested range");
return false;
}
return std::any_of(groups.begin(), groups.end(), [&](const auto& group) {
return !group->empty() && group->begin()->headers.object_id <= attrs.start_object &&
std::prev(group->end())->headers.object_id >= (attrs.end_object - 1);
});
}
void OnFetchOk(quicr::ConnectionHandle connection_handle,
uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
{
auto pub_track_h =
std::make_shared<MyPublishTrackHandler>(track_full_name, quicr::TrackMode::kStream, attrs.priority, 50000);
BindPublisherTrack(connection_handle, subscribe_id, pub_track_h);
const auto th = quicr::TrackHash(track_full_name);
qserver_vars::stop_fetch.emplace(std::make_pair(subscribe_id, false));
std::thread retrieve_cache_thread(
[=, cache_entries = qserver_vars::cache.at(th.track_fullname_hash).Get(attrs.start_group, attrs.end_group)] {
defer(UnbindPublisherTrack(connection_handle, pub_track_h));
for (const auto& cache_entry : cache_entries) {
for (const auto& object : *cache_entry) {
if (qserver_vars::stop_fetch[subscribe_id]) {
qserver_vars::stop_fetch.erase(subscribe_id);
return;
}
if ((object.headers.group_id < attrs.start_group || object.headers.group_id >= attrs.end_group) ||
(object.headers.object_id < attrs.start_object ||
object.headers.object_id >= attrs.end_object))
continue;
SPDLOG_INFO("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id);
pub_track_h->PublishObject(object.headers, object.data);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
retrieve_cache_thread.detach();
}
void FetchCancelReceived(quicr::ConnectionHandle, uint64_t subscribe_id) override
{
SPDLOG_INFO("Canceling fetch for subscribe_id: {0}", subscribe_id);
qserver_vars::stop_fetch[subscribe_id] = true;
}
void NewGroupRequested(quicr::ConnectionHandle conn_id, uint64_t subscribe_id, uint64_t track_alias) override
{
for (auto [_, handler] : qserver_vars::pub_subscribes[track_alias]) {
if (!handler) {
continue;
}
SPDLOG_DEBUG(
"Received New Group Request for conn: {} sub_id: {} track_alias: {}", conn_id, subscribe_id, track_alias);
handler->RequestNewGroup();
}
}
private:
const int kSubscriptionDampenDurationMs_ = 1000;
std::optional<std::chrono::time_point<std::chrono::steady_clock>> last_subscription_time_;
};
/* -------------------------------------------------------------------------------------------------
* Main program
* -------------------------------------------------------------------------------------------------
*/
InitConfig(cxxopts::ParseResult& cli_opts)
{
std::string qlog_path;
if (cli_opts.count("qlog")) {
qlog_path = cli_opts["qlog"].as<std::string>();
}
if (cli_opts.count("debug") && cli_opts["debug"].as<bool>() == true) {
SPDLOG_INFO("setting debug level");
spdlog::default_logger()->set_level(spdlog::level::debug);
}
if (cli_opts.count("version") && cli_opts["version"].as<bool>() == true) {
SPDLOG_INFO("QuicR library version: {}", QUICR_VERSION);
exit(0);
}
if (cli_opts.count("client_track_alias")) {
qserver_vars::force_track_alias = false;
}
config.endpoint_id = cli_opts["endpoint_id"].as<std::string>();
config.server_bind_ip = cli_opts["bind_ip"].as<std::string>();
config.server_port = cli_opts["port"].as<uint16_t>();
config.transport_config.debug = cli_opts["debug"].as<bool>();
config.transport_config.tls_cert_filename = cli_opts["cert"].as<std::string>();
config.transport_config.tls_key_filename = cli_opts["key"].as<std::string>();
config.transport_config.use_reset_wait_strategy = false;
config.transport_config.time_queue_max_duration = 50000;
config.transport_config.quic_qlog_path = qlog_path;
return config;
}
int
main(int argc, char* argv[])
{
int result_code = EXIT_SUCCESS;
cxxopts::Options options("qclient",
std::string("MOQ Example Server using QuicR Version: ") + std::string(QUICR_VERSION));
options.set_width(75).set_tab_expansion().allow_unrecognised_options().add_options()("h,help", "Print help")(
"d,debug", "Enable debugging") // a bool parameter
("client_track_alias", "Set to allow client provided track alias")("v,version",
"QuicR Version") // a bool parameter
("b,bind_ip", "Bind IP", cxxopts::value<std::string>()->default_value("127.0.0.1"))(
"p,port", "Listening port", cxxopts::value<uint16_t>()->default_value("1234"))(
"e,endpoint_id", "This relay/server endpoint ID", cxxopts::value<std::string>()->default_value("moq-server"))(
"c,cert", "Certificate file", cxxopts::value<std::string>()->default_value("./server-cert.pem"))(
"k,key", "Certificate key file", cxxopts::value<std::string>()->default_value("./server-key.pem"))(
"q,qlog", "Enable qlog using path", cxxopts::value<std::string>()); // end of options
auto result = options.parse(argc, argv);
if (result.count("help")) {
std::cout << options.help({ "" }) << std::endl;
return EXIT_SUCCESS;
}
// Install a signal handlers to catch operating system signals
installSignalHandlers();
// Lock the mutex so that main can then wait on it
std::unique_lock<std::mutex> lock(moq_example::main_mutex);
quicr::ServerConfig config = InitConfig(result);
try {
auto server = std::make_shared<MyServer>(config);
if (server->Start() != quicr::Transport::Status::kReady) {
SPDLOG_ERROR("Server failed to start");
exit(-2);
}
// Wait until told to terminate
moq_example::cv.wait(lock, [&]() { return moq_example::terminate; });
// Unlock the mutex
lock.unlock();
} catch (const std::invalid_argument& e) {
std::cerr << "Invalid argument: " << e.what() << std::endl;
result_code = EXIT_FAILURE;
} catch (const std::exception& e) {
std::cerr << "Unexpected exception: " << e.what() << std::endl;
result_code = EXIT_FAILURE;
} catch (...) {
std::cerr << "Unexpected exception" << std::endl;
result_code = EXIT_FAILURE;
}
return result_code;
}
Definition cache.h:17
MOQ track handler for published track.
Definition publish_track_handler.h:22
virtual void StatusChanged(Status status)
Notification of publish track status change.
virtual void MetricsSampled(const PublishTrackMetrics &metrics)
Notification callback to provide sampled metrics.
@ kSendingUnannounce
In this state, callbacks will not be called.
MoQ Server.
Definition server.h:21
virtual void OnFetchOk(ConnectionHandle connection_handle, uint64_t subscribe_id, const FullTrackName &track_full_name, const FetchAttributes &attributes)
Event to run on sending FetchOk.
virtual std::vector< ConnectionHandle > UnannounceReceived(ConnectionHandle connection_handle, const TrackNamespace &track_namespace)=0
Callback notification for unannounce received.
void MetricsSampled(ConnectionHandle connection_handle, const ConnectionMetrics &metrics) override
Notification callback to provide sampled metrics.
virtual void ResolveSubscribe(ConnectionHandle connection_handle, uint64_t subscribe_id, const SubscribeResponse &subscribe_response)
Accept or reject an subscribe that was received.
virtual void SubscribeReceived(ConnectionHandle connection_handle, uint64_t subscribe_id, uint64_t proposed_track_alias, quicr::messages::FilterType filter_type, const FullTrackName &track_full_name, const SubscribeAttributes &subscribe_attributes)
Callback notification for new subscribe received.
virtual void UnsubscribeReceived(ConnectionHandle connection_handle, uint64_t subscribe_id)=0
Callback notification on unsubscribe received.
virtual SubscribeAnnouncesResponse SubscribeAnnouncesReceived(ConnectionHandle connection_handle, const TrackNamespace &prefix_namespace, const PublishAnnounceAttributes &announce_attributes)
virtual void AnnounceReceived(ConnectionHandle connection_handle, const TrackNamespace &track_namespace, const PublishAnnounceAttributes &publish_announce_attributes)
Callback notification for new announce received that needs to be authorized.
virtual ClientSetupResponse ClientSetupReceived(ConnectionHandle connection_handle, const ClientSetupAttributes &client_setup_attributes)=0
Callback on client setup message.
void NewConnectionAccepted(ConnectionHandle connection_handle, const ConnectionRemoteInfo &remote) override
Callback notification on new connection.
void ResolveAnnounce(ConnectionHandle connection_handle, const TrackNamespace &track_namespace, const std::vector< ConnectionHandle > &subscribers, const AnnounceResponse &announce_response)
Accept or reject an announce that was received.
void BindPublisherTrack(ConnectionHandle connection_handle, uint64_t subscribe_id, const std::shared_ptr< PublishTrackHandler > &track_handler, bool ephemeral=false)
Bind a server publish track handler based on a subscribe.
void ConnectionStatusChanged(ConnectionHandle connection_handle, ConnectionStatus status) override
Callback notification for connection status/state change.
virtual bool FetchReceived(ConnectionHandle connection_handle, uint64_t subscribe_id, const FullTrackName &track_full_name, const FetchAttributes &attributes)
Callback notification on Fetch message received.
virtual void UnsubscribeAnnouncesReceived(ConnectionHandle connection_handle, const TrackNamespace &prefix_namespace)=0
Callback notification for Unsubscribe announces received.
MOQ track handler for subscribed track.
Definition subscribe_track_handler.h:22
@ kSendingUnsubscribe
In this state, callbacks will not be called.
virtual void ObjectReceived(const ObjectHeaders &object_headers, BytesSpan data)
Notification of received [full] data object.
virtual void StatusChanged(Status status)
Notification of subscribe status.
Definition subscribe_track_handler.h:192
An N-tuple representation of a MOQ namespace.
Definition track_name.h:21
void UpdateTrackSubscription(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Update Subscription to a track.
void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Subscribe to a track.
void UnsubscribeTrack(ConnectionHandle connection_handle, const std::shared_ptr< SubscribeTrackHandler > &track_handler)
Unsubscribe track.
Definition transport.h:26
std::vector< Byte > Bytes
Definition common.h:22
Span< const Byte > BytesSpan
Definition common.h:23
uint64_t ConnectionHandle
Definition common.h:24
Client Setup Attributes.
Definition common.h:49
const std::string endpoint_id
Definition common.h:50
std::string endpoint_id
Definition config.h:14
quicr::TransportConfig transport_config
Definition config.h:17
Definition metrics.h:14
QuicConnectionMetrics quic
QUIC connection metrics.
Definition metrics.h:17
MetricsTimeStampUs last_sample_time
Last sampled time in microseconds.
Definition metrics.h:15
Fetch attributes.
Definition common.h:79
uint64_t start_group
Fetch starting group in range.
Definition common.h:82
uint64_t end_group
Fetch final group in range.
Definition common.h:84
uint8_t priority
Fetch priority.
Definition common.h:80
Full track name struct.
Definition track_name.h:267
TrackNamespace name_space
Definition track_name.h:268
Object headers struct.
Definition object.h:32
uint64_t object_id
Object ID - Application defined order of generation.
Definition object.h:34
uint64_t group_id
Object group ID - Application defined order of generation.
Definition object.h:33
Publish announce attributes.
Definition common.h:43
MinMaxAvg tx_object_duration_us
TX object time in queue duration in microseconds.
Definition metrics.h:58
uint64_t tx_queue_discards
count of objects discarded due clear and transition to new stream
Definition metrics.h:50
MinMaxAvg tx_queue_size
TX queue size in period.
Definition metrics.h:56
Definition metrics.h:39
uint64_t bytes_published
sum of payload bytes published
Definition metrics.h:42
struct quicr::PublishTrackMetrics::Quic quic
uint64_t objects_published
count of objects published
Definition metrics.h:43
MetricsTimeStampUs last_sample_time
Last sampled time in microseconds.
Definition metrics.h:40
Definition config.h:27
std::string server_bind_ip
Definition config.h:29
uint16_t server_port
Listening port for server.
Definition config.h:31
Subscribe attributes.
Definition common.h:32
uint8_t priority
Subscriber priority.
Definition common.h:33
Definition track_name.h:274