]> www.infradead.org Git - pidgin-chime.git/commitdiff
Add a FsAppTransmitter
authorDavid Woodhouse <dwmw@amazon.co.uk>
Sat, 16 Sep 2017 14:24:02 +0000 (07:24 -0700)
committerDavid Woodhouse <dwmw@amazon.co.uk>
Tue, 31 Oct 2017 13:14:04 +0000 (13:14 +0000)
Makefile.am
chat.c
chime.c
configure.ac
fs-app-stream-transmitter.c [new file with mode: 0644]
fs-app-stream-transmitter.h [new file with mode: 0644]
fs-app-transmitter.c [new file with mode: 0644]
fs-app-transmitter.h [new file with mode: 0644]

index 46583792746b7191f539709f0e9992b256f395a5..8dd7e42ffd7a36374e9f9b39a8959cb3c4dbd05e 100644 (file)
@@ -19,10 +19,12 @@ libchime_la_SOURCES = chime.c buddy.c rooms.c chat.c messages.c conversations.c
        chime-call.c chime-call.h chime-call-audio.c \
        protobuf/auth_message.pb-c.c protobuf/auth_message.pb-c.h \
        protobuf/data_message.pb-c.c protobuf/data_message.pb-c.h \
-       protobuf/rt_message.pb-c.c protobuf/rt_message.pb-c.h
+       protobuf/rt_message.pb-c.c protobuf/rt_message.pb-c.h \
+       fs-app-transmitter.c fs-app-transmitter.h \
+       fs-app-stream-transmitter.c fs-app-stream-transmitter.h
 
-libchime_la_CFLAGS = $(PURPLE_CFLAGS) $(SOUP_CFLAGS) $(JSON_CFLAGS) $(LIBXML_CFLAGS) $(PROTOBUF_CFLAGS) $(OPUS_CFLAGS)
-libchime_la_LIBADD = $(PURPLE_LIBS) $(SOUP_LIBS) $(JSON_LIBS) $(LIBXML_LIBS) $(PROTOBUF_LIBS) $(OPUS_LIBS)
+libchime_la_CFLAGS = $(PURPLE_CFLAGS) $(SOUP_CFLAGS) $(JSON_CFLAGS) $(LIBXML_CFLAGS) $(PROTOBUF_CFLAGS) $(OPUS_CFLAGS) $(FARSTREAM_CFLAGS)
+libchime_la_LIBADD = $(PURPLE_LIBS) $(SOUP_LIBS) $(JSON_LIBS) $(LIBXML_LIBS) $(PROTOBUF_LIBS) $(OPUS_LIBS) $(FARSTREAM_LIBS)
 libchime_la_LDFLAGS = -module -avoid-version -no-undefined
 
 POTFILES = $(libchime_la_SOURCES)
diff --git a/chat.c b/chat.c
index 4c4d3f09e4ca6a5384c997a7b365963697885fa9..571f36a26f8b6936c4b398ab531a576b8a2ce83d 100644 (file)
--- a/chat.c
+++ b/chat.c
@@ -310,21 +310,21 @@ static void audio_joined(GObject *source, GAsyncResult *result, gpointer _chat)
        struct chime_chat *chat = _chat;
 
        chat->audio = chime_connection_join_call_audio_finish(cxn, result, NULL);
-
        if (!chat->audio)
                return;
 
 #if 0 /* FIXME make this work... */
-       const gchar *uuid = chime_call_get_uuid(chat->call);
-       chat->media = purple_media_manager_create_media(purple_media_manager_get(),
+       const gchar *name = chime_call_get_alert_body(chat->call);
+       PurpleMedia *media = purple_media_manager_create_media(purple_media_manager_get(),
                                                               chat->conv->account,
                                                               "fsrawconference",
-                                                              uuid,
+                                                              name,
                                                               TRUE);
-       printf("media for %s %p\n", uuid, chat->media);
-       if (chat->media) {
-               gboolean r = purple_media_add_stream(chat->media, "chime", uuid,
-                                                    PURPLE_MEDIA_AUDIO, TRUE,  "nice", 0, NULL);
+       printf("media for %s %p\n", name, media);
+       if (media) {
+               gboolean r = purple_media_add_stream(media, "chime", name,
+                                                    PURPLE_MEDIA_AUDIO, TRUE,
+                                                    "app", 0, NULL);
                printf("Add stream %s\n", r ? "succeeded" : "failed");
 
        }
diff --git a/chime.c b/chime.c
index c74d778213912055a3fab59e1d925369a20326d2..e47cba79e862bcc254b51ad16c99cf07ce9f9da5 100644 (file)
--- a/chime.c
+++ b/chime.c
@@ -441,11 +441,15 @@ static PurplePluginInfo chime_plugin_info = {
        .actions = chime_purple_plugin_actions,
 };
 
+#include "fs-app-transmitter.h"
+#include <farstream/fs-plugin.h>
+extern GType fs_app_transmitter_register_type(void *);
 static void chime_purple_init_plugin(PurplePlugin *plugin)
 {
        PurpleAccountOption *opt;
        GList *opts = NULL;
 
+       fs_plugin_register_static("app", "transmitter", fs_app_transmitter_register_type(NULL));
        opt = purple_account_option_string_new(_("Signin URL"),
                                               "server", NULL);
        opts = g_list_append(opts, opt);
index 8e6a682288aab4c9c36408792f31d3be5263e23a..eaa5e93796672e58091ff7551d7c0d036e9501bd 100644 (file)
@@ -47,6 +47,7 @@ if test "$purple_datadir" = "" ; then
 fi
 AC_SUBST(pixmapdir, ${purple_datadir}/pixmaps/pidgin/protocols)
 
+PKG_CHECK_MODULES(FARSTREAM, [farstream-0.2])
 PKG_CHECK_MODULES(OPUS, [opus])
 PKG_CHECK_MODULES(PROTOBUF, [libprotobuf-c])
 PKG_CHECK_MODULES(JSON, [json-glib-1.0])
diff --git a/fs-app-stream-transmitter.c b/fs-app-stream-transmitter.c
new file mode 100644 (file)
index 0000000..986376e
--- /dev/null
@@ -0,0 +1,597 @@
+/*
+ * Farstream - Farstream Shared Memory Stream Transmitter
+ *
+ * Copyright 2009 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
+ * Copyright 2009 Nokia Corp.
+ *
+ * fs-app-stream-transmitter.c - A Farstream Shared memory stream transmitter
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+
+/**
+ * SECTION:fs-app-stream-transmitter
+ * @short_description: A stream transmitter object for Shared Memory
+ *
+ * The name of this transmitter is "shm".
+ *
+ * This transmitter is meant to send and received the data from another process
+ * on the same system while minimizing the memory pressure associated with the
+ * use of sockets.
+ *
+ * Two sockets are used to control the shared memory areas. One is used to
+ * send data and one to receive data. The receiver always connects to the
+ * sender. The sender socket must exist before the receiver connects to it.
+ *
+ * Negotiating the paths of the sockets can happen in two ways. If the
+ * create-local-candidates is True then the transmitter will generate the
+ * path of the local candidate and us it as the ip filed in #FsCandidate. The
+ * transmitter will expect the path of the applications sender socket to be in
+ * the "ip" field of the remote candidates #FsCandidate as well.
+ *
+ * Or alternatively, if create-local-candidates is false then
+ * the sender socket can be created by giving the transmitter a candidate
+ * with the path of the socket in the "ip" field of the #FsCandidate. This
+ * #FsCandidate can be given to the #FsStreamTransmitter in two ways, either
+ * by setting the #FsStreamTransmitter:preferred-local-candidates property
+ * or by calling the fs_stream_transmitter_force_remote_candidates() function.
+ * There can be only one single send socket per stream. When the send socket
+ * is ready to be connected to, #FsStreamTransmitter::new-local-candidate signal
+ * will be emitted.
+ *
+ * To connect the receive side to the other application, one must create a
+ * #FsCandidate with the path of the sender's socket in the "username" field.
+ * If the receiver can not connect to the sender,
+ * the fs_stream_transmitter_force_remote_candidates() call will fail.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "fs-app-stream-transmitter.h"
+#include "fs-app-transmitter.h"
+
+#include <farstream/fs-candidate.h>
+#include <farstream/fs-conference.h>
+
+#include <gst/gst.h>
+
+#include <glib/gstdio.h>
+
+#include <string.h>
+#include <sys/types.h>
+
+#ifdef HAVE_UNISTD_H
+# include <unistd.h>
+#endif
+
+#include <stdlib.h>
+
+GST_DEBUG_CATEGORY_EXTERN (fs_app_transmitter_debug);
+#define GST_CAT_DEFAULT fs_app_transmitter_debug
+
+/* Signals */
+enum
+{
+  LAST_SIGNAL
+};
+
+/* props */
+enum
+{
+  PROP_0,
+  PROP_SENDING,
+  PROP_PREFERRED_LOCAL_CANDIDATES,
+  PROP_CREATE_LOCAL_CANDIDATES,
+};
+
+struct _FsAppStreamTransmitterPrivate
+{
+  /* We don't actually hold a ref to this,
+   * But since our parent FsStream can not exist without its parent
+   * FsSession, we should be safe
+   */
+  FsAppTransmitter *transmitter;
+
+  GList *preferred_local_candidates;
+
+  GMutex mutex;
+
+  /* Protected by the mutex */
+  gboolean sending;
+
+  /* Protected by the mutex */
+  FsCandidate **candidates;
+
+  /* Whether we create the local candidate ourselves or rely on the remote end
+   * to pass them to us as part of the candidate */
+  gboolean create_local_candidates;
+
+  /* temporary socket directy in case we made one */
+  gchar *socket_dir;
+
+  ShmSrc **shm_src;
+  ShmSink **shm_sink;
+};
+
+#define FS_APP_STREAM_TRANSMITTER_GET_PRIVATE(o)  \
+  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_APP_STREAM_TRANSMITTER, \
+                                FsAppStreamTransmitterPrivate))
+
+#define FS_APP_STREAM_TRANSMITTER_LOCK(s) \
+  g_mutex_lock (&(s)->priv->mutex)
+#define FS_APP_STREAM_TRANSMITTER_UNLOCK(s) \
+  g_mutex_unlock (&(s)->priv->mutex)
+
+static void fs_app_stream_transmitter_class_init (FsAppStreamTransmitterClass *klass);
+static void fs_app_stream_transmitter_init (FsAppStreamTransmitter *self);
+static void fs_app_stream_transmitter_dispose (GObject *object);
+static void fs_app_stream_transmitter_finalize (GObject *object);
+
+static void fs_app_stream_transmitter_get_property (GObject *object,
+                                                guint prop_id,
+                                                GValue *value,
+                                                GParamSpec *pspec);
+static void fs_app_stream_transmitter_set_property (GObject *object,
+                                                guint prop_id,
+                                                const GValue *value,
+                                                GParamSpec *pspec);
+
+static gboolean fs_app_stream_transmitter_force_remote_candidates (
+    FsStreamTransmitter *streamtransmitter, GList *candidates,
+    GError **error);
+static gboolean fs_app_stream_transmitter_gather_local_candidates (
+    FsStreamTransmitter *streamtransmitter,
+    GError **error);
+
+static gboolean
+fs_app_stream_transmitter_add_sink (FsAppStreamTransmitter *self,
+    FsCandidate *candidate, GError **error);
+
+
+static GObjectClass *parent_class = NULL;
+// static guint signals[LAST_SIGNAL] = { 0 };
+
+static GType type = 0;
+
+GType
+fs_app_stream_transmitter_get_type (void)
+{
+  return type;
+}
+
+GType
+fs_app_stream_transmitter_register_type (FsPlugin *module G_GNUC_UNUSED)
+{
+  static const GTypeInfo info = {
+    sizeof (FsAppStreamTransmitterClass),
+    NULL,
+    NULL,
+    (GClassInitFunc) fs_app_stream_transmitter_class_init,
+    NULL,
+    NULL,
+    sizeof (FsAppStreamTransmitter),
+    0,
+    (GInstanceInitFunc) fs_app_stream_transmitter_init
+  };
+
+  type = g_type_register_static (FS_TYPE_STREAM_TRANSMITTER,
+      "FsAppStreamTransmitter", &info, 0);
+
+  return type;
+}
+
+static void
+fs_app_stream_transmitter_class_init (FsAppStreamTransmitterClass *klass)
+{
+  GObjectClass *gobject_class = (GObjectClass *) klass;
+  FsStreamTransmitterClass *streamtransmitterclass =
+    FS_STREAM_TRANSMITTER_CLASS (klass);
+  GParamSpec *pspec;
+
+  parent_class = g_type_class_peek_parent (klass);
+
+  gobject_class->set_property = fs_app_stream_transmitter_set_property;
+  gobject_class->get_property = fs_app_stream_transmitter_get_property;
+
+  streamtransmitterclass->force_remote_candidates =
+    fs_app_stream_transmitter_force_remote_candidates;
+  streamtransmitterclass->gather_local_candidates =
+    fs_app_stream_transmitter_gather_local_candidates;
+
+  g_object_class_override_property (gobject_class, PROP_SENDING, "sending");
+  g_object_class_override_property (gobject_class,
+      PROP_PREFERRED_LOCAL_CANDIDATES, "preferred-local-candidates");
+
+  pspec = g_param_spec_boolean ("create-local-candidates",
+    "CreateLocalCandidates",
+    "Whether the transmitter should automatically create local candidates",
+    FALSE,
+    G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS);
+  g_object_class_install_property (gobject_class,
+    PROP_CREATE_LOCAL_CANDIDATES,
+    pspec);
+
+
+  gobject_class->dispose = fs_app_stream_transmitter_dispose;
+  gobject_class->finalize = fs_app_stream_transmitter_finalize;
+
+  g_type_class_add_private (klass, sizeof (FsAppStreamTransmitterPrivate));
+}
+
+static void
+fs_app_stream_transmitter_init (FsAppStreamTransmitter *self)
+{
+  /* member init */
+  self->priv = FS_APP_STREAM_TRANSMITTER_GET_PRIVATE (self);
+
+  self->priv->sending = TRUE;
+
+  g_mutex_init (&self->priv->mutex);
+}
+
+static void
+fs_app_stream_transmitter_dispose (GObject *object)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER (object);
+  gint c; /* component_id */
+
+  for (c = 1; c <= self->priv->transmitter->components; c++)
+  {
+    if (self->priv->shm_src[c])
+    {
+      fs_app_transmitter_check_shm_src (self->priv->transmitter,
+          self->priv->shm_src[c], NULL);
+    }
+    self->priv->shm_src[c] = NULL;
+
+    if (self->priv->shm_sink[c])
+    {
+      fs_app_transmitter_check_shm_sink (self->priv->transmitter,
+          self->priv->shm_sink[c], NULL);
+    }
+    self->priv->shm_sink[c] = NULL;
+  }
+
+  if (self->priv->socket_dir != NULL)
+    g_rmdir (self->priv->socket_dir);
+  g_free (self->priv->socket_dir);
+  self->priv->socket_dir = NULL;
+
+  parent_class->dispose (object);
+}
+
+static void
+fs_app_stream_transmitter_finalize (GObject *object)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER (object);
+
+  fs_candidate_list_destroy (self->priv->preferred_local_candidates);
+
+  g_free (self->priv->shm_src);
+  g_free (self->priv->shm_sink);
+  g_mutex_clear (&self->priv->mutex);
+
+  parent_class->finalize (object);
+}
+
+static void
+fs_app_stream_transmitter_get_property (GObject *object,
+                                           guint prop_id,
+                                           GValue *value,
+                                           GParamSpec *pspec)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER (object);
+
+  switch (prop_id)
+  {
+    case PROP_SENDING:
+      FS_APP_STREAM_TRANSMITTER_LOCK (self);
+      g_value_set_boolean (value, self->priv->sending);
+      FS_APP_STREAM_TRANSMITTER_UNLOCK (self);
+      break;
+    case PROP_PREFERRED_LOCAL_CANDIDATES:
+      g_value_set_boxed (value, self->priv->preferred_local_candidates);
+      break;
+    case PROP_CREATE_LOCAL_CANDIDATES:
+      g_value_set_boolean (value, self->priv->create_local_candidates);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+fs_app_stream_transmitter_set_property (GObject *object,
+                                           guint prop_id,
+                                           const GValue *value,
+                                           GParamSpec *pspec)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER (object);
+
+  switch (prop_id) {
+    case PROP_SENDING:
+      FS_APP_STREAM_TRANSMITTER_LOCK (self);
+      self->priv->sending = g_value_get_boolean (value);
+      if (self->priv->shm_sink[1])
+        fs_app_transmitter_sink_set_sending (self->priv->transmitter,
+            self->priv->shm_sink[1], self->priv->sending);
+      FS_APP_STREAM_TRANSMITTER_UNLOCK (self);
+      break;
+    case PROP_PREFERRED_LOCAL_CANDIDATES:
+      self->priv->preferred_local_candidates = g_value_dup_boxed (value);
+      break;
+    case PROP_CREATE_LOCAL_CANDIDATES:
+      self->priv->create_local_candidates = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static gboolean
+fs_app_stream_transmitter_build (FsAppStreamTransmitter *self,
+  GError **error)
+{
+  self->priv->shm_src = g_new0 (ShmSrc *,
+      self->priv->transmitter->components + 1);
+  self->priv->shm_sink = g_new0 (ShmSink *,
+      self->priv->transmitter->components + 1);
+
+  return TRUE;
+}
+
+static void
+got_buffer_func (GstBuffer *buffer, guint component, gpointer data)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER_CAST (data);
+
+  g_signal_emit_by_name (self, "known-source-packet-received", component,
+      buffer);
+}
+
+static void ready_cb (guint component, gchar *path, gpointer data)
+{
+  FsAppStreamTransmitter *self = FS_APP_STREAM_TRANSMITTER_CAST (data);
+  FsCandidate *candidate = fs_candidate_new (NULL, component,
+      FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, path, 0);
+
+  GST_DEBUG ("Emitting new local candidate with path %s", path);
+
+  g_signal_emit_by_name (self, "new-local-candidate", candidate);
+  g_signal_emit_by_name (self, "local-candidates-prepared");
+
+  fs_candidate_destroy (candidate);
+}
+
+static void
+connected_cb (guint component, gint id, gpointer data)
+{
+  FsAppStreamTransmitter *self = data;
+
+  g_signal_emit_by_name (self, "state-changed", component,
+      FS_STREAM_STATE_READY);
+}
+
+static gboolean
+fs_app_stream_transmitter_add_sink (FsAppStreamTransmitter *self,
+    FsCandidate *candidate, GError **error)
+{
+  if (self->priv->create_local_candidates)
+    return TRUE;
+
+  if (!candidate->ip || !candidate->ip[0])
+    return TRUE;
+
+  if (self->priv->shm_sink[candidate->component_id])
+  {
+    if (fs_app_transmitter_check_shm_sink (self->priv->transmitter,
+            self->priv->shm_sink[candidate->component_id], candidate->ip))
+      return TRUE;
+    self->priv->shm_sink[candidate->component_id] = NULL;
+  }
+
+  self->priv->shm_sink[candidate->component_id] =
+    fs_app_transmitter_get_shm_sink (self->priv->transmitter,
+        candidate->component_id, candidate->ip, ready_cb, connected_cb,
+        self, error);
+
+  if (self->priv->shm_sink[candidate->component_id] == NULL)
+    return FALSE;
+
+  if (candidate->component_id == 1)
+    fs_app_transmitter_sink_set_sending (self->priv->transmitter,
+        self->priv->shm_sink[candidate->component_id], self->priv->sending);
+
+  return TRUE;
+}
+
+
+static void
+disconnected_cb (guint component, gint id, gpointer data)
+{
+  FsAppStreamTransmitter *self = data;
+
+  g_signal_emit_by_name (self, "state-changed", component,
+      FS_STREAM_STATE_FAILED);
+}
+
+static gboolean
+fs_app_stream_transmitter_force_remote_candidate (
+    FsAppStreamTransmitter *self, FsCandidate *candidate,
+    GError **error)
+{
+  const gchar *path;
+  if (!fs_app_stream_transmitter_add_sink (self, candidate, error))
+    return FALSE;
+
+  if (self->priv->create_local_candidates)
+    path = candidate->ip;
+  else
+    path = candidate->username;
+
+  if (path && path[0])
+  {
+    if (self->priv->shm_src[candidate->component_id])
+    {
+      if (fs_app_transmitter_check_shm_src (self->priv->transmitter,
+              self->priv->shm_src[candidate->component_id], path))
+        return TRUE;
+      self->priv->shm_src[candidate->component_id] = NULL;
+    }
+
+    self->priv->shm_src[candidate->component_id] =
+      fs_app_transmitter_get_shm_src (self->priv->transmitter,
+          candidate->component_id, path, got_buffer_func, disconnected_cb,
+          self, error);
+
+    if (self->priv->shm_src[candidate->component_id] == NULL)
+      return FALSE;
+  }
+
+  return TRUE;
+}
+
+/**
+ * fs_app_stream_transmitter_force_remote_candidates
+ */
+
+static gboolean
+fs_app_stream_transmitter_force_remote_candidates (
+    FsStreamTransmitter *streamtransmitter, GList *candidates,
+    GError **error)
+{
+  GList *item = NULL;
+  FsAppStreamTransmitter *self =
+    FS_APP_STREAM_TRANSMITTER (streamtransmitter);
+
+  for (item = candidates; item; item = g_list_next (item))
+  {
+    FsCandidate *candidate = item->data;
+
+    if (candidate->component_id == 0 ||
+        candidate->component_id > self->priv->transmitter->components) {
+      g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+          "The candidate passed has an invalid component id %u (not in [1,%u])",
+          candidate->component_id, self->priv->transmitter->components);
+      return FALSE;
+    }
+
+    if ((!candidate->ip || !candidate->ip[0]) &&
+        (!candidate->username || !candidate->username[0]))
+    {
+      g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
+          "The candidate does not have a SINK shm segment in its ip"
+          " or a SRC shm segment in its username");
+      return FALSE;
+    }
+  }
+
+  for (item = candidates; item; item = g_list_next (item))
+    if (!fs_app_stream_transmitter_force_remote_candidate (self,
+            item->data, error))
+      return FALSE;
+
+
+  return TRUE;
+}
+
+
+FsAppStreamTransmitter *
+fs_app_stream_transmitter_newv (FsAppTransmitter *transmitter,
+  guint n_parameters, GParameter *parameters, GError **error)
+{
+  FsAppStreamTransmitter *streamtransmitter = NULL;
+
+  streamtransmitter = g_object_newv (FS_TYPE_APP_STREAM_TRANSMITTER,
+    n_parameters, parameters);
+
+  if (!streamtransmitter) {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+      "Could not build the stream transmitter");
+    return NULL;
+  }
+
+  streamtransmitter->priv->transmitter = transmitter;
+
+  if (!fs_app_stream_transmitter_build (streamtransmitter, error)) {
+    g_object_unref (streamtransmitter);
+    return NULL;
+  }
+
+  return streamtransmitter;
+}
+
+
+static gboolean
+fs_app_stream_transmitter_gather_local_candidates (
+    FsStreamTransmitter *streamtransmitter,
+    GError **error)
+{
+  FsAppStreamTransmitter *self =
+    FS_APP_STREAM_TRANSMITTER (streamtransmitter);
+  GList *item;
+
+  if (self->priv->create_local_candidates)
+  {
+    guint c;
+    gchar *socket_dir;
+
+    socket_dir = g_build_filename (g_get_tmp_dir (),
+      "farstream-shm-XXXXXX", NULL);
+
+    if (g_mkdtemp (socket_dir) == NULL)
+      return FALSE;
+
+    self->priv->socket_dir = socket_dir;
+
+    for (c = 1; c <= self->priv->transmitter->components; c++)
+    {
+      gchar *path = g_strdup_printf ("%s/shm-sink-socket-%d", socket_dir, c);
+
+      self->priv->shm_sink[c] =
+        fs_app_transmitter_get_shm_sink (self->priv->transmitter,
+          c, path, ready_cb, connected_cb, self, error);
+      g_free (path);
+
+      if (self->priv->shm_sink[c] == NULL)
+        return FALSE;
+
+      if (c == 1)
+        fs_app_transmitter_sink_set_sending (self->priv->transmitter,
+            self->priv->shm_sink[c], self->priv->sending);
+    }
+
+    return TRUE;
+  }
+
+  for (item = self->priv->preferred_local_candidates;
+       item;
+       item = g_list_next (item))
+  {
+    FsCandidate *candidate = item->data;
+
+    if (candidate->ip && candidate->ip[0])
+      if (!fs_app_stream_transmitter_add_sink (self, candidate, error))
+        return FALSE;
+  }
+
+  return TRUE;
+}
diff --git a/fs-app-stream-transmitter.h b/fs-app-stream-transmitter.h
new file mode 100644 (file)
index 0000000..049f453
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Farstream - Farstream Shared Memory Stream Transmitter
+ *
+ * Copyright 2007-2008 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
+ * Copyright 2007-2008 Nokia Corp.
+ *
+ * fs-app-stream-transmitter.h - A Farstream Shared Memory stream transmitter
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifndef __FS_APP_STREAM_TRANSMITTER_H__
+#define __FS_APP_STREAM_TRANSMITTER_H__
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include <farstream/fs-stream-transmitter.h>
+#include <farstream/fs-plugin.h>
+#include "fs-app-transmitter.h"
+
+G_BEGIN_DECLS
+
+/* TYPE MACROS */
+#define FS_TYPE_APP_STREAM_TRANSMITTER \
+  (fs_app_stream_transmitter_get_type ())
+#define FS_APP_STREAM_TRANSMITTER(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), FS_TYPE_APP_STREAM_TRANSMITTER, \
+                              FsAppStreamTransmitter))
+#define FS_APP_STREAM_TRANSMITTER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), FS_TYPE_APP_STREAM_TRANSMITTER, \
+                           FsAppStreamTransmitterClass))
+#define FS_IS_APP_STREAM_TRANSMITTER(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), FS_TYPE_APP_STREAM_TRANSMITTER))
+#define FS_IS_APP_STREAM_TRANSMITTER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), FS_TYPE_APP_STREAM_TRANSMITTER))
+#define FS_APP_STREAM_TRANSMITTER_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), FS_TYPE_APP_STREAM_TRANSMITTER, \
+                              FsAppStreamTransmitterClass))
+#define FS_APP_STREAM_TRANSMITTER_CAST(obj) ((FsAppStreamTransmitter *) (obj))
+
+typedef struct _FsAppStreamTransmitter FsAppStreamTransmitter;
+typedef struct _FsAppStreamTransmitterClass FsAppStreamTransmitterClass;
+typedef struct _FsAppStreamTransmitterPrivate FsAppStreamTransmitterPrivate;
+
+/**
+ * FsAppStreamTransmitterClass:
+ * @parent_class: Our parent
+ *
+ * The Shared Memory stream transmitter class
+ */
+
+struct _FsAppStreamTransmitterClass
+{
+  FsStreamTransmitterClass parent_class;
+
+  /*virtual functions */
+  /*< private >*/
+};
+
+/**
+ * FsAppStreamTransmitter:
+ * @parent: Parent object
+ *
+ * All members are private, access them using methods and properties
+ */
+struct _FsAppStreamTransmitter
+{
+  FsStreamTransmitter parent;
+
+  /*< private >*/
+  FsAppStreamTransmitterPrivate *priv;
+};
+
+GType fs_app_stream_transmitter_register_type (FsPlugin *module);
+
+GType fs_app_stream_transmitter_get_type (void);
+
+FsAppStreamTransmitter *
+fs_app_stream_transmitter_newv (FsAppTransmitter *transmitter,
+  guint n_parameters, GParameter *parameters, GError **error);
+
+G_END_DECLS
+
+#endif /* __FS_APP_STREAM_TRANSMITTER_H__ */
diff --git a/fs-app-transmitter.c b/fs-app-transmitter.c
new file mode 100644 (file)
index 0000000..1c49fbf
--- /dev/null
@@ -0,0 +1,941 @@
+/*
+ * Farstream - Farstream Shm UDP Transmitter
+ *
+ * Copyright 2007-2008 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
+ * Copyright 2007-2008 Nokia Corp.
+ *
+ * fs-app-transmitter.c - A Farstream shm UDP transmitter
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+/**
+ * SECTION:fs-app-transmitter
+ * @short_description: A transmitter for shm UDP
+ *
+ * This transmitter provides shm udp
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "fs-app-transmitter.h"
+#include "fs-app-stream-transmitter.h"
+
+#include <farstream/fs-conference.h>
+#include <farstream/fs-plugin.h>
+
+#include <string.h>
+#include <stdio.h>
+
+GST_DEBUG_CATEGORY (fs_app_transmitter_debug);
+#define GST_CAT_DEFAULT fs_app_transmitter_debug
+
+/* Signals */
+enum
+{
+  LAST_SIGNAL
+};
+
+/* props */
+enum
+{
+  PROP_0,
+  PROP_GST_SINK,
+  PROP_GST_SRC,
+  PROP_COMPONENTS,
+  PROP_DO_TIMESTAMP,
+};
+
+struct _FsAppTransmitterPrivate
+{
+  /* We hold references to this element */
+  GstElement *gst_sink;
+  GstElement *gst_src;
+
+  /* We don't hold a reference to these elements, they are owned
+     by the bins */
+  /* They are tables of pointers, one per component */
+  GstElement **funnels;
+  GstElement **tees;
+
+  gboolean do_timestamp;
+};
+
+#define FS_APP_TRANSMITTER_GET_PRIVATE(o)  \
+  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_APP_TRANSMITTER,   \
+      FsAppTransmitterPrivate))
+
+static void fs_app_transmitter_class_init (
+    FsAppTransmitterClass *klass);
+static void fs_app_transmitter_init (FsAppTransmitter *self);
+static void fs_app_transmitter_constructed (GObject *object);
+static void fs_app_transmitter_dispose (GObject *object);
+static void fs_app_transmitter_finalize (GObject *object);
+
+static void fs_app_transmitter_get_property (GObject *object,
+                                                guint prop_id,
+                                                GValue *value,
+                                                GParamSpec *pspec);
+static void fs_app_transmitter_set_property (GObject *object,
+                                                guint prop_id,
+                                                const GValue *value,
+                                                GParamSpec *pspec);
+
+static FsStreamTransmitter *fs_app_transmitter_new_stream_transmitter (
+    FsTransmitter *transmitter, FsParticipant *participant,
+    guint n_parameters, GParameter *parameters, GError **error);
+static GType fs_app_transmitter_get_stream_transmitter_type (
+    FsTransmitter *transmitter);
+
+
+static GObjectClass *parent_class = NULL;
+//static guint signals[LAST_SIGNAL] = { 0 };
+
+/*
+ * Private bin subclass
+ */
+
+enum {
+  BIN_SIGNAL_READY,
+  BIN_SIGNAL_DISCONNECTED,
+  BIN_LAST_SIGNAL
+};
+
+static guint bin_signals[BIN_LAST_SIGNAL] = { 0 };
+static GType shm_bin_type = 0;
+gpointer shm_bin_parent_class = NULL;
+
+typedef struct _FsShmBin
+{
+  GstBin parent;
+} FsShmBin;
+
+typedef struct _FsShmBinClass
+{
+  GstBinClass parent_class;
+} FsShmBinClass;
+
+static void fs_shm_bin_init (FsShmBin *self)
+{
+}
+
+static GstElement *
+fs_shm_bin_new (void)
+{
+  return g_object_new (shm_bin_type, NULL);
+}
+
+static void
+fs_shm_bin_handle_message (GstBin *bin, GstMessage *message)
+{
+  GstState old, new, pending;
+  GError *gerror;
+  gchar *msg;
+
+  switch (GST_MESSAGE_TYPE (message))
+  {
+    case GST_MESSAGE_STATE_CHANGED:
+      gst_message_parse_state_changed (message, &old, &new, &pending);
+
+      if (old == GST_STATE_PAUSED && new == GST_STATE_PLAYING)
+        g_signal_emit (bin, bin_signals[BIN_SIGNAL_READY], 0,
+            GST_MESSAGE_SRC (message));
+      break;
+    case GST_MESSAGE_ERROR:
+      gst_message_parse_error (message, &gerror, &msg);
+
+      if (g_error_matches (gerror, GST_RESOURCE_ERROR,
+              GST_RESOURCE_ERROR_READ))
+      {
+        g_signal_emit (bin, bin_signals[BIN_SIGNAL_DISCONNECTED], 0,
+            GST_MESSAGE_SRC (message));
+        gst_message_unref (message);
+        return;
+      }
+      break;
+    default:
+      break;
+  }
+
+  GST_BIN_CLASS (shm_bin_parent_class)->handle_message (bin, message);
+}
+
+static void fs_shm_bin_class_init (FsShmBinClass *klass)
+{
+  GstBinClass *bin_class = GST_BIN_CLASS (klass);
+
+  shm_bin_parent_class = g_type_class_peek_parent (klass);
+
+  bin_signals[BIN_SIGNAL_READY] =
+    g_signal_new ("ready", G_TYPE_FROM_CLASS (klass),
+        G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+        g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
+
+  bin_signals[BIN_SIGNAL_DISCONNECTED] =
+    g_signal_new ("disconnected", G_TYPE_FROM_CLASS (klass),
+        G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+        g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
+
+  bin_class->handle_message = GST_DEBUG_FUNCPTR (fs_shm_bin_handle_message);
+}
+
+/*
+ * Lets register the plugin
+ */
+
+static GType type = 0;
+
+GType
+fs_app_transmitter_get_type (void)
+{
+  g_assert (type);
+  return type;
+}
+
+GType
+fs_app_transmitter_register_type (FsPlugin *module);
+GType
+fs_app_transmitter_register_type (FsPlugin *module)
+{
+  static const GTypeInfo info = {
+    sizeof (FsAppTransmitterClass),
+    NULL,
+    NULL,
+    (GClassInitFunc) fs_app_transmitter_class_init,
+    NULL,
+    NULL,
+    sizeof (FsAppTransmitter),
+    0,
+    (GInstanceInitFunc) fs_app_transmitter_init
+  };
+
+  static const GTypeInfo bin_info = {
+    sizeof (FsShmBinClass),
+    NULL,
+    NULL,
+    (GClassInitFunc) fs_shm_bin_class_init,
+    NULL,
+    NULL,
+    sizeof (FsShmBin),
+    0,
+    (GInstanceInitFunc) fs_shm_bin_init
+  };
+
+
+  GST_DEBUG_CATEGORY_INIT (fs_app_transmitter_debug,
+      "fsshmtransmitter", 0,
+      "Farstream shm UDP transmitter");
+
+  fs_app_stream_transmitter_register_type (module);
+
+  type = g_type_register_static (FS_TYPE_TRANSMITTER, "FsAppTransmitter",
+      &info, 0);
+
+  shm_bin_type = g_type_register_static (
+    GST_TYPE_BIN, "FsShmBin", &bin_info, 0);
+
+  return type;
+}
+
+static void
+fs_app_transmitter_class_init (FsAppTransmitterClass *klass)
+{
+  GObjectClass *gobject_class = (GObjectClass *) klass;
+  FsTransmitterClass *transmitter_class = FS_TRANSMITTER_CLASS (klass);
+
+  parent_class = g_type_class_peek_parent (klass);
+
+  gobject_class->set_property = fs_app_transmitter_set_property;
+  gobject_class->get_property = fs_app_transmitter_get_property;
+
+  gobject_class->constructed = fs_app_transmitter_constructed;
+
+  g_object_class_override_property (gobject_class, PROP_GST_SRC, "gst-src");
+  g_object_class_override_property (gobject_class, PROP_GST_SINK, "gst-sink");
+  g_object_class_override_property (gobject_class, PROP_COMPONENTS,
+    "components");
+  g_object_class_override_property (gobject_class, PROP_DO_TIMESTAMP,
+    "do-timestamp");
+
+  transmitter_class->new_stream_transmitter =
+    fs_app_transmitter_new_stream_transmitter;
+  transmitter_class->get_stream_transmitter_type =
+    fs_app_transmitter_get_stream_transmitter_type;
+
+  gobject_class->dispose = fs_app_transmitter_dispose;
+  gobject_class->finalize = fs_app_transmitter_finalize;
+
+  g_type_class_add_private (klass, sizeof (FsAppTransmitterPrivate));
+}
+
+static void
+fs_app_transmitter_init (FsAppTransmitter *self)
+{
+
+  /* member init */
+  self->priv = FS_APP_TRANSMITTER_GET_PRIVATE (self);
+
+  self->components = 2;
+  self->priv->do_timestamp = TRUE;
+}
+
+static void
+fs_app_transmitter_constructed (GObject *object)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER_CAST (object);
+  FsTransmitter *trans = FS_TRANSMITTER_CAST (self);
+  GstPad *pad = NULL, *pad2 = NULL;
+  GstPad *ghostpad = NULL;
+  gchar *padname;
+  GstPadLinkReturn ret;
+  int c; /* component_id */
+
+
+  /* We waste one space in order to have the index be the component_id */
+  self->priv->funnels = g_new0 (GstElement *, self->components+1);
+  self->priv->tees = g_new0 (GstElement *, self->components+1);
+
+  /* First we need the src elemnet */
+
+  self->priv->gst_src = fs_shm_bin_new ();
+
+  if (!self->priv->gst_src) {
+    trans->construction_error = g_error_new (FS_ERROR,
+      FS_ERROR_CONSTRUCTION,
+      "Could not build the transmitter src bin");
+    return;
+  }
+
+  gst_object_ref (self->priv->gst_src);
+
+
+  /* Second, we do the sink element */
+
+  self->priv->gst_sink = fs_shm_bin_new ();
+
+  if (!self->priv->gst_sink) {
+    trans->construction_error = g_error_new (FS_ERROR,
+      FS_ERROR_CONSTRUCTION,
+      "Could not build the transmitter sink bin");
+    return;
+  }
+
+  g_object_set (G_OBJECT (self->priv->gst_sink),
+      "async-handling", TRUE,
+      NULL);
+
+  gst_object_ref (self->priv->gst_sink);
+
+  for (c = 1; c <= self->components; c++) {
+    GstElement *fakesink = NULL;
+
+    /* Lets create the RTP source funnel */
+
+    self->priv->funnels[c] = gst_element_factory_make ("funnel", NULL);
+
+    if (!self->priv->funnels[c]) {
+      trans->construction_error = g_error_new (FS_ERROR,
+        FS_ERROR_CONSTRUCTION,
+        "Could not make the funnel element");
+      return;
+    }
+
+    if (!gst_bin_add (GST_BIN (self->priv->gst_src),
+        self->priv->funnels[c])) {
+      trans->construction_error = g_error_new (FS_ERROR,
+        FS_ERROR_CONSTRUCTION,
+        "Could not add the funnel element to the transmitter src bin");
+    }
+
+    pad = gst_element_get_static_pad (self->priv->funnels[c], "src");
+    padname = g_strdup_printf ("src_%u", c);
+    ghostpad = gst_ghost_pad_new (padname, pad);
+    g_free (padname);
+    gst_object_unref (pad);
+
+    gst_pad_set_active (ghostpad, TRUE);
+    gst_element_add_pad (self->priv->gst_src, ghostpad);
+
+
+    /* Lets create the RTP sink tee */
+
+    self->priv->tees[c] = gst_element_factory_make ("tee", NULL);
+
+    if (!self->priv->tees[c]) {
+      trans->construction_error = g_error_new (FS_ERROR,
+        FS_ERROR_CONSTRUCTION,
+        "Could not make the tee element");
+      return;
+    }
+
+    if (!gst_bin_add (GST_BIN (self->priv->gst_sink),
+        self->priv->tees[c])) {
+      trans->construction_error = g_error_new (FS_ERROR,
+        FS_ERROR_CONSTRUCTION,
+        "Could not add the tee element to the transmitter sink bin");
+    }
+
+    pad = gst_element_get_static_pad (self->priv->tees[c], "sink");
+    padname = g_strdup_printf ("sink_%u", c);
+    ghostpad = gst_ghost_pad_new (padname, pad);
+    g_free (padname);
+    gst_object_unref (pad);
+
+    gst_pad_set_active (ghostpad, TRUE);
+    gst_element_add_pad (self->priv->gst_sink, ghostpad);
+
+    fakesink = gst_element_factory_make ("fakesink", NULL);
+
+    if (!fakesink) {
+      trans->construction_error = g_error_new (FS_ERROR,
+        FS_ERROR_CONSTRUCTION,
+        "Could not make the fakesink element");
+      return;
+    }
+
+    g_object_set (fakesink,
+        "async", FALSE,
+        "sync" , FALSE,
+        NULL);
+
+    if (!gst_bin_add (GST_BIN (self->priv->gst_sink), fakesink))
+    {
+      gst_object_unref (fakesink);
+      trans->construction_error = g_error_new (FS_ERROR,
+          FS_ERROR_CONSTRUCTION,
+          "Could not add the fakesink element to the transmitter sink bin");
+      return;
+    }
+
+    pad = gst_element_get_request_pad (self->priv->tees[c], "src_%u");
+    pad2 = gst_element_get_static_pad (fakesink, "sink");
+
+    ret = gst_pad_link (pad, pad2);
+
+    gst_object_unref (pad2);
+    gst_object_unref (pad);
+
+    if (GST_PAD_LINK_FAILED(ret)) {
+      trans->construction_error = g_error_new (FS_ERROR,
+          FS_ERROR_CONSTRUCTION,
+          "Could not link the tee to the fakesink");
+      return;
+    }
+  }
+
+  GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
+}
+
+static void
+fs_app_transmitter_dispose (GObject *object)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER (object);
+
+  if (self->priv->gst_src) {
+    gst_object_unref (self->priv->gst_src);
+    self->priv->gst_src = NULL;
+  }
+
+  if (self->priv->gst_sink) {
+    gst_object_unref (self->priv->gst_sink);
+    self->priv->gst_sink = NULL;
+  }
+
+  parent_class->dispose (object);
+}
+
+static void
+fs_app_transmitter_finalize (GObject *object)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER (object);
+
+  if (self->priv->funnels) {
+    g_free (self->priv->funnels);
+    self->priv->funnels = NULL;
+  }
+
+  if (self->priv->tees) {
+    g_free (self->priv->tees);
+    self->priv->tees = NULL;
+  }
+
+  parent_class->finalize (object);
+}
+
+static void
+fs_app_transmitter_get_property (GObject *object,
+                             guint prop_id,
+                             GValue *value,
+                             GParamSpec *pspec)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER (object);
+
+  switch (prop_id) {
+    case PROP_GST_SINK:
+      g_value_set_object (value, self->priv->gst_sink);
+      break;
+    case PROP_GST_SRC:
+      g_value_set_object (value, self->priv->gst_src);
+      break;
+    case PROP_COMPONENTS:
+      g_value_set_uint (value, self->components);
+      break;
+    case PROP_DO_TIMESTAMP:
+      g_value_set_boolean (value, self->priv->do_timestamp);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+fs_app_transmitter_set_property (GObject *object,
+                                    guint prop_id,
+                                    const GValue *value,
+                                    GParamSpec *pspec)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER (object);
+
+  switch (prop_id) {
+    case PROP_COMPONENTS:
+      self->components = g_value_get_uint (value);
+      break;
+    case PROP_DO_TIMESTAMP:
+      self->priv->do_timestamp = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+
+/**
+ * fs_app_transmitter_new_stream_app_transmitter:
+ * @transmitter: a #FsTranmitter
+ * @participant: the #FsParticipant for which the #FsStream using this
+ * new #FsStreamTransmitter is created
+ *
+ * This function will create a new #FsStreamTransmitter element for a
+ * specific participant for this #FsAppTransmitter
+ *
+ * Returns: a new #FsStreamTransmitter
+ */
+
+static FsStreamTransmitter *
+fs_app_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
+  FsParticipant *participant, guint n_parameters, GParameter *parameters,
+  GError **error)
+{
+  FsAppTransmitter *self = FS_APP_TRANSMITTER (transmitter);
+
+  printf("New App transmitter\n");
+  return FS_STREAM_TRANSMITTER (fs_app_stream_transmitter_newv (
+        self, n_parameters, parameters, error));
+}
+
+static GType
+fs_app_transmitter_get_stream_transmitter_type (
+    FsTransmitter *transmitter)
+{
+  return FS_TYPE_APP_STREAM_TRANSMITTER;
+}
+
+
+struct _ShmSrc {
+  guint component;
+  gchar *path;
+  GstElement *src;
+  GstPad *funnelpad;
+
+  got_buffer got_buffer_func;
+  connection disconnected_func;
+  gpointer cb_data;
+  gulong buffer_probe;
+};
+
+
+static GstPadProbeReturn
+src_buffer_probe_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
+{
+  ShmSrc *shm = user_data;
+  GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
+
+  shm->got_buffer_func (buffer, shm->component, shm->cb_data);
+
+  return TRUE;
+}
+
+
+static void
+disconnected_cb (GstBin *bin, GstElement *elem, ShmSrc *shm)
+{
+  if (elem != shm->src)
+    return;
+
+  shm->disconnected_func (shm->component, 0, shm->cb_data);
+}
+
+
+ShmSrc *
+fs_app_transmitter_get_shm_src (FsAppTransmitter *self,
+    guint component,
+    const gchar *path,
+    got_buffer got_buffer_func,
+    connection disconnected_func,
+    gpointer cb_data,
+    GError **error)
+{
+  ShmSrc *shm = g_slice_new0 (ShmSrc);
+  GstElement *elem;
+  GstPad *pad;
+
+  shm->component = component;
+  shm->got_buffer_func = got_buffer_func;
+  shm->disconnected_func = disconnected_func;
+  shm->cb_data = cb_data;
+
+  shm->path = g_strdup (path);
+
+  elem = gst_element_factory_make ("shmsrc", NULL);
+  if (!elem)
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not make shmsrc");
+    goto error;
+  }
+
+  g_object_set (elem,
+      "socket-path", path,
+      "do-timestamp", self->priv->do_timestamp,
+      "is-live", TRUE,
+      NULL);
+
+  if (shm->disconnected_func)
+    g_signal_connect (self->priv->gst_src, "disconnected",
+        G_CALLBACK (disconnected_cb), shm);
+
+  if (!gst_bin_add (GST_BIN (self->priv->gst_src), elem))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not add recvonly filter to bin");
+    gst_object_unref (elem);
+    goto error;
+  }
+
+  shm->src = elem;
+
+  shm->funnelpad = gst_element_get_request_pad (self->priv->funnels[component],
+      "sink_%u");
+
+  if (!shm->funnelpad)
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not get funnelpad");
+    goto error;
+  }
+
+  pad = gst_element_get_static_pad (shm->src, "src");
+  if (GST_PAD_LINK_FAILED (gst_pad_link (pad, shm->funnelpad)))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link tee"
+        " and valve");
+    gst_object_unref (pad);
+    goto error;
+  }
+
+  gst_object_unref (pad);
+
+  if (got_buffer_func)
+    shm->buffer_probe = gst_pad_add_probe (shm->funnelpad,
+        GST_PAD_PROBE_TYPE_BUFFER,
+        src_buffer_probe_cb, shm, NULL);
+
+  if (!gst_element_sync_state_with_parent (shm->src))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not sync the state of the new shmsrc with its parent");
+    goto error;
+  }
+
+  return shm;
+
+ error:
+  fs_app_transmitter_check_shm_src (self, shm, NULL);
+  return NULL;
+}
+
+/*
+ * Returns: %TRUE if the path is the same, other %FALSE and freeds the ShmSrc
+ */
+
+gboolean
+fs_app_transmitter_check_shm_src (FsAppTransmitter *self, ShmSrc *shm,
+    const gchar *path)
+{
+  if (path && !strcmp (path, shm->path))
+    return TRUE;
+
+  if (shm->buffer_probe)
+    gst_pad_remove_probe (shm->funnelpad, shm->buffer_probe);
+  shm->buffer_probe = 0;
+
+  if (shm->funnelpad) {
+    gst_element_release_request_pad (self->priv->funnels[shm->component],
+        shm->funnelpad);
+    gst_object_unref (shm->funnelpad);
+  }
+  shm->funnelpad = NULL;
+
+  if (shm->src)
+  {
+    gst_element_set_locked_state (shm->src, TRUE);
+    gst_element_set_state (shm->src, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN (self->priv->gst_src), shm->src);
+  }
+  shm->src = NULL;
+
+  g_free (shm->path);
+  g_slice_free (ShmSrc, shm);
+
+  return FALSE;
+}
+
+
+
+struct _ShmSink {
+  guint component;
+  gchar *path;
+  GstElement *sink;
+  GstElement *recvonly_filter;
+  GstPad *teepad;
+
+  ready ready_func;
+  connection connected_func;
+  gpointer cb_data;
+};
+
+
+static void
+ready_cb (GstBin *bin, GstElement *elem, ShmSink *shm)
+{
+  gchar *path = NULL;
+
+  if (elem != shm->sink)
+    return;
+
+  g_object_get (elem, "socket-path", &path, NULL);
+  shm->ready_func (shm->component, path, shm->cb_data);
+  g_free (path);
+}
+
+
+static void
+connected_cb (GstBin *bin, gint id, ShmSink *shm)
+{
+  shm->connected_func (shm->component, id, shm->cb_data);
+}
+
+ShmSink *
+fs_app_transmitter_get_shm_sink (FsAppTransmitter *self,
+    guint component,
+    const gchar *path,
+    ready ready_func,
+    connection connected_func,
+    gpointer cb_data,
+    GError **error)
+{
+  ShmSink *shm = g_slice_new0 (ShmSink);
+  GstElement *elem;
+  GstPad *pad;
+
+  GST_DEBUG ("Trying to add shm sink for c:%u path %s", component, path);
+
+  shm->component = component;
+
+  shm->path = g_strdup (path);
+
+  shm->ready_func = ready_func;
+  shm->connected_func = connected_func;
+  shm->cb_data = cb_data;
+
+  /* First add the sink */
+
+  elem = gst_element_factory_make ("shmsink", NULL);
+  if (!elem)
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not make shmsink");
+    goto error;
+  }
+  g_object_set (elem,
+      "socket-path", path,
+      "wait-for-connection", FALSE,
+      "async", FALSE,
+      "sync" , FALSE,
+      NULL);
+
+  if (ready_func)
+    g_signal_connect (self->priv->gst_sink, "ready", G_CALLBACK (ready_cb),
+        shm);
+
+  if (connected_func)
+    g_signal_connect (elem, "client-connected", G_CALLBACK (connected_cb), shm);
+
+  if (!gst_bin_add (GST_BIN (self->priv->gst_sink), elem))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not add shmsink to bin");
+    gst_object_unref (elem);
+    goto error;
+  }
+
+  shm->sink = elem;
+
+  /* Second add the recvonly filter */
+
+  elem = gst_element_factory_make ("valve", NULL);
+  if (!elem)
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not make valve");
+    goto error;
+  }
+
+  if (!gst_bin_add (GST_BIN (self->priv->gst_sink), elem))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not add recvonly filter to bin");
+    gst_object_unref (elem);
+    goto error;
+  }
+
+  shm->recvonly_filter = elem;
+
+  /* Third connect these */
+
+  if (!gst_element_link (shm->recvonly_filter, shm->sink))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not link recvonly filter and shmsink");
+    goto error;
+  }
+
+  if (!gst_element_sync_state_with_parent (shm->sink))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not sync the state of the new shmsink with its parent");
+    goto error;
+  }
+
+  if (!gst_element_sync_state_with_parent (shm->recvonly_filter))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not sync the state of the new recvonly filter  with its parent");
+    goto error;
+  }
+
+  shm->teepad = gst_element_get_request_pad (self->priv->tees[component],
+      "src_%u");
+
+  if (!shm->teepad)
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+        "Could not get teepad");
+    goto error;
+  }
+
+  pad = gst_element_get_static_pad (shm->recvonly_filter, "sink");
+  if (GST_PAD_LINK_FAILED (gst_pad_link (shm->teepad, pad)))
+  {
+    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link tee"
+        " and valve");
+    gst_object_unref (pad);
+    goto error;
+  }
+  gst_object_unref (pad);
+
+  return shm;
+
+ error:
+  fs_app_transmitter_check_shm_sink (self, shm, NULL);
+
+  return NULL;
+}
+
+gboolean
+fs_app_transmitter_check_shm_sink (FsAppTransmitter *self, ShmSink *shm,
+    const gchar *path)
+{
+  if (path && !strcmp (path, shm->path))
+    return TRUE;
+
+  if (path)
+    GST_DEBUG ("Replacing shm socket %s with %s", shm->path, path);
+  else
+    GST_DEBUG ("Freeing shm socket %s", shm->path);
+
+  if (shm->teepad)
+  {
+    gst_element_release_request_pad (self->priv->tees[shm->component],
+        shm->teepad);
+    gst_object_unref (shm->teepad);
+  }
+  shm->teepad = NULL;
+
+  if (shm->sink)
+  {
+    gst_element_set_locked_state (shm->sink, TRUE);
+    gst_element_set_state (shm->sink, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN (self->priv->gst_sink), shm->sink);
+  }
+  shm->sink = NULL;
+
+  if (shm->recvonly_filter)
+  {
+    gst_element_set_locked_state (shm->recvonly_filter, TRUE);
+    gst_element_set_state (shm->recvonly_filter, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN (self->priv->gst_sink), shm->recvonly_filter);
+  }
+  shm->recvonly_filter = NULL;
+
+  g_free (shm->path);
+  g_slice_free (ShmSink, shm);
+
+  return FALSE;
+}
+
+
+void
+fs_app_transmitter_sink_set_sending (FsAppTransmitter *self, ShmSink *shm,
+    gboolean sending)
+{
+  GObjectClass *klass = G_OBJECT_GET_CLASS (shm->recvonly_filter);
+
+  if (g_object_class_find_property (klass, "drop"))
+    g_object_set (shm->recvonly_filter, "drop", !sending, NULL);
+
+  if (sending)
+    gst_element_send_event (shm->sink,
+        gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+            gst_structure_new ("GstForceKeyUnit",
+              "all-headers", G_TYPE_BOOLEAN, TRUE,
+              NULL)));
+}
diff --git a/fs-app-transmitter.h b/fs-app-transmitter.h
new file mode 100644 (file)
index 0000000..cc1be62
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Farstream - Farstream Shared Memory Transmitter
+ *
+ * Copyright 2007-2008 Collabora Ltd.
+ *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
+ * Copyright 2007-2008 Nokia Corp.
+ *
+ * fs-app-transmitter.h - A Farstream Shared Memory transmitter
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifndef __FS_APP_TRANSMITTER_H__
+#define __FS_APP_TRANSMITTER_H__
+
+#include <farstream/fs-transmitter.h>
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+/* TYPE MACROS */
+#define FS_TYPE_APP_TRANSMITTER \
+  (fs_app_transmitter_get_type ())
+#define FS_APP_TRANSMITTER(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), FS_TYPE_APP_TRANSMITTER, \
+    FsAppTransmitter))
+#define FS_APP_TRANSMITTER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), FS_TYPE_APP_TRANSMITTER, \
+    FsAppTransmitterClass))
+#define FS_IS_APP_TRANSMITTER(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj), FS_TYPE_APP_TRANSMITTER))
+#define FS_IS_APP_TRANSMITTER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass), FS_TYPE_APP_TRANSMITTER))
+#define FS_APP_TRANSMITTER_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj), FS_TYPE_APP_TRANSMITTER, \
+    FsAppTransmitterClass))
+#define FS_APP_TRANSMITTER_CAST(obj) ((FsAppTransmitter *) (obj))
+
+typedef struct _FsAppTransmitter FsAppTransmitter;
+typedef struct _FsAppTransmitterClass FsAppTransmitterClass;
+typedef struct _FsAppTransmitterPrivate FsAppTransmitterPrivate;
+
+/**
+ * FsAppTransmitterClass:
+ * @parent_class: Our parent
+ *
+ * The Shared Memory transmitter class
+ */
+
+struct _FsAppTransmitterClass
+{
+  FsTransmitterClass parent_class;
+};
+
+/**
+ * FsAppTransmitter:
+ * @parent: Parent object
+ *
+ * All members are private, access them using methods and properties
+ */
+struct _FsAppTransmitter
+{
+  FsTransmitter parent;
+
+  /* The number of components (READONLY) */
+  gint components;
+
+  /*< private >*/
+  FsAppTransmitterPrivate *priv;
+};
+
+GType fs_app_transmitter_get_type (void);
+
+typedef struct _ShmSrc ShmSrc;
+typedef struct _ShmSink ShmSink;
+
+typedef void (*got_buffer) (GstBuffer *buffer, guint component, gpointer data);
+typedef void (*ready) (guint component, gchar *path, gpointer data);
+typedef void (*connection) (guint component, gint id, gpointer data);
+
+ShmSrc *fs_app_transmitter_get_shm_src (FsAppTransmitter *self,
+    guint component,
+    const gchar *path,
+    got_buffer got_buffer_func,
+    connection disconnected_func,
+    gpointer cb_data,
+    GError **error);
+
+gboolean fs_app_transmitter_check_shm_src (FsAppTransmitter *self,
+    ShmSrc *shm,
+    const gchar *path);
+
+ShmSink *fs_app_transmitter_get_shm_sink (FsAppTransmitter *self,
+    guint component,
+    const gchar *path,
+    ready ready_func,
+    connection connected_fubnc,
+    gpointer cb_data,
+    GError **error);
+
+gboolean fs_app_transmitter_check_shm_sink (FsAppTransmitter *self,
+    ShmSink *shm,
+    const gchar *path);
+
+void fs_app_transmitter_sink_set_sending (FsAppTransmitter *self,
+    ShmSink *shm, gboolean sending);
+
+
+
+G_END_DECLS
+
+#endif /* __FS_APP_TRANSMITTER_H__ */