LibQuicR
Loading...
Searching...
No Matches
transport.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
2// SPDX-License-Identifier: BSD-2-Clause
3
4#pragma once
5
6#include "attributes.h"
7#include "messages.h"
8#include "tick_service.h"
9
10#include "quic_transport.h"
11
12#include <chrono>
13#include <quicr/common.h>
14#include <quicr/config.h>
16#include <quicr/metrics.h>
19#include <span>
20#include <spdlog/sinks/stdout_color_sinks.h>
21#include <spdlog/spdlog.h>
22
23#include <atomic>
24#include <map>
25#include <string>
26#include <string_view>
27
28namespace quicr {
29
36 : public ITransport::TransportDelegate
37 , public std::enable_shared_from_this<Transport>
38 {
39 public:
40 Transport() = delete;
41
60
72
73 enum class StreamDataMessageStatus : uint8_t
74 {
75 };
76
88
93 {
94 std::string ip;
95 uint16_t port;
96 };
97
104 Transport(const ClientConfig& cfg, std::shared_ptr<TickService> tick_service);
105
112 Transport(const ServerConfig& cfg, std::shared_ptr<TickService> tick_service);
113
115
116 const std::shared_ptr<TickService>& GetTickService() const noexcept { return tick_service_; }
117
118 // -------------------------------------------------------------------------------------------------
119 // Public API MoQ Instance API methods
120 // -------------------------------------------------------------------------------------------------
128 void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr<SubscribeTrackHandler> track_handler);
129
136 void UnsubscribeTrack(ConnectionHandle connection_handle,
137 const std::shared_ptr<SubscribeTrackHandler>& track_handler);
138
148 std::shared_ptr<SubscribeTrackHandler> track_handler,
149 bool new_group_request = false);
150
158 void PublishTrack(ConnectionHandle connection_handle, std::shared_ptr<PublishTrackHandler> track_handler);
159
167 void PublishTrackSub(ConnectionHandle connection_handle, std::shared_ptr<PublishTrackHandler> track_handler);
168
175 void UnpublishTrack(ConnectionHandle connection_handle,
176 const std::shared_ptr<PublishTrackHandler>& track_handler);
177
188 virtual bool FetchReceived(ConnectionHandle connection_handle,
189 uint64_t request_id,
190 const FullTrackName& track_full_name,
191 const quicr::messages::FetchAttributes& attributes);
192
199 void FetchTrack(ConnectionHandle connection_handle, std::shared_ptr<FetchTrackHandler> track_handler);
200
207 void CancelFetchTrack(ConnectionHandle connection_handle, std::shared_ptr<FetchTrackHandler> track_handler);
208
218 uint64_t RequestTrackStatus(ConnectionHandle connection_handle,
219 const FullTrackName& track_full_name,
220 const messages::SubscribeAttributes& subscribe_attributes);
221
235 virtual void ResolveTrackStatus(ConnectionHandle connection_handle,
236 uint64_t request_id,
237 uint64_t track_alias,
238 const SubscribeResponse& subscribe_response);
239
245 Status GetStatus() const noexcept { return status_; }
246
251
257 virtual void StatusChanged([[maybe_unused]] Status status) {}
258
270 virtual void TrackStatusReceived(ConnectionHandle connection_handle,
271 uint64_t request_id,
272 const FullTrackName& track_full_name,
273 const messages::SubscribeAttributes& subscribe_attributes);
274
285 virtual void TrackStatusResponseReceived(ConnectionHandle connection_handle,
286 uint64_t request_id,
287 const SubscribeResponse& response);
288
290
292 protected:
293 Status Start();
294
295 Status Stop();
296
298
299 private:
300 // -------------------------------------------------------------------------------------------------
301 // Transport Delegate/callback functions
302 // -------------------------------------------------------------------------------------------------
303
304 void OnNewDataContext([[maybe_unused]] const ConnectionHandle& connection_handle,
305 [[maybe_unused]] const DataContextId& data_ctx_id) override;
306
307 void OnConnectionStatus(const ConnectionHandle& connection_handle, const TransportStatus status) override;
308 void OnNewConnection(const ConnectionHandle& connection_handle, const TransportRemote& remote) override;
309 void OnRecvStream(const ConnectionHandle& connection_handle,
310 uint64_t stream_id,
311 std::optional<DataContextId> data_ctx_id,
312 const bool is_bidir = false) override;
313 void OnRecvDgram(const ConnectionHandle& connection_handle, std::optional<DataContextId> data_ctx_id) override;
314
315 void OnConnectionMetricsSampled(MetricsTimeStamp sample_time,
316 TransportConnId conn_id,
317 const QuicConnectionMetrics& quic_connection_metrics) override;
318
319 void OnDataMetricsStampled(MetricsTimeStamp sample_time,
320 TransportConnId conn_id,
321 DataContextId data_ctx_id,
322 const QuicDataContextMetrics& quic_data_context_metrics) override;
323
324 // -------------------------------------------------------------------------------------------------
325 // End of transport handler/callback functions
326 // -------------------------------------------------------------------------------------------------
327
328 static constexpr std::size_t kControlMessageBufferSize = 4096;
329 struct ConnectionContext
330 {
331 ConnectionContext(const ConnectionContext& other)
332 : next_request_id(other.next_request_id.load(std::memory_order_seq_cst))
333 {
334 }
335
336 ConnectionHandle connection_handle{ 0 };
337 std::optional<uint64_t> ctrl_data_ctx_id;
338 bool setup_complete{ false };
339 bool closed{ false };
340 uint64_t client_version{ 0 };
341 std::optional<messages::ControlMessageType>
342 ctrl_msg_type_received;
343
344 std::vector<uint8_t> ctrl_msg_buffer;
345
349 std::atomic<uint64_t> next_request_id{ 0 };
350
354 {
357 std::optional<messages::Location> largest_location{ std::nullopt };
358 };
359
360 std::map<messages::RequestID, SubscribeContext> recv_req_id;
361
363 std::map<messages::RequestID, std::shared_ptr<SubscribeTrackHandler>> sub_tracks_by_request_id;
364
371 std::map<messages::TrackAlias, std::shared_ptr<SubscribeTrackHandler>> sub_by_recv_track_alias;
372
377 std::map<TrackNamespaceHash, std::map<TrackNameHash, std::shared_ptr<PublishTrackHandler>>>
378 pub_tracks_by_name;
379
381 std::map<messages::TrackAlias, std::map<uint64_t, std::shared_ptr<PublishTrackHandler>>>
382 pub_tracks_by_track_alias;
383
387 std::map<messages::RequestID, TrackNamespaceHash> pub_tracks_ns_by_request_id;
388
390 std::map<messages::RequestID, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_request_id;
391
393 std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;
394
396 std::map<messages::RequestID, std::shared_ptr<PublishTrackHandler>> pub_fetch_tracks_by_sub_id;
397
399 std::map<messages::RequestID, TrackNamespace> sub_announces_by_request_id;
400
401 ConnectionMetrics metrics{};
402
403 ConnectionContext() { ctrl_msg_buffer.reserve(kControlMessageBufferSize); }
404
405 /*
406 * Get the next request Id to use
407 */
408 uint64_t GetNextRequestId()
409 {
410 uint64_t rid = next_request_id;
411 next_request_id += 2;
412
413 return rid;
414 }
415 };
416
417 // -------------------------------------------------------------------------------------------------
418 // Private methods
419 // -------------------------------------------------------------------------------------------------
420
421 void Init();
422
423 void SendCtrlMsg(const ConnectionContext& conn_ctx, BytesSpan data);
424 void SendClientSetup();
425 void SendServerSetup(ConnectionContext& conn_ctx);
426 void SendAnnounce(ConnectionContext& conn_ctx,
427 messages::RequestID request_id,
428 const TrackNamespace& track_namespace);
429 void SendAnnounceOk(ConnectionContext& conn_ctx, messages::RequestID request_id);
430 void SendUnannounce(ConnectionContext& conn_ctx, const TrackNamespace& track_namespace);
431 void SendSubscribe(ConnectionContext& conn_ctx,
432 messages::RequestID request_id,
433 const FullTrackName& tfn,
434 TrackHash th,
435 messages::SubscriberPriority priority,
436 messages::GroupOrder group_order,
437 messages::FilterType filter_type,
438 std::chrono::milliseconds delivery_timeout);
439 void SendSubscribeUpdate(const ConnectionContext& conn_ctx,
440 messages::RequestID request_id,
441 messages::RequestID subscribe_request_id,
442 TrackHash th,
443 messages::Location start_location,
444 messages::GroupId end_group_id,
445 messages::SubscriberPriority priority,
446 bool forward,
447 bool new_group_request = false);
448
449 void SendSubscribeOk(ConnectionContext& conn_ctx,
450 messages::RequestID request_id,
451 uint64_t track_alias,
452 uint64_t expires,
453 const std::optional<messages::Location>& largest_location);
454 void SendUnsubscribe(ConnectionContext& conn_ctx, messages::RequestID request_id);
455 void SendSubscribeDone(ConnectionContext& conn_ctx, messages::RequestID request_id, const std::string& reason);
456 void SendSubscribeError(ConnectionContext& conn_ctx,
457 messages::RequestID request_id,
458 messages::SubscribeErrorCode error,
459 const std::string& reason);
460
461 void SendTrackStatus(ConnectionContext& conn_ctx, messages::RequestID request_id, const FullTrackName& tfn);
462 void SendTrackStatusOk(ConnectionContext& conn_ctx,
463 messages::RequestID request_id,
464 uint64_t track_alias,
465 uint64_t expires,
466 const std::optional<messages::Location>& largest_location);
467 void SendTrackStatusError(ConnectionContext& conn_ctx,
468 messages::RequestID request_id,
469 messages::SubscribeErrorErrorCode error,
470 const std::string& reason);
471
472 void SendPublish(ConnectionContext& conn_ctx,
473 messages::RequestID request_id,
474 const FullTrackName& tfn,
475 uint64_t track_alias,
476 messages::GroupOrder group_order,
477 std::optional<messages::Location> largest_location,
478 bool forward);
479
480 void SendPublishOk(ConnectionContext& conn_ctx,
481 messages::RequestID request_id,
482 bool forward,
483 messages::SubscriberPriority priority,
484 messages::GroupOrder group_order,
485 messages::FilterType filter_type);
486
487 void SendPublishError(ConnectionContext& conn_ctx,
488 messages::RequestID request_id,
489 messages::SubscribeErrorCode error,
490 const std::string& reason);
491
492 void SendSubscribeAnnounces(ConnectionHandle conn_handle, const TrackNamespace& prefix_namespace);
493 void SendUnsubscribeAnnounces(ConnectionHandle conn_handle, const TrackNamespace& prefix_namespace);
494 void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx, messages::RequestID request_id);
495 void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
496 messages::RequestID request_id,
497 messages::SubscribeNamespaceErrorCode err_code,
498 const messages::ReasonPhrase& reason);
499
500 void SendFetch(ConnectionContext& conn_ctx,
501 messages::RequestID request_id,
502 const FullTrackName& tfn,
503 messages::SubscriberPriority priority,
504 messages::GroupOrder group_order,
505 messages::GroupId start_group,
506 messages::GroupId start_object,
507 messages::GroupId end_group,
508 messages::GroupId end_object);
509 void SendJoiningFetch(ConnectionContext& conn_ctx,
510 messages::RequestID request_id,
511 messages::SubscriberPriority priority,
512 messages::GroupOrder group_order,
513 messages::RequestID joining_request_id,
514 messages::GroupId joining_start,
515 bool absolute,
516 const messages::Parameters parameters);
517 void SendFetchCancel(ConnectionContext& conn_ctx, messages::RequestID request_id);
518 void SendFetchOk(ConnectionContext& conn_ctx,
519 messages::RequestID request_id,
520 messages::GroupOrder group_order,
521 bool end_of_track,
522 messages::Location end_location);
523 void SendFetchError(ConnectionContext& conn_ctx,
524 messages::RequestID request_id,
525 messages::FetchErrorCode error,
526 const std::string& reason);
527 void CloseConnection(ConnectionHandle connection_handle,
528 messages::TerminationReason reason,
529 const std::string& reason_str);
530
531 void RemoveSubscribeTrack(ConnectionContext& conn_ctx,
532 SubscribeTrackHandler& handler,
533 bool remove_handler = true);
534
535 std::shared_ptr<PublishTrackHandler> GetPubTrackHandler(ConnectionContext& conn_ctx, TrackHash& th);
536
537 void RemoveAllTracksForConnectionClose(ConnectionContext& conn_ctx);
538
539 uint64_t GetNextRequestId();
540
541 bool OnRecvSubgroup(messages::StreamHeaderType type,
542 std::vector<uint8_t>::const_iterator cursor_it,
543 StreamRxContext& rx_ctx,
544 std::uint64_t stream_id,
545 ConnectionContext& conn_ctx,
546 std::shared_ptr<const std::vector<uint8_t>> data) const;
547 bool OnRecvFetch(std::vector<uint8_t>::const_iterator cursor_it,
548 StreamRxContext& rx_ctx,
549 std::uint64_t stream_id,
550 ConnectionContext& conn_ctx,
551 std::shared_ptr<const std::vector<uint8_t>> data) const;
552
553 // -------------------------------------------------------------------------------------------------
554 // Private member functions that will be implemented by Server class
555 // -------------------------------------------------------------------------------------------------
556 virtual void NewConnectionAccepted(ConnectionHandle, const ConnectionRemoteInfo&) {}
557
558 virtual void ConnectionStatusChanged(ConnectionHandle, ConnectionStatus) {}
559
560 virtual void SetConnectionHandle(ConnectionHandle) {}
561
562 virtual void MetricsSampled(ConnectionHandle, const ConnectionMetrics&) {}
563
564 // -------------------------------------------------------------------------------------------------
565 // Private member functions that will be implemented by Client class
566 // -------------------------------------------------------------------------------------------------
567 virtual void MetricsSampled(const ConnectionMetrics&) {}
568
569 protected:
570 std::shared_ptr<Transport> GetSharedPtr();
571
572 ConnectionContext& GetConnectionContext(ConnectionHandle conn);
573
574 // -------------------------------------------------------------------------------------------------
575
576 private:
577 // -------------------------------------------------------------------------------------------------
578 // Private member functions that will be implemented by both Server and Client
579 // ------------------------------------------------------------------------------------------------
580
581 virtual bool ProcessCtrlMessage(ConnectionContext& conn_ctx, BytesSpan msg_bytes) = 0;
582
583 TransportError Enqueue(const TransportConnId& conn_id,
584 const DataContextId& data_ctx_id,
585 std::uint64_t group_id,
586 std::shared_ptr<const std::vector<uint8_t>> bytes,
587 const uint8_t priority,
588 const uint32_t ttl_ms,
589 const uint32_t delay_ms,
590 const ITransport::EnqueueFlags flags);
591
592 private:
593 // -------------------------------------------------------------------------------------------------
594 // Private member variables
595 // -------------------------------------------------------------------------------------------------
596 std::mutex state_mutex_;
597 const bool client_mode_;
598 std::shared_ptr<spdlog::logger> logger_;
599 bool stop_{ false };
600 const ServerConfig server_config_;
601 const ClientConfig client_config_;
602
603 std::map<ConnectionHandle, ConnectionContext> connections_;
604
605 Status status_{ Status::kNotReady };
606
607 std::shared_ptr<TickService> tick_service_;
608 std::shared_ptr<ITransport> quic_transport_; // **MUST** be last for proper order of destruction
609
610 friend class Client;
611 friend class Server;
615 };
616
617} // namespace quicr
virtual bool FetchReceived(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const quicr::messages::FetchAttributes &attributes)
Event to run on receiving Fetch request.
void PublishTrackSub(ConnectionHandle connection_handle, std::shared_ptr< PublishTrackHandler > track_handler)
Publish to a track and force subscribe.
Status GetStatus() const noexcept
Get the status of the Client.
Definition transport.h:245
void UnpublishTrack(ConnectionHandle connection_handle, const std::shared_ptr< PublishTrackHandler > &track_handler)
Unpublish track.
Status
Status of the transport.
Definition transport.h:46
@ kReady
Definition transport.h:47
@ kNotConnected
Definition transport.h:56
@ kFailedToConnect
Definition transport.h:57
@ kInvalidParams
Definition transport.h:52
@ kDisconnecting
Definition transport.h:55
@ kNotReady
Definition transport.h:48
@ kConnecting
Definition transport.h:54
@ kPendingServerSetup
Definition transport.h:58
@ kInternalError
Definition transport.h:50
void CancelFetchTrack(ConnectionHandle connection_handle, std::shared_ptr< FetchTrackHandler > track_handler)
Cancel Fetch track.
virtual void ResolveTrackStatus(ConnectionHandle connection_handle, uint64_t request_id, uint64_t track_alias, const SubscribeResponse &subscribe_response)
Accept or reject track status that was received.
Transport(const ClientConfig &cfg, std::shared_ptr< TickService > tick_service)
Client mode Constructor to create the MOQ instance.
void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Subscribe to a track.
virtual void TrackStatusReceived(ConnectionHandle connection_handle, uint64_t request_id, const FullTrackName &track_full_name, const messages::SubscribeAttributes &subscribe_attributes)
Callback notification for track status message received.
ConnectionContext & GetConnectionContext(ConnectionHandle conn)
uint64_t RequestTrackStatus(ConnectionHandle connection_handle, const FullTrackName &track_full_name, const messages::SubscribeAttributes &subscribe_attributes)
Request track status.
friend class Client
Definition transport.h:610
friend class PublishTrackHandler
Definition transport.h:612
Transport(const ServerConfig &cfg, std::shared_ptr< TickService > tick_service)
Server mode Constructor to create the MOQ instance.
friend class SubscribeTrackHandler
Definition transport.h:614
std::shared_ptr< Transport > GetSharedPtr()
ControlMessageStatus
Control message status codes.
Definition transport.h:65
@ kMessageIncomplete
control message is incomplete and more data is needed
Definition transport.h:66
@ kUnsupportedMessageType
Unsupported MOQT message type.
Definition transport.h:70
@ kStreamBufferCannotBeZero
stream buffer cannot be zero when parsing message type
Definition transport.h:68
@ kMessageComplete
control message is complete and stream buffer get any has complete message
Definition transport.h:67
@ kStreamBufferMissingType
connection context is missing message type
Definition transport.h:69
void FetchTrack(ConnectionHandle connection_handle, std::shared_ptr< FetchTrackHandler > track_handler)
Fetch track.
virtual void TrackStatusResponseReceived(ConnectionHandle connection_handle, uint64_t request_id, const SubscribeResponse &response)
Callback notification for track status OK received.
void PublishTrack(ConnectionHandle connection_handle, std::shared_ptr< PublishTrackHandler > track_handler)
Publish to a track.
Transport()=delete
void UpdateTrackSubscription(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler, bool new_group_request=false)
Update Subscription to a track.
friend class PublishFetchHandler
Definition transport.h:613
friend class Server
Definition transport.h:611
const std::shared_ptr< TickService > & GetTickService() const noexcept
Definition transport.h:116
ConnectionStatus
Connection status codes.
Definition transport.h:81
@ kIdleTimeout
Definition transport.h:85
@ kConnected
Definition transport.h:84
@ kClosedByRemote
Definition transport.h:86
StreamDataMessageStatus
Definition transport.h:74
void UnsubscribeTrack(ConnectionHandle connection_handle, const std::shared_ptr< SubscribeTrackHandler > &track_handler)
Unsubscribe track.
virtual void StatusChanged(Status status)
Callback notification for status/state change.
Definition transport.h:257
Definition transport.h:28
uint64_t ConnectionHandle
Definition common.h:22
std::span< const Byte > BytesSpan
Definition common.h:21
Definition config.h:22
Definition metrics.h:14
Full track name struct.
Definition track_name.h:260
Definition config.h:28
Definition track_name.h:283
FullTrackName track_full_name
Definition transport.h:355
std::optional< messages::Location > largest_location
Definition transport.h:357
TrackHash track_hash
Definition transport.h:356
Connection remote information.
Definition transport.h:93
std::string ip
remote IPv4/v6 address
Definition transport.h:94
uint16_t port
remote port
Definition transport.h:95