/* * Copyright (C) 2021-2022 Purism SPC * * This file is part of Calls. * * Calls is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Calls is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Calls. If not, see . * * Author: Evangelos Ribeiro Tzaras * * SPDX-License-Identifier: GPL-3.0-or-later * */ #define G_LOG_DOMAIN "CallsSipMediaPipeline" #include "calls-media-pipeline-enums.h" #include "calls-sip-media-pipeline.h" #include "util.h" #include #include #include #define MAKE_ELEMENT(var, element, name) \ self->var = gst_element_factory_make (element, name); \ if (!self->var) { \ if (error) \ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, \ "Could not create '%s' element of type %s", \ name ? : "unnamed", element); \ return FALSE; \ } /** * SECTION:sip-media-pipeline * @short_description: * @Title: * * #CallsSipMediaPipeline is responsible for building Gstreamer pipelines. * Usually a sender and receiver pipeline is employed. * * The sender pipeline records audio and uses RTP to send it out over the network * to the specified host. * The receiver pipeline receives RTP from the network and plays the audio * on the system. * * Both pipelines are using RTCP. */ /* The following defines are used to set/reset bitmaps of playing/paused/stop state */ #define EL_PIPELINE (1<<0) #define EL_RTPBIN (1<<1) #define EL_RTP_SRC (1<<2) #define EL_RTP_SINK (1<<3) #define EL_RTCP_SRC (1<<4) #define EL_RTCP_SINK (1<<5) #define EL_SRTP_ENCODER (1<<6) #define EL_SRTP_DECODER (1<<7) #define EL_AUDIO_SRC (1<<8) #define EL_AUDIO_SINK (1<<9) #define EL_PAYLOADER (1<<10) #define EL_DEPAYLOADER (1<<11) #define EL_ENCODER (1<<12) #define EL_DECODER (1<<13) #define EL_SENDING \ (EL_AUDIO_SRC | EL_ENCODER | EL_PAYLOADER | \ EL_RTPBIN | EL_RTP_SINK | EL_RTCP_SINK) #define EL_ALL_RTP \ (EL_PIPELINE | EL_RTPBIN | \ EL_RTP_SRC | EL_RTP_SINK | EL_RTCP_SRC | EL_RTCP_SINK | \ EL_AUDIO_SRC | EL_AUDIO_SINK | \ EL_ENCODER | EL_DECODER | EL_PAYLOADER | EL_DEPAYLOADER) #define EL_ALL_SRTP (EL_ALL_RTP | EL_SRTP_ENCODER | EL_SRTP_DECODER) enum { PROP_0, PROP_CODEC, PROP_REMOTE, PROP_RPORT_RTP, PROP_RPORT_RTCP, PROP_DEBUG, PROP_STATE, PROP_LAST_PROP, }; enum { SENDING_STARTED, N_SIGNALS }; static GParamSpec *props[PROP_LAST_PROP]; static uint signals[N_SIGNALS]; struct _CallsSipMediaPipeline { GObject parent; MediaCodecInfo *codec; gboolean debug; CallsMediaPipelineState state; uint element_map_playing; uint element_map_paused; uint element_map_stopped; gboolean emitted_sending_signal; /* Connection details */ char *remote; gint rport_rtp; gint rport_rtcp; GstElement *pipeline; GstElement *rtpbin; GstElement *rtp_src; GstElement *rtp_sink; GstElement *rtcp_sink; GstElement *rtcp_src; GstElement *audio_src; GstElement *payloader; GstElement *encoder; GstElement *audio_sink; GstElement *depayloader; GstElement *decoder; /* SRTP */ gboolean use_srtp; GstElement *srtpenc; GstElement *srtpdec; gulong request_rtpbin_rtp_decoder_id; gulong request_rtpbin_rtp_encoder_id; gulong request_rtpbin_rtcp_encoder_id; gulong request_rtpbin_rtcp_decoder_id; /* Gstreamer busses */ GstBus *bus; guint bus_watch_id; }; #if GLIB_CHECK_VERSION (2, 70, 0) G_DEFINE_FINAL_TYPE (CallsSipMediaPipeline, calls_sip_media_pipeline, G_TYPE_OBJECT) #else G_DEFINE_TYPE (CallsSipMediaPipeline, calls_sip_media_pipeline, G_TYPE_OBJECT) #endif static void set_state (CallsSipMediaPipeline *self, CallsMediaPipelineState state) { g_autoptr (GEnumClass) enum_class = NULL; GEnumValue *enum_val; g_autofree char *fname = NULL; g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); if (self->state == state) return; self->state = state; g_object_notify_by_pspec (G_OBJECT (self), props[PROP_STATE]); self->emitted_sending_signal = FALSE; if (state == CALLS_MEDIA_PIPELINE_STATE_INITIALIZING) return; enum_class = g_type_class_ref (CALLS_TYPE_MEDIA_PIPELINE_STATE); enum_val = g_enum_get_value (enum_class, state); fname = g_strdup_printf ("calls-%s", enum_val->value_nick); GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, fname); } static void check_element_maps (CallsSipMediaPipeline *self) { uint all_rtp_elements; g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); all_rtp_elements = self->use_srtp ? EL_ALL_SRTP : EL_ALL_RTP; if (self->element_map_playing == all_rtp_elements) { g_debug ("All pipeline elements are playing"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING); return; } if (self->element_map_paused == all_rtp_elements) { g_debug ("All pipeline elements are paused"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED); return; } if (self->element_map_stopped == all_rtp_elements) { g_debug ("All pipeline elements are stopped"); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED); return; } if ((self->element_map_playing & (EL_SENDING)) == (EL_SENDING) && !self->emitted_sending_signal) { g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d", self->remote, self->rport_rtp, self->rport_rtcp); g_signal_emit (self, signals[SENDING_STARTED], 0); self->emitted_sending_signal = TRUE; } } /* rtpbin adds a pad once the payload is verified */ static void on_pad_added (GstElement *rtpbin, GstPad *srcpad, GstElement *depayloader) { GstPad *sinkpad; g_debug ("pad added: %s", GST_PAD_NAME (srcpad)); sinkpad = gst_element_get_static_pad (depayloader, "sink"); g_debug ("linking to %s", GST_PAD_NAME (sinkpad)); if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) g_warning ("Failed to link rtpbin to depayloader"); gst_object_unref (sinkpad); } static gboolean on_bus_message (GstBus *bus, GstMessage *message, gpointer data) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (data); switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_ERROR: { g_autoptr (GError) error = NULL; g_autofree char *msg = NULL; gst_message_parse_error (message, &error, &msg); g_warning ("Error on the message bus: %s (%s)", error->message, msg); break; } case GST_MESSAGE_WARNING: { g_autoptr (GError) error = NULL; g_autofree char *msg = NULL; gst_message_parse_warning (message, &error, &msg); g_warning ("Warning on the message bus: %s (%s)", error->message, msg); break; } case GST_MESSAGE_EOS: g_debug ("Received end of stream"); calls_sip_media_pipeline_stop (self); break; case GST_MESSAGE_STATE_CHANGED: { GstState oldstate; GstState newstate; uint element_id = 0; uint unset_element_id; gst_message_parse_state_changed (message, &oldstate, &newstate, NULL); g_debug ("Element %s has changed state from %s to %s", GST_OBJECT_NAME (message->src), gst_element_state_get_name (oldstate), gst_element_state_get_name (newstate)); if (message->src == GST_OBJECT (self->pipeline)) element_id = EL_PIPELINE; else if (message->src == GST_OBJECT (self->rtpbin)) element_id = EL_RTPBIN; else if (message->src == GST_OBJECT (self->rtp_src)) element_id = EL_RTP_SRC; else if (message->src == GST_OBJECT (self->rtp_sink)) element_id = EL_RTP_SINK; else if (message->src == GST_OBJECT (self->rtcp_src)) element_id = EL_RTCP_SRC; else if (message->src == GST_OBJECT (self->rtcp_sink)) element_id = EL_RTCP_SINK; else if (message->src == GST_OBJECT (self->srtpenc)) element_id = EL_SRTP_ENCODER; else if (message->src == GST_OBJECT (self->srtpdec)) element_id = EL_SRTP_DECODER; else if (message->src == GST_OBJECT (self->audio_src)) element_id = EL_AUDIO_SRC; else if (message->src == GST_OBJECT (self->audio_sink)) element_id = EL_AUDIO_SINK; else if (message->src == GST_OBJECT (self->payloader)) element_id = EL_PAYLOADER; else if (message->src == GST_OBJECT (self->depayloader)) element_id = EL_DEPAYLOADER; else if (message->src == GST_OBJECT (self->encoder)) element_id = EL_ENCODER; else if (message->src == GST_OBJECT (self->decoder)) element_id = EL_DECODER; unset_element_id = G_MAXUINT ^ element_id; if (newstate == GST_STATE_PLAYING) { self->element_map_playing |= element_id; self->element_map_paused &= unset_element_id; self->element_map_stopped &= unset_element_id; } else if (newstate == GST_STATE_PAUSED) { self->element_map_paused |= element_id; self->element_map_playing &= unset_element_id; self->element_map_stopped &= unset_element_id; } else if (newstate == GST_STATE_NULL) { self->element_map_stopped |= element_id; self->element_map_playing &= unset_element_id; self->element_map_paused &= unset_element_id; } check_element_maps (self); break; } default: if (self->debug) g_debug ("Got unhandled %s message", GST_MESSAGE_TYPE_NAME (message)); break; } /* keep watching for messages on the bus */ return TRUE; } /* SRTP setup */ static GstCaps * on_srtpdec_request_key (GstElement *srtpdec, guint ssrc, gpointer user_data) { /* TODO get key */ return gst_caps_new_simple ("application/x-srtp", "srtp-cipher", G_TYPE_STRING, "null", "srtcp-cipher", G_TYPE_STRING, "null", "srtp-auth", G_TYPE_STRING, "null", "srtcp-auth", G_TYPE_STRING, "null", NULL); } static GstElement * on_rtpbin_request_decoder (GstElement *rtpbin, guint session_id, gpointer user_data) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (user_data); if (!self->use_srtp) return NULL; return gst_object_ref (self->srtpdec); } static GstElement * on_rtpbin_request_encoder (GstElement *rtpbin, guint session_id, gpointer user_data) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (user_data); if (!self->use_srtp) return NULL; return gst_object_ref (self->srtpenc); } /* Pipeline setup */ static gboolean setup_socket_reuse (CallsSipMediaPipeline *self, GError **error) { g_autoptr (GSocket) rtp_sock = NULL; g_autoptr (GSocket) rtcp_sock = NULL; /* set rtp element ready and lock it's state so it doesn't get stopped */ gst_element_set_locked_state (self->rtp_src, TRUE); gst_element_set_state (self->rtp_src, GST_STATE_READY); g_object_get (self->rtp_src, "used-socket", &rtp_sock, NULL); if (!rtp_sock) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Could not retrieve used socket from RTP udpsrc element"); return FALSE; } /* configure socket and don't close it, since it belongs to rtp_src */ g_object_set (self->rtp_sink, "socket", rtp_sock, "close-socket", FALSE, NULL); /* set rtcp element ready and lock it's state so it doesn't get stopped */ gst_element_set_locked_state (self->rtcp_src, TRUE); gst_element_set_state (self->rtcp_src, GST_STATE_READY); g_object_get (self->rtcp_src, "used-socket", &rtcp_sock, NULL); if (!rtcp_sock) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Could not retrieve used socket from RTCP udpsrc element"); return FALSE; } /* configure socket and don't close it, since it belongs to rtcp_src */ g_object_set (self->rtcp_sink, "socket", rtcp_sock, "close-socket", FALSE, NULL); return TRUE; } static gboolean pipeline_init (CallsSipMediaPipeline *self, GError **error) { GstPad *tmppad; const char *env_var; g_assert (CALLS_SIP_MEDIA_PIPELINE (self)); self->pipeline = gst_pipeline_new ("media-pipeline"); if (!self->pipeline) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Could not create media pipeline"); return FALSE; } gst_object_ref_sink (self->pipeline); /* Audio source*/ env_var = g_getenv ("CALLS_AUDIOSRC"); if (!STR_IS_NULL_OR_EMPTY (env_var)) { MAKE_ELEMENT (audio_src, env_var, "audiosource"); } else { g_autoptr (GstStructure) gst_props = NULL; MAKE_ELEMENT (audio_src, "pulsesrc", "audiosource"); /* enable echo cancellation and set buffer size to 40ms */ gst_props = gst_structure_new ("props", "media.role", G_TYPE_STRING, "phone", "filter.want", G_TYPE_STRING, "echo-cancel", NULL); g_object_set (self->audio_src, "buffer-time", (gint64) 40000, "stream-properties", gst_props, NULL); } /* Audio sink */ env_var = g_getenv ("CALLS_AUDIOSINK"); if (!STR_IS_NULL_OR_EMPTY (env_var)) { MAKE_ELEMENT (audio_sink, env_var, "audiosink"); } else { g_autoptr (GstStructure) gst_props = NULL; MAKE_ELEMENT (audio_sink, "pulsesink", "audiosink"); /* enable echo cancellation and set buffer size to 40ms */ gst_props = gst_structure_new ("props", "media.role", G_TYPE_STRING, "phone", "filter.want", G_TYPE_STRING, "echo-cancel", NULL); g_object_set (self->audio_sink, "buffer-time", (gint64) 40000, "stream-properties", gst_props, NULL); } /* rtpbin */ MAKE_ELEMENT (rtpbin, "rtpbin", "rtpbin"); /* srtp elements */ MAKE_ELEMENT (srtpdec, "srtpdec", "srtpdec"); g_signal_connect (self->srtpdec, "request-key", G_CALLBACK (on_srtpdec_request_key), self); MAKE_ELEMENT (srtpenc, "srtpenc", "srtpenc"); g_object_set (self->srtpenc, "rtp-cipher", 0, "rtp-auth", 0, "rtcp-cipher", 0, "rtcp-auth", 0, NULL); #if GST_CHECK_VERSION (1, 20, 0) tmppad = gst_element_request_pad_simple (self->srtpenc, "rtp_sink_0"); #else tmppad = gst_element_get_request_pad (self->srtpenc, "rtp_sink_0"); #endif gst_object_unref (tmppad); #if GST_CHECK_VERSION (1, 20, 0) tmppad = gst_element_request_pad_simple (self->srtpenc, "rtcp_sink_0"); #else tmppad = gst_element_get_request_pad (self->srtpenc, "rtcp_sink_0"); #endif gst_object_unref (tmppad); self->request_rtpbin_rtp_encoder_id = g_signal_connect (self->rtpbin, "request-rtp-encoder", G_CALLBACK (on_rtpbin_request_encoder), self); self->request_rtpbin_rtp_decoder_id = g_signal_connect (self->rtpbin, "request-rtp-decoder", G_CALLBACK (on_rtpbin_request_decoder), self); self->request_rtpbin_rtcp_encoder_id = g_signal_connect (self->rtpbin, "request-rtcp-encoder", G_CALLBACK (on_rtpbin_request_encoder), self); self->request_rtpbin_rtcp_decoder_id = g_signal_connect (self->rtpbin, "request-rtcp-decoder", G_CALLBACK (on_rtpbin_request_decoder), self); /* UDP sources and sinks for RTP and RTCP */ MAKE_ELEMENT (rtp_src, "udpsrc", "rtp-udp-src"); MAKE_ELEMENT (rtp_sink, "udpsink", "rtp-udp-sink"); MAKE_ELEMENT (rtcp_src, "udpsrc", "rtcp-udp-src"); MAKE_ELEMENT (rtcp_sink, "udpsink", "rtcp-udp-sink"); /* port 0 means letting the OS allocate */ g_object_set (self->rtp_src, "port", 0, NULL); g_object_set (self->rtcp_src, "port", 0, NULL); g_object_set (self->rtp_sink, "async", FALSE, "sync", FALSE, NULL); g_object_set (self->rtcp_sink, "async", FALSE, "sync", FALSE, NULL); g_object_bind_property (self, "rport-rtp", self->rtp_sink, "port", G_BINDING_BIDIRECTIONAL); g_object_bind_property (self, "remote", self->rtp_sink, "host", G_BINDING_BIDIRECTIONAL); g_object_bind_property (self, "rport-rtcp", self->rtcp_sink, "port", G_BINDING_BIDIRECTIONAL); g_object_bind_property (self, "remote", self->rtcp_sink, "host", G_BINDING_BIDIRECTIONAL); /* Add all elements to the pipeline */ gst_bin_add_many (GST_BIN (self->pipeline), self->audio_src, self->audio_sink, self->rtpbin, self->rtp_src, self->rtp_sink, self->rtcp_src, self->rtcp_sink, NULL); /* Setup bus watch */ self->bus = gst_pipeline_get_bus (GST_PIPELINE (self->pipeline)); self->bus_watch_id = gst_bus_add_watch (self->bus, on_bus_message, self); if (!setup_socket_reuse (self, error)) return FALSE; return TRUE; } static gboolean pipeline_link_elements (CallsSipMediaPipeline *self, GError **error) { g_autoptr (GstPad) srcpad = NULL; g_autoptr (GstPad) sinkpad = NULL; GstPadLinkReturn ret; g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); /* link to payloader */ #if GST_CHECK_VERSION (1, 20, 0) sinkpad = gst_element_request_pad_simple (self->rtpbin, "send_rtp_sink_0"); #else sinkpad = gst_element_get_request_pad (self->rtpbin, "send_rtp_sink_0"); #endif srcpad = gst_element_get_static_pad (self->payloader, "src"); if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link payloader to rtpbin"); return FALSE; } /* Transmitter pads */ srcpad = gst_element_get_static_pad (self->rtp_src, "src"); #if GST_CHECK_VERSION (1, 20, 0) sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtp_sink_0"); #else sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtp_sink_0"); #endif ret = gst_pad_link (srcpad, sinkpad); if (ret != GST_PAD_LINK_OK) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link rtpsrc to rtpbin"); return FALSE; } gst_object_unref (srcpad); gst_object_unref (sinkpad); srcpad = gst_element_get_static_pad (self->rtpbin, "send_rtp_src_0"); sinkpad = gst_element_get_static_pad (self->rtp_sink, "sink"); if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link rtpbin to rtpsink"); return FALSE; } gst_object_unref (srcpad); gst_object_unref (sinkpad); srcpad = gst_element_get_static_pad (self->rtcp_src, "src"); #if GST_CHECK_VERSION (1, 20, 0) sinkpad = gst_element_request_pad_simple (self->rtpbin, "recv_rtcp_sink_0"); #else sinkpad = gst_element_get_request_pad (self->rtpbin, "recv_rtcp_sink_0"); #endif if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link rtcpsrc to rtpbin"); return FALSE; } gst_object_unref (srcpad); gst_object_unref (sinkpad); #if GST_CHECK_VERSION (1, 20, 0) srcpad = gst_element_request_pad_simple (self->rtpbin, "send_rtcp_src_0"); #else srcpad = gst_element_get_request_pad (self->rtpbin, "send_rtcp_src_0"); #endif sinkpad = gst_element_get_static_pad (self->rtcp_sink, "sink"); if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link rtpbin to rtcpsink"); return FALSE; } /* can only link to depayloader after RTP payload has been verified */ g_signal_connect (self->rtpbin, "pad-added", G_CALLBACK (on_pad_added), self->depayloader); /* request-encoder and request-decoder signals have been emitted after linking pads from rtpbin */ if (self->request_rtpbin_rtp_decoder_id) g_signal_handler_disconnect (self->rtpbin, self->request_rtpbin_rtp_decoder_id); if (self->request_rtpbin_rtp_encoder_id) g_signal_handler_disconnect (self->rtpbin, self->request_rtpbin_rtp_encoder_id); if (self->request_rtpbin_rtcp_decoder_id) g_signal_handler_disconnect (self->rtpbin, self->request_rtpbin_rtcp_decoder_id); if (self->request_rtpbin_rtcp_encoder_id) g_signal_handler_disconnect (self->rtpbin, self->request_rtpbin_rtcp_encoder_id); return TRUE; } static gboolean pipeline_setup_codecs (CallsSipMediaPipeline *self, MediaCodecInfo *codec, GError **error) { g_autoptr (GstCaps) caps = NULL; g_autofree char *caps_string = NULL; g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); g_assert (codec); MAKE_ELEMENT (decoder, codec->gst_decoder_name, "decoder"); MAKE_ELEMENT (depayloader, codec->gst_depayloader_name, "depayloader"); MAKE_ELEMENT (encoder, codec->gst_encoder_name, "encoder"); MAKE_ELEMENT (payloader, codec->gst_payloader_name, "payloader"); gst_bin_add_many (GST_BIN (self->pipeline), self->depayloader, self->decoder, self->payloader, self->encoder, NULL); if (!gst_element_link_many (self->audio_src, self->encoder, self->payloader, NULL)) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link audiosrc encoder and payloader"); return FALSE; } if (!gst_element_link_many (self->depayloader, self->decoder, self->audio_sink, NULL)) { if (error) g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Failed to link depayloader decoder and audiosink"); return FALSE; } /* UDP src capabilities */ caps_string = media_codec_get_gst_capabilities (codec, self->use_srtp); g_debug ("Capabilities:\n%s", caps_string); caps = gst_caps_from_string (caps_string); /* set udp sinks and sources for RTP and RTCP */ g_object_set (self->rtp_src, "caps", caps, NULL); return TRUE; } static void calls_sip_media_pipeline_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (object); switch (property_id) { case PROP_CODEC: g_value_set_pointer (value, self->codec); break; case PROP_REMOTE: g_value_set_string (value, self->remote); break; case PROP_RPORT_RTP: g_value_set_uint (value, self->rport_rtp); break; case PROP_RPORT_RTCP: g_value_set_uint (value, self->rport_rtcp); break; case PROP_DEBUG: g_value_set_boolean (value, self->debug); break; case PROP_STATE: g_value_set_enum (value, calls_sip_media_pipeline_get_state (self)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void calls_sip_media_pipeline_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (object); switch (property_id) { case PROP_CODEC: calls_sip_media_pipeline_set_codec (self, g_value_get_pointer (value)); break; case PROP_REMOTE: g_free (self->remote); self->remote = g_value_dup_string (value); break; case PROP_RPORT_RTP: self->rport_rtp = g_value_get_uint (value); break; case PROP_RPORT_RTCP: self->rport_rtcp = g_value_get_uint (value); break; case PROP_DEBUG: self->debug = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void calls_sip_media_pipeline_constructed (GObject *object) { g_autoptr (GError) error = NULL; CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (object); G_OBJECT_CLASS (calls_sip_media_pipeline_parent_class)->constructed (object); set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING); if (!pipeline_init (self, &error)) { g_warning ("Could not create pipeline: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; } set_state (self, CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC); } static void calls_sip_media_pipeline_finalize (GObject *object) { CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (object); calls_sip_media_pipeline_stop (self); gst_object_unref (self->pipeline); gst_bus_remove_watch (self->bus); gst_object_unref (self->bus); gst_object_unref (self->srtpenc); gst_object_unref (self->srtpdec); g_free (self->remote); G_OBJECT_CLASS (calls_sip_media_pipeline_parent_class)->finalize (object); } static void calls_sip_media_pipeline_class_init (CallsSipMediaPipelineClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->set_property = calls_sip_media_pipeline_set_property; object_class->constructed = calls_sip_media_pipeline_constructed; object_class->get_property = calls_sip_media_pipeline_get_property; object_class->finalize = calls_sip_media_pipeline_finalize; /* Maybe we want to turn Codec into a GObject later */ props[PROP_CODEC] = g_param_spec_pointer ("codec", "Codec", "Media codec", G_PARAM_READWRITE); props[PROP_REMOTE] = g_param_spec_string ("remote", "Remote", "Remote host", NULL, G_PARAM_READWRITE); props[PROP_RPORT_RTP] = g_param_spec_uint ("rport-rtp", "rport-rtp", "remote rtp port", 1025, 65535, 5002, G_PARAM_READWRITE); props[PROP_RPORT_RTCP] = g_param_spec_uint ("rport-rtcp", "rport-rtcp", "remote rtcp port", 1025, 65535, 5003, G_PARAM_READWRITE); props[PROP_DEBUG] = g_param_spec_boolean ("debug", "Debug", "Enable debugging information", FALSE, G_PARAM_READWRITE); props[PROP_STATE] = g_param_spec_enum ("state", "State", "The state of the media pipeline", CALLS_TYPE_MEDIA_PIPELINE_STATE, CALLS_MEDIA_PIPELINE_STATE_UNKNOWN, G_PARAM_READABLE); g_object_class_install_properties (object_class, PROP_LAST_PROP, props); signals[SENDING_STARTED] = g_signal_new ("sending-started", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0); } static gboolean usr2_handler (CallsSipMediaPipeline *self) { g_print ("playing: %d\n" "paused: %d\n" "stopped: %d\n" "target map: %d\n" "current state: %d\n", self->element_map_playing, self->element_map_paused, self->element_map_stopped, self->use_srtp ? EL_ALL_SRTP : EL_ALL_RTP, self->state); GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (self->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "usr2-debug"); return G_SOURCE_CONTINUE; } static void calls_sip_media_pipeline_init (CallsSipMediaPipeline *self) { if (!gst_is_initialized ()) gst_init (NULL, NULL); /* Pipeline debugging */ g_unix_signal_add (SIGUSR2, (GSourceFunc) usr2_handler, self); } CallsSipMediaPipeline* calls_sip_media_pipeline_new (MediaCodecInfo *codec) { CallsSipMediaPipeline *pipeline; pipeline = g_object_new (CALLS_TYPE_SIP_MEDIA_PIPELINE, NULL); if (codec) g_object_set (pipeline, "codec", codec, NULL); return pipeline; } void calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self, MediaCodecInfo *codec) { g_autoptr (GError) error = NULL; g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self)); g_return_if_fail (codec); if (self->codec == codec) return; if (self->codec) { g_warning ("Cannot change codec of a pipeline. Use a new pipeline instead."); return; } if (!media_codec_available_in_gst (codec)) { g_warning ("Cannot setup pipeline with codec '%s' because it's not available in GStreamer", codec->name); return; } if (!pipeline_setup_codecs (self, codec, &error)) { g_warning ("Error trying to setup codecs for pipeline: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; } if (!pipeline_link_elements (self, &error)) { g_warning ("Not all pads could be linked: %s", error->message); set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR); return; } self->codec = codec; g_object_notify_by_pspec (G_OBJECT (self), props[PROP_CODEC]); set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY); } static void diagnose_used_ports_in_socket (GSocket *socket) { g_autoptr (GSocketAddress) local_addr = NULL; g_autoptr (GSocketAddress) remote_addr = NULL; guint16 local_port; guint16 remote_port; local_addr = g_socket_get_local_address (socket, NULL); remote_addr = g_socket_get_remote_address (socket, NULL); if (!local_addr) { g_warning ("Could not get local address of socket"); return; } g_assert (G_IS_INET_SOCKET_ADDRESS (local_addr)); local_port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (local_addr)); g_debug ("Using local port %d", local_port); if (!remote_addr) { g_warning ("Could not get remote address of socket"); return; } g_assert (G_IS_INET_SOCKET_ADDRESS (remote_addr)); remote_port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (remote_addr)); g_debug ("Using remote port %d", remote_port); } static void diagnose_ports_in_use (CallsSipMediaPipeline *self) { GSocket *socket_in; GSocket *socket_out; gboolean same_socket = FALSE; g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self)); if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING && self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED) { g_warning ("Cannot diagnose ports when pipeline is not active"); return; } g_object_get (self->rtp_src, "used-socket", &socket_in, NULL); g_object_get (self->rtp_sink, "used-socket", &socket_out, NULL); if (socket_in == NULL || socket_out == NULL) { g_warning ("Could not get used socket"); return; } same_socket = socket_in == socket_out; if (same_socket) { g_debug ("Diagnosing bidirectional socket..."); diagnose_used_ports_in_socket (socket_in); } else { g_debug ("Diagnosing server socket..."); diagnose_used_ports_in_socket (socket_in); g_debug ("Diagnosing client socket..."); diagnose_used_ports_in_socket (socket_out); } } void calls_sip_media_pipeline_start (CallsSipMediaPipeline *self) { g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self)); if (self->state != CALLS_MEDIA_PIPELINE_STATE_READY) { g_warning ("Cannot start pipeline because it's not ready"); return; } g_debug ("Starting media pipeline"); g_debug ("RTP/RTCP port before starting pipeline: %d/%d", calls_sip_media_pipeline_get_rtp_port (self), calls_sip_media_pipeline_get_rtcp_port (self)); /* unlock the state of our udp sources, see setup_socket_reuse() */ gst_element_set_locked_state (self->rtp_src, FALSE); gst_element_set_locked_state (self->rtcp_src, FALSE); gst_element_set_state (self->pipeline, GST_STATE_PLAYING); g_debug ("RTP/RTCP port after starting pipeline: %d/%d", calls_sip_media_pipeline_get_rtp_port (self), calls_sip_media_pipeline_get_rtcp_port (self)); set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING); if (self->debug) diagnose_ports_in_use (self); } void calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self) { g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self)); g_debug ("Stopping media pipeline"); gst_element_set_locked_state (self->rtp_src, FALSE); gst_element_set_locked_state (self->rtcp_src, FALSE); gst_element_set_locked_state (self->rtp_sink, FALSE); gst_element_set_locked_state (self->rtcp_sink, FALSE); gst_element_set_state (self->pipeline, GST_STATE_NULL); set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING); } void calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self, gboolean pause) { g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self)); if (pause && (self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSED || self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING)) return; if (!pause && (self->state == CALLS_MEDIA_PIPELINE_STATE_PLAYING || self->state == CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING)) return; if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING && self->state != CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING && self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED && self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING) { g_warning ("Cannot pause or unpause pipeline because it's not currently active"); return; } g_debug ("%s media pipeline", pause ? "Pausing" : "Unpausing"); /* leave udpsrc running to prevent timeouts */ gst_element_set_locked_state (self->rtp_src, pause); gst_element_set_locked_state (self->rtcp_src, pause); gst_element_set_locked_state (self->rtp_sink, pause); gst_element_set_locked_state (self->rtcp_sink, pause); gst_element_set_state (self->pipeline, pause ? GST_STATE_PAUSED : GST_STATE_PLAYING); set_state (self, pause ? CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING : CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING); } int calls_sip_media_pipeline_get_rtp_port (CallsSipMediaPipeline *self) { int port; g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), 0); g_object_get (self->rtp_src, "port", &port, NULL); return port; } int calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self) { int port; g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), 0); g_object_get (self->rtcp_src, "port", &port, NULL); return port; } CallsMediaPipelineState calls_sip_media_pipeline_get_state (CallsSipMediaPipeline *self) { g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self), CALLS_MEDIA_PIPELINE_STATE_UNKNOWN); return self->state; }