LibQuicR
Loading...
Searching...
No Matches
transport.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Copyright (c) 2024 Cisco Systems
2// SPDX-License-Identifier: BSD-2-Clause
3
4#pragma once
5
6#include "messages.h"
7#include "tick_service.h"
8
9#include "quic_transport.h"
10
11#include "span.h"
12#include <chrono>
13#include <quicr/common.h>
14#include <quicr/config.h>
16#include <quicr/metrics.h>
19#include <spdlog/sinks/stdout_color_sinks.h>
20#include <spdlog/spdlog.h>
21
22#include <map>
23#include <string>
24#include <string_view>
25
26namespace quicr {
27
33 class Transport : public ITransport::TransportDelegate
34 {
35 public:
36 Transport() = delete;
37
41 enum class Status : uint8_t
42 {
43 kReady = 0,
45
47
49
55 };
56
68
69 enum class StreamDataMessageStatus : uint8_t
70 {
71 };
72
76 enum class ConnectionStatus : uint8_t
77 {
78 kNotConnected = 0,
83 };
84
89 {
90 std::string ip;
91 uint16_t port;
92 };
93
100 Transport(const ClientConfig& cfg, std::shared_ptr<TickService> tick_service);
101
108 Transport(const ServerConfig& cfg, std::shared_ptr<TickService> tick_service);
109
111
112 const std::shared_ptr<TickService>& GetTickService() const noexcept { return tick_service_; }
113
114 // -------------------------------------------------------------------------------------------------
115 // Public API MoQ Instance API methods
116 // -------------------------------------------------------------------------------------------------
124 void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr<SubscribeTrackHandler> track_handler);
125
132 void UnsubscribeTrack(ConnectionHandle connection_handle,
133 const std::shared_ptr<SubscribeTrackHandler>& track_handler);
134
143 std::shared_ptr<SubscribeTrackHandler> track_handler);
144
152 void PublishTrack(ConnectionHandle connection_handle, std::shared_ptr<PublishTrackHandler> track_handler);
153
160 void UnpublishTrack(ConnectionHandle connection_handle,
161 const std::shared_ptr<PublishTrackHandler>& track_handler);
162
169 void FetchTrack(ConnectionHandle connection_handle, std::shared_ptr<FetchTrackHandler> track_handler);
170
177 void CancelFetchTrack(ConnectionHandle connection_handle, std::shared_ptr<FetchTrackHandler> track_handler);
178
184 Status GetStatus() const noexcept { return status_; }
185
190
196 virtual void StatusChanged([[maybe_unused]] Status status) {}
198
200 protected:
201 Status Start();
202
203 Status Stop();
204
206
207 private:
208 // -------------------------------------------------------------------------------------------------
209 // Transport Delegate/callback functions
210 // -------------------------------------------------------------------------------------------------
211
212 void OnNewDataContext([[maybe_unused]] const ConnectionHandle& connection_handle,
213 [[maybe_unused]] const DataContextId& data_ctx_id) override;
214
215 void OnConnectionStatus(const ConnectionHandle& connection_handle, const TransportStatus status) override;
216 void OnNewConnection(const ConnectionHandle& connection_handle, const TransportRemote& remote) override;
217 void OnRecvStream(const ConnectionHandle& connection_handle,
218 uint64_t stream_id,
219 std::optional<DataContextId> data_ctx_id,
220 const bool is_bidir = false) override;
221 void OnRecvDgram(const ConnectionHandle& connection_handle, std::optional<DataContextId> data_ctx_id) override;
222
223 void OnConnectionMetricsSampled(MetricsTimeStamp sample_time,
224 TransportConnId conn_id,
225 const QuicConnectionMetrics& quic_connection_metrics) override;
226
227 void OnDataMetricsStampled(MetricsTimeStamp sample_time,
228 TransportConnId conn_id,
229 DataContextId data_ctx_id,
230 const QuicDataContextMetrics& quic_data_context_metrics) override;
231
232 // -------------------------------------------------------------------------------------------------
233 // End of transport handler/callback functions
234 // -------------------------------------------------------------------------------------------------
235
236 static constexpr std::size_t kControlMessageBufferSize = 4096;
237 struct ConnectionContext
238 {
239 ConnectionHandle connection_handle{ 0 };
240 std::optional<uint64_t> ctrl_data_ctx_id;
241 bool setup_complete{ false };
242 uint64_t client_version{ 0 };
243 std::optional<messages::ControlMessageType>
244 ctrl_msg_type_received;
245
246 std::vector<uint8_t> ctrl_msg_buffer;
247
248 uint64_t current_subscribe_id{ 0 };
249
252 std::map<messages::SubscribeId, std::pair<TrackNamespaceHash, TrackNameHash>> recv_sub_id;
253
255 std::map<messages::SubscribeId, std::shared_ptr<SubscribeTrackHandler>> tracks_by_sub_id;
256
258 std::map<messages::TrackAlias, std::shared_ptr<SubscribeTrackHandler>> sub_by_track_alias;
259
261 std::map<TrackNamespaceHash, std::map<TrackNameHash, std::shared_ptr<PublishTrackHandler>>>
262 pub_tracks_by_name;
263
264 std::map<messages::TrackAlias, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_track_alias;
265
267 std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;
268
269 ConnectionMetrics metrics{};
270
271 ConnectionContext() { ctrl_msg_buffer.reserve(kControlMessageBufferSize); }
272 };
273
274 // -------------------------------------------------------------------------------------------------
275 // Private methods
276 // -------------------------------------------------------------------------------------------------
277
278 void Init();
279
280 PublishTrackHandler::PublishObjectStatus SendData(PublishTrackHandler& track_handler,
281 uint8_t priority,
282 uint32_t ttl,
283 bool stream_header_needed,
284 std::shared_ptr<const std::vector<uint8_t>> data);
285
286 PublishTrackHandler::PublishObjectStatus SendObject(PublishTrackHandler& track_handler,
287 uint8_t priority,
288 uint32_t ttl,
289 bool stream_header_needed,
290 uint64_t group_id,
291 uint64_t subgroup_id,
292 uint64_t object_id,
293 std::optional<Extensions> extensions,
294 BytesSpan data);
295
296 void SendCtrlMsg(const ConnectionContext& conn_ctx, BytesSpan data);
297 void SendClientSetup();
298 void SendServerSetup(ConnectionContext& conn_ctx);
299 void SendAnnounce(ConnectionContext& conn_ctx, const TrackNamespace& track_namespace);
300 void SendAnnounceOk(ConnectionContext& conn_ctx, const TrackNamespace& track_namespace);
301 void SendUnannounce(ConnectionContext& conn_ctx, const TrackNamespace& track_namespace);
302 void SendSubscribe(ConnectionContext& conn_ctx,
303 uint64_t subscribe_id,
304 const FullTrackName& tfn,
305 TrackHash th,
306 messages::ObjectPriority priority,
307 messages::GroupOrder group_order,
308 messages::FilterType filter_type);
309 void SendSubscribeUpdate(ConnectionContext& conn_ctx,
310 uint64_t subscribe_id,
311 TrackHash th,
312 messages::GroupId start_group_id,
313 messages::ObjectId start_object_id,
314 messages::GroupId end_group_id,
315 messages::ObjectPriority priority);
316
317 void SendSubscribeOk(ConnectionContext& conn_ctx,
318 uint64_t subscribe_id,
319 uint64_t expires,
320 bool content_exists,
321 messages::GroupId largest_group_id,
322 messages::ObjectId largest_object_id);
323 void SendUnsubscribe(ConnectionContext& conn_ctx, uint64_t subscribe_id);
324 void SendSubscribeDone(ConnectionContext& conn_ctx, uint64_t subscribe_id, const std::string& reason);
325 void SendSubscribeError(ConnectionContext& conn_ctx,
326 uint64_t subscribe_id,
327 uint64_t track_alias,
328 messages::SubscribeErrorCode error,
329 const std::string& reason);
330
331 void SendSubscribeAnnounces(ConnectionHandle conn_handle, const TrackNamespace& prefix_namespace);
332 void SendUnsubscribeAnnounces(ConnectionHandle conn_handle, const TrackNamespace& prefix_namespace);
333 void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx, const TrackNamespace& prefix_namespace);
334 void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
335 const TrackNamespace& prefix_namespace,
336 messages::SubscribeAnnouncesErrorCode err_code,
337 const messages::ReasonPhrase& reason);
338
339 void SendFetch(ConnectionContext& conn_ctx,
340 uint64_t subscribe_id,
341 const FullTrackName& tfn,
342 messages::ObjectPriority priority,
343 messages::GroupOrder group_order,
344 messages::GroupId start_group,
345 messages::GroupId start_object,
346 messages::GroupId end_group,
347 messages::GroupId end_object);
348 void SendFetchCancel(ConnectionContext& conn_ctx, uint64_t subscribe_id);
349 void SendFetchOk(ConnectionContext& conn_ctx,
350 uint64_t subscribe_id,
351 messages::GroupOrder group_order,
352 bool end_of_track,
353 messages::GroupId largest_group_id,
354 messages::GroupId largest_object_id);
355 void SendFetchError(ConnectionContext& conn_ctx,
356 uint64_t subscribe_id,
357 messages::FetchErrorCode error,
358 const std::string& reason);
359 void CloseConnection(ConnectionHandle connection_handle,
360 messages::TerminationReason reason,
361 const std::string& reason_str);
362
363 void RemoveSubscribeTrack(ConnectionContext& conn_ctx,
364 SubscribeTrackHandler& handler,
365 bool remove_handler = true);
366
367 void SendNewGroupRequest(ConnectionHandle conn_id, uint64_t subscribe_id, uint64_t track_alias);
368
369 std::shared_ptr<PublishTrackHandler> GetPubTrackHandler(ConnectionContext& conn_ctx, TrackHash& th);
370
371 void RemoveAllTracksForConnectionClose(ConnectionContext& conn_ctx);
372
373 // -------------------------------------------------------------------------------------------------
374 // Private member functions that will be implemented by Server class
375 // -------------------------------------------------------------------------------------------------
376 virtual void NewConnectionAccepted(ConnectionHandle, const ConnectionRemoteInfo&) {}
377
378 virtual void ConnectionStatusChanged(ConnectionHandle, ConnectionStatus) {}
379
380 virtual void SetConnectionHandle(ConnectionHandle) {}
381
382 virtual void MetricsSampled(ConnectionHandle, const ConnectionMetrics&) {}
383
384 // -------------------------------------------------------------------------------------------------
385 // Private member functions that will be implemented by Client class
386 // -------------------------------------------------------------------------------------------------
387 virtual void MetricsSampled(const ConnectionMetrics&) {}
388
389 // -------------------------------------------------------------------------------------------------
390
391 private:
392 // -------------------------------------------------------------------------------------------------
393 // Private member functions that will be implemented by both Server and Client
394 // ------------------------------------------------------------------------------------------------
395
396 virtual bool ProcessCtrlMessage(ConnectionContext& conn_ctx, BytesSpan msg_bytes) = 0;
397
398 private:
399 // -------------------------------------------------------------------------------------------------
400 // Private member variables
401 // -------------------------------------------------------------------------------------------------
402 std::mutex state_mutex_;
403 const bool client_mode_;
404 std::shared_ptr<spdlog::logger> logger_;
405 bool stop_{ false };
406 const ServerConfig server_config_;
407 const ClientConfig client_config_;
408
409 std::map<ConnectionHandle, ConnectionContext> connections_;
410
411 Status status_{ Status::kNotReady };
412
413 std::shared_ptr<TickService> tick_service_;
414 std::shared_ptr<ITransport> quic_transport_; // **MUST** be last for proper order of destruction
415
416 friend class Client;
417 friend class Server;
418 };
419
420} // namespace quicr
MoQ Client.
Definition client.h:21
PublishObjectStatus
Publish status codes.
Definition publish_track_handler.h:28
MoQ Server.
Definition server.h:21
MOQ Implementation supporting both client and server modes.
Definition transport.h:34
Status GetStatus() const noexcept
Get the status of the Client.
Definition transport.h:184
void UnpublishTrack(ConnectionHandle connection_handle, const std::shared_ptr< PublishTrackHandler > &track_handler)
Unpublish track.
Status
Status of the transport.
Definition transport.h:42
void UpdateTrackSubscription(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Update Subscription to a track.
void CancelFetchTrack(ConnectionHandle connection_handle, std::shared_ptr< FetchTrackHandler > track_handler)
Cancel Fetch track.
Transport(const ClientConfig &cfg, std::shared_ptr< TickService > tick_service)
Client mode Constructor to create the MOQ instance.
void SubscribeTrack(ConnectionHandle connection_handle, std::shared_ptr< SubscribeTrackHandler > track_handler)
Subscribe to a track.
Transport(const ServerConfig &cfg, std::shared_ptr< TickService > tick_service)
Server mode Constructor to create the MOQ instance.
ControlMessageStatus
Control message status codes.
Definition transport.h:61
@ kMessageIncomplete
control message is incomplete and more data is needed
@ kUnsupportedMessageType
Unsupported MOQT message type.
@ kStreamBufferCannotBeZero
stream buffer cannot be zero when parsing message type
@ kMessageComplete
control message is complete and stream buffer get any has complete message
@ kStreamBufferMissingType
connection context is missing message type
void FetchTrack(ConnectionHandle connection_handle, std::shared_ptr< FetchTrackHandler > track_handler)
Fetch track.
void PublishTrack(ConnectionHandle connection_handle, std::shared_ptr< PublishTrackHandler > track_handler)
Publish to a track.
Transport()=delete
const std::shared_ptr< TickService > & GetTickService() const noexcept
Definition transport.h:112
ConnectionStatus
Connection status codes.
Definition transport.h:77
StreamDataMessageStatus
Definition transport.h:70
void UnsubscribeTrack(ConnectionHandle connection_handle, const std::shared_ptr< SubscribeTrackHandler > &track_handler)
Unsubscribe track.
virtual void StatusChanged(Status status)
Callback notification for status/state change.
Definition transport.h:196
Definition transport.h:26
Span< const Byte > BytesSpan
Definition common.h:23
uint64_t ConnectionHandle
Definition common.h:24
Definition config.h:22
Definition config.h:27
Connection remote information.
Definition transport.h:89
std::string ip
remote IPv4/v6 address
Definition transport.h:90
uint16_t port
remote port
Definition transport.h:91