59a24091f4abb9443d7324c386802e3f1edc1b0e
[octopus.git] / src / octopus-backend-gst.c
1 /*   Octopus Media Engine
2  *   Copyright 2008 Movial Creative Technologies Inc
3  *
4  *   Authors: Mikko Rasa, <mikko.rasa@movial.fi>
5  *
6  *   This program is free software; you can redistribute it and/or
7  *   modify it under the terms of the GNU Lesser General Public
8  *   License as published by the Free Software Foundation; either
9  *   version 2.1 of the License, or (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *   Lesser General Public License for more details.
15  *
16  *   You should have received a copy of the GNU Lesser General Public
17  *   License along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
19  */
20 #include <string.h>
21 #include <gst/gst.h>
22 #include <gst/interfaces/xoverlay.h>
23 #include "octopus-backend-gst.h"
24 #include "octopus-module.h"
25 #include "octopus-module-manager.h"
26 #include "octopus-route.h"
27
28 typedef struct _RouteDataGst RouteDataGst;
29
30 struct _RouteDataGst
31 {
32   GstElement *pipeline;
33   GstElement *fakesink;
34   gboolean   ready;
35   guint      timeout_tag;
36 };
37
38 static gboolean
39 validate_component(OctopusBackend   *backend,
40                    OctopusComponent *comp);
41 static gboolean
42 can_link_components(OctopusBackend *backend,
43                     OctopusComponent *comp1,
44                     OctopusComponent *comp2);
45
46 gboolean
47 init_route (OctopusBackend *backend,
48             OctopusRoute   *route);
49 static void
50 destroy_route(OctopusBackend *backend,
51               OctopusRoute   *route);
52 static gboolean
53 play_route(OctopusBackend *backend,
54            OctopusRoute   *route);
55 static gboolean
56 pause_route(OctopusBackend *backend,
57             OctopusRoute   *route);
58 static gboolean
59 stop_route(OctopusBackend *backend,
60            OctopusRoute   *route);
61
62 static void
63 route_mapping_changed(OctopusRoute *route,
64                       gpointer user_data);
65 static void
66 route_endpoint_changed(OctopusRoute    *route,
67                        OctopusEndpoint *endpoint,
68                        gpointer        user_data);
69 static void
70 have_type(GstTypeFind *typefind,
71           guint       probability,
72           GstCaps     *caps,
73           gpointer    user_data);
74 static void
75 pad_added(GstElement *elem,
76           GstPad     *pad,
77           gpointer   *user_data);
78 static void
79 pad_removed(GstElement *elem,
80             GstPad     *pad,
81             gpointer   *user_data);
82 static void
83 no_more_pads(GstElement *elem,
84              gpointer   user_data);
85 static gboolean
86 bus_watch(GstBus     *bus,
87           GstMessage *msg,
88           gpointer   user_data);
89 static gboolean
90 position_timer(gpointer);
91
92 G_DEFINE_TYPE(OctopusBackendGst, octopus_backend_gst, OCTOPUS_TYPE_BACKEND)
93
94 static void
95 octopus_backend_gst_class_init(OctopusBackendGstClass *klass)
96 {
97   OctopusBackendClass *backend;
98
99   backend = OCTOPUS_BACKEND_CLASS(klass);
100
101   backend->validate_component = validate_component;
102   backend->can_link_components = can_link_components;
103   backend->init_route = init_route;
104   backend->destroy_route = destroy_route;
105   backend->play_route = play_route;
106   backend->pause_route = pause_route;
107   backend->stop_route = stop_route;
108 }
109
110 static void
111 octopus_backend_gst_init(OctopusBackendGst *backend)
112 {
113   OctopusBackend *base;
114
115   base = OCTOPUS_BACKEND(backend);
116
117   base->name = "gstreamer";
118 }
119
120 /*
121  * Helper functions
122  */
123
124 static gboolean
125 can_maybe_sink_caps(GstElementFactory *factory,
126                     GstCaps *caps)
127 {
128   const GList *pads;
129   gboolean    result = FALSE;
130
131   pads = gst_element_factory_get_static_pad_templates(factory);
132   for(; pads; pads = pads->next) {
133     GstStaticPadTemplate *tmpl = pads->data;
134     if(tmpl->direction == GST_PAD_SINK && tmpl->presence == GST_PAD_ALWAYS) {
135       caps = gst_caps_intersect(caps, gst_static_pad_template_get_caps(tmpl));
136       result = !gst_caps_is_empty(caps);
137       gst_caps_unref(caps);
138       break;
139     }
140   }
141
142   return result;
143 }
144
145 /*
146  * OctopusBackend implementation
147  */
148
149 static gboolean
150 validate_component(OctopusBackend   *backend,
151                    OctopusComponent *comp)
152 {
153   GSList            *iter;
154
155   for(iter = comp->elements; iter; iter = iter->next) {
156     GstElementFactory *factory;
157     
158     factory = gst_element_factory_find(((OctopusElement *)iter->data)->name);
159     if(!factory)
160       return FALSE;
161     gst_object_unref(factory);
162   }
163
164   return TRUE;
165 }
166
167 static gboolean
168 can_link_components(OctopusBackend   *backend,
169                     OctopusComponent *comp1,
170                     OctopusComponent *comp2)
171 {
172   OctopusElement    *elem;
173   GstElementFactory *factory;
174   const GList       *pads;
175   GstCaps           *caps = NULL;
176   gboolean          result = FALSE;
177
178   elem = (OctopusElement *)g_slist_last(comp1->elements)->data;
179   factory = gst_element_factory_find(elem->name);
180
181   pads = gst_element_factory_get_static_pad_templates(factory);
182   for(; pads; pads = pads->next) {
183     GstStaticPadTemplate *tmpl = pads->data;
184     if(tmpl->direction == GST_PAD_SRC && tmpl->presence == GST_PAD_ALWAYS) {
185       caps = gst_static_pad_template_get_caps(tmpl);
186       break;
187     }
188   }
189   gst_object_unref(factory);
190
191   if(caps) {
192     if(gst_caps_is_any(caps))
193       return FALSE;
194
195     elem = (OctopusElement *)comp2->elements->data;
196     factory = gst_element_factory_find(elem->name);
197     result = can_maybe_sink_caps(factory, caps);
198     gst_object_unref(factory);
199   }
200
201   return result;
202 }
203
204 gboolean
205 init_route(OctopusBackend *backend,
206            OctopusRoute   *route)
207 {
208   RouteDataGst *data;
209   GstBus       *bus;
210
211   data = g_new0(RouteDataGst, 1);
212   data->pipeline = gst_element_factory_make("pipeline", NULL);
213   g_object_set_data(G_OBJECT(route), "data", data);
214
215   g_signal_connect(route, "mapping-changed", G_CALLBACK(route_mapping_changed), 0);
216   g_signal_connect(route, "endpoint-changed", G_CALLBACK(route_endpoint_changed), 0);
217
218   bus = gst_pipeline_get_bus(GST_PIPELINE(data->pipeline));
219   gst_bus_add_watch(bus, bus_watch, route);
220
221   return TRUE;
222 }
223
224 static void
225 destroy_route(OctopusBackend *backend,
226               OctopusRoute   *route)
227 {
228   RouteDataGst *data;
229
230   stop_route(backend, route);
231   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
232   gst_object_unref(GST_OBJECT(data->pipeline));
233 }
234
235 static gboolean
236 play_route(OctopusBackend *backend,
237            OctopusRoute   *route)
238 {
239   RouteDataGst         *data;
240   GstStateChangeReturn ret;
241
242   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
243   if(!data->ready && !data->fakesink) {
244     /* Keep the pipeline in an async state change while building it */
245     data->fakesink = gst_element_factory_make("fakesink", NULL);
246     gst_bin_add(GST_BIN(data->pipeline), data->fakesink);
247   }
248
249   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PLAYING);
250   if(ret == GST_STATE_CHANGE_FAILURE) {
251     g_debug("Pipeline failed to start playback");
252     return FALSE;
253   } else if(ret != GST_STATE_CHANGE_ASYNC) {
254     octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
255   }
256
257   if(!data->timeout_tag)
258     data->timeout_tag = g_timeout_add(1000, position_timer, route);
259
260   return TRUE;
261 }
262
263 static gboolean
264 pause_route(OctopusBackend *backend,
265             OctopusRoute   *route)
266 {
267   RouteDataGst         *data;
268   GstStateChangeReturn ret;
269
270   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
271
272   if(data->timeout_tag) {
273     g_source_remove(data->timeout_tag);
274     data->timeout_tag = 0;
275   }
276
277   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PAUSED);
278   if(ret == GST_STATE_CHANGE_FAILURE) {
279     g_debug("Pipeline failed to pause playback");
280     return FALSE;
281   } else if(ret != GST_STATE_CHANGE_ASYNC) {
282     octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
283   }
284
285   return TRUE;
286 }
287
288 static gboolean
289 stop_route(OctopusBackend *backend,
290            OctopusRoute   *route)
291 {
292   RouteDataGst         *data;
293   GstStateChangeReturn ret;
294
295   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
296
297   if(data->timeout_tag) {
298     g_source_remove(data->timeout_tag);
299     data->timeout_tag = 0;
300   }
301
302   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_NULL);
303   if(ret == GST_STATE_CHANGE_FAILURE) {
304     g_debug("Pipeline failed to stop playback");
305     return FALSE;
306   } else if(ret != GST_STATE_CHANGE_ASYNC) {
307     octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
308   }
309
310   return TRUE;
311 }
312
313 /*
314  * Private functions
315  */
316
317 static void
318 configure_endpoint(GstElement            *elem,
319                    const OctopusEndpoint *endpoint)
320 {
321   if(endpoint->uri) {
322     g_debug("Setting URI '%s'", endpoint->uri);
323     if(GST_IS_URI_HANDLER(elem)) {
324       GstURIHandler *uri_handler;
325
326       uri_handler = GST_URI_HANDLER(elem);
327       gst_uri_handler_set_uri(uri_handler, endpoint->uri);
328     } else {
329       gchar *loc;
330       
331       loc=gst_uri_get_location(endpoint->uri);
332       
333       if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "location"))
334         g_object_set(G_OBJECT(elem), "location", loc, NULL);
335       else if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "file-name"))
336         g_object_set(G_OBJECT(elem), "file-name", loc, NULL);
337       else
338         g_warning("Don't know how to set URI");
339     }
340   }
341
342   if(endpoint->window) {
343     g_debug("Setting X11 display ('%s', %lx)", endpoint->display, endpoint->window);
344     g_object_set(G_OBJECT(elem), "display", endpoint->display, NULL);
345     gst_x_overlay_set_xwindow_id(GST_X_OVERLAY(elem), endpoint->window);
346   }
347 }
348
349 static GstElement *
350 realize_element(OctopusRoute   *route,
351                 OctopusElement *oct_elem)
352 {
353   GstElement *gst_elem;
354   GList      *iter;
355   GSList     *iter2;
356
357   g_debug("Realizing element '%s'", oct_elem->name);
358   gst_elem = gst_element_factory_make(oct_elem->name, oct_elem->endpoint);
359
360   if(oct_elem->endpoint) {
361     const OctopusEndpoint *endpoint;
362     if((endpoint = octopus_route_get_endpoint(route, oct_elem->endpoint)))
363       configure_endpoint(gst_elem, endpoint);
364   }
365
366   for(iter2 = oct_elem->parameters; iter2; iter2 = iter2->next) {
367     OctopusParameter *param = (OctopusParameter *)iter2->data;
368     GParamSpec       *pspec;
369
370     pspec = g_object_class_find_property(G_OBJECT_GET_CLASS(gst_elem), param->name);
371     if(pspec) {
372       if(pspec->value_type == G_TYPE_INT) {
373         g_object_set(G_OBJECT(gst_elem), param->name, strtol(param->value, NULL, 0), NULL);
374       } else if(pspec->value_type == G_TYPE_UINT) {
375         g_object_set(G_OBJECT(gst_elem), param->name, strtoul(param->value, NULL, 0), NULL);
376       } else if(pspec->value_type == G_TYPE_INT64) {
377         g_object_set(G_OBJECT(gst_elem), param->name, strtoll(param->value, NULL, 0), NULL);
378       } else if(pspec->value_type == G_TYPE_UINT64) {
379         g_object_set(G_OBJECT(gst_elem), param->name, strtoull(param->value, NULL, 0), NULL);
380       } else if(pspec->value_type == G_TYPE_BOOLEAN) {
381         if(!strcmp(param->value, "true"))
382           g_object_set(G_OBJECT(gst_elem), param->name, TRUE, NULL);
383         else if(!strcmp(param->value, "false"))
384           g_object_set(G_OBJECT(gst_elem), param->name, FALSE, NULL);
385       } else if(pspec->value_type == G_TYPE_STRING) {
386         g_object_set(G_OBJECT(gst_elem), param->name, param->value, NULL);
387       }
388     } else {
389       g_debug("Unknown parameter '%s' specified for element '%s'", param->name, oct_elem->name);
390     }
391   }
392
393   /* XXX better way to figure out if it's a typefind? */
394   if(g_str_has_prefix(oct_elem->name, "typefind")) {
395     g_debug("Connecting to 'have-type' signal");
396     g_signal_connect(G_OBJECT(gst_elem), "have-type", G_CALLBACK(have_type), route);
397   }
398
399   iter = gst_element_class_get_pad_template_list(GST_ELEMENT_GET_CLASS(gst_elem));
400   for(; iter; iter = iter->next) {
401     GstPadTemplate *tmpl = (GstPadTemplate *)iter->data;
402     if(GST_PAD_TEMPLATE_DIRECTION(tmpl) == GST_PAD_SRC
403        && GST_PAD_TEMPLATE_PRESENCE(tmpl) == GST_PAD_SOMETIMES)
404     {
405       g_debug("Connecting to dynamic pad signals");
406       g_signal_connect(G_OBJECT(gst_elem), "pad-added", G_CALLBACK(pad_added), route);
407       g_signal_connect(G_OBJECT(gst_elem), "pad-removed", G_CALLBACK(pad_removed), route);
408       g_signal_connect(G_OBJECT(gst_elem), "no-more-pads", G_CALLBACK(no_more_pads), route);
409       break;
410     }
411   }
412
413   return gst_elem;
414 }
415
416 static GstPad *
417 realize_component(OctopusBackend   *backend,
418                   OctopusRoute     *route,
419                   OctopusComponent *component,
420                   GstPad           *srcpad)
421 {
422   RouteDataGst *data;
423   GSList       *iter;
424   GstState     state;
425
426   g_debug("Realizing component '%s'", component->name);
427
428   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
429
430   state = GST_STATE(data->pipeline);
431   if(state != GST_STATE_NULL)
432     state = GST_STATE_PAUSED;
433
434   for(iter = component->elements; iter; iter = iter->next) {
435     OctopusElement *oct_elem = (OctopusElement *)iter->data;
436     GstElement     *gst_elem;
437
438     gst_elem = realize_element(route, oct_elem);
439     gst_bin_add(GST_BIN(data->pipeline), gst_elem);
440
441     if(srcpad) {
442       GstPad *sinkpad;
443
444       if((sinkpad = gst_element_get_static_pad(gst_elem, "sink"))) {
445         if(GST_PAD_LINK_FAILED(gst_pad_link(srcpad, sinkpad)))
446           g_debug("Linking pads failed - component '%s' may be broken", component->name);
447         gst_object_unref(sinkpad);
448       }
449
450       gst_object_unref(srcpad);
451     }
452     gst_element_set_state(gst_elem, state);
453
454     srcpad = gst_element_get_static_pad(gst_elem, "src");
455   }
456
457   return srcpad;
458 }
459
460 static void
461 realize_component_chain(OctopusBackend *backend,
462                         OctopusRoute   *route,
463                         GSList         *chain,
464                         GstPad         *srcpad)
465 {
466   GSList *iter;
467
468   if(srcpad)
469     gst_object_ref(srcpad);
470   for(iter = chain; iter; iter = iter->next)
471     srcpad = realize_component(backend, route, (OctopusComponent *)iter->data, srcpad);
472   if(srcpad)
473     gst_object_unref(srcpad);
474 }
475
476 static void
477 continue_route_from_pad(OctopusBackend *backend,
478                         OctopusRoute   *route,
479                         GstPad         *srcpad)
480 {
481   GstCaps *caps;
482   GSList  *iter;
483   gchar   **sinks;
484   GSList  *chain = NULL;
485
486   caps = gst_pad_get_caps(srcpad);
487   g_object_get(G_OBJECT(route), "sinks", &sinks, NULL);
488
489   for(iter = backend->components; (!chain && iter); iter = iter->next) {
490     OctopusComponent  *comp = (OctopusComponent *)iter->data;
491     OctopusElement    *elem = (OctopusElement *)comp->elements->data;
492     GstElementFactory *factory;
493
494     factory = gst_element_factory_find(elem->name);
495     if(can_maybe_sink_caps(factory, caps)) {
496       unsigned j;
497
498       g_debug("%s[%s] can sink this", comp->name, elem->name);
499
500       for(j = 0; (!chain && sinks[j]); ++j)
501         chain = octopus_backend_build_component_chain(backend, comp, sinks[j]);
502     }
503     gst_object_unref(factory);
504   }
505   gst_caps_unref(caps);
506
507   if(chain) {
508     realize_component_chain(backend, route, chain, srcpad);
509     g_slist_free(chain);
510   } else {
511     g_debug("Unable to continue route");
512     octopus_route_playback_error(route, "Unable to build route: unsupported media type");
513   }
514 }
515
516 static void
517 route_mapping_changed(OctopusRoute *route,
518                       gpointer user_data)
519 {
520   OctopusBackend   *backend;
521   OctopusComponent *component;
522   gchar            **sources;
523   gchar            **sinks;
524   gchar            *pipeline_desc;
525   unsigned         i;
526
527   g_debug("Route mapping changed");
528
529   g_object_get(G_OBJECT(route), "backend", &backend,
530                                 "sources", &sources,
531                                 "sinks", &sinks,
532                                 "pipeline-description", &pipeline_desc, NULL);
533   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
534
535   if(pipeline_desc) {
536     RouteDataGst *data;
537     GError       *err;
538
539     g_debug("Using forced pipeline description: %s", pipeline_desc);
540
541     data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
542     if(data->pipeline) {
543       gst_element_set_state(data->pipeline, GST_STATE_NULL);
544       gst_object_unref(data->pipeline);
545     }
546
547     data->pipeline = gst_parse_launch(pipeline_desc, &err);
548     if(data->pipeline)
549       data->ready = TRUE;
550     else
551       g_warning("Pipeline is invalid: %s", err->message);
552
553   } else {
554     for(i = 0; sources[i]; ++i) {
555       GSList   *chain = NULL;
556       unsigned j;
557
558       component = octopus_backend_get_component_by_endpoint(backend, sources[i], OCTOPUS_ENDPOINT_SOURCE);
559       if(!component)
560         continue;
561
562       for(j = 0; sinks[j]; ++j) {
563         if((chain = octopus_backend_build_component_chain(backend, component, sinks[j]))) {
564           realize_component_chain(backend, route, chain, NULL);
565           g_slist_free(chain);
566           break;
567         }
568       }
569     }
570   }
571 }
572
573 static void
574 route_endpoint_changed(OctopusRoute    *route,
575                        OctopusEndpoint *endpoint,
576                        gpointer        user_data)
577 {
578   RouteDataGst *data;
579   GstIterator  *iter;
580   gpointer     element;
581   gboolean     found = FALSE;
582
583   g_debug("Route endpoint '%s' changed", endpoint->name);
584
585   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
586
587   iter = gst_bin_iterate_recurse(GST_BIN(data->pipeline));
588   while(!found && gst_iterator_next(iter, &element) == GST_ITERATOR_OK) {
589     gchar *elem_name;
590
591     g_object_get(G_OBJECT(element), "name", &elem_name, NULL);
592     if(g_str_has_prefix(elem_name, endpoint->name)) {
593       configure_endpoint(element, endpoint);
594       found = TRUE;
595     }
596
597     g_free(elem_name);
598     gst_object_unref(element);
599   }
600   gst_iterator_free(iter);
601 }
602
603 static void
604 have_type(GstTypeFind *typefind,
605           guint       probability,
606           GstCaps     *caps,
607           gpointer    user_data)
608 {
609   OctopusRoute   *route;
610   OctopusBackend *backend;
611   GstPad         *pad;
612   gchar          *caps_str;
613
614   route = OCTOPUS_ROUTE(user_data);
615   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
616   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
617
618   caps_str = gst_caps_to_string(caps);
619   g_debug("Have type: %s", caps_str);
620   g_free(caps_str);
621
622   pad = gst_element_get_static_pad(GST_ELEMENT(typefind), "src");
623   continue_route_from_pad(backend, route, pad);
624   g_object_unref(pad);
625 }
626
627 static void
628 pad_added(GstElement *elem,
629           GstPad     *pad,
630           gpointer   *user_data)
631 {
632   OctopusRoute   *route;
633   OctopusBackend *backend;
634   RouteDataGst   *data;
635   gchar          *elem_name;
636   gchar          *pad_name;
637   GstCaps        *caps;
638   gchar          *caps_str;
639   GstElement     *queue;
640   GstPad         *qpad;
641
642   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
643   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
644   caps = gst_pad_get_caps(pad);
645   caps_str = gst_caps_to_string(caps);
646   g_debug("Pad '%s' added to '%s', caps: %s", pad_name, elem_name, caps_str);
647   gst_caps_unref(caps);
648   g_free(caps_str);
649   g_free(elem_name);
650   g_free(pad_name);
651
652   route = OCTOPUS_ROUTE(user_data);
653   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
654   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
655
656   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
657
658   queue = gst_element_factory_make("queue", NULL);
659   gst_bin_add(GST_BIN(data->pipeline), queue);
660   gst_element_set_state(queue, GST_STATE_PAUSED);
661   qpad = gst_element_get_static_pad(queue, "sink");
662   gst_pad_link(pad, qpad);
663   gst_object_unref(qpad);
664
665   qpad = gst_element_get_static_pad(queue, "src");
666   continue_route_from_pad(backend, route, qpad);
667   gst_object_unref(qpad);
668 }
669
670 static void
671 pad_removed(GstElement *elem,
672             GstPad     *pad,
673             gpointer   *user_data)
674 {
675   OctopusRoute *route;
676   RouteDataGst *data;
677   gchar        *elem_name;
678   gchar        *pad_name;
679
680   if(gst_pad_get_direction(pad) != GST_PAD_SRC)
681     return;
682
683   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
684   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
685   g_debug("Pad '%s' removed from '%s'", pad_name, elem_name);
686   g_free(elem_name);
687   g_free(pad_name);
688
689   route = OCTOPUS_ROUTE(user_data);
690   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
691   data->ready = FALSE;
692
693   while(1) {
694     GstIterator *iter;
695     gpointer    elem;
696     GSList      *stale = NULL;
697     GSList      *ptr;
698
699     /* Find any elements that have a sink pad with nothing connected */
700     iter = gst_bin_iterate_elements(GST_BIN(data->pipeline));
701     while(gst_iterator_next(iter, &elem) != GST_ITERATOR_DONE) {
702       GstPad *pad = gst_element_get_static_pad(GST_ELEMENT(elem), "sink");
703       if(pad) {
704         GstPad *peer = gst_pad_get_peer(pad);
705         if(peer)
706           gst_object_unref(peer);
707         else
708           stale = g_slist_prepend(stale, elem);
709         gst_object_unref(pad);
710       }
711       gst_object_unref(GST_OBJECT(elem));
712     }
713     gst_iterator_free(iter);
714
715     /* No stale elements found - we're done */
716     if(!stale)
717       break;
718
719     for(ptr = stale; ptr; ptr = ptr->next) {
720       GstElement *elem;
721       gchar      *name;
722
723       elem = GST_ELEMENT(ptr->data);
724       g_object_get(G_OBJECT(elem), "name", &name, NULL);
725       g_debug("Removing element '%s'", name);
726       gst_element_set_state(elem, GST_STATE_NULL);
727       gst_bin_remove(GST_BIN(data->pipeline), elem);
728     }
729   }
730 }
731
732 static void
733 no_more_pads(GstElement *elem,
734              gpointer   user_data)
735 {
736   OctopusRoute *route;
737   RouteDataGst *data;
738   gchar        *elem_name;
739
740   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
741   g_debug("No more pads from '%s'", elem_name);
742   g_free(elem_name);
743
744   route = OCTOPUS_ROUTE(user_data);
745   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
746   if(data->fakesink) {
747     gst_element_set_state(data->fakesink, GST_STATE_NULL);
748     gst_bin_remove(GST_BIN(data->pipeline), data->fakesink);
749     data->fakesink = NULL;
750     data->ready = TRUE;
751   }
752 }
753
754 static gboolean
755 bus_watch(GstBus     *bus,
756           GstMessage *msg,
757           gpointer   user_data)
758 {
759   OctopusRoute   *route;
760   RouteDataGst   *data;
761   OctopusBackend *backend;
762   
763   route = OCTOPUS_ROUTE(user_data);
764   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
765   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
766   
767   switch(GST_MESSAGE_TYPE(msg)) {
768   case GST_MESSAGE_ERROR: {
769     GError *error = NULL;
770
771     gst_message_parse_error(msg, &error, NULL);
772     g_debug("GST ERROR: %s", error->message);
773     octopus_route_playback_error(route, error->message);
774     g_error_free(error);
775     stop_route(backend, route);
776     break; }
777
778   case GST_MESSAGE_EOS:
779     g_debug("GST EOS");
780     stop_route(backend, route);
781     break;
782
783   case GST_MESSAGE_STATE_CHANGED: {
784     GstElement *elem;
785     gchar      *name;
786
787     GstState previous, state, pending;
788     gst_message_parse_state_changed (msg, &previous, &state, &pending);
789     elem = GST_ELEMENT(GST_MESSAGE_SRC(msg));
790     name = gst_element_get_name(elem);
791     g_debug("GST STATE CHANGED: %s %s->%s, pending %s", name, gst_element_state_get_name(previous), gst_element_state_get_name(state), gst_element_state_get_name(pending));
792     g_free(name);
793
794     if(elem == data->pipeline && pending == GST_STATE_VOID_PENDING) {
795       if(state == GST_STATE_NULL)
796         octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
797       else if(state == GST_STATE_PAUSED)
798         octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
799       else if(state == GST_STATE_PLAYING)
800         octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
801     }
802     break; }
803
804   default:;
805     g_debug("Unhandled GST message of type %d", GST_MESSAGE_TYPE(msg));
806   }
807
808   return TRUE;
809 }
810
811 static gboolean
812 position_timer(gpointer user_data)
813 {
814   OctopusRoute *route = (OctopusRoute *)user_data;
815   RouteDataGst *data;
816   GstFormat    fmt = GST_FORMAT_TIME;
817   gint64       pos = 0;
818   gint64       dur = 0;
819
820   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
821   gst_element_query_position(data->pipeline, &fmt, &pos);
822   gst_element_query_duration(data->pipeline, &fmt, &dur);
823
824   octopus_route_stream_position(route, (guint)(pos/1000000000), (guint)(dur/1000000000));
825
826   return TRUE;
827 }
828
829 // vim: filetype=c:expandtab:shiftwidth=2:tabstop=2:softtabstop=2