From aa446f821835ee02271ad9e4083e85b2f4754558 Mon Sep 17 00:00:00 2001 From: Evangelos Ribeiro Tzaras Date: Wed, 6 Apr 2022 10:30:34 +0200 Subject: [PATCH] sip: pipeline: Unify send and receive pipeline Using a single pipeline makes implementing encryption easier because we don't need to duplicate srtpenc and srtpdec elements for each direction. It also makes it easier to switch to using farstream down the line (see #426). --- plugins/sip/calls-sip-media-pipeline.c | 668 ++++++++++--------------- 1 file changed, 268 insertions(+), 400 deletions(-) diff --git a/plugins/sip/calls-sip-media-pipeline.c b/plugins/sip/calls-sip-media-pipeline.c index fa7050a..bcb3387 100644 --- a/plugins/sip/calls-sip-media-pipeline.c +++ b/plugins/sip/calls-sip-media-pipeline.c @@ -60,35 +60,36 @@ /* The following defines are used to set/reset bitmaps of playing/paused/stop state */ -#define EL_SEND_PIPELINE (1<<0) -#define EL_SEND_AUDIO_SRC (1<<1) -#define EL_SEND_RTPBIN (1<<2) -#define EL_SEND_RTP_SINK (1<<3) -#define EL_SEND_RTCP_SINK (1<<4) -#define EL_SEND_RTCP_SRC (1<<5) -#define EL_SEND_PAYLOADER (1<<6) -#define EL_SEND_ENCODER (1<<7) +#define EL_PIPELINE (1<<0) +#define EL_RTPBIN (1<<1) +#define EL_RTP_SRC (1<<2) +#define EL_RTP_SINK (1<<3) +#define EL_RTCP_SRC (1<<4) +#define EL_RTCP_SINK (1<<5) -#define EL_SEND_ALL_RTP EL_SEND_PIPELINE | EL_SEND_AUDIO_SRC | \ - EL_SEND_RTPBIN | EL_SEND_RTP_SINK | EL_SEND_RTCP_SRC | EL_SEND_RTCP_SINK | \ - EL_SEND_PAYLOADER | EL_SEND_ENCODER -#define EL_SEND_SENDING EL_SEND_AUDIO_SRC | EL_SEND_RTPBIN | EL_SEND_RTP_SINK | \ - EL_SEND_PAYLOADER | EL_SEND_ENCODER +#define EL_SRTP_ENCODER (1<<6) +#define EL_SRTP_DECODER (1<<7) -/* leave some room for more elements to be added later */ +#define EL_AUDIO_SRC (1<<8) +#define EL_AUDIO_SINK (1<<9) -#define EL_RECV_PIPELINE (1<<16) -#define EL_RECV_AUDIO_SINK (1<<17) -#define EL_RECV_RTPBIN (1<<18) -#define EL_RECV_RTP_SRC (1<<19) -#define EL_RECV_RTCP_SINK (1<<20) -#define EL_RECV_RTCP_SRC (1<<21) -#define EL_RECV_DEPAYLOADER (1<<22) -#define EL_RECV_DECODER (1<<23) +#define EL_PAYLOADER (1<<10) +#define EL_DEPAYLOADER (1<<11) -#define EL_RECV_ALL_RTP EL_RECV_PIPELINE | EL_RECV_AUDIO_SINK | \ - EL_RECV_RTPBIN | EL_RECV_RTP_SRC | EL_RECV_RTCP_SRC | EL_RECV_RTCP_SINK | \ - EL_RECV_DEPAYLOADER | EL_RECV_DECODER +#define EL_ENCODER (1<<12) +#define EL_DECODER (1<<13) + +#define EL_SENDING \ + (EL_AUDIO_SRC | EL_ENCODER | EL_PAYLOADER | \ + EL_RTPBIN | EL_RTP_SINK | EL_RTCP_SINK) + +#define EL_ALL_RTP \ + (EL_PIPELINE | EL_RTPBIN | \ + EL_RTP_SRC | EL_RTP_SINK | EL_RTCP_SRC | EL_RTCP_SINK | \ + EL_AUDIO_SRC | EL_AUDIO_SINK | \ + EL_ENCODER | EL_DECODER | EL_PAYLOADER | EL_DEPAYLOADER) + +#define EL_ALL_SRTP (EL_ALL_RTP | EL_SRTP_ENCODER | EL_SRTP_DECODER) enum { @@ -122,37 +123,31 @@ struct _CallsSipMediaPipeline { uint element_map_paused; uint element_map_stopped; gboolean emitted_sending_signal; + /* Connection details */ char *remote; - gint rport_rtp; - gint rport_rtcp; - /* Gstreamer Elements (sending) */ - GstElement *send_pipeline; - GstElement *audiosrc; - GstElement *send_rtpbin; - GstElement *rtp_sink; /* UDP out */ + GstElement *pipeline; + GstElement *rtpbin; + + GstElement *rtp_src; + GstElement *rtp_sink; + GstElement *rtcp_sink; + GstElement *rtcp_src; + + GstElement *audio_src; GstElement *payloader; GstElement *encoder; - GstElement *rtcp_send_sink; - GstElement *rtcp_send_src; - /* Gstreamer elements (receiving) */ - GstElement *recv_pipeline; - GstElement *audiosink; - GstElement *recv_rtpbin; - GstElement *rtp_src; /* UDP in */ + + GstElement *audio_sink; GstElement *depayloader; GstElement *decoder; - GstElement *rtcp_recv_sink; - GstElement *rtcp_recv_src; /* Gstreamer busses */ - GstBus *bus_send; - GstBus *bus_recv; - guint bus_watch_send; - guint bus_watch_recv; + GstBus *bus; + guint bus_watch_id; }; #if GLIB_CHECK_VERSION(2, 70, 0) @@ -170,8 +165,7 @@ set_state (CallsSipMediaPipeline *self, g_autoptr (GEnumClass) enum_class = NULL; GEnumValue *enum_val; - g_autofree char *recv_fname = NULL; - g_autofree char *send_fname = NULL; + g_autofree char *fname = NULL; g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); @@ -189,15 +183,11 @@ set_state (CallsSipMediaPipeline *self, enum_class = g_type_class_ref (CALLS_TYPE_MEDIA_PIPELINE_STATE); enum_val = g_enum_get_value (enum_class, state); - recv_fname = g_strdup_printf ("recv-%s", enum_val->value_nick); - send_fname = g_strdup_printf ("send-%s", enum_val->value_nick); + fname = g_strdup_printf ("calls-%s", enum_val->value_nick); - GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->recv_pipeline), + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, - recv_fname); - GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->send_pipeline), - GST_DEBUG_GRAPH_SHOW_ALL, - send_fname); + fname); } @@ -206,25 +196,26 @@ check_element_maps (CallsSipMediaPipeline *self) { g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); - if (self->element_map_playing == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { + /* TODO take encryption into account */ + if (self->element_map_playing == EL_ALL_RTP) { g_debug ("All pipeline elements are playing"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING); return; } - if (self->element_map_paused == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { + if (self->element_map_paused == EL_ALL_RTP) { g_debug ("All pipeline elements are paused"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED); return; } - if (self->element_map_stopped == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) { + if (self->element_map_stopped == EL_ALL_RTP) { g_debug ("All pipeline elements are stopped"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED); return; } - if ((self->element_map_playing & (EL_SEND_SENDING)) == (EL_SEND_SENDING) && + if ((self->element_map_playing & (EL_SENDING)) == (EL_SENDING) && !self->emitted_sending_signal) { g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d", self->remote, self->rport_rtp, self->rport_rtcp); @@ -301,42 +292,46 @@ on_bus_message (GstBus *bus, gst_element_state_get_name (oldstate), gst_element_state_get_name (newstate)); - /* Sender pipeline elements */ - if (message->src == GST_OBJECT (self->send_pipeline)) - element_id = EL_SEND_PIPELINE; - else if (message->src == GST_OBJECT (self->audiosrc)) - element_id = EL_SEND_AUDIO_SRC; - else if (message->src == GST_OBJECT (self->send_rtpbin)) - element_id = EL_SEND_RTPBIN; - else if (message->src == GST_OBJECT (self->rtp_sink)) - element_id = EL_SEND_RTP_SINK; - else if (message->src == GST_OBJECT (self->rtcp_send_sink)) - element_id = EL_SEND_RTCP_SINK; - else if (message->src == GST_OBJECT (self->rtcp_send_src)) - element_id = EL_SEND_RTCP_SRC; - else if (message->src == GST_OBJECT (self->payloader)) - element_id = EL_SEND_PAYLOADER; - else if (message->src == GST_OBJECT (self->encoder)) - element_id = EL_SEND_ENCODER; - /* Receiver pipeline elements */ - else if (message->src == GST_OBJECT (self->recv_pipeline)) - element_id = EL_RECV_PIPELINE; - else if (message->src == GST_OBJECT (self->audiosink)) - element_id = EL_RECV_AUDIO_SINK; - else if (message->src == GST_OBJECT (self->recv_rtpbin)) - element_id = EL_RECV_RTPBIN; + if (message->src == GST_OBJECT (self->pipeline)) + element_id = EL_PIPELINE; + else if (message->src == GST_OBJECT (self->rtpbin)) + element_id = EL_RTPBIN; + else if (message->src == GST_OBJECT (self->rtp_src)) - element_id = EL_RECV_RTP_SRC; - else if (message->src == GST_OBJECT (self->rtcp_recv_sink)) - element_id = EL_RECV_RTCP_SINK; - else if (message->src == GST_OBJECT (self->rtcp_recv_src)) - element_id = EL_RECV_RTCP_SRC; + element_id = EL_RTP_SRC; + else if (message->src == GST_OBJECT (self->rtp_sink)) + element_id = EL_RTP_SINK; + + else if (message->src == GST_OBJECT (self->rtcp_src)) + element_id = EL_RTCP_SRC; + else if (message->src == GST_OBJECT (self->rtcp_sink)) + element_id = EL_RTCP_SINK; + + /* TODO srtp encryption + else if (message->src == GST_OBJECT (self->srtpenc)) + element_id = EL_SRTP_ENCODER; + else if (message->src == GST_OBJECT (self->srtpdec)) + element_id = EL_SRTP_DECODER; + */ + + + else if (message->src == GST_OBJECT (self->audio_src)) + element_id = EL_AUDIO_SRC; + else if (message->src == GST_OBJECT (self->audio_sink)) + element_id = EL_AUDIO_SINK; + + else if (message->src == GST_OBJECT (self->payloader)) + element_id = EL_PAYLOADER; else if (message->src == GST_OBJECT (self->depayloader)) - element_id = EL_RECV_DEPAYLOADER; + element_id = EL_DEPAYLOADER; + + else if (message->src == GST_OBJECT (self->encoder)) + element_id = EL_ENCODER; else if (message->src == GST_OBJECT (self->decoder)) - element_id = EL_RECV_DECODER; + element_id = EL_DECODER; unset_element_id = G_MAXUINT ^ element_id; + if (newstate == GST_STATE_PLAYING) { self->element_map_playing |= element_id; self->element_map_paused &= unset_element_id; @@ -366,118 +361,52 @@ on_bus_message (GstBus *bus, } -/* Setting up pipelines */ +/* Pipeline setup */ static gboolean -send_pipeline_link_elements (CallsSipMediaPipeline *self, - GError **error) +setup_socket_reuse (CallsSipMediaPipeline *self, + GError **error) { - g_autoptr (GstPad) srcpad = NULL; - g_autoptr (GstPad) sinkpad = NULL; + g_autoptr (GSocket) rtp_sock = NULL; + g_autoptr (GSocket) rtcp_sock = NULL; - g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); + /* Set udp sources to ready to get ports allocated */ + gst_element_set_state (self->pipeline, GST_STATE_READY); -#if GST_CHECK_VERSION (1, 20, 0) - sinkpad = gst_element_request_pad_simple (self->send_rtpbin, "send_rtp_sink_0"); -#else - sinkpad = gst_element_get_request_pad (self->send_rtpbin, "send_rtp_sink_0"); -#endif - srcpad = gst_element_get_static_pad (self->payloader, "src"); - if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { + g_object_get (self->rtp_src, "used-socket", &rtp_sock, NULL); + if (!rtp_sock) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link payloader to rtpbin"); + "Could not retrieve used socket from RTP udpsrc element"); return FALSE; } - gst_object_unref (srcpad); - gst_object_unref (sinkpad); - - /* link RTP srcpad to udpsink */ - srcpad = gst_element_get_static_pad (self->send_rtpbin, "send_rtp_src_0"); - sinkpad = gst_element_get_static_pad (self->rtp_sink, "sink"); - if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { + g_object_get (self->rtcp_src, "used-socket", &rtcp_sock, NULL); + if (!rtcp_sock) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link rtpbin to rtpsink"); + "Could not retrieve used socket from RTCP udpsrc element"); return FALSE; } - gst_object_unref (srcpad); - gst_object_unref (sinkpad); + /* Ports are allocated. Let's reuse the socket for rtcp source in the sink for NAT traversal*/ + g_object_set (self->rtp_sink, + "socket", rtp_sock, + "close-socket", FALSE, + NULL); - /* RTCP srcpad to udpsink */ -#if GST_CHECK_VERSION (1, 20, 0) - srcpad = gst_element_request_pad_simple (self->send_rtpbin, "send_rtcp_src_0"); -#else - srcpad = gst_element_get_request_pad (self->send_rtpbin, "send_rtcp_src_0"); -#endif - sinkpad = gst_element_get_static_pad (self->rtcp_send_sink, "sink"); - if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link rtpbin to rtcpsink"); - return FALSE; - } - - gst_object_unref (srcpad); - gst_object_unref (sinkpad); - - /* receive RTCP */ - srcpad = gst_element_get_static_pad (self->rtcp_send_src, "src"); -#if GST_CHECK_VERSION (1, 20, 0) - sinkpad = gst_element_request_pad_simple (self->send_rtpbin, "recv_rtcp_sink_0"); -#else - sinkpad = gst_element_get_request_pad (self->send_rtpbin, "recv_rtcp_sink_0"); -#endif - if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link rtcpsrc to rtpbin"); - return FALSE; - } + g_object_set (self->rtcp_sink, + "socket", rtcp_sock, + "close-socket", FALSE, + NULL); return TRUE; } static gboolean -send_pipeline_setup_codecs (CallsSipMediaPipeline *self, - MediaCodecInfo *codec, - GError **error) -{ - g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); - g_assert (codec); - - MAKE_ELEMENT (encoder, codec->gst_encoder_name, "encoder"); - MAKE_ELEMENT (payloader, codec->gst_payloader_name, "payloader"); - - gst_bin_add_many (GST_BIN (self->send_pipeline), self->payloader, self->encoder, - self->audiosrc, NULL); - - if (!gst_element_link_many (self->audiosrc, self->encoder, self->payloader, NULL)) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link audiosrc encoder and payloader"); - return FALSE; - } - - return send_pipeline_link_elements (self, error); -} - -/** - * Prepare a skeleton send pipeline where we can later - * plug the codec specific elements into. - * - * In contrast to the receiver pipeline there is no need to start the - * pipeline until we actually want to establish a media session. - * - * The receiver pipeline should have been initialized at this point - * allowing us to reuse GSockets. - */ -static gboolean -send_pipeline_init (CallsSipMediaPipeline *self, - GError **error) +pipeline_init (CallsSipMediaPipeline *self, + GError **error) { g_autoptr (GSocket) rtp_sock = NULL; g_autoptr (GSocket) rtcp_sock = NULL; @@ -485,24 +414,25 @@ send_pipeline_init (CallsSipMediaPipeline *self, g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); - self->send_pipeline = gst_pipeline_new ("rtp-send-pipeline"); + self->pipeline = gst_pipeline_new ("media-pipeline"); - if (!self->send_pipeline) { + if (!self->pipeline) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Could not create send pipeline"); + "Could not create media pipeline"); return FALSE; } - gst_object_ref_sink (self->send_pipeline); + gst_object_ref_sink (self->pipeline); + /* Audio source*/ env_var = g_getenv ("CALLS_AUDIOSRC"); if (!STR_IS_NULL_OR_EMPTY (env_var)) { - MAKE_ELEMENT (audiosrc, env_var, "audiosource"); + MAKE_ELEMENT (audio_src, env_var, "audiosource"); } else { g_autoptr (GstStructure) gst_props = NULL; - MAKE_ELEMENT (audiosrc, "pulsesrc", "audiosource"); + MAKE_ELEMENT (audio_src, "pulsesrc", "audiosource"); /* enable echo cancellation and set buffer size to 40ms */ gst_props = gst_structure_new ("props", @@ -510,22 +440,61 @@ send_pipeline_init (CallsSipMediaPipeline *self, "filter.want", G_TYPE_STRING, "echo-cancel", NULL); - g_object_set (self->audiosrc, + g_object_set (self->audio_src, "buffer-time", (gint64) 40000, "stream-properties", gst_props, NULL); } - MAKE_ELEMENT (send_rtpbin, "rtpbin", "send-rtpbin"); - MAKE_ELEMENT (rtp_sink, "udpsink", "rtp-udp-sink"); - MAKE_ELEMENT (rtcp_send_src, "udpsrc", "rtcp-udp-send-src"); - MAKE_ELEMENT (rtcp_send_sink, "udpsink", "rtcp-udp-send-sink"); + /* Audio sink */ + env_var = g_getenv ("CALLS_AUDIOSINK"); + if (!STR_IS_NULL_OR_EMPTY (env_var)) { + MAKE_ELEMENT (audio_sink, env_var, "audiosink"); + } else { + g_autoptr (GstStructure) gst_props = NULL; - g_object_set (self->rtcp_send_sink, - "async", FALSE, - "sync", FALSE, + MAKE_ELEMENT (audio_sink, "pulsesink", "audiosink"); + + /* enable echo cancellation and set buffer size to 40ms */ + gst_props = gst_structure_new ("props", + "media.role", G_TYPE_STRING, "phone", + "filter.want", G_TYPE_STRING, "echo-cancel", + NULL); + + g_object_set (self->audio_sink, + "buffer-time", (gint64) 40000, + "stream-properties", gst_props, + NULL); + + } + + + /* rtpbin */ + MAKE_ELEMENT (rtpbin, "rtpbin", "rtpbin"); + + /* UDP sources and sinks for RTP and RTCP */ + MAKE_ELEMENT (rtp_src, "udpsrc", "rtp-udp-src"); + MAKE_ELEMENT (rtp_sink, "udpsink", "rtp-udp-sink"); + + MAKE_ELEMENT (rtcp_src, "udpsrc", "rtcp-udp-src"); + MAKE_ELEMENT (rtcp_sink, "udpsink", "rtcp-udp-sink"); + + /* port 0 means letting the OS allocate */ + g_object_set (self->rtp_src, + "port", 0, + "close-socket", FALSE, + "reuse", TRUE, NULL); + g_object_set (self->rtcp_src, + "port", 0, + "close-socket", FALSE, + "reuse", TRUE, + NULL); + + g_object_set (self->rtp_sink, "async", FALSE, "sync", FALSE, NULL); + g_object_set (self->rtcp_sink, "async", FALSE, "sync", FALSE, NULL); + g_object_bind_property (self, "rport-rtp", self->rtp_sink, "port", G_BINDING_BIDIRECTIONAL); @@ -535,69 +504,65 @@ send_pipeline_init (CallsSipMediaPipeline *self, G_BINDING_BIDIRECTIONAL); g_object_bind_property (self, "rport-rtcp", - self->rtcp_send_sink, "port", + self->rtcp_sink, "port", G_BINDING_BIDIRECTIONAL); g_object_bind_property (self, "remote", - self->rtcp_send_sink, "host", + self->rtcp_sink, "host", G_BINDING_BIDIRECTIONAL); - /* bind sockets for hole punching scheme */ - g_object_get (self->rtp_src, "used-socket", &rtp_sock, NULL); - if (!rtp_sock) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Could not retrieve used socket from RTP udpsrc element"); + + /* Add all elements to the pipeline */ + gst_bin_add_many (GST_BIN (self->pipeline), + self->audio_src, self->audio_sink, + self->rtpbin, + self->rtp_src, self->rtp_sink, + self->rtcp_src, self->rtcp_sink, + NULL); + + /* Setup bus watch */ + self->bus = gst_pipeline_get_bus (GST_PIPELINE (self->pipeline)); + self->bus_watch_id = gst_bus_add_watch (self->bus, on_bus_message, self); + + if (!setup_socket_reuse (self, error)) return FALSE; - } - - g_object_set (self->rtp_sink, - "socket", rtp_sock, - "close-socket", FALSE, - NULL); - - - g_object_get (self->rtcp_recv_src, "used-socket", &rtcp_sock, NULL); - if (!rtcp_sock) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Could not retrieve used socket from RTCP udpsrc element"); - return FALSE; - } - g_object_set (self->rtcp_send_sink, - "socket", rtcp_sock, - "close-socket", FALSE, - NULL); - g_object_set (self->rtcp_send_src, - "socket", rtcp_sock, - "close-socket", FALSE, - NULL); - - gst_bin_add (GST_BIN (self->send_pipeline), self->send_rtpbin); - gst_bin_add_many (GST_BIN (self->send_pipeline), self->rtp_sink, - self->rtcp_send_src, self->rtcp_send_sink, NULL); - - self->bus_send = gst_pipeline_get_bus (GST_PIPELINE (self->send_pipeline)); - self->bus_watch_send = gst_bus_add_watch (self->bus_send, on_bus_message, self); return TRUE; } static gboolean -recv_pipeline_link_elements (CallsSipMediaPipeline *self, - GError **error) +pipeline_link_elements (CallsSipMediaPipeline *self, + GError **error) { g_autoptr (GstPad) srcpad = NULL; g_autoptr (GstPad) sinkpad = NULL; g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); + /* link to payloader */ + +#if GST_CHECK_VERSION (1, 20, 0) + sinkpad = gst_element_request_pad_simple (self->rtpbin, "send_rtp_sink_0"); +#else + sinkpad = gst_element_get_request_pad (self->rtpbin, "send_rtp_sink_0"); +#endif + srcpad = gst_element_get_static_pad (self->payloader, "src"); + if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { + if (error) + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Failed to link payloader to rtpbin"); + return FALSE; + } + + + /* Transmitter pads */ + srcpad = gst_element_get_static_pad (self->rtp_src, "src"); #if GST_CHECK_VERSION (1, 20, 0) - sinkpad = gst_element_request_pad_simple (self->recv_rtpbin, "recv_rtp_sink_0"); + sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtp_sink_0"); #else - sinkpad = gst_element_get_request_pad (self->recv_rtpbin, "recv_rtp_sink_0"); + sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtp_sink_0"); #endif if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) @@ -609,11 +574,23 @@ recv_pipeline_link_elements (CallsSipMediaPipeline *self, gst_object_unref (srcpad); gst_object_unref (sinkpad); - srcpad = gst_element_get_static_pad (self->rtcp_recv_src, "src"); + srcpad = gst_element_get_static_pad (self->rtpbin, "send_rtp_src_0"); + sinkpad = gst_element_get_static_pad (self->rtp_sink, "sink"); + if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { + if (error) + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Failed to link rtpbin to rtpsink"); + return FALSE; + } + + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + + srcpad = gst_element_get_static_pad (self->rtcp_src, "src"); #if GST_CHECK_VERSION (1, 20 , 0) - sinkpad = gst_element_request_pad_simple (self->recv_rtpbin, "recv_rtcp_sink_0"); + sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtcp_sink_0"); #else - sinkpad = gst_element_get_request_pad (self->recv_rtpbin, "recv_rtcp_sink_0"); + sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtcp_sink_0"); #endif if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) @@ -622,32 +599,33 @@ recv_pipeline_link_elements (CallsSipMediaPipeline *self, return FALSE; } - gst_object_unref (srcpad); - gst_object_unref (sinkpad); + gst_object_unref (srcpad); + gst_object_unref (sinkpad); -#if GST_CHECK_VERSION (1, 20, 0) - srcpad = gst_element_request_pad_simple (self->recv_rtpbin, "send_rtcp_src_0"); -#else - srcpad = gst_element_get_request_pad (self->recv_rtpbin, "send_rtcp_src_0"); -#endif - sinkpad = gst_element_get_static_pad (self->rtcp_recv_sink, "sink"); - if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to link rtpbin to rtcpsink"); - return FALSE; - } + #if GST_CHECK_VERSION (1, 20, 0) + srcpad = gst_element_request_pad_simple (self->rtpbin, "send_rtcp_src_0"); + #else + srcpad = gst_element_get_request_pad (self->rtpbin, "send_rtcp_src_0"); + #endif + sinkpad = gst_element_get_static_pad (self->rtcp_sink, "sink"); + if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { + if (error) + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Failed to link rtpbin to rtcpsink"); + return FALSE; + } - g_signal_connect (self->recv_rtpbin, "pad-added", G_CALLBACK (on_pad_added), self->depayloader); + /* can only link to depayloader after RTP payload has been verified */ + g_signal_connect (self->rtpbin, "pad-added", G_CALLBACK (on_pad_added), self->depayloader); return TRUE; } static gboolean -recv_pipeline_setup_codecs (CallsSipMediaPipeline *self, - MediaCodecInfo *codec, - GError **error) +pipeline_setup_codecs (CallsSipMediaPipeline *self, + MediaCodecInfo *codec, + GError **error) { g_autoptr (GstCaps) caps = NULL; g_autofree char *caps_string = NULL; @@ -658,10 +636,22 @@ recv_pipeline_setup_codecs (CallsSipMediaPipeline *self, MAKE_ELEMENT (decoder, codec->gst_decoder_name, "decoder"); MAKE_ELEMENT (depayloader, codec->gst_depayloader_name, "depayloader"); - gst_bin_add_many (GST_BIN (self->recv_pipeline), self->depayloader, self->decoder, - self->audiosink, NULL); + MAKE_ELEMENT (encoder, codec->gst_encoder_name, "encoder"); + MAKE_ELEMENT (payloader, codec->gst_payloader_name, "payloader"); - if (!gst_element_link_many (self->depayloader, self->decoder, self->audiosink, NULL)) { + gst_bin_add_many (GST_BIN (self->pipeline), + self->depayloader, self->decoder, + self->payloader, self->encoder, + NULL); + + if (!gst_element_link_many (self->audio_src, self->encoder, self->payloader, NULL)) { + if (error) + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Failed to link audiosrc encoder and payloader"); + return FALSE; + } + + if (!gst_element_link_many (self->depayloader, self->decoder, self->audio_sink, NULL)) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link depayloader decoder and audiosink"); @@ -679,119 +669,6 @@ recv_pipeline_setup_codecs (CallsSipMediaPipeline *self, "caps", caps, NULL); - return recv_pipeline_link_elements (self, error); -} - - -/** - * Prepares a skeleton receiver pipeline which can later be - * used to plug codec specific element in. - * This pipeline just consists of (minimally linked) rtpbin - * audio sink and two udpsrc elements, one for RTP and one for RTCP. - * - * The pipeline will be set ready to let the OS allocate sockets - * for us instead of building and providing GSockets ourselves - * by hand. These GSockets are reused for any outgoing traffic in our - * hole punching scheme as a simple NAT traversal technique. - */ -static gboolean -recv_pipeline_init (CallsSipMediaPipeline *self, - GError **error) -{ - g_autoptr (GSocket) rtcp_sock = NULL; - const char *env_var; - - g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); - - self->recv_pipeline = gst_pipeline_new ("rtp-recv-pipeline"); - - if (!self->recv_pipeline) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Could not create receiver pipeline"); - return FALSE; - } - - gst_object_ref_sink (self->recv_pipeline); - - env_var = g_getenv ("CALLS_AUDIOSINK"); - if (!STR_IS_NULL_OR_EMPTY (env_var)) { - MAKE_ELEMENT (audiosink, env_var, "audiosink"); - } else { - g_autoptr (GstStructure) gst_props = NULL; - - MAKE_ELEMENT (audiosink, "pulsesink", "audiosink"); - - /* enable echo cancellation and set buffer size to 40ms */ - gst_props = gst_structure_new ("props", - "media.role", G_TYPE_STRING, "phone", - "filter.want", G_TYPE_STRING, "echo-cancel", - NULL); - - g_object_set (self->audiosink, - "buffer-time", (gint64) 40000, - "stream-properties", gst_props, - NULL); - - } - - MAKE_ELEMENT (recv_rtpbin, "rtpbin", "recv-rtpbin") - MAKE_ELEMENT (rtp_src, "udpsrc", "rtp-udp-src"); - MAKE_ELEMENT (rtcp_recv_src, "udpsrc", "rtcp-udp-recv-src"); - MAKE_ELEMENT (rtcp_recv_sink, "udpsink", "rtcp-udp-recv-sink"); - - - g_object_set (self->rtcp_recv_sink, - "async", FALSE, - "sync", FALSE, - NULL); - - - /* port 0 means allocate */ - g_object_set (self->rtp_src, "port", 0, NULL); - g_object_set (self->rtcp_recv_src, "port", 0, NULL); - - g_object_bind_property (self, "rport-rtcp", - self->rtcp_recv_sink, "port", - G_BINDING_BIDIRECTIONAL); - - g_object_bind_property (self, "remote", - self->rtcp_recv_sink, "host", - G_BINDING_BIDIRECTIONAL); - - gst_bin_add (GST_BIN (self->recv_pipeline), self->recv_rtpbin); - gst_bin_add_many (GST_BIN (self->recv_pipeline), self->rtp_src, - self->rtcp_recv_src, self->rtcp_recv_sink, NULL); - - self->bus_recv = gst_pipeline_get_bus (GST_PIPELINE (self->recv_pipeline)); - self->bus_watch_recv = gst_bus_add_watch (self->bus_recv, on_bus_message, self); - - g_object_set (self->rtp_src, - "close-socket", FALSE, - "reuse", TRUE, - NULL); - g_object_set (self->rtcp_recv_src, - "close-socket", FALSE, - "reuse", TRUE, - NULL); - - /* Set pipeline to ready to get ports allocated */ - gst_element_set_state (self->recv_pipeline, GST_STATE_READY); - - g_object_get (self->rtcp_recv_src, "used-socket", &rtcp_sock, NULL); - if (!rtcp_sock) { - if (error) - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Could not retrieve used socket from RTCP udpsrc element"); - return FALSE; - } - - /* Ports are allocated. Let's reuse the socket for rtcp source in the sink for NAT traversal*/ - g_object_set (self->rtcp_recv_sink, - "socket", rtcp_sock, - "close-socket", FALSE, - NULL); - return TRUE; } @@ -883,14 +760,8 @@ calls_sip_media_pipeline_constructed (GObject *object) set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING); - if (!recv_pipeline_init (self, &error)) { - g_warning ("Could not create receive pipeline: %s", error->message); - set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); - return; - } - - if (!send_pipeline_init (self, &error)) { - g_warning ("Could not create send pipeline: %s", error->message); + if (!pipeline_init (self, &error)) { + g_warning ("Could not create pipeline: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; } @@ -906,12 +777,9 @@ calls_sip_media_pipeline_finalize (GObject *object) calls_sip_media_pipeline_stop (self); - gst_object_unref (self->send_pipeline); - gst_object_unref (self->recv_pipeline); - gst_bus_remove_watch (self->bus_send); - gst_object_unref (self->bus_send); - gst_bus_remove_watch (self->bus_recv); - gst_object_unref (self->bus_recv); + gst_object_unref (self->pipeline); + gst_bus_remove_watch (self->bus); + gst_object_unref (self->bus); g_free (self->remote); @@ -1022,15 +890,15 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self, return; } - if (!recv_pipeline_setup_codecs (self, codec, &error)) { - g_warning ("Error trying to setup codec for receive pipeline: %s", + if (!pipeline_setup_codecs (self, codec, &error)) { + g_warning ("Error trying to setup codecs for pipeline: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; } - if (!send_pipeline_setup_codecs (self, codec, &error)) { - g_warning ("Error trying to setup codec for send pipeline: %s", + if (!pipeline_link_elements (self, &error)) { + g_warning ("Not all pads could be linked: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; @@ -1042,6 +910,7 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self, set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY); } + static void diagnose_used_ports_in_socket (GSocket *socket) { @@ -1122,8 +991,11 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self) g_debug ("Starting media pipeline"); - gst_element_set_state (self->recv_pipeline, GST_STATE_PLAYING); - gst_element_set_state (self->send_pipeline, GST_STATE_PLAYING); + g_debug ("RTP/RTCP port before starting pipeline: %d/%d", + calls_sip_media_pipeline_get_rtp_port (self), + calls_sip_media_pipeline_get_rtcp_port (self)); + + gst_element_set_state (self->pipeline, GST_STATE_PLAYING); g_debug ("RTP/RTCP port after starting pipeline: %d/%d", calls_sip_media_pipeline_get_rtp_port (self), @@ -1143,9 +1015,7 @@ calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self) g_debug ("Stopping media pipeline"); - /* Stop the pipelines in reverse order (compared to the starting) */ - gst_element_set_state (self->send_pipeline, GST_STATE_NULL); - gst_element_set_state (self->recv_pipeline, GST_STATE_NULL); + gst_element_set_state (self->pipeline, GST_STATE_NULL); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING); } @@ -1179,10 +1049,8 @@ calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self, "Pausing" : "Unpausing"); - gst_element_set_state (self->recv_pipeline, pause ? - GST_STATE_PAUSED : - GST_STATE_PLAYING); - gst_element_set_state (self->send_pipeline, pause ? + + gst_element_set_state (self->pipeline, pause ? GST_STATE_PAUSED : GST_STATE_PLAYING); @@ -1212,7 +1080,7 @@ calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self) g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), 0); - g_object_get (self->rtcp_recv_src, "port", &port, NULL); + g_object_get (self->rtcp_src, "port", &port, NULL); return port; }