33 class Transport :
public ITransport::TransportDelegate
112 const std::shared_ptr<TickService>&
GetTickService() const noexcept {
return tick_service_; }
133 const std::shared_ptr<SubscribeTrackHandler>& track_handler);
143 std::shared_ptr<SubscribeTrackHandler> track_handler);
161 const std::shared_ptr<PublishTrackHandler>& track_handler);
212 void OnNewDataContext([[maybe_unused]]
const ConnectionHandle& connection_handle,
213 [[maybe_unused]]
const DataContextId& data_ctx_id)
override;
215 void OnConnectionStatus(
const ConnectionHandle& connection_handle,
const TransportStatus status)
override;
216 void OnNewConnection(
const ConnectionHandle& connection_handle,
const TransportRemote& remote)
override;
219 std::optional<DataContextId> data_ctx_id,
220 const bool is_bidir =
false)
override;
221 void OnRecvDgram(
const ConnectionHandle& connection_handle, std::optional<DataContextId> data_ctx_id)
override;
223 void OnConnectionMetricsSampled(MetricsTimeStamp sample_time,
224 TransportConnId conn_id,
225 const QuicConnectionMetrics& quic_connection_metrics)
override;
227 void OnDataMetricsStampled(MetricsTimeStamp sample_time,
228 TransportConnId conn_id,
229 DataContextId data_ctx_id,
230 const QuicDataContextMetrics& quic_data_context_metrics)
override;
236 static constexpr std::size_t kControlMessageBufferSize = 4096;
237 struct ConnectionContext
240 std::optional<uint64_t> ctrl_data_ctx_id;
241 bool setup_complete{
false };
242 uint64_t client_version{ 0 };
243 std::optional<messages::ControlMessageType>
244 ctrl_msg_type_received;
246 std::vector<uint8_t> ctrl_msg_buffer;
248 uint64_t current_subscribe_id{ 0 };
252 std::map<messages::SubscribeId, std::pair<TrackNamespaceHash, TrackNameHash>> recv_sub_id;
255 std::map<messages::SubscribeId, std::shared_ptr<SubscribeTrackHandler>> tracks_by_sub_id;
258 std::map<messages::TrackAlias, std::shared_ptr<SubscribeTrackHandler>> sub_by_track_alias;
261 std::map<TrackNamespaceHash, std::map<TrackNameHash, std::shared_ptr<PublishTrackHandler>>>
264 std::map<messages::TrackAlias, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_track_alias;
267 std::map<DataContextId, std::shared_ptr<PublishTrackHandler>> pub_tracks_by_data_ctx_id;
269 ConnectionMetrics metrics{};
271 ConnectionContext() { ctrl_msg_buffer.reserve(kControlMessageBufferSize); }
283 bool stream_header_needed,
284 std::shared_ptr<
const std::vector<uint8_t>> data);
289 bool stream_header_needed,
291 uint64_t subgroup_id,
293 std::optional<Extensions> extensions,
296 void SendCtrlMsg(
const ConnectionContext& conn_ctx,
BytesSpan data);
297 void SendClientSetup();
298 void SendServerSetup(ConnectionContext& conn_ctx);
299 void SendAnnounce(ConnectionContext& conn_ctx,
const TrackNamespace& track_namespace);
300 void SendAnnounceOk(ConnectionContext& conn_ctx,
const TrackNamespace& track_namespace);
301 void SendUnannounce(ConnectionContext& conn_ctx,
const TrackNamespace& track_namespace);
302 void SendSubscribe(ConnectionContext& conn_ctx,
303 uint64_t subscribe_id,
304 const FullTrackName& tfn,
306 messages::ObjectPriority priority,
307 messages::GroupOrder group_order,
308 messages::FilterType filter_type);
309 void SendSubscribeUpdate(ConnectionContext& conn_ctx,
310 uint64_t subscribe_id,
312 messages::GroupId start_group_id,
313 messages::ObjectId start_object_id,
314 messages::GroupId end_group_id,
315 messages::ObjectPriority priority);
317 void SendSubscribeOk(ConnectionContext& conn_ctx,
318 uint64_t subscribe_id,
321 messages::GroupId largest_group_id,
322 messages::ObjectId largest_object_id);
323 void SendUnsubscribe(ConnectionContext& conn_ctx, uint64_t subscribe_id);
324 void SendSubscribeDone(ConnectionContext& conn_ctx, uint64_t subscribe_id,
const std::string& reason);
325 void SendSubscribeError(ConnectionContext& conn_ctx,
326 uint64_t subscribe_id,
327 uint64_t track_alias,
328 messages::SubscribeErrorCode error,
329 const std::string& reason);
331 void SendSubscribeAnnounces(
ConnectionHandle conn_handle,
const TrackNamespace& prefix_namespace);
332 void SendUnsubscribeAnnounces(
ConnectionHandle conn_handle,
const TrackNamespace& prefix_namespace);
333 void SendSubscribeAnnouncesOk(ConnectionContext& conn_ctx,
const TrackNamespace& prefix_namespace);
334 void SendSubscribeAnnouncesError(ConnectionContext& conn_ctx,
335 const TrackNamespace& prefix_namespace,
336 messages::SubscribeAnnouncesErrorCode err_code,
337 const messages::ReasonPhrase& reason);
339 void SendFetch(ConnectionContext& conn_ctx,
340 uint64_t subscribe_id,
341 const FullTrackName& tfn,
342 messages::ObjectPriority priority,
343 messages::GroupOrder group_order,
344 messages::GroupId start_group,
345 messages::GroupId start_object,
346 messages::GroupId end_group,
347 messages::GroupId end_object);
348 void SendFetchCancel(ConnectionContext& conn_ctx, uint64_t subscribe_id);
349 void SendFetchOk(ConnectionContext& conn_ctx,
350 uint64_t subscribe_id,
351 messages::GroupOrder group_order,
353 messages::GroupId largest_group_id,
354 messages::GroupId largest_object_id);
355 void SendFetchError(ConnectionContext& conn_ctx,
356 uint64_t subscribe_id,
357 messages::FetchErrorCode error,
358 const std::string& reason);
360 messages::TerminationReason reason,
361 const std::string& reason_str);
363 void RemoveSubscribeTrack(ConnectionContext& conn_ctx,
364 SubscribeTrackHandler& handler,
365 bool remove_handler =
true);
367 void SendNewGroupRequest(
ConnectionHandle conn_id, uint64_t subscribe_id, uint64_t track_alias);
369 std::shared_ptr<PublishTrackHandler> GetPubTrackHandler(ConnectionContext& conn_ctx, TrackHash& th);
371 void RemoveAllTracksForConnectionClose(ConnectionContext& conn_ctx);
376 virtual void NewConnectionAccepted(
ConnectionHandle,
const ConnectionRemoteInfo&) {}
387 virtual void MetricsSampled(
const ConnectionMetrics&) {}
396 virtual bool ProcessCtrlMessage(ConnectionContext& conn_ctx,
BytesSpan msg_bytes) = 0;
402 std::mutex state_mutex_;
403 const bool client_mode_;
404 std::shared_ptr<spdlog::logger> logger_;
406 const ServerConfig server_config_;
407 const ClientConfig client_config_;
409 std::map<ConnectionHandle, ConnectionContext> connections_;
413 std::shared_ptr<TickService> tick_service_;
414 std::shared_ptr<ITransport> quic_transport_;