#include <condition_variable>
#include <map>
#include <oss/cxxopts.hpp>
#include <set>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
#include <quicr/detail/defer.h>
#include "signal_handler.h"
using TrackNamespaceHash = uint64_t;
using TrackNameHash = uint64_t;
using FullTrackNameHash = uint64_t;
struct CacheObject
{
};
template<>
struct std::less<CacheObject>
{
constexpr bool operator()(const CacheObject& lhs, const CacheObject& rhs) const noexcept
{
return lhs.headers.object_id < rhs.headers.object_id;
}
};
namespace qserver_vars {
std::mutex state_mutex;
std::map<quicr::TrackNamespace, std::map<quicr::ConnectionHandle, std::set<quicr::messages::TrackAlias>>>
announce_active;
std::map<quicr::messages::TrackAlias,
std::map<quicr::ConnectionHandle, std::shared_ptr<quicr::PublishTrackHandler>>>
subscribes;
std::map<quicr::ConnectionHandle, std::map<quicr::messages::RequestID, quicr::messages::TrackAlias>>
subscribe_alias_req_id;
struct SubscribeInfo
{
uint64_t connection_handle;
uint64_t subscribe_id;
uint64_t track_alias;
bool operator<(const SubscribeInfo& other) const
{
return connection_handle < other.connection_handle ||
(connection_handle == other.connection_handle && subscribe_id < other.subscribe_id);
}
bool operator==(const SubscribeInfo& other) const
{
return connection_handle == other.connection_handle && subscribe_id == other.subscribe_id;
}
bool operator>(const SubscribeInfo& other) const
{
return connection_handle > other.connection_handle ||
(connection_handle == other.connection_handle && subscribe_id > other.subscribe_id);
}
};
std::map<quicr::TrackNamespace, std::map<TrackNameHash, std::set<SubscribeInfo>>> subscribe_active;
std::map<quicr::messages::TrackAlias,
std::map<quicr::ConnectionHandle, std::shared_ptr<quicr::SubscribeTrackHandler>>>
pub_subscribes;
std::map<quicr::messages::RequestID, std::shared_ptr<quicr::SubscribeTrackHandler>>>
pub_subscribes_by_req_id;
std::map<quicr::TrackNamespace, std::set<quicr::ConnectionHandle>> subscribes_announces;
std::map<quicr::messages::TrackAlias, quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>> cache;
std::shared_ptr<quicr::ThreadedTickService> tick_service = std::make_shared<quicr::ThreadedTickService>();
std::map<std::pair<quicr::ConnectionHandle, quicr::messages::RequestID>, std::atomic_bool> stop_fetch;
std::set<quicr::messages::TrackAlias> track_aliases_new_group_request;
}
{
public:
MySubscribeTrackHandler(const quicr::FullTrackName& full_track_name, bool is_publisher_initiated = false)
: SubscribeTrackHandler(full_track_name,
3,
quicr::messages::GroupOrder::kAscending,
quicr::messages::FilterType::kLargestObject,
std::nullopt,
is_publisher_initiated)
{
}
{
if (data.size() > 255) {
SPDLOG_CRITICAL("Example server is for example only, received data > 255 bytes is not allowed!");
SPDLOG_CRITICAL("Use github.com/quicr/laps for full relay functionality");
throw std::runtime_error("Example server is for example only, received data > 255 bytes is not allowed!");
}
latest_group_ = object_headers.
group_id;
std::lock_guard<std::mutex> _(qserver_vars::state_mutex);
if (!track_alias.has_value()) {
SPDLOG_DEBUG("Data without valid track alias");
return;
}
auto sub_it = qserver_vars::subscribes.find(track_alias.value());
if (sub_it == qserver_vars::subscribes.end()) {
SPDLOG_TRACE("No subscribes, ignoring data size: {0} ", data.size());
return;
}
if (qserver_vars::cache.count(*track_alias) == 0) {
qserver_vars::cache.insert(std::make_pair(*track_alias,
quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>{
50000, 1000, qserver_vars::tick_service }));
}
auto& cache_entry = qserver_vars::cache.at(*track_alias);
CacheObject object{ object_headers, { data.begin(), data.end() } };
if (
auto group = cache_entry.Get(object_headers.
group_id)) {
group->insert(std::move(object));
} else {
cache_entry.Insert(object_headers.
group_id, { std::move(object) }, 50000);
}
try {
for (const auto& [conn_id, pth] : sub_it->second) {
pth->PublishObject(object_headers, data);
}
} catch (const std::exception& e) {
SPDLOG_ERROR("Caught exception trying to publish. (error={})", e.what());
}
if (qserver_vars::track_aliases_new_group_request.count(*track_alias) > 0) {
qserver_vars::track_aliases_new_group_request.erase(*track_alias);
}
}
{
if (status == Status::kOk) {
SPDLOG_INFO(
"Track alias: {0} is subscribed",
GetTrackAlias().value());
} else {
std::string reason = "";
switch (status) {
case Status::kNotConnected:
reason = "not connected";
break;
case Status::kError:
reason = "subscribe error";
break;
case Status::kNotAuthorized:
reason = "not authorized";
break;
case Status::kNotSubscribed:
reason = "not subscribed";
break;
case Status::kPendingResponse:
reason = "pending subscribe response";
break;
case Status::kSendingUnsubscribe:
reason = "unsubscribing";
break;
case Status::kPaused:
reason = "paused";
break;
case Status::kNewGroupRequested:
reason = "new group requested";
break;
default:
break;
}
SPDLOG_INFO(
"Track alias: {0} failed to subscribe reason: {1}",
GetTrackAlias().value(), reason);
}
}
private:
uint64_t latest_group_{ 0 };
uint64_t latest_object_{ 0 };
};
{
public:
MyPublishTrackHandler(const quicr::FullTrackName& full_track_name,
quicr::TrackMode track_mode,
uint8_t default_priority,
uint32_t default_ttl)
: quicr::PublishTrackHandler(full_track_name, track_mode, default_priority, default_ttl)
{
}
{
if (status == Status::kOk) {
SPDLOG_INFO(
"Publish track alias {0} has subscribers",
GetTrackAlias().value());
} else {
std::string reason = "";
switch (status) {
case Status::kNotConnected:
reason = "not connected";
break;
case Status::kNotAnnounced:
reason = "not announced";
break;
case Status::kAnnounceNotAuthorized:
reason = "not authorized";
break;
case Status::kPendingAnnounceResponse:
reason = "pending announce response";
break;
case Status::kNoSubscribers:
reason = "no subscribers";
break;
case Status::kSendingUnannounce:
reason = "sending unannounce";
break;
case Status::kNewGroupRequested:
reason = "new group requested";
qserver_vars::track_aliases_new_group_request.emplace(
GetTrackAlias().value());
break;
case Status::kPaused:
reason = "paused";
break;
default:
break;
}
SPDLOG_INFO(
"Publish track alias: {0} not ready, reason: {1}",
GetTrackAlias().value(), reason);
}
}
void MetricsSampled(
const quicr::PublishTrackMetrics& metrics)
override
{
SPDLOG_DEBUG("Metrics sample time: {0}"
" track_alias: {1}"
" objects sent: {2}"
" bytes sent: {3}"
" object duration us: {4}"
" queue discards: {5}"
" queue size: {6}",
GetTrackAlias().value(),
}
};
{
MyFetchTrackHandler(const std::shared_ptr<quicr::PublishFetchHandler> publish_fetch_handler,
const quicr::FullTrackName& full_track_name,
quicr::messages::SubscriberPriority priority,
quicr::messages::GroupOrder group_order,
uint64_t start_group,
uint64_t start_object,
uint64_t end_group,
uint64_t end_object)
: FetchTrackHandler(full_track_name, priority, group_order, start_group, end_group, start_object, end_object)
{
publish_fetch_handler_ = publish_fetch_handler;
}
public:
static auto Create(
const std::shared_ptr<quicr::PublishFetchHandler> publish_fetch_handler,
const quicr::FullTrackName& full_track_name,
std::uint8_t priority,
quicr::messages::GroupOrder group_order,
uint64_t start_group,
uint64_t start_object,
uint64_t end_group,
uint64_t end_object)
{
return std::shared_ptr<MyFetchTrackHandler>(new MyFetchTrackHandler(publish_fetch_handler,
full_track_name,
priority,
group_order,
start_group,
end_group,
start_object,
end_object));
}
{
if (publish_fetch_handler_) {
publish_fetch_handler_->PublishObject(headers, data);
}
}
{
switch (status) {
case Status::kOk: {
if (
auto track_alias =
GetTrackAlias(); track_alias.has_value()) {
SPDLOG_INFO("Track alias: {0} is ready to read", track_alias.value());
}
} break;
case Status::kError: {
SPDLOG_INFO("Fetch failed");
break;
}
default:
break;
}
}
private:
std::shared_ptr<quicr::PublishFetchHandler> publish_fetch_handler_;
};
{
public:
MyServer(const quicr::ServerConfig& cfg)
: quicr::Server(cfg)
{
}
{
SPDLOG_INFO("New connection handle {0} accepted from {1}:{2}", connection_handle, remote.ip, remote.port);
}
{
SPDLOG_DEBUG("Metrics sample time: {0}"
" connection handle: {1}"
" rtt_us: {2}"
" srtt_us: {3}"
" rate_bps: {4}"
" lost pkts: {5}",
connection_handle,
metrics.
quic.srtt_us.max,
metrics.
quic.tx_rate_bps.max,
metrics.
quic.tx_lost_pkts);
}
const quicr::TrackNamespace& track_namespace) override
{
auto th = quicr::TrackHash({ track_namespace, {} });
SPDLOG_DEBUG("Received unannounce from connection handle: {0} for namespace hash: {1}, removing all tracks "
"associated with namespace",
connection_handle,
th.track_namespace_hash);
std::vector<quicr::ConnectionHandle> sub_annos_connections;
for (const auto& [ns, conns] : qserver_vars::subscribes_announces) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (auto sub_conn_handle : conns) {
SPDLOG_DEBUG(
"Received unannounce matches prefix subscribed from connection handle: {} for namespace hash: {}",
sub_conn_handle,
th.track_namespace_hash);
sub_annos_connections.emplace_back(sub_conn_handle);
}
}
for (auto track_alias : qserver_vars::announce_active[track_namespace][connection_handle]) {
auto ptd = qserver_vars::pub_subscribes[track_alias][connection_handle];
if (ptd != nullptr) {
SPDLOG_INFO(
"Received unannounce from connection handle: {0} for namespace hash: {1}, removing track alias: {2}",
connection_handle,
th.track_namespace_hash,
track_alias);
}
qserver_vars::pub_subscribes[track_alias].erase(connection_handle);
if (qserver_vars::pub_subscribes[track_alias].empty()) {
qserver_vars::pub_subscribes.erase(track_alias);
}
}
qserver_vars::announce_active[track_namespace].erase(connection_handle);
if (qserver_vars::announce_active[track_namespace].empty()) {
qserver_vars::announce_active.erase(track_namespace);
}
return sub_annos_connections;
}
const quicr::TrackNamespace& prefix_namespace) override
{
auto it = qserver_vars::subscribes_announces.find(prefix_namespace);
if (it == qserver_vars::subscribes_announces.end()) {
return;
}
auto th = quicr::TrackHash({ prefix_namespace, {} });
SPDLOG_INFO("Unsubscribe announces received connection handle: {} for namespace_hash: {}, removing",
connection_handle,
th.track_namespace_hash);
}
std::pair<std::optional<quicr::messages::SubscribeNamespaceErrorCode>, std::vector<quicr::TrackNamespace>>
const quicr::TrackNamespace& prefix_namespace,
const quicr::PublishAnnounceAttributes&) override
{
auto th = quicr::TrackHash({ prefix_namespace, {} });
auto [it, is_new] = qserver_vars::subscribes_announces.try_emplace(prefix_namespace);
it->second.insert(connection_handle);
if (is_new) {
SPDLOG_INFO("Subscribe announces received connection handle: {} for namespace_hash: {}, adding to state",
connection_handle,
th.track_namespace_hash);
}
std::vector<quicr::TrackNamespace> matched_ns;
for (const auto& [ns, _] : qserver_vars::announce_active) {
if (ns.HasSamePrefix(prefix_namespace)) {
matched_ns.push_back(ns);
}
}
return { std::nullopt, std::move(matched_ns) };
}
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
const quicr::messages::PublishAttributes& publish_attributes) override
{
auto th = quicr::TrackHash(track_full_name);
SPDLOG_INFO("Received publish from connection handle: {} using track alias: {} request_id: {}",
connection_handle,
th.track_fullname_hash,
request_id);
quicr::PublishResponse publish_response;
publish_response.reason_code = quicr::PublishResponse::ReasonCode::kOk;
auto sub_track_handler = std::make_shared<MySubscribeTrackHandler>(track_full_name, true);
sub_track_handler->SetRequestId(request_id);
sub_track_handler->SetReceivedTrackAlias(publish_attributes.track_alias);
sub_track_handler->SetPriority(publish_attributes.priority);
qserver_vars::pub_subscribes[th.track_fullname_hash][connection_handle] = sub_track_handler;
qserver_vars::pub_subscribes_by_req_id[connection_handle][request_id] = sub_track_handler;
request_id,
true,
publish_attributes.priority,
publish_attributes.group_order,
publish_response);
if (qserver_vars::subscribes[th.track_fullname_hash].empty()) {
SPDLOG_INFO("No subscribers, pause publish connection handle: {0} using track alias: {1}",
connection_handle,
th.track_fullname_hash);
sub_track_handler->Pause();
}
}
const quicr::TrackNamespace& track_namespace,
const quicr::PublishAnnounceAttributes& attrs) override
{
auto th = quicr::TrackHash({ track_namespace, {} });
SPDLOG_INFO("Received announce from connection handle: {0} for namespace_hash: {1}",
connection_handle,
th.track_namespace_hash);
auto [anno_conn_it, is_new] = qserver_vars::announce_active[track_namespace].try_emplace(connection_handle);
if (!is_new) {
SPDLOG_INFO("Received announce from connection handle: {} for namespace hash: {} is duplicate, ignoring",
connection_handle,
th.track_namespace_hash);
return;
}
AnnounceResponse announce_response;
auto& anno_tracks = qserver_vars::announce_active[track_namespace][connection_handle];
std::vector<quicr::ConnectionHandle> sub_annos_connections;
for (const auto& [ns, conns] : qserver_vars::subscribes_announces) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (auto sub_conn_handle : conns) {
SPDLOG_DEBUG(
"Received announce matches prefix subscribed from connection handle: {} for namespace hash: {}",
sub_conn_handle,
th.track_namespace_hash);
sub_annos_connections.emplace_back(sub_conn_handle);
}
}
for (const auto& [ns, sub_tracks] : qserver_vars::subscribe_active) {
if (!ns.HasSamePrefix(track_namespace)) {
continue;
}
for (const auto& [track_name, si] : sub_tracks) {
if (not si.empty()) {
auto& a_si = *si.begin();
if (anno_tracks.find(a_si.track_alias) == anno_tracks.end()) {
SPDLOG_INFO("Sending subscribe to announcer connection handle: {0} subscribe track_alias: {1}",
connection_handle,
a_si.track_alias);
anno_tracks.insert(a_si.track_alias);
const auto pub_track_h = qserver_vars::subscribes[a_si.track_alias][a_si.connection_handle];
auto sub_track_handler =
std::make_shared<MySubscribeTrackHandler>(pub_track_h->GetFullTrackName());
sub_track_handler->SetTrackAlias(
a_si.track_alias);
qserver_vars::pub_subscribes[a_si.track_alias][connection_handle] = sub_track_handler;
}
}
}
}
}
{
if (status == ConnectionStatus::kConnected) {
SPDLOG_DEBUG("Connection ready connection_handle: {0} ", connection_handle);
} else {
SPDLOG_DEBUG(
"Connection changed connection_handle: {0} status: {1}", connection_handle, static_cast<int>(status));
std::vector<quicr::TrackNamespace> remove_ns;
for (auto& [ns, conns] : qserver_vars::subscribes_announces) {
auto it = conns.find(connection_handle);
if (it != conns.end()) {
conns.erase(it);
if (conns.empty()) {
remove_ns.emplace_back(ns);
}
}
}
for (auto ns : remove_ns) {
qserver_vars::subscribes_announces.erase(ns);
}
}
std::vector<quicr::messages::RequestID> subscribe_ids;
auto ta_conn_it = qserver_vars::subscribe_alias_req_id.find(connection_handle);
if (ta_conn_it != qserver_vars::subscribe_alias_req_id.end()) {
for (const auto& [sub_id, _] : ta_conn_it->second) {
subscribe_ids.push_back(sub_id);
}
}
for (const auto& sub_id : subscribe_ids) {
}
}
const quicr::ClientSetupAttributes& client_setup_attributes) override
{
ClientSetupResponse client_setup_response;
SPDLOG_INFO(
"Client setup received from endpoint_id: {0}", client_setup_attributes.
endpoint_id);
return client_setup_response;
}
{
SPDLOG_INFO("Subscribe Done connection handle: {0} request_id: {1}", connection_handle, request_id);
std::lock_guard<std::mutex> _(qserver_vars::state_mutex);
auto req_it = qserver_vars::pub_subscribes_by_req_id.find(connection_handle);
if (req_it == qserver_vars::pub_subscribes_by_req_id.end()) {
SPDLOG_WARN("Subscribe Done connection handle: {0} request_id: {1} does not have a connection entry in "
"state, ignoring",
connection_handle,
request_id);
return;
}
auto sub_it = req_it->second.find(request_id);
if (sub_it == req_it->second.end()) {
SPDLOG_WARN(
"Subscribe Done connection handle: {0} request_id: {1} does not matching existing state, ignoring",
connection_handle,
request_id);
return;
}
auto th = quicr::TrackHash(sub_it->second->GetFullTrackName());
qserver_vars::pub_subscribes[th.track_fullname_hash].erase(connection_handle);
req_it->second.erase(sub_it);
if (req_it->second.empty()) {
qserver_vars::pub_subscribes_by_req_id.erase(req_it);
}
}
{
SPDLOG_INFO("Unsubscribe received connection handle: {0} subscribe_id: {1}", connection_handle, request_id);
auto ta_conn_it = qserver_vars::subscribe_alias_req_id.find(connection_handle);
if (ta_conn_it == qserver_vars::subscribe_alias_req_id.end()) {
SPDLOG_WARN("Unable to find track alias connection for connection handle: {0} request_id: {1}",
connection_handle,
request_id);
return;
}
auto ta_it = ta_conn_it->second.find(request_id);
if (ta_it == ta_conn_it->second.end()) {
SPDLOG_WARN(
"Unable to find track alias for connection handle: {0} request_id: {1}", connection_handle, request_id);
return;
}
std::lock_guard<std::mutex> _(qserver_vars::state_mutex);
auto track_alias = ta_it->second;
ta_conn_it->second.erase(ta_it);
if (!ta_conn_it->second.size()) {
qserver_vars::subscribe_alias_req_id.erase(ta_conn_it);
}
auto& track_h = qserver_vars::subscribes[track_alias][connection_handle];
if (track_h == nullptr) {
SPDLOG_WARN("Unsubscribe unable to find track delegate for connection handle: {0} request_id: {1}",
connection_handle,
request_id);
return;
}
const auto& tfn = track_h->GetFullTrackName();
auto th = quicr::TrackHash(tfn);
qserver_vars::subscribes[track_alias].erase(connection_handle);
bool unsub_pub{ false };
if (!qserver_vars::subscribes[track_alias].size()) {
unsub_pub = true;
qserver_vars::subscribes.erase(track_alias);
}
qserver_vars::subscribe_active[tfn.name_space][th.track_name_hash].erase(
qserver_vars::SubscribeInfo{ connection_handle, request_id, th.track_fullname_hash });
if (!qserver_vars::subscribe_active[tfn.name_space][th.track_name_hash].size()) {
qserver_vars::subscribe_active[tfn.name_space].erase(th.track_name_hash);
}
if (!qserver_vars::subscribe_active[tfn.name_space].size()) {
qserver_vars::subscribe_active.erase(tfn.name_space);
}
if (unsub_pub) {
SPDLOG_INFO("No subscribers left, unsubscribe publisher track_alias: {0}", track_alias);
for (const auto& [pub_connection_handle, handler] : qserver_vars::pub_subscribes[track_alias]) {
if (handler->IsPublisherInitiated()) {
handler->Pause();
}
}
auto anno_ns_it = qserver_vars::announce_active.find(tfn.name_space);
if (anno_ns_it == qserver_vars::announce_active.end()) {
return;
}
for (auto& [pub_connection_handle, tracks] : anno_ns_it->second) {
if (tracks.find(th.track_fullname_hash) != tracks.end()) {
SPDLOG_INFO("Unsubscribe to announcer conn_id: {0} subscribe track_alias: {1}",
pub_connection_handle,
th.track_fullname_hash);
tracks.erase(th.track_fullname_hash);
auto sub_track_h = qserver_vars::pub_subscribes[th.track_fullname_hash][pub_connection_handle];
if (sub_track_h != nullptr) {
}
qserver_vars::pub_subscribes[th.track_fullname_hash].erase(pub_connection_handle);
}
}
if (qserver_vars::pub_subscribes[th.track_fullname_hash].empty()) {
qserver_vars::pub_subscribes.erase(th.track_fullname_hash);
}
}
}
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
[[maybe_unused]] const quicr::messages::SubscribeAttributes& subscribe_attributes) override
{
auto th = quicr::TrackHash(track_full_name);
SPDLOG_INFO("Track status request connection handle: {} request_id: {} track alias: {}",
connection_handle,
request_id,
th.track_fullname_hash);
std::optional<quicr::messages::Location> largest_location = std::nullopt;
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
largest_location = { latest_object->headers.group_id, latest_object->headers.object_id };
}
}
const auto pubs_it = qserver_vars::pub_subscribes.find(th.track_fullname_hash);
if (pubs_it != qserver_vars::pub_subscribes.end()) {
for (const auto& [conn_id, _] : pubs_it->second) {
if (conn_id != connection_handle) {
request_id,
th.track_fullname_hash,
{
quicr::SubscribeResponse::ReasonCode::kOk,
std::nullopt,
largest_location,
});
return;
}
}
}
request_id,
th.track_fullname_hash,
{
quicr::SubscribeResponse::ReasonCode::kTrackDoesNotExist,
"Track does not exist",
largest_location,
});
}
uint64_t request_id,
[[maybe_unused]] quicr::messages::FilterType filter_type,
const quicr::FullTrackName& track_full_name,
const quicr::messages::SubscribeAttributes& attrs) override
{
auto th = quicr::TrackHash(track_full_name);
SPDLOG_INFO("New subscribe connection handle: {} request_id: {} track alias: {} "
"priority: {}",
connection_handle,
request_id,
th.track_fullname_hash,
attrs.priority);
std::optional<quicr::messages::Location> largest_location = std::nullopt;
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
largest_location = { latest_object->headers.group_id, latest_object->headers.object_id };
}
}
const std::uint32_t ttl =
attrs.delivery_timeout != std::chrono::milliseconds::zero() ? attrs.delivery_timeout.count() : 50000;
const auto pub_track_h =
std::make_shared<MyPublishTrackHandler>(track_full_name, quicr::TrackMode::kStream, attrs.priority, ttl);
const auto track_alias = th.track_fullname_hash;
request_id,
track_alias,
{
quicr::SubscribeResponse::ReasonCode::kOk,
std::nullopt,
largest_location,
});
qserver_vars::subscribes[track_alias][connection_handle] = pub_track_h;
qserver_vars::subscribe_alias_req_id[connection_handle][request_id] = track_alias;
qserver_vars::subscribe_active[track_full_name.
name_space][th.track_name_hash].emplace(
qserver_vars::SubscribeInfo{ connection_handle, request_id, track_alias });
for (const auto& [pub_connection_handle, handler] : qserver_vars::pub_subscribes[track_alias]) {
if (handler->IsPublisherInitiated()) {
handler->Resume();
}
}
bool success = false;
for (auto& [ns, conns] : qserver_vars::announce_active) {
if (!ns.HasSamePrefix(track_full_name.
name_space)) {
continue;
}
success = true;
for (auto& [conn_h, tracks] : conns) {
if (tracks.find(track_alias) == tracks.end()) {
last_subscription_time_ = std::chrono::steady_clock::now();
SPDLOG_INFO("Sending subscribe to announcer connection handler: {0} subscribe track_alias: {1}",
conn_h,
track_alias);
tracks.insert(track_alias);
auto sub_track_h = std::make_shared<MySubscribeTrackHandler>(track_full_name);
auto copy_sub_track_h = sub_track_h;
SPDLOG_INFO("Sending subscription to announcer connection: {0} hash: {1}, handler: {2}",
conn_h,
th.track_fullname_hash,
track_alias);
qserver_vars::pub_subscribes[track_alias][conn_h] = copy_sub_track_h;
} else {
if (!last_subscription_time_.has_value()) {
last_subscription_time_ = std::chrono::steady_clock::now();
}
auto now = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(now - last_subscription_time_.value())
.count();
if (elapsed > kSubscriptionDampenDurationMs_) {
auto& sub_track_h = qserver_vars::pub_subscribes[track_alias][conn_h];
if (sub_track_h == nullptr) {
return;
}
SPDLOG_INFO("Sending subscription update to connection: hash: {0} request: {1}",
th.track_namespace_hash,
request_id);
last_subscription_time_ = std::chrono::steady_clock::now();
}
}
}
}
if (not success) {
SPDLOG_INFO("Subscribe to track namespace hash: {0}, does not have any announcements.",
th.track_namespace_hash);
}
}
std::optional<quicr::messages::Location>
GetLargestAvailable(
const quicr::FullTrackName& track_name)
override
{
std::optional<quicr::messages::Location> largest_location = std::nullopt;
const auto& th = quicr::TrackHash(track_name);
const auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
largest_location = { latest_object->headers.group_id, latest_object->headers.object_id };
}
}
return largest_location;
}
uint64_t request_id,
const quicr::FullTrackName& track_full_name,
const quicr::messages::FetchAttributes& attributes) override
{
auto anno_ns_it = qserver_vars::announce_active.find(track_full_name.
name_space);
if (anno_ns_it == qserver_vars::announce_active.end()) {
return false;
}
track_full_name, attributes.priority, request_id, attributes.group_order, 50000);
auto fetch_track_handler =
MyFetchTrackHandler::Create(pub_fetch_h,
track_full_name,
attributes.priority,
attributes.group_order,
attributes.start_location.group,
attributes.start_location.object,
attributes.end_group,
attributes.end_object.has_value() ? attributes.end_object.value() : 0);
FetchTrack(pub_connection_handle, fetch_track_handler);
return true;
};
for (auto& [pub_connection_handle, _] : anno_ns_it->second) {
return setup_fetch_handler(pub_connection_handle);
}
return false;
}
uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::messages::FetchAttributes& attrs) override
{
auto pub_fetch_h =
const auto th = quicr::TrackHash(track_full_name);
qserver_vars::stop_fetch.try_emplace({ connection_handle, subscribe_id }, false);
const auto cache_entries = [&] {
std::lock_guard lock(moq_example::main_mutex);
return qserver_vars::cache.at(th.track_fullname_hash).Get(attrs.start_location.group, attrs.end_group + 1);
}();
if (cache_entries.empty())
return false;
std::thread retrieve_cache_thread([=, this, cache_entries = cache_entries] {
for (const auto& cache_entry : cache_entries) {
for (const auto& object : *cache_entry) {
if (qserver_vars::stop_fetch[{ connection_handle, subscribe_id }]) {
qserver_vars::stop_fetch.erase({ connection_handle, subscribe_id });
return;
}
if (attrs.end_object.has_value() && *attrs.end_object != 0 &&
object.headers.group_id == attrs.end_group && object.headers.object_id > *attrs.end_object)
break;
SPDLOG_DEBUG(
"Fetching group: {} object: {}",
object.headers.
group_id,
object.headers.object_id);
try {
pub_fetch_h->PublishObject(object.headers, object.data);
} catch (const std::exception& e) {
SPDLOG_ERROR("Caught exception trying to publish. (error={})", e.what());
}
}
}
});
retrieve_cache_thread.detach();
return true;
}
{
SPDLOG_INFO("Canceling fetch for connection handle: {} subscribe_id: {}", connection_handle, subscribe_id);
if (qserver_vars::stop_fetch.count({ connection_handle, subscribe_id }) == 0)
qserver_vars::stop_fetch[{ connection_handle, subscribe_id }] = true;
}
private:
const int kSubscriptionDampenDurationMs_ = 1000;
std::optional<std::chrono::time_point<std::chrono::steady_clock>> last_subscription_time_;
};
InitConfig(cxxopts::ParseResult& cli_opts)
{
std::string qlog_path;
if (cli_opts.count("qlog")) {
qlog_path = cli_opts["qlog"].as<std::string>();
}
if (cli_opts.count("debug") && cli_opts["debug"].as<bool>() == true) {
SPDLOG_INFO("setting debug level");
spdlog::default_logger()->set_level(spdlog::level::debug);
}
if (cli_opts.count("version") && cli_opts["version"].as<bool>() == true) {
SPDLOG_INFO("QuicR library version: {}", QUICR_VERSION);
exit(0);
}
if (cli_opts.count("ssl_keylog") && cli_opts["ssl_keylog"].as<bool>() == true) {
SPDLOG_INFO("SSL Keylog enabled");
}
config.
endpoint_id = cli_opts[
"endpoint_id"].as<std::string>();
return config;
}
int
main(int argc, char* argv[])
{
int result_code = EXIT_SUCCESS;
cxxopts::Options options("qclient",
std::string("MOQ Example Server using QuicR Version: ") + std::string(QUICR_VERSION));
options.set_width(75).set_tab_expansion().allow_unrecognised_options().add_options()("h,help", "Print help")(
"d,debug", "Enable debugging")
("v,version", "QuicR Version")
("b,bind_ip", "Bind IP", cxxopts::value<std::string>()->default_value("127.0.0.1"))(
"p,port", "Listening port", cxxopts::value<uint16_t>()->default_value("1234"))(
"e,endpoint_id", "This relay/server endpoint ID", cxxopts::value<std::string>()->default_value("moq-server"))(
"c,cert", "Certificate file", cxxopts::value<std::string>()->default_value("./server-cert.pem"))(
"k,key", "Certificate key file", cxxopts::value<std::string>()->default_value("./server-key.pem"))(
"q,qlog", "Enable qlog using path", cxxopts::value<std::string>())(
"s,ssl_keylog", "Enable SSL Keylog for transport debugging");
auto result = options.parse(argc, argv);
if (result.count("help")) {
std::cout << options.help({ "" }) << std::endl;
return EXIT_SUCCESS;
}
installSignalHandlers();
std::unique_lock<std::mutex> lock(moq_example::main_mutex);
try {
auto server = std::make_shared<MyServer>(config);
SPDLOG_ERROR("Server failed to start");
exit(-2);
}
moq_example::cv.wait(lock, [&]() { return moq_example::terminate; });
lock.unlock();
} catch (const std::invalid_argument& e) {
std::cerr << "Invalid argument: " << e.what() << std::endl;
result_code = EXIT_FAILURE;
} catch (const std::exception& e) {
std::cerr << "Unexpected exception: " << e.what() << std::endl;
result_code = EXIT_FAILURE;
} catch (...) {
std::cerr << "Unexpected exception" << std::endl;
result_code = EXIT_FAILURE;
}
return result_code;
}
Definition fetch_track_handler.h:12
static std::shared_ptr< FetchTrackHandler > Create(const FullTrackName &full_track_name, messages::SubscriberPriority priority, messages::GroupOrder group_order, messages::GroupId start_group, messages::GroupId end_group, messages::GroupId start_object, messages::GroupId end_object)
Create shared Fetch track handler.
Definition fetch_track_handler.h:54
static std::shared_ptr< PublishFetchHandler > Create(const FullTrackName &full_track_name, uint8_t priority, uint64_t subscribe_id, messages::GroupOrder group_order, uint32_t ttl)
Definition publish_fetch_handler.h:25
MOQ track handler for published track.
Definition publish_track_handler.h:23
virtual void StatusChanged(Status status)
Notification of publish track status change.
virtual void MetricsSampled(const PublishTrackMetrics &metrics)
Notification callback to provide sampled metrics.
std::optional< uint64_t > GetTrackAlias() const noexcept
Get the track alias.
Definition publish_track_handler.h:248
MoQ Server.
Definition server.h:23
virtual std::vector< ConnectionHandle > UnannounceReceived(ConnectionHandle connection_handle, const TrackNamespace &track_namespace)=0
Callback notification for unannounce received.
virtual bool OnFetchOk(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const messages::FetchAttributes &attributes)
Event to run on sending FetchOk.
virtual void UnsubscribeReceived(ConnectionHandle connection_handle, uint64_t request_id)=0
Callback notification on unsubscribe received.
void UnbindFetchTrack(ConnectionHandle conn_id, const std::shared_ptr< PublishFetchHandler > &track_handler)
Unbind a server fetch publisher track handler.
void MetricsSampled(ConnectionHandle connection_handle, const ConnectionMetrics &metrics) override
Notification callback to provide sampled metrics.
void ResolveAnnounce(ConnectionHandle connection_handle, uint64_t request_id, const TrackNamespace &track_namespace, const std::vector< ConnectionHandle > &subscribers, const AnnounceResponse &announce_response)
Accept or reject an announce that was received.
virtual void FetchCancelReceived(ConnectionHandle connection_handle, uint64_t request_id)=0
Callback notification on receiving a FetchCancel message.
void BindPublisherTrack(ConnectionHandle connection_handle, ConnectionHandle src_id, uint64_t request_id, const std::shared_ptr< PublishTrackHandler > &track_handler, bool ephemeral=false)
Bind a server publish track handler based on a subscribe.
virtual void SubscribeReceived(ConnectionHandle connection_handle, uint64_t request_id, messages::FilterType filter_type, const FullTrackName &track_full_name, const messages::SubscribeAttributes &subscribe_attributes)
Callback notification for new subscribe received.
virtual std::optional< messages::Location > GetLargestAvailable(const FullTrackName &track_name)
Get the largest available location for the given track, if any.
virtual void SubscribeDoneReceived(ConnectionHandle connection_handle, uint64_t request_id)=0
Callback notification on Subscribe Done received.
virtual void ResolveSubscribe(ConnectionHandle connection_handle, uint64_t request_id, uint64_t track_alias, const SubscribeResponse &subscribe_response)
Accept or reject an subscribe that was received.
void BindFetchTrack(TransportConnId conn_id, std::shared_ptr< PublishFetchHandler > track_handler)
Bind a server fetch publisher track handler.
virtual SubscribeAnnouncesResponse SubscribeAnnouncesReceived(ConnectionHandle connection_handle, const TrackNamespace &prefix_namespace, const PublishAnnounceAttributes &announce_attributes)
virtual void AnnounceReceived(ConnectionHandle connection_handle, const TrackNamespace &track_namespace, const PublishAnnounceAttributes &publish_announce_attributes)
Callback notification for new announce received that needs to be authorized.
virtual ClientSetupResponse ClientSetupReceived(ConnectionHandle connection_handle, const ClientSetupAttributes &client_setup_attributes)=0
Callback on client setup message.
virtual bool FetchReceived(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const quicr::messages::FetchAttributes &attributes) override
Event to run on receiving Fetch request.
void NewConnectionAccepted(ConnectionHandle connection_handle, const ConnectionRemoteInfo &remote) override
Callback notification on new connection.
void ConnectionStatusChanged(ConnectionHandle connection_handle, ConnectionStatus status) override
Callback notification for connection status/state change.
virtual void PublishReceived(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const messages::PublishAttributes &publish_attributes)=0
Callback notification for new publish received.
virtual void ResolvePublish(ConnectionHandle connection_handle, uint64_t request_id, bool forward, messages::SubscriberPriority priority, messages::GroupOrder group_order, const PublishResponse &publish_response)
Accept or reject publish that was received.
virtual void UnsubscribeAnnouncesReceived(ConnectionHandle connection_handle, const TrackNamespace &prefix_namespace)=0
Callback notification for Unsubscribe announces received.
MOQ track handler for subscribed track.
Definition subscribe_track_handler.h:22
std::optional< uint64_t > GetTrackAlias() const noexcept
Get the track alias.
Definition subscribe_track_handler.h:166
void RequestNewGroup() noexcept
Generate a new group request for this subscription.
virtual void ObjectReceived(const ObjectHeaders &object_headers, BytesSpan data)
Notification of received [full] data object.
virtual void StatusChanged(Status status)
Notification of subscribe status.
Definition subscribe_track_handler.h:270
@ kReady
Definition transport.h:47
virtual void ResolveTrackStatus(ConnectionHandle connection_handle, uint64_t request_id, uint64_t track_alias, const SubscribeResponse &subscribe_response)
Accept or reject track status that was received.
void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Subscribe to a track.
virtual void TrackStatusReceived(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const messages::SubscribeAttributes &subscribe_attributes)
Callback notification for track status message received.
void FetchTrack(ConnectionHandle connection_handle, std::shared_ptr< FetchTrackHandler > track_handler)
Fetch track.
void UpdateTrackSubscription(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler, bool new_group_request=false)
Update Subscription to a track.
void UnsubscribeTrack(ConnectionHandle connection_handle, const std::shared_ptr< SubscribeTrackHandler > &track_handler)
Unsubscribe track.
std::vector< Byte > Bytes
Definition common.h:20
uint64_t ConnectionHandle
Definition common.h:22
std::span< const Byte > BytesSpan
Definition common.h:21
const std::string endpoint_id
Definition common.h:38
std::string endpoint_id
Definition config.h:14
quicr::TransportConfig transport_config
Definition config.h:17
QuicConnectionMetrics quic
QUIC connection metrics.
Definition metrics.h:17
MetricsTimeStampUs last_sample_time
Last sampled time in microseconds.
Definition metrics.h:15
TrackNamespace name_space
Definition track_name.h:261
uint64_t request_id
Definition common.h:30
MinMaxAvg tx_object_duration_us
TX object time in queue duration in microseconds.
Definition metrics.h:58
uint64_t tx_queue_discards
count of objects discarded due clear and transition to new stream
Definition metrics.h:50
MinMaxAvg tx_queue_size
TX queue size in period.
Definition metrics.h:56
uint64_t bytes_published
sum of payload bytes published
Definition metrics.h:42
struct quicr::PublishTrackMetrics::Quic quic
uint64_t objects_published
count of objects published
Definition metrics.h:43
MetricsTimeStampUs last_sample_time
Last sampled time in microseconds.
Definition metrics.h:40
@ kOk
Definition server.h:43
std::string server_bind_ip
Definition config.h:30
uint16_t server_port
Listening port for server.
Definition config.h:32