36 :
public ITransport::TransportDelegate
37 ,
public std::enable_shared_from_this<Transport>
116 const std::shared_ptr<TickService>&
GetTickService() const noexcept {
return tick_service_; }
137 const std::shared_ptr<SubscribeTrackHandler>& track_handler);
148 std::shared_ptr<SubscribeTrackHandler> track_handler,
149 bool new_group_request =
false);
176 const std::shared_ptr<PublishTrackHandler>& track_handler);
191 const quicr::messages::FetchAttributes& attributes);
220 const messages::SubscribeAttributes& subscribe_attributes);
237 uint64_t track_alias,
238 const SubscribeResponse& subscribe_response);
273 const messages::SubscribeAttributes& subscribe_attributes);
287 const SubscribeResponse& response);
304 void OnNewDataContext([[maybe_unused]]
const ConnectionHandle& connection_handle,
305 [[maybe_unused]]
const DataContextId& data_ctx_id)
override;
307 void OnConnectionStatus(
const ConnectionHandle& connection_handle,
const TransportStatus status)
override;
308 void OnNewConnection(
const ConnectionHandle& connection_handle,
const TransportRemote& remote)
override;
311 std::optional<DataContextId> data_ctx_id,
312 const bool is_bidir =
false)
override;
313 void OnRecvDgram(
const ConnectionHandle& connection_handle, std::optional<DataContextId> data_ctx_id)
override;
315 void OnConnectionMetricsSampled(MetricsTimeStamp sample_time,
316 TransportConnId conn_id,
317 const QuicConnectionMetrics& quic_connection_metrics)
override;
319 void OnDataMetricsStampled(MetricsTimeStamp sample_time,
320 TransportConnId conn_id,
321 DataContextId data_ctx_id,
322 const QuicDataContextMetrics& quic_data_context_metrics)
override;
328 static constexpr std::size_t kControlMessageBufferSize = 4096;
329 struct ConnectionContext
331 ConnectionContext(
const ConnectionContext& other)
332 : next_request_id(other.next_request_id.load(std::memory_order_seq_cst))
337 std::optional<uint64_t> ctrl_data_ctx_id;
338 bool setup_complete{
false };
339 bool closed{
false };
340 uint64_t client_version{ 0 };
341 std::optional<messages::ControlMessageType>
342 ctrl_msg_type_received;
344 std::vector<uint8_t> ctrl_msg_buffer;
349 std::atomic<uint64_t> next_request_id{ 0 };
360 std::map<messages::RequestID, SubscribeContext> recv_req_id;
363 std::map<messages::RequestID, std::shared_ptr<SubscribeTrackHandler>> sub_tracks_by_request_id;
371 std::map<messages::TrackAlias, std::shared_ptr<SubscribeTrackHandler>> sub_by_recv_track_alias;
377 std::map<TrackNamespaceHash, std::map<TrackNameHash, std::shared_ptr<PublishTrackHandler>>>
381 std::map<messages::TrackAlias, std::map<uint64_t, std::shared_ptr<PublishTrackHandler>>>
382 pub_tracks_by_track_alias;
387 std::map<messages::RequestID, TrackNamespaceHash> pub_tracks_ns_by_request_id;
390 std::map<messages::RequestID, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_request_id;
393 std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;
396 std::map<messages::RequestID, std::shared_ptr<PublishTrackHandler>> pub_fetch_tracks_by_sub_id;
399 std::map<messages::RequestID, TrackNamespace> sub_announces_by_request_id;
403 ConnectionContext() { ctrl_msg_buffer.reserve(kControlMessageBufferSize); }
408 uint64_t GetNextRequestId()
410 uint64_t rid = next_request_id;
411 next_request_id += 2;
423 void SendCtrlMsg(
const ConnectionContext& conn_ctx,
BytesSpan data);
424 void SendClientSetup();
425 void SendServerSetup(ConnectionContext& conn_ctx);
426 void SendAnnounce(ConnectionContext& conn_ctx,
427 messages::RequestID request_id,
428 const TrackNamespace& track_namespace);
429 void SendAnnounceOk(ConnectionContext& conn_ctx, messages::RequestID request_id);
430 void SendUnannounce(ConnectionContext& conn_ctx,
const TrackNamespace& track_namespace);
431 void SendSubscribe(ConnectionContext& conn_ctx,
432 messages::RequestID request_id,
433 const FullTrackName& tfn,
435 messages::SubscriberPriority priority,
436 messages::GroupOrder group_order,
437 messages::FilterType filter_type,
438 std::chrono::milliseconds delivery_timeout);
439 void SendSubscribeUpdate(
const ConnectionContext& conn_ctx,
440 messages::RequestID request_id,
441 messages::RequestID subscribe_request_id,
443 messages::Location start_location,
444 messages::GroupId end_group_id,
445 messages::SubscriberPriority priority,
447 bool new_group_request =
false);
449 void SendSubscribeOk(ConnectionContext& conn_ctx,
450 messages::RequestID request_id,
451 uint64_t track_alias,
453 const std::optional<messages::Location>& largest_location);
454 void SendUnsubscribe(ConnectionContext& conn_ctx, messages::RequestID request_id);
455 void SendSubscribeDone(ConnectionContext& conn_ctx, messages::RequestID request_id,
const std::string& reason);
456 void SendSubscribeError(ConnectionContext& conn_ctx,
457 messages::RequestID request_id,
458 messages::SubscribeErrorCode error,
459 const std::string& reason);
461 void SendTrackStatus(ConnectionContext& conn_ctx, messages::RequestID request_id,
const FullTrackName& tfn);
462 void SendTrackStatusOk(ConnectionContext& conn_ctx,
463 messages::RequestID request_id,
464 uint64_t track_alias,
466 const std::optional<messages::Location>& largest_location);
467 void SendTrackStatusError(ConnectionContext& conn_ctx,
468 messages::RequestID request_id,
469 messages::SubscribeErrorErrorCode error,
470 const std::string& reason);
472 void SendPublish(ConnectionContext& conn_ctx,
473 messages::RequestID request_id,
474 const FullTrackName& tfn,
475 uint64_t track_alias,
476 messages::GroupOrder group_order,
477 std::optional<messages::Location> largest_location,
480 void SendPublishOk(ConnectionContext& conn_ctx,
481 messages::RequestID request_id,
483 messages::SubscriberPriority priority,
484 messages::GroupOrder group_order,
485 messages::FilterType filter_type);
487 void SendPublishError(ConnectionContext& conn_ctx,
488 messages::RequestID request_id,
489 messages::SubscribeErrorCode error,
490 const std::string& reason);
492 void SendSubscribeAnnounces(
ConnectionHandle conn_handle,
const TrackNamespace& prefix_namespace);
493 void SendUnsubscribeAnnounces(
ConnectionHandle conn_handle,
const TrackNamespace& prefix_namespace);
494 void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx, messages::RequestID request_id);
495 void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
496 messages::RequestID request_id,
497 messages::SubscribeNamespaceErrorCode err_code,
498 const messages::ReasonPhrase& reason);
500 void SendFetch(ConnectionContext& conn_ctx,
501 messages::RequestID request_id,
502 const FullTrackName& tfn,
503 messages::SubscriberPriority priority,
504 messages::GroupOrder group_order,
505 messages::GroupId start_group,
506 messages::GroupId start_object,
507 messages::GroupId end_group,
508 messages::GroupId end_object);
509 void SendJoiningFetch(ConnectionContext& conn_ctx,
510 messages::RequestID request_id,
511 messages::SubscriberPriority priority,
512 messages::GroupOrder group_order,
513 messages::RequestID joining_request_id,
514 messages::GroupId joining_start,
516 const messages::Parameters parameters);
517 void SendFetchCancel(ConnectionContext& conn_ctx, messages::RequestID request_id);
518 void SendFetchOk(ConnectionContext& conn_ctx,
519 messages::RequestID request_id,
520 messages::GroupOrder group_order,
522 messages::Location end_location);
523 void SendFetchError(ConnectionContext& conn_ctx,
524 messages::RequestID request_id,
525 messages::FetchErrorCode error,
526 const std::string& reason);
528 messages::TerminationReason reason,
529 const std::string& reason_str);
531 void RemoveSubscribeTrack(ConnectionContext& conn_ctx,
533 bool remove_handler =
true);
535 std::shared_ptr<PublishTrackHandler> GetPubTrackHandler(ConnectionContext& conn_ctx, TrackHash& th);
537 void RemoveAllTracksForConnectionClose(ConnectionContext& conn_ctx);
539 uint64_t GetNextRequestId();
541 bool OnRecvSubgroup(messages::StreamHeaderType type,
542 std::vector<uint8_t>::const_iterator cursor_it,
543 StreamRxContext& rx_ctx,
544 std::uint64_t stream_id,
545 ConnectionContext& conn_ctx,
546 std::shared_ptr<
const std::vector<uint8_t>> data)
const;
547 bool OnRecvFetch(std::vector<uint8_t>::const_iterator cursor_it,
548 StreamRxContext& rx_ctx,
549 std::uint64_t stream_id,
550 ConnectionContext& conn_ctx,
551 std::shared_ptr<
const std::vector<uint8_t>> data)
const;
567 virtual void MetricsSampled(
const ConnectionMetrics&) {}
581 virtual bool ProcessCtrlMessage(ConnectionContext& conn_ctx,
BytesSpan msg_bytes) = 0;
583 TransportError Enqueue(
const TransportConnId& conn_id,
584 const DataContextId& data_ctx_id,
585 std::uint64_t group_id,
586 std::shared_ptr<
const std::vector<uint8_t>> bytes,
587 const uint8_t priority,
588 const uint32_t ttl_ms,
589 const uint32_t delay_ms,
590 const ITransport::EnqueueFlags flags);
596 std::mutex state_mutex_;
597 const bool client_mode_;
598 std::shared_ptr<spdlog::logger> logger_;
603 std::map<ConnectionHandle, ConnectionContext> connections_;
607 std::shared_ptr<TickService> tick_service_;
608 std::shared_ptr<ITransport> quic_transport_;