15

there are plenty of examples in the gstreamer documentation on constructing and running static pipelines. however, there isn't much about changing/relinking elements in a live pipeline - while the media is actually flowing. it's definitely possible, so the question is:

  1. what gstreamer concepts/mechanics should i understand before attempting this?
  2. are there any pitfalls to watch out for?
  3. what is the basic procedure, or a good example?

accepted answer will be spoon fed, comprehensive, and with source code

5 Answers 5

15
  1. My favorite "concept" for understanding linking (and dynamic linking), is thinking about the pipeline as a real pipe with water streaming through it. Once you do this, some things will become very obvious. Like, "do you set the source to PLAYING before linking the element?", becomes "do you turn on the water before connecting the hose?", and it sort of answers itself. Even more so with dynamic linking, how would you ensure that no water "leaks" (that is bad, "leaks" in GStreamer is the equivalent of getting a GST_FLOW_NOT_LINKED, and will stop your source and the fun) or get clogged up (can cause dropping or congestion of packets).

  2. Yes. Many. With a little disclaimer that I still currently work with 0.10 and some of this might have been fixed with 1.0, there is unfortunately very, very hard to do dynamic linking and unlinking with GStreamer 0.10. Let me explain: Let's say you are using a Tee, and you want to unlink one branch. You would start by releasing the Tees srcpad (never mind unlinking it, that happens as part of the release of the pad), and now you should safely be able to tear down the elements downstream from that pad. (The water equivalent is that you close a valve after the tee, and should now be able to dismantle the pipes after the valve, you would not start dismantling the pipes without closing the valve first unless you wanted to get wet...) This will work most of the time, but there is a race here. Because after you have released the pad, there might still be a push or a pad-alloc on their way on that pad, and if you now in your code start tearing down the downstream elements, this might now crash because of the race that exists in some elements if they get a push or pad-alloc while tearing down, or you get a GST_FLOW_WRONG_STATE or GST_FLOW_NOT_LINKED and they will go back to the source stopping the stream for everyone...

  3. I did a lot of experiments with this, and found that if you need stability, and crashing/freezing occasionally is not an option you need an element that will serve as your dynamic safety-net. An element that will guarantee that absolutely no activity will happen on a pad after you release/unlink it. The only way to do this is to break another GStreamer paradigm of not pushing while holding a lock: you need to hold a lock while pushing / sending events / pad-allocing. I made such a thing a while back here. (test-case being the most important thing of course, as it allows you to test your own / other elements for their safeness) You could also imagine a lock-free element that would swallow all bad FlowReturns, and just paint a pretty picture for its upstream, but then you would need to be absolutely sure that all your downstream-elements would be "push or pad-alloc received while shutting down"-safe, since your element would not be able to guarantee that once "stop the flow" (release/unlink) has been executed, a little drop would not squeeze past.

Of course you have to put some of this in perspective. The window for these terrible race-conditions I am talking about is in fact very, very small, and might only happen every 1000th or 10.000th time you run your program. But for a professional application this is of course not acceptable. I did a talk where I covered some of this stuff here

8

I tend to use output-selector or input selector bins depending on the situation rather than pad blocking complexity (I have answered the pad blocking in another post http://gstreamer-devel.966125.n4.nabble.com/Dynamically-adding-and-removing-branches-of-a-tee-td973635.html#a4656812). And connect the selector to fakesrc or fakesink bins when not in use. In the example below if one is using GTK then one may replace the line g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel); with gtk_toggle_button and put all the code presently in the switch_cb function into toggle button callback function. In this code one may switch between the two imagesinks. I would substitute one image sink with fakesink to keep pipeline running, in case I want to add a tee in the future with a filesink where I want to record video yet provide player an option to turn on (selector on imagesink)/off (selector on fakesink) the display. This allows one to add/remove bins at runtime using selector.

#include <gst/gst.h>

#define SWITCH_TIMEOUT 1000
#define NUM_VIDEO_BUFFERS 500

static GMainLoop *loop;

/* Output selector src pads */
static GstPad *osel_src1 = NULL;
static GstPad *osel_src2 = NULL;

static gboolean
my_bus_callback (GstBus * bus, GstMessage * message, gpointer data)
{
  g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR:{
      GError *err;
      gchar *debug;

      gst_message_parse_error (message, &err, &debug);
      g_print ("Error: %s\n", err->message);
      g_error_free (err);
      g_free (debug);

      g_main_loop_quit (loop);
      break;
    }
    case GST_MESSAGE_EOS:
      /* end-of-stream */
      g_main_loop_quit (loop);
      break;
    default:
      /* unhandled message */
      break;
  }
  /* we want to be notified again the next time there is a message
   * on the bus, so returning TRUE (FALSE means we want to stop watching
   * for messages on the bus and our callback should not be called again)
   */
  return TRUE;
}

static gboolean
switch_cb (gpointer user_data)
{
  GstElement *sel = GST_ELEMENT (user_data);
  GstPad *old_pad, *new_pad = NULL;

  g_object_get (G_OBJECT (sel), "active-pad", &old_pad, NULL);

  if (old_pad == osel_src1)
    new_pad = osel_src2;
  else
    new_pad = osel_src1;

  g_object_set (G_OBJECT (sel), "active-pad", new_pad, NULL);

  g_print ("switched from %s:%s to %s:%s\n", GST_DEBUG_PAD_NAME (old_pad),
      GST_DEBUG_PAD_NAME (new_pad));

  gst_object_unref (old_pad);

  return TRUE;

}

gint
main (gint argc, gchar * argv[])
{
  GstElement *pipeline, *src, *toverlay, *osel, *sink1, *sink2, *convert;
  GstPad *sinkpad1;
  GstPad *sinkpad2;
  GstBus *bus;

  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  /* create elements */
  pipeline = gst_element_factory_make ("pipeline", "pipeline");
  src = gst_element_factory_make ("videotestsrc", "src");
  toverlay = gst_element_factory_make ("timeoverlay", "timeoverlay");
  osel = gst_element_factory_make ("output-selector", "osel");
  convert = gst_element_factory_make ("ffmpegcolorspace", "convert");
  sink1 = gst_element_factory_make ("xvimagesink", "sink1");
  sink2 = gst_element_factory_make ("ximagesink", "sink2");

  if (!pipeline || !src || !toverlay || !osel || !convert || !sink1 || !sink2) {
    g_print ("missing element\n");
    return -1;
  }

  /* add them to bin */
  gst_bin_add_many (GST_BIN (pipeline), src, toverlay, osel, convert, sink1,
      sink2, NULL);

  /* set properties */
  g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
  g_object_set (G_OBJECT (src), "do-timestamp", TRUE, NULL);
  g_object_set (G_OBJECT (src), "num-buffers", NUM_VIDEO_BUFFERS, NULL);
  g_object_set (G_OBJECT (sink1), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (sink2), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (osel), "resend-latest", TRUE, NULL);

  /* link src ! timeoverlay ! osel */
  if (!gst_element_link_many (src, toverlay, osel, NULL)) {
    g_print ("linking failed\n");
    return -1;
  }

  /* link output 1 */
  sinkpad1 = gst_element_get_static_pad (sink1, "sink");
  osel_src1 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src1, sinkpad1) != GST_PAD_LINK_OK) {
    g_print ("linking output 1 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad1);

  /* link output 2 */
  sinkpad2 = gst_element_get_static_pad (convert, "sink");
  osel_src2 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src2, sinkpad2) != GST_PAD_LINK_OK) {
    g_print ("linking output 2 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad2);

  if (!gst_element_link (convert, sink2)) {
    g_print ("linking output 2 failed\n");
    return -1;
  }

  /* add switch callback */
  g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);

  /* change to playing */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  gst_bus_add_watch (bus, my_bus_callback, loop);
  gst_object_unref (bus);

  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  /* now run */
  g_main_loop_run (loop);

  /* also clean up */
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_element_release_request_pad (osel, osel_src1);
  gst_element_release_request_pad (osel, osel_src2);
  gst_object_unref (GST_OBJECT (pipeline));

  return 0;
}
3

This post showed up first when I looked for dynamically modifying any gstreamer pipeline. Found some links, but it is now well documented in the manual: http://gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html

2

Actually I'm trying to do the same. Not too much luck yet :(

I got following link by asking on the #gstreamer IRC channel: http://cgit.freedesktop.org/gstreamer/gstreamer/tree/docs/design/part-dynamic.txt

Maybe a hint to the right direction.

Please let me know when you find other documentation...

1
0

I did not achieve to create full legible muxed files for Gstreamer 0.10 on top of multifilesink or output-selector.

After analysing lots of alternatives my solution takes as code base the example depicted in: http://gstreamer.freedesktop.org/data/doc/gstreamer/head/manual/html/section-dynamic-pipelines.html

The probes function API has been changed a bit from 0.10 to 1.0 but the solution below works to create every N seconds different MP4 files:

static GstElement *pipeline = NULL;

// Pipeline -> src                    -> dynamic pipeline
// Pipeline -> capsfilter(f264file)   -> mp4mux(mux0)                                 -> filesink(fsink0)
// Pipeline -> elem_before||blockpad| -> |elem_cur_sinkpad||elem_cur||elem_cur_srcpad -> |elem_after_sinkpad||elem_after
static gulong probe_id;             // probe ID
static GstElement *elem_before;     // SRC of dynamic pipeline
static GstElement *elem_after;      // SINK of dynamic pipeline
static GstElement *elem_cur;        // Main element of dynamic pipeline
static GstPad *blockpad;            // SRC pad to be blocked
static GstPad *elem_cur_srcpad;     // SRC pad where check EOS
static GstPad *elem_cur_sinkpad;    // SINK of dynamic pipeline
static GstPad *elem_after_sinkpad;  // SINK of SINK element

// Last Buffer Timestamp
static GstClockTime last_ts = 0;

typedef enum {
  NO_NEW_FILE,  // Keep current file destination
  NEW_FILE,     // Switch file destination
} NewFileStatus;
static NewFileStatus newfile = NO_NEW_FILE; // Switch File Flag

static int counter = 1; // Index filename

// EOS listener to switch to other file destination
static gboolean
event_probe_cb (GstPad * pad, GstEvent * event, gpointer user_data)
{
  g_print ("INSIDE event_probe_cb:%d type:%s\n",probe_id,
      GST_EVENT_TYPE (event)==GST_EVENT_EOS?"EOS":GST_EVENT_TYPE (event)==GST_EVENT_NEWSEGMENT?"NEWSEGMENT":"OTHER");

  if (GST_EVENT_TYPE (event) != GST_EVENT_EOS)
  {
    // Push the event in the pipe flow (false DROP)
    return TRUE;
  }

  // remove the probe first
  gst_pad_remove_event_probe (pad, probe_id);

  gst_object_unref (elem_cur_srcpad);
  gst_object_unref (elem_after_sinkpad);
  gst_element_release_request_pad(elem_cur, elem_cur_sinkpad);

  gst_element_set_state (elem_cur, GST_STATE_NULL);
  gst_element_set_state (elem_after, GST_STATE_NULL);

  // remove unlinks automatically
  GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_cur);
  gst_bin_remove (GST_BIN (pipeline), elem_cur);
  GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, elem_after);
  gst_bin_remove (GST_BIN (pipeline), elem_after);

  GstElement * mux0 = gst_element_factory_make("mp4mux", "mux0");
  GstElement * fsink0 = gst_element_factory_make("filesink", "fsink0");
  elem_cur = mux0;
  elem_after = fsink0;

  if(!mux0 || !fsink0)
  {
    printf("mising elements\n");
  }

  GST_DEBUG_OBJECT (pipeline, "adding   %" GST_PTR_FORMAT, elem_cur);
  gst_bin_add (GST_BIN (pipeline), elem_cur);
  GST_DEBUG_OBJECT (pipeline, "adding   %" GST_PTR_FORMAT, elem_after);
  gst_bin_add (GST_BIN (pipeline), elem_after);

  char buffer[128];
  sprintf(buffer, "test_%d.mp4", counter++);
  g_print ("File Switching %s\n", buffer);
  g_object_set(G_OBJECT(elem_after), "location", buffer, NULL);

  GST_DEBUG_OBJECT (pipeline, "linking..");
  elem_cur_srcpad = gst_element_get_static_pad (elem_cur, "src");
  elem_cur_sinkpad = gst_element_get_request_pad (elem_cur, "video_%d");
  elem_after_sinkpad = gst_element_get_static_pad (elem_after, "sink");

  if(gst_pad_link(blockpad, elem_cur_sinkpad) != GST_PAD_LINK_OK)
  {
    printf("linking output 0 failed\n");
    return -1;
  }
  if(gst_pad_link(elem_cur_srcpad, elem_after_sinkpad) != GST_PAD_LINK_OK)
  {
    printf("linking output 1 failed\n");
    return -1;
  }

  g_print ("Moving to PLAYING\n");
  gst_element_set_state (elem_cur, GST_STATE_PLAYING);
  gst_element_set_state (elem_after, GST_STATE_PLAYING);

  GST_DEBUG_OBJECT (pipeline, "done");

  newfile = NO_NEW_FILE;
  // Push the event in the pipe flow (false DROP)
  return TRUE;
}

// Check if Buffer contains a KEY FRAME
static gboolean
is_sync_frame (GstBuffer * buffer)
{
  if (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) 
  {
    return FALSE;
  }
  else if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_IN_CAPS)) 
  {
    return TRUE;
  }
}

// Block source and launch EOS to MUXER to achieve a full muxed file
static gboolean
pad_probe_cb (GstPad * pad, GstBuffer * buffer, gpointer user_data)
{
  g_print ("\n\tINSIDE pad_probe_cb:%d %s %s\n",probe_id, (newfile?"newfile":"thesame"), 
      (is_sync_frame (buffer)?"KEYframe":"frame"));
  GST_DEBUG_OBJECT (pad, "pad is blocked now");

  last_ts = GST_BUFFER_TIMESTAMP(buffer);
  if(!GST_CLOCK_TIME_IS_VALID(last_ts))
      last_ts=0;

  if((newfile==NO_NEW_FILE) || !is_sync_frame (buffer))
    return TRUE;

  /* remove the probe first */
  gst_pad_remove_buffer_probe (pad, probe_id);

  /* install new probe for EOS */
  probe_id = gst_pad_add_event_probe (elem_after_sinkpad, G_CALLBACK(event_probe_cb), user_data);

  /* push EOS into the element, the probe will be fired when the
   * EOS leaves the effect and it has thus drained all of its data */
  gst_pad_send_event (elem_cur_sinkpad, gst_event_new_eos ());

  // Wait til the EOS have been processed the Buffer with the Key frame will be the FIRST
  while(newfile != NO_NEW_FILE)
      Sleep(1);

  // Push the buffer in the pipe flow (false DROP)
  return TRUE;
}

// this timeout is periodically run as part of the mainloop
static gboolean timeout (gpointer user_data)
{
  g_print ("TIMEOUT\n");
  if(!playing)
      return false;
  newfile = NEW_FILE;
  /* install new probe for Keyframe and New File */
  probe_id = gst_pad_add_buffer_probe (blockpad, G_CALLBACK(pad_probe_cb), pipeline);
  return true;
}

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.