1
0
Fork 0
mirror of https://gitlab.gnome.org/GNOME/calls.git synced 2025-01-07 04:15:32 +00:00

sip: media-pipeline: Keep track of pipeline state

This can be used by the media manager to dispose of pipelines which are done.
This commit is contained in:
Evangelos Ribeiro Tzaras 2022-02-28 11:18:36 +01:00
parent 53d6082d64
commit fe6951c938
3 changed files with 304 additions and 27 deletions

View file

@ -24,6 +24,7 @@
#define G_LOG_DOMAIN "CallsSipMediaPipeline"
#include "calls-media-pipeline-enums.h"
#include "calls-sip-media-pipeline.h"
#include "util.h"
@ -57,6 +58,39 @@
* Both pipelines are using RTCP.
*/
/* The following defines are used to set/reset bitmaps of playing/paused/stop state */
#define EL_SEND_PIPELINE (1<<0)
#define EL_SEND_AUDIO_SRC (1<<1)
#define EL_SEND_RTPBIN (1<<2)
#define EL_SEND_RTP_SINK (1<<3)
#define EL_SEND_RTCP_SINK (1<<4)
#define EL_SEND_RTCP_SRC (1<<5)
#define EL_SEND_PAYLOADER (1<<6)
#define EL_SEND_ENCODER (1<<7)
#define EL_SEND_ALL_RTP EL_SEND_PIPELINE | EL_SEND_AUDIO_SRC | \
EL_SEND_RTPBIN | EL_SEND_RTP_SINK | EL_SEND_RTCP_SRC | EL_SEND_RTCP_SINK | \
EL_SEND_PAYLOADER | EL_SEND_ENCODER
#define EL_SEND_SENDING EL_SEND_AUDIO_SRC | EL_SEND_RTPBIN | EL_SEND_RTP_SINK | \
EL_SEND_PAYLOADER | EL_SEND_ENCODER
/* leave some room for more elements to be added later */
#define EL_RECV_PIPELINE (1<<16)
#define EL_RECV_AUDIO_SINK (1<<17)
#define EL_RECV_RTPBIN (1<<18)
#define EL_RECV_RTP_SRC (1<<19)
#define EL_RECV_RTCP_SINK (1<<20)
#define EL_RECV_RTCP_SRC (1<<21)
#define EL_RECV_DEPAYLOADER (1<<22)
#define EL_RECV_DECODER (1<<23)
#define EL_RECV_ALL_RTP EL_RECV_PIPELINE | EL_RECV_AUDIO_SINK | \
EL_RECV_RTPBIN | EL_RECV_RTP_SRC | EL_RECV_RTCP_SRC | EL_RECV_RTCP_SINK | \
EL_RECV_DEPAYLOADER | EL_RECV_DECODER
enum {
PROP_0,
PROP_CODEC,
@ -66,15 +100,30 @@ enum {
PROP_LPORT_RTCP,
PROP_RPORT_RTCP,
PROP_DEBUG,
PROP_STATE,
PROP_LAST_PROP,
};
enum {
SENDING_STARTED,
N_SIGNALS
};
static GParamSpec *props[PROP_LAST_PROP];
static uint signals[N_SIGNALS];
struct _CallsSipMediaPipeline {
GObject parent;
MediaCodecInfo *codec;
gboolean debug;
CallsMediaPipelineState state;
uint element_map_playing;
uint element_map_paused;
uint element_map_stopped;
gboolean emitted_sending_signal;
/* Connection details */
char *remote;
@ -84,8 +133,6 @@ struct _CallsSipMediaPipeline {
gint rport_rtcp;
gint lport_rtcp;
gboolean is_running;
/* Gstreamer Elements (sending) */
GstElement *send_pipeline;
GstElement *audiosrc;
@ -119,6 +166,56 @@ static void initable_iface_init (GInitableIface *iface);
G_DEFINE_TYPE_WITH_CODE (CallsSipMediaPipeline, calls_sip_media_pipeline, G_TYPE_OBJECT,
G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, initable_iface_init));
static void
set_state (CallsSipMediaPipeline *self,
CallsMediaPipelineState state)
{
g_assert (CALLS_SIP_MEDIA_PIPELINE (self));
if (self->state == state)
return;
self->state = state;
g_object_notify_by_pspec (G_OBJECT (self), props[PROP_STATE]);
self->emitted_sending_signal = FALSE;
}
static void
check_element_maps (CallsSipMediaPipeline *self)
{
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
if (self->element_map_playing == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
g_debug ("All pipeline elements are playing");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING);
return;
}
if (self->element_map_paused == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
g_debug ("All pipeline elements are paused");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED);
return;
}
if (self->element_map_stopped == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
g_debug ("All pipeline elements are stopped");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED);
return;
}
if ((self->element_map_playing & (EL_SEND_SENDING)) == (EL_SEND_SENDING) &&
!self->emitted_sending_signal) {
g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d",
self->remote, self->rport_rtp, self->rport_rtcp);
g_signal_emit (self, signals[SENDING_STARTED], 0);
self->emitted_sending_signal = TRUE;
}
}
/* rtpbin adds a pad once the payload is verified */
static void
on_pad_added (GstElement *rtpbin,
@ -144,7 +241,7 @@ on_bus_message (GstBus *bus,
GstMessage *message,
gpointer data)
{
CallsSipMediaPipeline *pipeline = CALLS_SIP_MEDIA_PIPELINE (data);
CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (data);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
@ -169,24 +266,79 @@ on_bus_message (GstBus *bus,
case GST_MESSAGE_EOS:
g_debug ("Received end of stream");
calls_sip_media_pipeline_stop (pipeline);
calls_sip_media_pipeline_stop (self);
break;
case GST_MESSAGE_STATE_CHANGED:
{
GstState oldstate;
GstState newstate;
uint element_id = 0;
uint unset_element_id;
gst_message_parse_state_changed (message, &oldstate, &newstate, NULL);
g_debug ("Element %s has changed state from %s to %s",
GST_OBJECT_NAME (message->src),
gst_element_state_get_name (oldstate),
gst_element_state_get_name (newstate));
/* Sender pipeline elements */
if (message->src == GST_OBJECT (self->send_pipeline))
element_id = EL_SEND_PIPELINE;
else if (message->src == GST_OBJECT (self->audiosrc))
element_id = EL_SEND_AUDIO_SRC;
else if (message->src == GST_OBJECT (self->send_rtpbin))
element_id = EL_SEND_RTPBIN;
else if (message->src == GST_OBJECT (self->rtp_sink))
element_id = EL_SEND_RTP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_send_sink))
element_id = EL_SEND_RTCP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_send_src))
element_id = EL_SEND_RTCP_SRC;
else if (message->src == GST_OBJECT (self->payloader))
element_id = EL_SEND_PAYLOADER;
else if (message->src == GST_OBJECT (self->encoder))
element_id = EL_SEND_ENCODER;
/* Receiver pipeline elements */
else if (message->src == GST_OBJECT (self->recv_pipeline))
element_id = EL_RECV_PIPELINE;
else if (message->src == GST_OBJECT (self->audiosink))
element_id = EL_RECV_AUDIO_SINK;
else if (message->src == GST_OBJECT (self->recv_rtpbin))
element_id = EL_RECV_RTPBIN;
else if (message->src == GST_OBJECT (self->rtp_src))
element_id = EL_RECV_RTP_SRC;
else if (message->src == GST_OBJECT (self->rtcp_recv_sink))
element_id = EL_RECV_RTCP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_recv_src))
element_id = EL_RECV_RTCP_SRC;
else if (message->src == GST_OBJECT (self->depayloader))
element_id = EL_RECV_DEPAYLOADER;
else if (message->src == GST_OBJECT (self->decoder))
element_id = EL_RECV_DECODER;
unset_element_id = G_MAXUINT ^ element_id;
if (newstate == GST_STATE_PLAYING) {
self->element_map_playing |= element_id;
self->element_map_paused &= unset_element_id;
self->element_map_stopped &= unset_element_id;
} else if (newstate == GST_STATE_PAUSED) {
self->element_map_paused |= element_id;
self->element_map_playing &= unset_element_id;
self->element_map_stopped &= unset_element_id;
} else if (newstate == GST_STATE_NULL) {
self->element_map_stopped |= element_id;
self->element_map_playing &= unset_element_id;
self->element_map_paused &= unset_element_id;
}
check_element_maps (self);
break;
}
default:
if (pipeline->debug)
if (self->debug)
g_debug ("Got unhandled %s message", GST_MESSAGE_TYPE_NAME (message));
break;
}
@ -610,6 +762,10 @@ calls_sip_media_pipeline_get_property (GObject *object,
g_value_set_boolean (value, self->debug);
break;
case PROP_STATE:
g_value_set_enum (value, calls_sip_media_pipeline_get_state (self));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@ -733,7 +889,21 @@ calls_sip_media_pipeline_class_init (CallsSipMediaPipelineClass *klass)
FALSE,
G_PARAM_READWRITE);
props[PROP_STATE] = g_param_spec_enum ("state",
"State",
"The state of the media pipeline",
CALLS_TYPE_MEDIA_PIPELINE_STATE,
CALLS_MEDIA_PIPELINE_STATE_UNKNOWN,
G_PARAM_READABLE);
g_object_class_install_properties (object_class, PROP_LAST_PROP, props);
signals[SENDING_STARTED] =
g_signal_new ("sending-started",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 0);
}
@ -752,13 +922,20 @@ pipelines_initable_init (GInitable *initable,
{
CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (initable);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING);
if (!recv_pipeline_init (self, cancellable, error))
return FALSE;
goto err;
if (!send_pipeline_init (self, cancellable, error))
return FALSE;
goto err;
set_state (self, CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC);
return TRUE;
err:
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return FALSE;
}
@ -815,17 +992,21 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
if (!recv_pipeline_setup_codecs (self, codec, &error)) {
g_warning ("Error trying to setup codec for receive pipeline: %s",
error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return;
}
if (!send_pipeline_setup_codecs (self, codec, &error)) {
g_warning ("Error trying to setup codec for send pipeline: %s",
error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return;
}
self->codec = codec;
g_object_notify_by_pspec (G_OBJECT (self), props[PROP_CODEC]);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY);
}
static void
@ -867,7 +1048,12 @@ diagnose_ports_in_use (CallsSipMediaPipeline *self)
gboolean same_socket = FALSE;
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
g_assert (self->is_running);
if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING &&
self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED) {
g_warning ("Cannot diagnose ports when pipeline is not active");
return;
}
g_object_get (self->rtp_src, "used-socket", &socket_in, NULL);
g_object_get (self->rtp_sink, "used-socket", &socket_out, NULL);
@ -897,13 +1083,12 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self)
GSocket *socket;
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
if (!self->codec) {
g_warning ("Codec not set for this pipeline. Cannot start");
if (self->state != CALLS_MEDIA_PIPELINE_STATE_READY) {
g_warning ("Cannot start pipeline because it's not ready");
return;
}
g_debug ("Starting media pipeline");
self->is_running = TRUE;
/* First start the receiver pipeline so that
we may reuse the socket in the sender pipeline */
@ -924,9 +1109,10 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self)
/* Now start the sender pipeline */
gst_element_set_state (self->send_pipeline, GST_STATE_PLAYING);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING);
if (self->debug)
diagnose_ports_in_use (self);
}
@ -936,11 +1122,12 @@ calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self)
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
g_debug ("Stopping media pipeline");
self->is_running = FALSE;
/* Stop the pipelines in reverse order (compared to the starting) */
gst_element_set_state (self->send_pipeline, GST_STATE_NULL);
gst_element_set_state (self->recv_pipeline, GST_STATE_NULL);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING);
}
@ -950,20 +1137,38 @@ calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
{
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
if (self->is_running != pause)
if (pause &&
(self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSED ||
self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING))
return;
g_debug ("%s media pipeline", self->is_running ?
if (!pause &&
(self->state == CALLS_MEDIA_PIPELINE_STATE_PLAYING ||
self->state == CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING))
return;
if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING &&
self->state != CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING &&
self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED &&
self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING) {
g_warning ("Cannot pause or unpause pipeline because it's not currently active");
return;
}
g_debug ("%s media pipeline", pause ?
"Pausing" :
"Unpausing");
gst_element_set_state (self->recv_pipeline, self->is_running ?
gst_element_set_state (self->recv_pipeline, pause ?
GST_STATE_PAUSED :
GST_STATE_PLAYING);
gst_element_set_state (self->send_pipeline, self->is_running ?
gst_element_set_state (self->send_pipeline, pause ?
GST_STATE_PAUSED :
GST_STATE_PLAYING);
self->is_running = !self->is_running;
set_state (self, pause ?
CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING :
CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING);
}
@ -992,4 +1197,38 @@ calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self)
return port;
}
CallsMediaPipelineState
calls_sip_media_pipeline_get_state (CallsSipMediaPipeline *self)
{
g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self),
CALLS_MEDIA_PIPELINE_STATE_UNKNOWN);
return self->state;
}
#undef MAKE_ELEMENT
#undef EL_SEND_PIPELINE
#undef EL_SEND_AUDIO_SRC
#undef EL_SEND_RTPBIN
#undef EL_SEND_RTP_SINK
#undef EL_SEND_RTCP_SINK
#undef EL_SEND_RTCP_SRC
#undef EL_SEND_PAYLOADER
#undef EL_SEND_ENCODER
#undef EL_SEND_ALL_RTP
#undef EL_SEND_SENDING
#undef EL_RECV_PIPELINE
#undef EL_RECV_AUDIO_SINK
#undef EL_RECV_RTPBIN
#undef EL_RECV_RTP_SRC
#undef EL_RECV_RTCP_SINK
#undef EL_RECV_RTCP_SRC
#undef EL_RECV_DEPAYLOADER
#undef EL_RECV_DECODER
#undef EL_RECV_ALL_RTP

View file

@ -30,19 +30,49 @@
G_BEGIN_DECLS
/**
* CallsMediaPipelineState:
* @CALLS_MEDIA_PIPELINE_STATE_UNKNOWN: Default state for new pipelines
* @CALLS_MEDIA_PIPELINE_STATE_ERROR: Pipeline is in an error state
* @CALLS_MEDIA_PIPELINE_STATE_INITIALIZING: Pipeline is initializing
* @CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC: Pipeline was initialized and needs a codec set
* @CALLS_MEDIA_PIPELINE_STATE_READY: Pipeline is ready to be set into playing state
* @CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING: Request to start pipeline pending
* @CALLS_MEDIA_PIPELINE_STATE_PLAYING: Pipeline is currently playing
* @CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING: Request to pause pipeline pending
* @CALLS_MEDIA_PIPELINE_STATE_PAUSED: Pipeline is currently paused
* @CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING: Request to stop pipeline pending
* @CALLS_MEDIA_PIPELINE_STATE_STOPPED: Pipeline has stopped playing (f.e. received BYE packet)
*/
typedef enum {
CALLS_MEDIA_PIPELINE_STATE_UNKNOWN = 0,
CALLS_MEDIA_PIPELINE_STATE_ERROR,
CALLS_MEDIA_PIPELINE_STATE_INITIALIZING,
CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC,
CALLS_MEDIA_PIPELINE_STATE_READY,
CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING,
CALLS_MEDIA_PIPELINE_STATE_PLAYING,
CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING,
CALLS_MEDIA_PIPELINE_STATE_PAUSED,
CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING,
CALLS_MEDIA_PIPELINE_STATE_STOPPED
} CallsMediaPipelineState;
#define CALLS_TYPE_SIP_MEDIA_PIPELINE (calls_sip_media_pipeline_get_type ())
G_DECLARE_FINAL_TYPE (CallsSipMediaPipeline, calls_sip_media_pipeline, CALLS, SIP_MEDIA_PIPELINE, GObject)
CallsSipMediaPipeline* calls_sip_media_pipeline_new (MediaCodecInfo *codec);
void calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
MediaCodecInfo *info);
void calls_sip_media_pipeline_start (CallsSipMediaPipeline *self);
void calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self);
void calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
gboolean pause);
int calls_sip_media_pipeline_get_rtp_port (CallsSipMediaPipeline *self);
int calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self);
CallsSipMediaPipeline* calls_sip_media_pipeline_new (MediaCodecInfo *codec);
void calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
MediaCodecInfo *info);
void calls_sip_media_pipeline_start (CallsSipMediaPipeline *self);
void calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self);
void calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
gboolean pause);
int calls_sip_media_pipeline_get_rtp_port (CallsSipMediaPipeline *self);
int calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self);
CallsMediaPipelineState calls_sip_media_pipeline_get_state (CallsSipMediaPipeline *self);
G_END_DECLS

View file

@ -55,6 +55,14 @@ sip_sources = files(
]
)
pipeline_enum_headers = [
'calls-sip-media-pipeline.h',
]
pipeline_enums = gnome.mkenums_simple('calls-media-pipeline-enums',
sources: pipeline_enum_headers)
sip_sources += pipeline_enums
sip_enum_headers = [
'calls-sip-util.h',
]