1
0
Fork 0
mirror of https://gitlab.gnome.org/GNOME/calls.git synced 2025-01-23 20:15:32 +00:00

sip: pipeline: Unify send and receive pipeline

Using a single pipeline makes implementing encryption easier because we don't
need to duplicate srtpenc and srtpdec elements for each direction.

It also makes it easier to switch to using farstream down the line (see #426).
This commit is contained in:
Evangelos Ribeiro Tzaras 2022-04-06 10:30:34 +02:00
parent 85ee014cc9
commit aa446f8218

View file

@ -60,35 +60,36 @@
/* The following defines are used to set/reset bitmaps of playing/paused/stop state */ /* The following defines are used to set/reset bitmaps of playing/paused/stop state */
#define EL_SEND_PIPELINE (1<<0) #define EL_PIPELINE (1<<0)
#define EL_SEND_AUDIO_SRC (1<<1) #define EL_RTPBIN (1<<1)
#define EL_SEND_RTPBIN (1<<2) #define EL_RTP_SRC (1<<2)
#define EL_SEND_RTP_SINK (1<<3) #define EL_RTP_SINK (1<<3)
#define EL_SEND_RTCP_SINK (1<<4) #define EL_RTCP_SRC (1<<4)
#define EL_SEND_RTCP_SRC (1<<5) #define EL_RTCP_SINK (1<<5)
#define EL_SEND_PAYLOADER (1<<6)
#define EL_SEND_ENCODER (1<<7)
#define EL_SEND_ALL_RTP EL_SEND_PIPELINE | EL_SEND_AUDIO_SRC | \ #define EL_SRTP_ENCODER (1<<6)
EL_SEND_RTPBIN | EL_SEND_RTP_SINK | EL_SEND_RTCP_SRC | EL_SEND_RTCP_SINK | \ #define EL_SRTP_DECODER (1<<7)
EL_SEND_PAYLOADER | EL_SEND_ENCODER
#define EL_SEND_SENDING EL_SEND_AUDIO_SRC | EL_SEND_RTPBIN | EL_SEND_RTP_SINK | \
EL_SEND_PAYLOADER | EL_SEND_ENCODER
/* leave some room for more elements to be added later */ #define EL_AUDIO_SRC (1<<8)
#define EL_AUDIO_SINK (1<<9)
#define EL_RECV_PIPELINE (1<<16) #define EL_PAYLOADER (1<<10)
#define EL_RECV_AUDIO_SINK (1<<17) #define EL_DEPAYLOADER (1<<11)
#define EL_RECV_RTPBIN (1<<18)
#define EL_RECV_RTP_SRC (1<<19)
#define EL_RECV_RTCP_SINK (1<<20)
#define EL_RECV_RTCP_SRC (1<<21)
#define EL_RECV_DEPAYLOADER (1<<22)
#define EL_RECV_DECODER (1<<23)
#define EL_RECV_ALL_RTP EL_RECV_PIPELINE | EL_RECV_AUDIO_SINK | \ #define EL_ENCODER (1<<12)
EL_RECV_RTPBIN | EL_RECV_RTP_SRC | EL_RECV_RTCP_SRC | EL_RECV_RTCP_SINK | \ #define EL_DECODER (1<<13)
EL_RECV_DEPAYLOADER | EL_RECV_DECODER
#define EL_SENDING \
(EL_AUDIO_SRC | EL_ENCODER | EL_PAYLOADER | \
EL_RTPBIN | EL_RTP_SINK | EL_RTCP_SINK)
#define EL_ALL_RTP \
(EL_PIPELINE | EL_RTPBIN | \
EL_RTP_SRC | EL_RTP_SINK | EL_RTCP_SRC | EL_RTCP_SINK | \
EL_AUDIO_SRC | EL_AUDIO_SINK | \
EL_ENCODER | EL_DECODER | EL_PAYLOADER | EL_DEPAYLOADER)
#define EL_ALL_SRTP (EL_ALL_RTP | EL_SRTP_ENCODER | EL_SRTP_DECODER)
enum { enum {
@ -122,37 +123,31 @@ struct _CallsSipMediaPipeline {
uint element_map_paused; uint element_map_paused;
uint element_map_stopped; uint element_map_stopped;
gboolean emitted_sending_signal; gboolean emitted_sending_signal;
/* Connection details */ /* Connection details */
char *remote; char *remote;
gint rport_rtp; gint rport_rtp;
gint rport_rtcp; gint rport_rtcp;
/* Gstreamer Elements (sending) */ GstElement *pipeline;
GstElement *send_pipeline; GstElement *rtpbin;
GstElement *audiosrc;
GstElement *send_rtpbin; GstElement *rtp_src;
GstElement *rtp_sink; /* UDP out */ GstElement *rtp_sink;
GstElement *rtcp_sink;
GstElement *rtcp_src;
GstElement *audio_src;
GstElement *payloader; GstElement *payloader;
GstElement *encoder; GstElement *encoder;
GstElement *rtcp_send_sink;
GstElement *rtcp_send_src; GstElement *audio_sink;
/* Gstreamer elements (receiving) */
GstElement *recv_pipeline;
GstElement *audiosink;
GstElement *recv_rtpbin;
GstElement *rtp_src; /* UDP in */
GstElement *depayloader; GstElement *depayloader;
GstElement *decoder; GstElement *decoder;
GstElement *rtcp_recv_sink;
GstElement *rtcp_recv_src;
/* Gstreamer busses */ /* Gstreamer busses */
GstBus *bus_send; GstBus *bus;
GstBus *bus_recv; guint bus_watch_id;
guint bus_watch_send;
guint bus_watch_recv;
}; };
#if GLIB_CHECK_VERSION(2, 70, 0) #if GLIB_CHECK_VERSION(2, 70, 0)
@ -170,8 +165,7 @@ set_state (CallsSipMediaPipeline *self,
g_autoptr (GEnumClass) enum_class = NULL; g_autoptr (GEnumClass) enum_class = NULL;
GEnumValue *enum_val; GEnumValue *enum_val;
g_autofree char *recv_fname = NULL; g_autofree char *fname = NULL;
g_autofree char *send_fname = NULL;
g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); g_assert (CALLS_SIP_MEDIA_PIPELINE (self));
@ -189,15 +183,11 @@ set_state (CallsSipMediaPipeline *self,
enum_class = g_type_class_ref (CALLS_TYPE_MEDIA_PIPELINE_STATE); enum_class = g_type_class_ref (CALLS_TYPE_MEDIA_PIPELINE_STATE);
enum_val = g_enum_get_value (enum_class, state); enum_val = g_enum_get_value (enum_class, state);
recv_fname = g_strdup_printf ("recv-%s", enum_val->value_nick); fname = g_strdup_printf ("calls-%s", enum_val->value_nick);
send_fname = g_strdup_printf ("send-%s", enum_val->value_nick);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->recv_pipeline), GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, GST_DEBUG_GRAPH_SHOW_ALL,
recv_fname); fname);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->send_pipeline),
GST_DEBUG_GRAPH_SHOW_ALL,
send_fname);
} }
@ -206,25 +196,26 @@ check_element_maps (CallsSipMediaPipeline *self)
{ {
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
if (self->element_map_playing == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { /* TODO take encryption into account */
if (self->element_map_playing == EL_ALL_RTP) {
g_debug ("All pipeline elements are playing"); g_debug ("All pipeline elements are playing");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING);
return; return;
} }
if (self->element_map_paused == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { if (self->element_map_paused == EL_ALL_RTP) {
g_debug ("All pipeline elements are paused"); g_debug ("All pipeline elements are paused");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED);
return; return;
} }
if (self->element_map_stopped == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { if (self->element_map_stopped == EL_ALL_RTP) {
g_debug ("All pipeline elements are stopped"); g_debug ("All pipeline elements are stopped");
set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED);
return; return;
} }
if ((self->element_map_playing & (EL_SEND_SENDING)) == (EL_SEND_SENDING) && if ((self->element_map_playing & (EL_SENDING)) == (EL_SENDING) &&
!self->emitted_sending_signal) { !self->emitted_sending_signal) {
g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d", g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d",
self->remote, self->rport_rtp, self->rport_rtcp); self->remote, self->rport_rtp, self->rport_rtcp);
@ -301,42 +292,46 @@ on_bus_message (GstBus *bus,
gst_element_state_get_name (oldstate), gst_element_state_get_name (oldstate),
gst_element_state_get_name (newstate)); gst_element_state_get_name (newstate));
/* Sender pipeline elements */ if (message->src == GST_OBJECT (self->pipeline))
if (message->src == GST_OBJECT (self->send_pipeline)) element_id = EL_PIPELINE;
element_id = EL_SEND_PIPELINE; else if (message->src == GST_OBJECT (self->rtpbin))
else if (message->src == GST_OBJECT (self->audiosrc)) element_id = EL_RTPBIN;
element_id = EL_SEND_AUDIO_SRC;
else if (message->src == GST_OBJECT (self->send_rtpbin))
element_id = EL_SEND_RTPBIN;
else if (message->src == GST_OBJECT (self->rtp_sink))
element_id = EL_SEND_RTP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_send_sink))
element_id = EL_SEND_RTCP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_send_src))
element_id = EL_SEND_RTCP_SRC;
else if (message->src == GST_OBJECT (self->payloader))
element_id = EL_SEND_PAYLOADER;
else if (message->src == GST_OBJECT (self->encoder))
element_id = EL_SEND_ENCODER;
/* Receiver pipeline elements */
else if (message->src == GST_OBJECT (self->recv_pipeline))
element_id = EL_RECV_PIPELINE;
else if (message->src == GST_OBJECT (self->audiosink))
element_id = EL_RECV_AUDIO_SINK;
else if (message->src == GST_OBJECT (self->recv_rtpbin))
element_id = EL_RECV_RTPBIN;
else if (message->src == GST_OBJECT (self->rtp_src)) else if (message->src == GST_OBJECT (self->rtp_src))
element_id = EL_RECV_RTP_SRC; element_id = EL_RTP_SRC;
else if (message->src == GST_OBJECT (self->rtcp_recv_sink)) else if (message->src == GST_OBJECT (self->rtp_sink))
element_id = EL_RECV_RTCP_SINK; element_id = EL_RTP_SINK;
else if (message->src == GST_OBJECT (self->rtcp_recv_src))
element_id = EL_RECV_RTCP_SRC; else if (message->src == GST_OBJECT (self->rtcp_src))
element_id = EL_RTCP_SRC;
else if (message->src == GST_OBJECT (self->rtcp_sink))
element_id = EL_RTCP_SINK;
/* TODO srtp encryption
else if (message->src == GST_OBJECT (self->srtpenc))
element_id = EL_SRTP_ENCODER;
else if (message->src == GST_OBJECT (self->srtpdec))
element_id = EL_SRTP_DECODER;
*/
else if (message->src == GST_OBJECT (self->audio_src))
element_id = EL_AUDIO_SRC;
else if (message->src == GST_OBJECT (self->audio_sink))
element_id = EL_AUDIO_SINK;
else if (message->src == GST_OBJECT (self->payloader))
element_id = EL_PAYLOADER;
else if (message->src == GST_OBJECT (self->depayloader)) else if (message->src == GST_OBJECT (self->depayloader))
element_id = EL_RECV_DEPAYLOADER; element_id = EL_DEPAYLOADER;
else if (message->src == GST_OBJECT (self->encoder))
element_id = EL_ENCODER;
else if (message->src == GST_OBJECT (self->decoder)) else if (message->src == GST_OBJECT (self->decoder))
element_id = EL_RECV_DECODER; element_id = EL_DECODER;
unset_element_id = G_MAXUINT ^ element_id; unset_element_id = G_MAXUINT ^ element_id;
if (newstate == GST_STATE_PLAYING) { if (newstate == GST_STATE_PLAYING) {
self->element_map_playing |= element_id; self->element_map_playing |= element_id;
self->element_map_paused &= unset_element_id; self->element_map_paused &= unset_element_id;
@ -366,118 +361,52 @@ on_bus_message (GstBus *bus,
} }
/* Setting up pipelines */ /* Pipeline setup */
static gboolean static gboolean
send_pipeline_link_elements (CallsSipMediaPipeline *self, setup_socket_reuse (CallsSipMediaPipeline *self,
GError **error) GError **error)
{ {
g_autoptr (GstPad) srcpad = NULL; g_autoptr (GSocket) rtp_sock = NULL;
g_autoptr (GstPad) sinkpad = NULL; g_autoptr (GSocket) rtcp_sock = NULL;
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); /* Set udp sources to ready to get ports allocated */
gst_element_set_state (self->pipeline, GST_STATE_READY);
#if GST_CHECK_VERSION (1, 20, 0) g_object_get (self->rtp_src, "used-socket", &rtp_sock, NULL);
sinkpad = gst_element_request_pad_simple (self->send_rtpbin, "send_rtp_sink_0"); if (!rtp_sock) {
#else
sinkpad = gst_element_get_request_pad (self->send_rtpbin, "send_rtp_sink_0");
#endif
srcpad = gst_element_get_static_pad (self->payloader, "src");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error) if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link payloader to rtpbin"); "Could not retrieve used socket from RTP udpsrc element");
return FALSE; return FALSE;
} }
gst_object_unref (srcpad); g_object_get (self->rtcp_src, "used-socket", &rtcp_sock, NULL);
gst_object_unref (sinkpad); if (!rtcp_sock) {
/* link RTP srcpad to udpsink */
srcpad = gst_element_get_static_pad (self->send_rtpbin, "send_rtp_src_0");
sinkpad = gst_element_get_static_pad (self->rtp_sink, "sink");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error) if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link rtpbin to rtpsink"); "Could not retrieve used socket from RTCP udpsrc element");
return FALSE; return FALSE;
} }
gst_object_unref (srcpad); /* Ports are allocated. Let's reuse the socket for rtcp source in the sink for NAT traversal*/
gst_object_unref (sinkpad); g_object_set (self->rtp_sink,
"socket", rtp_sock,
"close-socket", FALSE,
NULL);
/* RTCP srcpad to udpsink */ g_object_set (self->rtcp_sink,
#if GST_CHECK_VERSION (1, 20, 0) "socket", rtcp_sock,
srcpad = gst_element_request_pad_simple (self->send_rtpbin, "send_rtcp_src_0"); "close-socket", FALSE,
#else NULL);
srcpad = gst_element_get_request_pad (self->send_rtpbin, "send_rtcp_src_0");
#endif
sinkpad = gst_element_get_static_pad (self->rtcp_send_sink, "sink");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link rtpbin to rtcpsink");
return FALSE;
}
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
/* receive RTCP */
srcpad = gst_element_get_static_pad (self->rtcp_send_src, "src");
#if GST_CHECK_VERSION (1, 20, 0)
sinkpad = gst_element_request_pad_simple (self->send_rtpbin, "recv_rtcp_sink_0");
#else
sinkpad = gst_element_get_request_pad (self->send_rtpbin, "recv_rtcp_sink_0");
#endif
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link rtcpsrc to rtpbin");
return FALSE;
}
return TRUE; return TRUE;
} }
static gboolean static gboolean
send_pipeline_setup_codecs (CallsSipMediaPipeline *self, pipeline_init (CallsSipMediaPipeline *self,
MediaCodecInfo *codec, GError **error)
GError **error)
{
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
g_assert (codec);
MAKE_ELEMENT (encoder, codec->gst_encoder_name, "encoder");
MAKE_ELEMENT (payloader, codec->gst_payloader_name, "payloader");
gst_bin_add_many (GST_BIN (self->send_pipeline), self->payloader, self->encoder,
self->audiosrc, NULL);
if (!gst_element_link_many (self->audiosrc, self->encoder, self->payloader, NULL)) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link audiosrc encoder and payloader");
return FALSE;
}
return send_pipeline_link_elements (self, error);
}
/**
* Prepare a skeleton send pipeline where we can later
* plug the codec specific elements into.
*
* In contrast to the receiver pipeline there is no need to start the
* pipeline until we actually want to establish a media session.
*
* The receiver pipeline should have been initialized at this point
* allowing us to reuse GSockets.
*/
static gboolean
send_pipeline_init (CallsSipMediaPipeline *self,
GError **error)
{ {
g_autoptr (GSocket) rtp_sock = NULL; g_autoptr (GSocket) rtp_sock = NULL;
g_autoptr (GSocket) rtcp_sock = NULL; g_autoptr (GSocket) rtcp_sock = NULL;
@ -485,24 +414,25 @@ send_pipeline_init (CallsSipMediaPipeline *self,
g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); g_assert (CALLS_SIP_MEDIA_PIPELINE (self));
self->send_pipeline = gst_pipeline_new ("rtp-send-pipeline"); self->pipeline = gst_pipeline_new ("media-pipeline");
if (!self->send_pipeline) { if (!self->pipeline) {
if (error) if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Could not create send pipeline"); "Could not create media pipeline");
return FALSE; return FALSE;
} }
gst_object_ref_sink (self->send_pipeline); gst_object_ref_sink (self->pipeline);
/* Audio source*/
env_var = g_getenv ("CALLS_AUDIOSRC"); env_var = g_getenv ("CALLS_AUDIOSRC");
if (!STR_IS_NULL_OR_EMPTY (env_var)) { if (!STR_IS_NULL_OR_EMPTY (env_var)) {
MAKE_ELEMENT (audiosrc, env_var, "audiosource"); MAKE_ELEMENT (audio_src, env_var, "audiosource");
} else { } else {
g_autoptr (GstStructure) gst_props = NULL; g_autoptr (GstStructure) gst_props = NULL;
MAKE_ELEMENT (audiosrc, "pulsesrc", "audiosource"); MAKE_ELEMENT (audio_src, "pulsesrc", "audiosource");
/* enable echo cancellation and set buffer size to 40ms */ /* enable echo cancellation and set buffer size to 40ms */
gst_props = gst_structure_new ("props", gst_props = gst_structure_new ("props",
@ -510,22 +440,61 @@ send_pipeline_init (CallsSipMediaPipeline *self,
"filter.want", G_TYPE_STRING, "echo-cancel", "filter.want", G_TYPE_STRING, "echo-cancel",
NULL); NULL);
g_object_set (self->audiosrc, g_object_set (self->audio_src,
"buffer-time", (gint64) 40000, "buffer-time", (gint64) 40000,
"stream-properties", gst_props, "stream-properties", gst_props,
NULL); NULL);
} }
MAKE_ELEMENT (send_rtpbin, "rtpbin", "send-rtpbin"); /* Audio sink */
MAKE_ELEMENT (rtp_sink, "udpsink", "rtp-udp-sink"); env_var = g_getenv ("CALLS_AUDIOSINK");
MAKE_ELEMENT (rtcp_send_src, "udpsrc", "rtcp-udp-send-src"); if (!STR_IS_NULL_OR_EMPTY (env_var)) {
MAKE_ELEMENT (rtcp_send_sink, "udpsink", "rtcp-udp-send-sink"); MAKE_ELEMENT (audio_sink, env_var, "audiosink");
} else {
g_autoptr (GstStructure) gst_props = NULL;
g_object_set (self->rtcp_send_sink, MAKE_ELEMENT (audio_sink, "pulsesink", "audiosink");
"async", FALSE,
"sync", FALSE, /* enable echo cancellation and set buffer size to 40ms */
gst_props = gst_structure_new ("props",
"media.role", G_TYPE_STRING, "phone",
"filter.want", G_TYPE_STRING, "echo-cancel",
NULL);
g_object_set (self->audio_sink,
"buffer-time", (gint64) 40000,
"stream-properties", gst_props,
NULL);
}
/* rtpbin */
MAKE_ELEMENT (rtpbin, "rtpbin", "rtpbin");
/* UDP sources and sinks for RTP and RTCP */
MAKE_ELEMENT (rtp_src, "udpsrc", "rtp-udp-src");
MAKE_ELEMENT (rtp_sink, "udpsink", "rtp-udp-sink");
MAKE_ELEMENT (rtcp_src, "udpsrc", "rtcp-udp-src");
MAKE_ELEMENT (rtcp_sink, "udpsink", "rtcp-udp-sink");
/* port 0 means letting the OS allocate */
g_object_set (self->rtp_src,
"port", 0,
"close-socket", FALSE,
"reuse", TRUE,
NULL); NULL);
g_object_set (self->rtcp_src,
"port", 0,
"close-socket", FALSE,
"reuse", TRUE,
NULL);
g_object_set (self->rtp_sink, "async", FALSE, "sync", FALSE, NULL);
g_object_set (self->rtcp_sink, "async", FALSE, "sync", FALSE, NULL);
g_object_bind_property (self, "rport-rtp", g_object_bind_property (self, "rport-rtp",
self->rtp_sink, "port", self->rtp_sink, "port",
G_BINDING_BIDIRECTIONAL); G_BINDING_BIDIRECTIONAL);
@ -535,69 +504,65 @@ send_pipeline_init (CallsSipMediaPipeline *self,
G_BINDING_BIDIRECTIONAL); G_BINDING_BIDIRECTIONAL);
g_object_bind_property (self, "rport-rtcp", g_object_bind_property (self, "rport-rtcp",
self->rtcp_send_sink, "port", self->rtcp_sink, "port",
G_BINDING_BIDIRECTIONAL); G_BINDING_BIDIRECTIONAL);
g_object_bind_property (self, "remote", g_object_bind_property (self, "remote",
self->rtcp_send_sink, "host", self->rtcp_sink, "host",
G_BINDING_BIDIRECTIONAL); G_BINDING_BIDIRECTIONAL);
/* bind sockets for hole punching scheme */
g_object_get (self->rtp_src, "used-socket", &rtp_sock, NULL); /* Add all elements to the pipeline */
if (!rtp_sock) { gst_bin_add_many (GST_BIN (self->pipeline),
if (error) self->audio_src, self->audio_sink,
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, self->rtpbin,
"Could not retrieve used socket from RTP udpsrc element"); self->rtp_src, self->rtp_sink,
self->rtcp_src, self->rtcp_sink,
NULL);
/* Setup bus watch */
self->bus = gst_pipeline_get_bus (GST_PIPELINE (self->pipeline));
self->bus_watch_id = gst_bus_add_watch (self->bus, on_bus_message, self);
if (!setup_socket_reuse (self, error))
return FALSE; return FALSE;
}
g_object_set (self->rtp_sink,
"socket", rtp_sock,
"close-socket", FALSE,
NULL);
g_object_get (self->rtcp_recv_src, "used-socket", &rtcp_sock, NULL);
if (!rtcp_sock) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Could not retrieve used socket from RTCP udpsrc element");
return FALSE;
}
g_object_set (self->rtcp_send_sink,
"socket", rtcp_sock,
"close-socket", FALSE,
NULL);
g_object_set (self->rtcp_send_src,
"socket", rtcp_sock,
"close-socket", FALSE,
NULL);
gst_bin_add (GST_BIN (self->send_pipeline), self->send_rtpbin);
gst_bin_add_many (GST_BIN (self->send_pipeline), self->rtp_sink,
self->rtcp_send_src, self->rtcp_send_sink, NULL);
self->bus_send = gst_pipeline_get_bus (GST_PIPELINE (self->send_pipeline));
self->bus_watch_send = gst_bus_add_watch (self->bus_send, on_bus_message, self);
return TRUE; return TRUE;
} }
static gboolean static gboolean
recv_pipeline_link_elements (CallsSipMediaPipeline *self, pipeline_link_elements (CallsSipMediaPipeline *self,
GError **error) GError **error)
{ {
g_autoptr (GstPad) srcpad = NULL; g_autoptr (GstPad) srcpad = NULL;
g_autoptr (GstPad) sinkpad = NULL; g_autoptr (GstPad) sinkpad = NULL;
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
/* link to payloader */
#if GST_CHECK_VERSION (1, 20, 0)
sinkpad = gst_element_request_pad_simple (self->rtpbin, "send_rtp_sink_0");
#else
sinkpad = gst_element_get_request_pad (self->rtpbin, "send_rtp_sink_0");
#endif
srcpad = gst_element_get_static_pad (self->payloader, "src");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link payloader to rtpbin");
return FALSE;
}
/* Transmitter pads */
srcpad = gst_element_get_static_pad (self->rtp_src, "src"); srcpad = gst_element_get_static_pad (self->rtp_src, "src");
#if GST_CHECK_VERSION (1, 20, 0) #if GST_CHECK_VERSION (1, 20, 0)
sinkpad = gst_element_request_pad_simple (self->recv_rtpbin, "recv_rtp_sink_0"); sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtp_sink_0");
#else #else
sinkpad = gst_element_get_request_pad (self->recv_rtpbin, "recv_rtp_sink_0"); sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtp_sink_0");
#endif #endif
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error) if (error)
@ -609,11 +574,23 @@ recv_pipeline_link_elements (CallsSipMediaPipeline *self,
gst_object_unref (srcpad); gst_object_unref (srcpad);
gst_object_unref (sinkpad); gst_object_unref (sinkpad);
srcpad = gst_element_get_static_pad (self->rtcp_recv_src, "src"); srcpad = gst_element_get_static_pad (self->rtpbin, "send_rtp_src_0");
sinkpad = gst_element_get_static_pad (self->rtp_sink, "sink");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link rtpbin to rtpsink");
return FALSE;
}
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
srcpad = gst_element_get_static_pad (self->rtcp_src, "src");
#if GST_CHECK_VERSION (1, 20 , 0) #if GST_CHECK_VERSION (1, 20 , 0)
sinkpad = gst_element_request_pad_simple (self->recv_rtpbin, "recv_rtcp_sink_0"); sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtcp_sink_0");
#else #else
sinkpad = gst_element_get_request_pad (self->recv_rtpbin, "recv_rtcp_sink_0"); sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtcp_sink_0");
#endif #endif
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error) if (error)
@ -622,32 +599,33 @@ recv_pipeline_link_elements (CallsSipMediaPipeline *self,
return FALSE; return FALSE;
} }
gst_object_unref (srcpad); gst_object_unref (srcpad);
gst_object_unref (sinkpad); gst_object_unref (sinkpad);
#if GST_CHECK_VERSION (1, 20, 0) #if GST_CHECK_VERSION (1, 20, 0)
srcpad = gst_element_request_pad_simple (self->recv_rtpbin, "send_rtcp_src_0"); srcpad = gst_element_request_pad_simple (self->rtpbin, "send_rtcp_src_0");
#else #else
srcpad = gst_element_get_request_pad (self->recv_rtpbin, "send_rtcp_src_0"); srcpad = gst_element_get_request_pad (self->rtpbin, "send_rtcp_src_0");
#endif #endif
sinkpad = gst_element_get_static_pad (self->rtcp_recv_sink, "sink"); sinkpad = gst_element_get_static_pad (self->rtcp_sink, "sink");
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
if (error) if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link rtpbin to rtcpsink"); "Failed to link rtpbin to rtcpsink");
return FALSE; return FALSE;
} }
g_signal_connect (self->recv_rtpbin, "pad-added", G_CALLBACK (on_pad_added), self->depayloader); /* can only link to depayloader after RTP payload has been verified */
g_signal_connect (self->rtpbin, "pad-added", G_CALLBACK (on_pad_added), self->depayloader);
return TRUE; return TRUE;
} }
static gboolean static gboolean
recv_pipeline_setup_codecs (CallsSipMediaPipeline *self, pipeline_setup_codecs (CallsSipMediaPipeline *self,
MediaCodecInfo *codec, MediaCodecInfo *codec,
GError **error) GError **error)
{ {
g_autoptr (GstCaps) caps = NULL; g_autoptr (GstCaps) caps = NULL;
g_autofree char *caps_string = NULL; g_autofree char *caps_string = NULL;
@ -658,10 +636,22 @@ recv_pipeline_setup_codecs (CallsSipMediaPipeline *self,
MAKE_ELEMENT (decoder, codec->gst_decoder_name, "decoder"); MAKE_ELEMENT (decoder, codec->gst_decoder_name, "decoder");
MAKE_ELEMENT (depayloader, codec->gst_depayloader_name, "depayloader"); MAKE_ELEMENT (depayloader, codec->gst_depayloader_name, "depayloader");
gst_bin_add_many (GST_BIN (self->recv_pipeline), self->depayloader, self->decoder, MAKE_ELEMENT (encoder, codec->gst_encoder_name, "encoder");
self->audiosink, NULL); MAKE_ELEMENT (payloader, codec->gst_payloader_name, "payloader");
if (!gst_element_link_many (self->depayloader, self->decoder, self->audiosink, NULL)) { gst_bin_add_many (GST_BIN (self->pipeline),
self->depayloader, self->decoder,
self->payloader, self->encoder,
NULL);
if (!gst_element_link_many (self->audio_src, self->encoder, self->payloader, NULL)) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link audiosrc encoder and payloader");
return FALSE;
}
if (!gst_element_link_many (self->depayloader, self->decoder, self->audio_sink, NULL)) {
if (error) if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Failed to link depayloader decoder and audiosink"); "Failed to link depayloader decoder and audiosink");
@ -679,119 +669,6 @@ recv_pipeline_setup_codecs (CallsSipMediaPipeline *self,
"caps", caps, "caps", caps,
NULL); NULL);
return recv_pipeline_link_elements (self, error);
}
/**
* Prepares a skeleton receiver pipeline which can later be
* used to plug codec specific element in.
* This pipeline just consists of (minimally linked) rtpbin
* audio sink and two udpsrc elements, one for RTP and one for RTCP.
*
* The pipeline will be set ready to let the OS allocate sockets
* for us instead of building and providing GSockets ourselves
* by hand. These GSockets are reused for any outgoing traffic in our
* hole punching scheme as a simple NAT traversal technique.
*/
static gboolean
recv_pipeline_init (CallsSipMediaPipeline *self,
GError **error)
{
g_autoptr (GSocket) rtcp_sock = NULL;
const char *env_var;
g_assert (CALLS_SIP_MEDIA_PIPELINE (self));
self->recv_pipeline = gst_pipeline_new ("rtp-recv-pipeline");
if (!self->recv_pipeline) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Could not create receiver pipeline");
return FALSE;
}
gst_object_ref_sink (self->recv_pipeline);
env_var = g_getenv ("CALLS_AUDIOSINK");
if (!STR_IS_NULL_OR_EMPTY (env_var)) {
MAKE_ELEMENT (audiosink, env_var, "audiosink");
} else {
g_autoptr (GstStructure) gst_props = NULL;
MAKE_ELEMENT (audiosink, "pulsesink", "audiosink");
/* enable echo cancellation and set buffer size to 40ms */
gst_props = gst_structure_new ("props",
"media.role", G_TYPE_STRING, "phone",
"filter.want", G_TYPE_STRING, "echo-cancel",
NULL);
g_object_set (self->audiosink,
"buffer-time", (gint64) 40000,
"stream-properties", gst_props,
NULL);
}
MAKE_ELEMENT (recv_rtpbin, "rtpbin", "recv-rtpbin")
MAKE_ELEMENT (rtp_src, "udpsrc", "rtp-udp-src");
MAKE_ELEMENT (rtcp_recv_src, "udpsrc", "rtcp-udp-recv-src");
MAKE_ELEMENT (rtcp_recv_sink, "udpsink", "rtcp-udp-recv-sink");
g_object_set (self->rtcp_recv_sink,
"async", FALSE,
"sync", FALSE,
NULL);
/* port 0 means allocate */
g_object_set (self->rtp_src, "port", 0, NULL);
g_object_set (self->rtcp_recv_src, "port", 0, NULL);
g_object_bind_property (self, "rport-rtcp",
self->rtcp_recv_sink, "port",
G_BINDING_BIDIRECTIONAL);
g_object_bind_property (self, "remote",
self->rtcp_recv_sink, "host",
G_BINDING_BIDIRECTIONAL);
gst_bin_add (GST_BIN (self->recv_pipeline), self->recv_rtpbin);
gst_bin_add_many (GST_BIN (self->recv_pipeline), self->rtp_src,
self->rtcp_recv_src, self->rtcp_recv_sink, NULL);
self->bus_recv = gst_pipeline_get_bus (GST_PIPELINE (self->recv_pipeline));
self->bus_watch_recv = gst_bus_add_watch (self->bus_recv, on_bus_message, self);
g_object_set (self->rtp_src,
"close-socket", FALSE,
"reuse", TRUE,
NULL);
g_object_set (self->rtcp_recv_src,
"close-socket", FALSE,
"reuse", TRUE,
NULL);
/* Set pipeline to ready to get ports allocated */
gst_element_set_state (self->recv_pipeline, GST_STATE_READY);
g_object_get (self->rtcp_recv_src, "used-socket", &rtcp_sock, NULL);
if (!rtcp_sock) {
if (error)
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Could not retrieve used socket from RTCP udpsrc element");
return FALSE;
}
/* Ports are allocated. Let's reuse the socket for rtcp source in the sink for NAT traversal*/
g_object_set (self->rtcp_recv_sink,
"socket", rtcp_sock,
"close-socket", FALSE,
NULL);
return TRUE; return TRUE;
} }
@ -883,14 +760,8 @@ calls_sip_media_pipeline_constructed (GObject *object)
set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING); set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING);
if (!recv_pipeline_init (self, &error)) { if (!pipeline_init (self, &error)) {
g_warning ("Could not create receive pipeline: %s", error->message); g_warning ("Could not create pipeline: %s", error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return;
}
if (!send_pipeline_init (self, &error)) {
g_warning ("Could not create send pipeline: %s", error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return; return;
} }
@ -906,12 +777,9 @@ calls_sip_media_pipeline_finalize (GObject *object)
calls_sip_media_pipeline_stop (self); calls_sip_media_pipeline_stop (self);
gst_object_unref (self->send_pipeline); gst_object_unref (self->pipeline);
gst_object_unref (self->recv_pipeline); gst_bus_remove_watch (self->bus);
gst_bus_remove_watch (self->bus_send); gst_object_unref (self->bus);
gst_object_unref (self->bus_send);
gst_bus_remove_watch (self->bus_recv);
gst_object_unref (self->bus_recv);
g_free (self->remote); g_free (self->remote);
@ -1022,15 +890,15 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
return; return;
} }
if (!recv_pipeline_setup_codecs (self, codec, &error)) { if (!pipeline_setup_codecs (self, codec, &error)) {
g_warning ("Error trying to setup codec for receive pipeline: %s", g_warning ("Error trying to setup codecs for pipeline: %s",
error->message); error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return; return;
} }
if (!send_pipeline_setup_codecs (self, codec, &error)) { if (!pipeline_link_elements (self, &error)) {
g_warning ("Error trying to setup codec for send pipeline: %s", g_warning ("Not all pads could be linked: %s",
error->message); error->message);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return; return;
@ -1042,6 +910,7 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY); set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY);
} }
static void static void
diagnose_used_ports_in_socket (GSocket *socket) diagnose_used_ports_in_socket (GSocket *socket)
{ {
@ -1122,8 +991,11 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self)
g_debug ("Starting media pipeline"); g_debug ("Starting media pipeline");
gst_element_set_state (self->recv_pipeline, GST_STATE_PLAYING); g_debug ("RTP/RTCP port before starting pipeline: %d/%d",
gst_element_set_state (self->send_pipeline, GST_STATE_PLAYING); calls_sip_media_pipeline_get_rtp_port (self),
calls_sip_media_pipeline_get_rtcp_port (self));
gst_element_set_state (self->pipeline, GST_STATE_PLAYING);
g_debug ("RTP/RTCP port after starting pipeline: %d/%d", g_debug ("RTP/RTCP port after starting pipeline: %d/%d",
calls_sip_media_pipeline_get_rtp_port (self), calls_sip_media_pipeline_get_rtp_port (self),
@ -1143,9 +1015,7 @@ calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self)
g_debug ("Stopping media pipeline"); g_debug ("Stopping media pipeline");
/* Stop the pipelines in reverse order (compared to the starting) */ gst_element_set_state (self->pipeline, GST_STATE_NULL);
gst_element_set_state (self->send_pipeline, GST_STATE_NULL);
gst_element_set_state (self->recv_pipeline, GST_STATE_NULL);
set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING);
} }
@ -1179,10 +1049,8 @@ calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
"Pausing" : "Pausing" :
"Unpausing"); "Unpausing");
gst_element_set_state (self->recv_pipeline, pause ?
GST_STATE_PAUSED : gst_element_set_state (self->pipeline, pause ?
GST_STATE_PLAYING);
gst_element_set_state (self->send_pipeline, pause ?
GST_STATE_PAUSED : GST_STATE_PAUSED :
GST_STATE_PLAYING); GST_STATE_PLAYING);
@ -1212,7 +1080,7 @@ calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self)
g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), 0); g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), 0);
g_object_get (self->rtcp_recv_src, "port", &port, NULL); g_object_get (self->rtcp_src, "port", &port, NULL);
return port; return port;
} }