Handle multiple ways of setting an URI
[octopus.git] / src / octopus-backend-gst.c
1 /*   Octopus Media Engine
2  *   Copyright 2008 Movial Creative Technologies Inc
3  *
4  *   Authors: Mikko Rasa, <mikko.rasa@movial.fi>
5  *
6  *   This program is free software; you can redistribute it and/or
7  *   modify it under the terms of the GNU Lesser General Public
8  *   License as published by the Free Software Foundation; either
9  *   version 2.1 of the License, or (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *   Lesser General Public License for more details.
15  *
16  *   You should have received a copy of the GNU Lesser General Public
17  *   License along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
19  */
20 #include <string.h>
21 #include <gst/gst.h>
22 #include <gst/interfaces/xoverlay.h>
23 #include "octopus-backend-gst.h"
24 #include "octopus-module.h"
25 #include "octopus-module-manager.h"
26 #include "octopus-route.h"
27
28 typedef struct _RouteDataGst RouteDataGst;
29
30 struct _RouteDataGst
31 {
32   GstElement *pipeline;
33   GstElement *fakesink;
34   gboolean   ready;
35   guint      timeout_tag;
36 };
37
38 static gboolean
39 validate_component(OctopusBackend   *backend,
40                    OctopusComponent *comp);
41 static gboolean
42 can_link_components(OctopusBackend *backend,
43                     OctopusComponent *comp1,
44                     OctopusComponent *comp2);
45
46 gboolean
47 init_route (OctopusBackend *backend,
48             OctopusRoute   *route);
49 static void
50 destroy_route(OctopusBackend *backend,
51               OctopusRoute   *route);
52 static gboolean
53 play_route(OctopusBackend *backend,
54            OctopusRoute   *route);
55 static gboolean
56 pause_route(OctopusBackend *backend,
57             OctopusRoute   *route);
58 static gboolean
59 stop_route(OctopusBackend *backend,
60            OctopusRoute   *route);
61
62 static void
63 route_mapping_changed(OctopusRoute *route,
64                       gpointer user_data);
65 static void
66 route_endpoint_changed(OctopusRoute    *route,
67                        OctopusEndpoint *endpoint,
68                        gpointer        user_data);
69 static void
70 have_type(GstTypeFind *typefind,
71           guint       probability,
72           GstCaps     *caps,
73           gpointer    user_data);
74 static void
75 pad_added(GstElement *elem,
76           GstPad     *pad,
77           gpointer   *user_data);
78 static void
79 pad_removed(GstElement *elem,
80             GstPad     *pad,
81             gpointer   *user_data);
82 static void
83 no_more_pads(GstElement *elem,
84              gpointer   user_data);
85 static gboolean
86 bus_watch(GstBus     *bus,
87           GstMessage *msg,
88           gpointer   user_data);
89 static gboolean
90 position_timer(gpointer);
91
92 G_DEFINE_TYPE(OctopusBackendGst, octopus_backend_gst, OCTOPUS_TYPE_BACKEND)
93
94 static void
95 octopus_backend_gst_class_init(OctopusBackendGstClass *klass)
96 {
97   OctopusBackendClass *backend;
98
99   backend = OCTOPUS_BACKEND_CLASS(klass);
100
101   backend->validate_component = validate_component;
102   backend->can_link_components = can_link_components;
103   backend->init_route = init_route;
104   backend->destroy_route = destroy_route;
105   backend->play_route = play_route;
106   backend->pause_route = pause_route;
107   backend->stop_route = stop_route;
108 }
109
110 static void
111 octopus_backend_gst_init(OctopusBackendGst *backend)
112 {
113   OctopusBackend *base;
114
115   base = OCTOPUS_BACKEND(backend);
116
117   base->name = "gstreamer";
118 }
119
120 /*
121  * Helper functions
122  */
123
124 static gboolean
125 can_maybe_sink_caps(GstElementFactory *factory,
126                     GstCaps *caps)
127 {
128   const GList *pads;
129   gboolean    result = FALSE;
130
131   pads = gst_element_factory_get_static_pad_templates(factory);
132   for(; pads; pads = pads->next) {
133     GstStaticPadTemplate *tmpl = pads->data;
134     if(tmpl->direction == GST_PAD_SINK && tmpl->presence == GST_PAD_ALWAYS) {
135       caps = gst_caps_intersect(caps, gst_static_pad_template_get_caps(tmpl));
136       result = !gst_caps_is_empty(caps);
137       gst_caps_unref(caps);
138       break;
139     }
140   }
141
142   return result;
143 }
144
145 /*
146  * OctopusBackend implementation
147  */
148
149 static gboolean
150 validate_component(OctopusBackend   *backend,
151                    OctopusComponent *comp)
152 {
153   GSList            *iter;
154
155   for(iter = comp->elements; iter; iter = iter->next) {
156     GstElementFactory *factory;
157     
158     factory = gst_element_factory_find(((OctopusElement *)iter->data)->name);
159     if(!factory)
160       return FALSE;
161     gst_object_unref(factory);
162   }
163
164   return TRUE;
165 }
166
167 static gboolean
168 can_link_components(OctopusBackend   *backend,
169                     OctopusComponent *comp1,
170                     OctopusComponent *comp2)
171 {
172   OctopusElement    *elem;
173   GstElementFactory *factory;
174   const GList       *pads;
175   GstCaps           *caps = NULL;
176   gboolean          result = FALSE;
177
178   elem = (OctopusElement *)g_slist_last(comp1->elements)->data;
179   factory = gst_element_factory_find(elem->name);
180
181   pads = gst_element_factory_get_static_pad_templates(factory);
182   for(; pads; pads = pads->next) {
183     GstStaticPadTemplate *tmpl = pads->data;
184     if(tmpl->direction == GST_PAD_SRC && tmpl->presence == GST_PAD_ALWAYS) {
185       caps = gst_static_pad_template_get_caps(tmpl);
186       break;
187     }
188   }
189   gst_object_unref(factory);
190
191   if(caps) {
192     if(gst_caps_is_any(caps))
193       return FALSE;
194
195     elem = (OctopusElement *)comp2->elements->data;
196     factory = gst_element_factory_find(elem->name);
197     result = can_maybe_sink_caps(factory, caps);
198     gst_object_unref(factory);
199   }
200
201   return result;
202 }
203
204 gboolean
205 init_route(OctopusBackend *backend,
206            OctopusRoute   *route)
207 {
208   RouteDataGst *data;
209   GstBus       *bus;
210
211   data = g_new0(RouteDataGst, 1);
212   data->pipeline = gst_element_factory_make("pipeline", NULL);
213   g_object_set_data(G_OBJECT(route), "data", data);
214
215   g_signal_connect(route, "mapping-changed", G_CALLBACK(route_mapping_changed), 0);
216   g_signal_connect(route, "endpoint-changed", G_CALLBACK(route_endpoint_changed), 0);
217
218   bus = gst_pipeline_get_bus(GST_PIPELINE(data->pipeline));
219   gst_bus_add_watch(bus, bus_watch, route);
220
221   return TRUE;
222 }
223
224 static void
225 destroy_route(OctopusBackend *backend,
226               OctopusRoute   *route)
227 {
228   RouteDataGst *data;
229
230   stop_route(backend, route);
231   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
232   gst_object_unref(GST_OBJECT(data->pipeline));
233 }
234
235 static gboolean
236 play_route(OctopusBackend *backend,
237            OctopusRoute   *route)
238 {
239   RouteDataGst         *data;
240   GstStateChangeReturn ret;
241
242   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
243   if(!data->ready && !data->fakesink) {
244     /* Keep the pipeline in an async state change while building it */
245     data->fakesink = gst_element_factory_make("fakesink", NULL);
246     gst_bin_add(GST_BIN(data->pipeline), data->fakesink);
247   }
248
249   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PLAYING);
250   if(ret == GST_STATE_CHANGE_FAILURE) {
251     g_debug("Pipeline failed to start playback");
252     return FALSE;
253   } else if(ret != GST_STATE_CHANGE_ASYNC) {
254     octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
255   }
256
257   if(!data->timeout_tag)
258     data->timeout_tag = g_timeout_add(1000, position_timer, route);
259
260   return TRUE;
261 }
262
263 static gboolean
264 pause_route(OctopusBackend *backend,
265             OctopusRoute   *route)
266 {
267   RouteDataGst         *data;
268   GstStateChangeReturn ret;
269
270   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
271
272   if(data->timeout_tag) {
273     g_source_remove(data->timeout_tag);
274     data->timeout_tag = 0;
275   }
276
277   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PAUSED);
278   if(ret == GST_STATE_CHANGE_FAILURE) {
279     g_debug("Pipeline failed to pause playback");
280     return FALSE;
281   } else if(ret != GST_STATE_CHANGE_ASYNC) {
282     octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
283   }
284
285   return TRUE;
286 }
287
288 static gboolean
289 stop_route(OctopusBackend *backend,
290            OctopusRoute   *route)
291 {
292   RouteDataGst         *data;
293   GstStateChangeReturn ret;
294
295   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
296
297   if(data->timeout_tag) {
298     g_source_remove(data->timeout_tag);
299     data->timeout_tag = 0;
300   }
301
302   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_NULL);
303   if(ret == GST_STATE_CHANGE_FAILURE) {
304     g_debug("Pipeline failed to stop playback");
305     return FALSE;
306   } else if(ret != GST_STATE_CHANGE_ASYNC) {
307     octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
308   }
309
310   return TRUE;
311 }
312
313 /*
314  * Private functions
315  */
316
317 static void
318 configure_endpoint(GstElement            *elem,
319                    const OctopusEndpoint *endpoint)
320 {
321   if(endpoint->uri) {
322     g_debug("Setting URI '%s'", endpoint->uri);
323     if(GST_IS_URI_HANDLER(elem)) {
324       GstURIHandler *uri_handler;
325
326       uri_handler = GST_URI_HANDLER(elem);
327       gst_uri_handler_set_uri(uri_handler, endpoint->uri);
328     } else {
329       gchar *loc;
330       
331       loc=gst_uri_get_location(endpoint->uri);
332       
333       if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "location"))
334         g_object_set(G_OBJECT(elem), "location", loc, NULL);
335       else if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "file-name"))
336         g_object_set(G_OBJECT(elem), "file-name", loc, NULL);
337       else
338         g_warning("Don't know how to set URI");
339     }
340   }
341
342   if(endpoint->window) {
343     g_debug("Setting X11 display ('%s', %lx)", endpoint->display, endpoint->window);
344     g_object_set(G_OBJECT(elem), "display", endpoint->display, NULL);
345     gst_x_overlay_set_xwindow_id(GST_X_OVERLAY(elem), endpoint->window);
346   }
347 }
348
349 static GstElement *
350 realize_element(OctopusRoute   *route,
351                 OctopusElement *oct_elem)
352 {
353   GstElement *gst_elem;
354   GList      *iter;
355   GSList     *iter2;
356
357   g_debug("Realizing element '%s'", oct_elem->name);
358   gst_elem = gst_element_factory_make(oct_elem->name, oct_elem->endpoint);
359
360   if(oct_elem->endpoint) {
361     const OctopusEndpoint *endpoint;
362     if((endpoint = octopus_route_get_endpoint(route, oct_elem->endpoint)))
363       configure_endpoint(gst_elem, endpoint);
364   }
365
366   for(iter2 = oct_elem->parameters; iter2; iter2 = iter2->next) {
367     OctopusParameter *param = (OctopusParameter *)iter2->data;
368     GParamSpec       *pspec;
369
370     pspec = g_object_class_find_property(G_OBJECT_GET_CLASS(gst_elem), param->name);
371     if(pspec) {
372       if(pspec->value_type == G_TYPE_INT) {
373         g_object_set(G_OBJECT(gst_elem), param->name, strtol(param->value, NULL, 0), NULL);
374       } else if(pspec->value_type == G_TYPE_UINT) {
375         g_object_set(G_OBJECT(gst_elem), param->name, strtoul(param->value, NULL, 0), NULL);
376       } else if(pspec->value_type == G_TYPE_INT64) {
377         g_object_set(G_OBJECT(gst_elem), param->name, strtoll(param->value, NULL, 0), NULL);
378       } else if(pspec->value_type == G_TYPE_UINT64) {
379         g_object_set(G_OBJECT(gst_elem), param->name, strtoull(param->value, NULL, 0), NULL);
380       } else if(pspec->value_type == G_TYPE_BOOLEAN) {
381         if(!strcmp(param->value, "true"))
382           g_object_set(G_OBJECT(gst_elem), param->name, TRUE, NULL);
383         else if(!strcmp(param->value, "false"))
384           g_object_set(G_OBJECT(gst_elem), param->name, FALSE, NULL);
385       } else if(pspec->value_type == G_TYPE_STRING) {
386         g_object_set(G_OBJECT(gst_elem), param->name, param->value, NULL);
387       }
388     } else {
389       g_debug("Unknown parameter '%s' specified for element '%s'", param->name, oct_elem->name);
390     }
391   }
392
393   /* XXX better way to figure out if it's a typefind? */
394   if(g_str_has_prefix(oct_elem->name, "typefind")) {
395     g_debug("Connecting to 'have-type' signal");
396     g_signal_connect(G_OBJECT(gst_elem), "have-type", G_CALLBACK(have_type), route);
397   }
398
399   iter = gst_element_class_get_pad_template_list(GST_ELEMENT_GET_CLASS(gst_elem));
400   for(; iter; iter = iter->next) {
401     GstPadTemplate *tmpl = (GstPadTemplate *)iter->data;
402     if(GST_PAD_TEMPLATE_DIRECTION(tmpl) == GST_PAD_SRC
403        && GST_PAD_TEMPLATE_PRESENCE(tmpl) == GST_PAD_SOMETIMES)
404     {
405       g_debug("Connecting to dynamic pad signals");
406       g_signal_connect(G_OBJECT(gst_elem), "pad-added", G_CALLBACK(pad_added), route);
407       g_signal_connect(G_OBJECT(gst_elem), "pad-removed", G_CALLBACK(pad_removed), route);
408       g_signal_connect(G_OBJECT(gst_elem), "no-more-pads", G_CALLBACK(no_more_pads), route);
409       break;
410     }
411   }
412
413   return gst_elem;
414 }
415
416 static GstPad *
417 realize_component(OctopusBackend   *backend,
418                   OctopusRoute     *route,
419                   OctopusComponent *component,
420                   GstPad           *srcpad)
421 {
422   RouteDataGst *data;
423   GSList       *iter;
424   GstState     state;
425
426   g_debug("Realizing component '%s'", component->name);
427
428   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
429
430   state = GST_STATE(data->pipeline);
431   if(state != GST_STATE_NULL)
432     state = GST_STATE_PAUSED;
433
434   for(iter = component->elements; iter; iter = iter->next) {
435     OctopusElement *oct_elem = (OctopusElement *)iter->data;
436     GstElement     *gst_elem;
437
438     gst_elem = realize_element(route, oct_elem);
439     gst_bin_add(GST_BIN(data->pipeline), gst_elem);
440
441     if(srcpad) {
442       GstPad *sinkpad;
443
444       if((sinkpad = gst_element_get_static_pad(gst_elem, "sink"))) {
445         if(GST_PAD_LINK_FAILED(gst_pad_link(srcpad, sinkpad)))
446           g_debug("Linking pads failed - component '%s' may be broken", component->name);
447         gst_object_unref(sinkpad);
448       }
449
450       gst_object_unref(srcpad);
451     }
452     gst_element_set_state(gst_elem, state);
453
454     srcpad = gst_element_get_static_pad(gst_elem, "src");
455   }
456
457   return srcpad;
458 }
459
460 static void
461 realize_component_chain(OctopusBackend *backend,
462                         OctopusRoute   *route,
463                         GSList         *chain,
464                         GstPad         *srcpad)
465 {
466   GSList *iter;
467
468   if(srcpad)
469     gst_object_ref(srcpad);
470   for(iter = chain; iter; iter = iter->next)
471     srcpad = realize_component(backend, route, (OctopusComponent *)iter->data, srcpad);
472   if(srcpad)
473     gst_object_unref(srcpad);
474 }
475
476 static void
477 continue_route_from_pad(OctopusBackend *backend,
478                         OctopusRoute   *route,
479                         GstPad         *srcpad)
480 {
481   GstCaps *caps;
482   GSList  *iter;
483   gchar   **sinks;
484   GSList  *chain = NULL;
485
486   caps = gst_pad_get_caps(srcpad);
487   g_object_get(G_OBJECT(route), "sinks", &sinks, NULL);
488
489   for(iter = backend->components; (!chain && iter); iter = iter->next) {
490     OctopusComponent  *comp = (OctopusComponent *)iter->data;
491     OctopusElement    *elem = (OctopusElement *)comp->elements->data;
492     GstElementFactory *factory;
493
494     factory = gst_element_factory_find(elem->name);
495     if(can_maybe_sink_caps(factory, caps)) {
496       unsigned j;
497
498       g_debug("%s[%s] can sink this", comp->name, elem->name);
499
500       for(j = 0; (!chain && sinks[j]); ++j)
501         chain = octopus_backend_build_component_chain(backend, comp, sinks[j]);
502     }
503     gst_object_unref(factory);
504   }
505   gst_caps_unref(caps);
506
507   if(chain) {
508     realize_component_chain(backend, route, chain, srcpad);
509     g_slist_free(chain);
510   } else {
511     g_debug("Unable to continue route");
512     octopus_route_playback_error(route, "Unable to build route: unsupported media type");
513   }
514 }
515
516 static void
517 route_mapping_changed(OctopusRoute *route,
518                       gpointer user_data)
519 {
520   OctopusBackend   *backend;
521   OctopusComponent *component;
522   gchar            **sources;
523   gchar            **sinks;
524   unsigned         i;
525
526   g_debug("Route mapping changed");
527
528   g_object_get(G_OBJECT(route), "backend", &backend,
529                                 "sources", &sources,
530                                 "sinks", &sinks, NULL);
531   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
532
533   for(i = 0; sources[i]; ++i) {
534     GSList   *chain = NULL;
535     unsigned j;
536
537     component = octopus_backend_get_component_by_endpoint(backend, sources[i], OCTOPUS_ENDPOINT_SOURCE);
538     if(!component)
539       continue;
540
541     for(j = 0; sinks[j]; ++j) {
542       if((chain = octopus_backend_build_component_chain(backend, component, sinks[j]))) {
543         realize_component_chain(backend, route, chain, NULL);
544         g_slist_free(chain);
545         break;
546       }
547     }
548   }
549
550   /* TODO */
551 }
552
553 static void
554 route_endpoint_changed(OctopusRoute    *route,
555                        OctopusEndpoint *endpoint,
556                        gpointer        user_data)
557 {
558   RouteDataGst *data;
559   GstIterator  *iter;
560   gpointer     element;
561   gboolean     found = FALSE;
562
563   g_debug("Route endpoint '%s' changed", endpoint->name);
564
565   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
566
567   iter = gst_bin_iterate_recurse(GST_BIN(data->pipeline));
568   while(!found && gst_iterator_next(iter, &element) == GST_ITERATOR_OK) {
569     gchar *elem_name;
570
571     g_object_get(G_OBJECT(element), "name", &elem_name, NULL);
572     if(g_str_has_prefix(elem_name, endpoint->name)) {
573       configure_endpoint(element, endpoint);
574       found = TRUE;
575     }
576
577     g_free(elem_name);
578     gst_object_unref(element);
579   }
580   gst_iterator_free(iter);
581 }
582
583 static void
584 have_type(GstTypeFind *typefind,
585           guint       probability,
586           GstCaps     *caps,
587           gpointer    user_data)
588 {
589   OctopusRoute   *route;
590   OctopusBackend *backend;
591   GstPad         *pad;
592   gchar          *caps_str;
593
594   route = OCTOPUS_ROUTE(user_data);
595   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
596   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
597
598   caps_str = gst_caps_to_string(caps);
599   g_debug("Have type: %s", caps_str);
600   g_free(caps_str);
601
602   pad = gst_element_get_static_pad(GST_ELEMENT(typefind), "src");
603   continue_route_from_pad(backend, route, pad);
604   g_object_unref(pad);
605 }
606
607 static void
608 pad_added(GstElement *elem,
609           GstPad     *pad,
610           gpointer   *user_data)
611 {
612   OctopusRoute   *route;
613   OctopusBackend *backend;
614   RouteDataGst   *data;
615   gchar          *elem_name;
616   gchar          *pad_name;
617   GstCaps        *caps;
618   gchar          *caps_str;
619   GstElement     *queue;
620   GstPad         *qpad;
621
622   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
623   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
624   caps = gst_pad_get_caps(pad);
625   caps_str = gst_caps_to_string(caps);
626   g_debug("Pad '%s' added to '%s', caps: %s", pad_name, elem_name, caps_str);
627   gst_caps_unref(caps);
628   g_free(caps_str);
629   g_free(elem_name);
630   g_free(pad_name);
631
632   route = OCTOPUS_ROUTE(user_data);
633   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
634   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
635
636   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
637
638   queue = gst_element_factory_make("queue", NULL);
639   gst_bin_add(GST_BIN(data->pipeline), queue);
640   gst_element_set_state(queue, GST_STATE_PAUSED);
641   qpad = gst_element_get_static_pad(queue, "sink");
642   gst_pad_link(pad, qpad);
643   gst_object_unref(qpad);
644
645   qpad = gst_element_get_static_pad(queue, "src");
646   continue_route_from_pad(backend, route, qpad);
647   gst_object_unref(qpad);
648 }
649
650 static void
651 pad_removed(GstElement *elem,
652             GstPad     *pad,
653             gpointer   *user_data)
654 {
655   OctopusRoute *route;
656   RouteDataGst *data;
657   gchar        *elem_name;
658   gchar        *pad_name;
659
660   if(gst_pad_get_direction(pad) != GST_PAD_SRC)
661     return;
662
663   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
664   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
665   g_debug("Pad '%s' removed from '%s'", pad_name, elem_name);
666   g_free(elem_name);
667   g_free(pad_name);
668
669   route = OCTOPUS_ROUTE(user_data);
670   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
671   data->ready = FALSE;
672
673   while(1) {
674     GstIterator *iter;
675     gpointer    elem;
676     GSList      *stale = NULL;
677     GSList      *ptr;
678
679     /* Find any elements that have a sink pad with nothing connected */
680     iter = gst_bin_iterate_elements(GST_BIN(data->pipeline));
681     while(gst_iterator_next(iter, &elem) != GST_ITERATOR_DONE) {
682       GstPad *pad = gst_element_get_static_pad(GST_ELEMENT(elem), "sink");
683       if(pad) {
684         GstPad *peer = gst_pad_get_peer(pad);
685         if(peer)
686           gst_object_unref(peer);
687         else
688           stale = g_slist_prepend(stale, elem);
689         gst_object_unref(pad);
690       }
691       gst_object_unref(GST_OBJECT(elem));
692     }
693     gst_iterator_free(iter);
694
695     /* No stale elements found - we're done */
696     if(!stale)
697       break;
698
699     for(ptr = stale; ptr; ptr = ptr->next) {
700       GstElement *elem;
701       gchar      *name;
702
703       elem = GST_ELEMENT(ptr->data);
704       g_object_get(G_OBJECT(elem), "name", &name, NULL);
705       g_debug("Removing element '%s'", name);
706       gst_element_set_state(elem, GST_STATE_NULL);
707       gst_bin_remove(GST_BIN(data->pipeline), elem);
708     }
709   }
710 }
711
712 static void
713 no_more_pads(GstElement *elem,
714              gpointer   user_data)
715 {
716   OctopusRoute *route;
717   RouteDataGst *data;
718   gchar        *elem_name;
719
720   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
721   g_debug("No more pads from '%s'", elem_name);
722   g_free(elem_name);
723
724   route = OCTOPUS_ROUTE(user_data);
725   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
726   if(data->fakesink) {
727     gst_element_set_state(data->fakesink, GST_STATE_NULL);
728     gst_bin_remove(GST_BIN(data->pipeline), data->fakesink);
729     data->fakesink = NULL;
730     data->ready = TRUE;
731   }
732 }
733
734 static gboolean
735 bus_watch(GstBus     *bus,
736           GstMessage *msg,
737           gpointer   user_data)
738 {
739   OctopusRoute   *route;
740   RouteDataGst   *data;
741   OctopusBackend *backend;
742   
743   route = OCTOPUS_ROUTE(user_data);
744   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
745   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
746   
747   switch(GST_MESSAGE_TYPE(msg)) {
748   case GST_MESSAGE_ERROR: {
749     GError *error = NULL;
750
751     gst_message_parse_error(msg, &error, NULL);
752     g_debug("GST ERROR: %s", error->message);
753     octopus_route_playback_error(route, error->message);
754     g_error_free(error);
755     stop_route(backend, route);
756     break; }
757
758   case GST_MESSAGE_EOS:
759     g_debug("GST EOS");
760     stop_route(backend, route);
761     break;
762
763   case GST_MESSAGE_STATE_CHANGED: {
764     GstElement *elem;
765     gchar      *name;
766
767     GstState previous, state, pending;
768     gst_message_parse_state_changed (msg, &previous, &state, &pending);
769     elem = GST_ELEMENT(GST_MESSAGE_SRC(msg));
770     name = gst_element_get_name(elem);
771     g_debug("GST STATE CHANGED: %s %s->%s, pending %s", name, gst_element_state_get_name(previous), gst_element_state_get_name(state), gst_element_state_get_name(pending));
772     g_free(name);
773
774     if(elem == data->pipeline && pending == GST_STATE_VOID_PENDING) {
775       if(state == GST_STATE_NULL)
776         octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
777       else if(state == GST_STATE_PAUSED)
778         octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
779       else if(state == GST_STATE_PLAYING)
780         octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
781     }
782     break; }
783
784   default:;
785     g_debug("Unhandled GST message of type %d", GST_MESSAGE_TYPE(msg));
786   }
787
788   return TRUE;
789 }
790
791 static gboolean
792 position_timer(gpointer user_data)
793 {
794   OctopusRoute *route = (OctopusRoute *)user_data;
795   RouteDataGst *data;
796   GstFormat    fmt = GST_FORMAT_TIME;
797   gint64       pos = 0;
798   gint64       dur = 0;
799
800   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
801   gst_element_query_position(data->pipeline, &fmt, &pos);
802   gst_element_query_duration(data->pipeline, &fmt, &dur);
803
804   octopus_route_stream_position(route, (guint)(pos/1000000000), (guint)(dur/1000000000));
805
806   return TRUE;
807 }
808
809 // vim: filetype=c:expandtab:shiftwidth=2:tabstop=2:softtabstop=2