Minor fixes to GStreamer backend
[octopus.git] / src / octopus-backend-gst.c
1 /*   Octopus Media Engine
2  *   Copyright 2008 Movial Creative Technologies Inc
3  *
4  *   Authors: Mikko Rasa, <mikko.rasa@movial.fi>
5  *
6  *   This program is free software; you can redistribute it and/or
7  *   modify it under the terms of the GNU Lesser General Public
8  *   License as published by the Free Software Foundation; either
9  *   version 2.1 of the License, or (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *   Lesser General Public License for more details.
15  *
16  *   You should have received a copy of the GNU Lesser General Public
17  *   License along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
19  */
20 #include <string.h>
21 #include <gst/gst.h>
22 #include <gst/interfaces/xoverlay.h>
23 #include "octopus-backend-gst.h"
24 #include "octopus-module.h"
25 #include "octopus-module-manager.h"
26 #include "octopus-route.h"
27
28 typedef struct _RouteDataGst RouteDataGst;
29
30 struct _RouteDataGst
31 {
32   GstElement *pipeline;
33   GstElement *fakesink;
34   gboolean   ready;
35   guint      timeout_tag;
36 };
37
38 static gboolean
39 validate_component(OctopusBackend   *backend,
40                    OctopusComponent *comp);
41 static gboolean
42 can_link_components(OctopusBackend *backend,
43                     OctopusComponent *comp1,
44                     OctopusComponent *comp2);
45
46 static gboolean
47 init_route (OctopusBackend *backend,
48             OctopusRoute   *route);
49 static void
50 destroy_route(OctopusBackend *backend,
51               OctopusRoute   *route);
52 static gboolean
53 play_route(OctopusBackend *backend,
54            OctopusRoute   *route);
55 static gboolean
56 pause_route(OctopusBackend *backend,
57             OctopusRoute   *route);
58 static gboolean
59 stop_route(OctopusBackend *backend,
60            OctopusRoute   *route);
61
62 static void
63 route_mapping_changed(OctopusRoute *route,
64                       gpointer user_data);
65 static void
66 route_endpoint_changed(OctopusRoute    *route,
67                        OctopusEndpoint *endpoint,
68                        gpointer        user_data);
69 static void
70 have_type(GstTypeFind *typefind,
71           guint       probability,
72           GstCaps     *caps,
73           gpointer    user_data);
74 static void
75 pad_added(GstElement *elem,
76           GstPad     *pad,
77           gpointer   *user_data);
78 static void
79 pad_removed(GstElement *elem,
80             GstPad     *pad,
81             gpointer   *user_data);
82 static void
83 no_more_pads(GstElement *elem,
84              gpointer   user_data);
85 static gboolean
86 bus_watch(GstBus     *bus,
87           GstMessage *msg,
88           gpointer   user_data);
89 static gboolean
90 position_timer(gpointer);
91
92 G_DEFINE_TYPE(OctopusBackendGst, octopus_backend_gst, OCTOPUS_TYPE_BACKEND)
93
94 static void
95 octopus_backend_gst_class_init(OctopusBackendGstClass *klass)
96 {
97   OctopusBackendClass *backend;
98
99   backend = OCTOPUS_BACKEND_CLASS(klass);
100
101   backend->validate_component = validate_component;
102   backend->can_link_components = can_link_components;
103   backend->init_route = init_route;
104   backend->destroy_route = destroy_route;
105   backend->play_route = play_route;
106   backend->pause_route = pause_route;
107   backend->stop_route = stop_route;
108 }
109
110 static void
111 octopus_backend_gst_init(OctopusBackendGst *backend)
112 {
113   OctopusBackend *base;
114
115   base = OCTOPUS_BACKEND(backend);
116
117   base->name = "gstreamer";
118
119   g_debug("GStreamer backend initialized");
120 }
121
122 /*
123  * Helper functions
124  */
125
126 static gboolean
127 can_maybe_sink_caps(GstElementFactory *factory,
128                     GstCaps *caps)
129 {
130   const GList *pads;
131   gboolean    result = FALSE;
132
133   pads = gst_element_factory_get_static_pad_templates(factory);
134   for(; pads; pads = pads->next) {
135     GstStaticPadTemplate *tmpl = pads->data;
136     if(tmpl->direction == GST_PAD_SINK && tmpl->presence == GST_PAD_ALWAYS) {
137       caps = gst_caps_intersect(caps, gst_static_pad_template_get_caps(tmpl));
138       result = !gst_caps_is_empty(caps);
139       gst_caps_unref(caps);
140       break;
141     }
142   }
143
144   return result;
145 }
146
147 /*
148  * OctopusBackend implementation
149  */
150
151 static gboolean
152 validate_component(OctopusBackend   *backend,
153                    OctopusComponent *comp)
154 {
155   GSList            *iter;
156
157   for(iter = comp->elements; iter; iter = iter->next) {
158     GstElementFactory *factory;
159     
160     factory = gst_element_factory_find(((OctopusElement *)iter->data)->name);
161     if(!factory)
162       return FALSE;
163     gst_object_unref(factory);
164   }
165
166   return TRUE;
167 }
168
169 static gboolean
170 can_link_components(OctopusBackend   *backend,
171                     OctopusComponent *comp1,
172                     OctopusComponent *comp2)
173 {
174   OctopusElement    *elem;
175   GstElementFactory *factory;
176   const GList       *pads;
177   GstCaps           *caps = NULL;
178   gboolean          result = FALSE;
179
180   elem = (OctopusElement *)g_slist_last(comp1->elements)->data;
181   factory = gst_element_factory_find(elem->name);
182
183   pads = gst_element_factory_get_static_pad_templates(factory);
184   for(; pads; pads = pads->next) {
185     GstStaticPadTemplate *tmpl = pads->data;
186     if(tmpl->direction == GST_PAD_SRC && tmpl->presence == GST_PAD_ALWAYS) {
187       caps = gst_static_pad_template_get_caps(tmpl);
188       break;
189     }
190   }
191   gst_object_unref(factory);
192
193   if(caps) {
194     if(gst_caps_is_any(caps))
195       return FALSE;
196
197     elem = (OctopusElement *)comp2->elements->data;
198     factory = gst_element_factory_find(elem->name);
199     result = can_maybe_sink_caps(factory, caps);
200     gst_object_unref(factory);
201   }
202
203   return result;
204 }
205
206 static gboolean
207 init_route(OctopusBackend *backend,
208            OctopusRoute   *route)
209 {
210   RouteDataGst *data;
211   GstBus       *bus;
212
213   data = g_new0(RouteDataGst, 1);
214   data->pipeline = gst_element_factory_make("pipeline", NULL);
215   g_object_set_data(G_OBJECT(route), "data", data);
216
217   g_signal_connect(route, "mapping-changed", G_CALLBACK(route_mapping_changed), 0);
218   g_signal_connect(route, "endpoint-changed", G_CALLBACK(route_endpoint_changed), 0);
219
220   bus = gst_pipeline_get_bus(GST_PIPELINE(data->pipeline));
221   gst_bus_add_watch(bus, bus_watch, route);
222
223   return TRUE;
224 }
225
226 static void
227 destroy_route(OctopusBackend *backend,
228               OctopusRoute   *route)
229 {
230   RouteDataGst *data;
231
232   stop_route(backend, route);
233   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
234   gst_object_unref(GST_OBJECT(data->pipeline));
235   g_free(data);
236 }
237
238 static gboolean
239 play_route(OctopusBackend *backend,
240            OctopusRoute   *route)
241 {
242   RouteDataGst         *data;
243   GstStateChangeReturn ret;
244
245   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
246   if(!data->ready && !data->fakesink) {
247     /* Keep the pipeline in an async state change while building it */
248     data->fakesink = gst_element_factory_make("fakesink", NULL);
249     gst_bin_add(GST_BIN(data->pipeline), data->fakesink);
250   }
251
252   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PLAYING);
253   if(ret == GST_STATE_CHANGE_FAILURE) {
254     g_debug("Pipeline failed to start playback");
255     return FALSE;
256   } else if(ret != GST_STATE_CHANGE_ASYNC) {
257     octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
258   }
259
260   if(!data->timeout_tag)
261     data->timeout_tag = g_timeout_add(1000, position_timer, route);
262
263   return TRUE;
264 }
265
266 static gboolean
267 pause_route(OctopusBackend *backend,
268             OctopusRoute   *route)
269 {
270   RouteDataGst         *data;
271   GstStateChangeReturn ret;
272
273   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
274
275   if(data->timeout_tag) {
276     g_source_remove(data->timeout_tag);
277     data->timeout_tag = 0;
278   }
279
280   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PAUSED);
281   if(ret == GST_STATE_CHANGE_FAILURE) {
282     g_debug("Pipeline failed to pause playback");
283     return FALSE;
284   } else if(ret != GST_STATE_CHANGE_ASYNC) {
285     octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
286   }
287
288   return TRUE;
289 }
290
291 static gboolean
292 stop_route(OctopusBackend *backend,
293            OctopusRoute   *route)
294 {
295   RouteDataGst         *data;
296   GstStateChangeReturn ret;
297
298   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
299
300   if(data->timeout_tag) {
301     g_source_remove(data->timeout_tag);
302     data->timeout_tag = 0;
303   }
304
305   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_NULL);
306   if(ret == GST_STATE_CHANGE_FAILURE) {
307     g_debug("Pipeline failed to stop playback");
308     return FALSE;
309   } else if(ret != GST_STATE_CHANGE_ASYNC) {
310     octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
311   }
312
313   return TRUE;
314 }
315
316 /*
317  * Private functions
318  */
319
320 static void
321 configure_endpoint(GstElement            *elem,
322                    const OctopusEndpoint *endpoint)
323 {
324   if(endpoint->uri) {
325     g_debug("Setting URI '%s'", endpoint->uri);
326     if(GST_IS_URI_HANDLER(elem)) {
327       GstURIHandler *uri_handler;
328
329       uri_handler = GST_URI_HANDLER(elem);
330       gst_uri_handler_set_uri(uri_handler, endpoint->uri);
331     } else {
332       gchar *loc;
333       
334       loc=gst_uri_get_location(endpoint->uri);
335       
336       if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "location"))
337         g_object_set(G_OBJECT(elem), "location", loc, NULL);
338       else if(g_object_class_find_property(G_OBJECT_GET_CLASS(elem), "file-name"))
339         g_object_set(G_OBJECT(elem), "file-name", loc, NULL);
340       else
341         g_warning("Don't know how to set URI");
342     }
343   }
344
345   if(endpoint->window) {
346     g_debug("Setting X11 display ('%s', %lx)", endpoint->display, endpoint->window);
347     g_object_set(G_OBJECT(elem), "display", endpoint->display, NULL);
348     gst_x_overlay_set_xwindow_id(GST_X_OVERLAY(elem), endpoint->window);
349   }
350 }
351
352 static GstElement *
353 realize_element(OctopusRoute   *route,
354                 OctopusElement *oct_elem)
355 {
356   GstElement *gst_elem;
357   GList      *iter;
358   GSList     *iter2;
359
360   g_debug("Realizing element '%s'", oct_elem->name);
361   gst_elem = gst_element_factory_make(oct_elem->name, oct_elem->endpoint);
362
363   if(oct_elem->endpoint) {
364     const OctopusEndpoint *endpoint;
365     if((endpoint = octopus_route_get_endpoint(route, oct_elem->endpoint)))
366       configure_endpoint(gst_elem, endpoint);
367   }
368
369   for(iter2 = oct_elem->parameters; iter2; iter2 = iter2->next) {
370     OctopusParameter *param = (OctopusParameter *)iter2->data;
371     GParamSpec       *pspec;
372
373     pspec = g_object_class_find_property(G_OBJECT_GET_CLASS(gst_elem), param->name);
374     if(pspec) {
375       if(pspec->value_type == G_TYPE_INT) {
376         g_object_set(G_OBJECT(gst_elem), param->name, strtol(param->value, NULL, 0), NULL);
377       } else if(pspec->value_type == G_TYPE_UINT) {
378         g_object_set(G_OBJECT(gst_elem), param->name, strtoul(param->value, NULL, 0), NULL);
379       } else if(pspec->value_type == G_TYPE_INT64) {
380         g_object_set(G_OBJECT(gst_elem), param->name, strtoll(param->value, NULL, 0), NULL);
381       } else if(pspec->value_type == G_TYPE_UINT64) {
382         g_object_set(G_OBJECT(gst_elem), param->name, strtoull(param->value, NULL, 0), NULL);
383       } else if(pspec->value_type == G_TYPE_BOOLEAN) {
384         if(!strcmp(param->value, "true"))
385           g_object_set(G_OBJECT(gst_elem), param->name, TRUE, NULL);
386         else if(!strcmp(param->value, "false"))
387           g_object_set(G_OBJECT(gst_elem), param->name, FALSE, NULL);
388       } else if(pspec->value_type == G_TYPE_STRING) {
389         g_object_set(G_OBJECT(gst_elem), param->name, param->value, NULL);
390       }
391     } else {
392       g_debug("Unknown parameter '%s' specified for element '%s'", param->name, oct_elem->name);
393     }
394   }
395
396   /* XXX better way to figure out if it's a typefind? */
397   if(g_str_has_prefix(oct_elem->name, "typefind")) {
398     g_debug("Connecting to 'have-type' signal");
399     g_signal_connect(G_OBJECT(gst_elem), "have-type", G_CALLBACK(have_type), route);
400   }
401
402   iter = gst_element_class_get_pad_template_list(GST_ELEMENT_GET_CLASS(gst_elem));
403   for(; iter; iter = iter->next) {
404     GstPadTemplate *tmpl = (GstPadTemplate *)iter->data;
405     if(GST_PAD_TEMPLATE_DIRECTION(tmpl) == GST_PAD_SRC
406        && GST_PAD_TEMPLATE_PRESENCE(tmpl) == GST_PAD_SOMETIMES)
407     {
408       g_debug("Connecting to dynamic pad signals");
409       g_signal_connect(G_OBJECT(gst_elem), "pad-added", G_CALLBACK(pad_added), route);
410       g_signal_connect(G_OBJECT(gst_elem), "pad-removed", G_CALLBACK(pad_removed), route);
411       g_signal_connect(G_OBJECT(gst_elem), "no-more-pads", G_CALLBACK(no_more_pads), route);
412       break;
413     }
414   }
415
416   return gst_elem;
417 }
418
419 static GstPad *
420 realize_component(OctopusBackend   *backend,
421                   OctopusRoute     *route,
422                   OctopusComponent *component,
423                   GstPad           *srcpad)
424 {
425   RouteDataGst *data;
426   GSList       *iter;
427   GstState     state;
428
429   g_debug("Realizing component '%s'", component->name);
430
431   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
432
433   state = GST_STATE(data->pipeline);
434   if(state != GST_STATE_NULL)
435     state = GST_STATE_PAUSED;
436
437   for(iter = component->elements; iter; iter = iter->next) {
438     OctopusElement *oct_elem = (OctopusElement *)iter->data;
439     GstElement     *gst_elem;
440
441     gst_elem = realize_element(route, oct_elem);
442     gst_bin_add(GST_BIN(data->pipeline), gst_elem);
443
444     if(srcpad) {
445       GstPad *sinkpad;
446
447       if((sinkpad = gst_element_get_static_pad(gst_elem, "sink"))) {
448         if(GST_PAD_LINK_FAILED(gst_pad_link(srcpad, sinkpad)))
449           g_debug("Linking pads failed - component '%s' may be broken", component->name);
450         gst_object_unref(sinkpad);
451       }
452
453       gst_object_unref(srcpad);
454     }
455     gst_element_set_state(gst_elem, state);
456
457     srcpad = gst_element_get_static_pad(gst_elem, "src");
458   }
459
460   return srcpad;
461 }
462
463 static void
464 realize_component_chain(OctopusBackend *backend,
465                         OctopusRoute   *route,
466                         GSList         *chain,
467                         GstPad         *srcpad)
468 {
469   GSList *iter;
470
471   if(srcpad)
472     gst_object_ref(srcpad);
473   for(iter = chain; iter; iter = iter->next)
474     srcpad = realize_component(backend, route, (OctopusComponent *)iter->data, srcpad);
475   if(srcpad)
476     gst_object_unref(srcpad);
477 }
478
479 static void
480 continue_route_from_pad(OctopusBackend *backend,
481                         OctopusRoute   *route,
482                         GstPad         *srcpad)
483 {
484   GstCaps *caps;
485   GSList  *iter;
486   gchar   **sinks;
487   GSList  *chain = NULL;
488
489   caps = gst_pad_get_caps(srcpad);
490   g_object_get(G_OBJECT(route), "sinks", &sinks, NULL);
491
492   for(iter = backend->components; (!chain && iter); iter = iter->next) {
493     OctopusComponent  *comp = (OctopusComponent *)iter->data;
494     OctopusElement    *elem = (OctopusElement *)comp->elements->data;
495     GstElementFactory *factory;
496
497     factory = gst_element_factory_find(elem->name);
498     if(can_maybe_sink_caps(factory, caps)) {
499       unsigned j;
500
501       g_debug("%s[%s] can sink this", comp->name, elem->name);
502
503       for(j = 0; (!chain && sinks[j]); ++j)
504         chain = octopus_backend_build_component_chain(backend, comp, sinks[j]);
505     }
506     gst_object_unref(factory);
507   }
508   gst_caps_unref(caps);
509
510   if(chain) {
511     realize_component_chain(backend, route, chain, srcpad);
512     g_slist_free(chain);
513   } else {
514     g_debug("Unable to continue route");
515     octopus_route_playback_error(route, "Unable to build route: unsupported media type");
516   }
517 }
518
519 static void
520 route_mapping_changed(OctopusRoute *route,
521                       gpointer user_data)
522 {
523   OctopusBackend   *backend;
524   OctopusComponent *component;
525   gchar            **sources;
526   gchar            **sinks;
527   gchar            *pipeline_desc;
528   unsigned         i;
529
530   g_debug("Route mapping changed");
531
532   g_object_get(G_OBJECT(route), "backend", &backend,
533                                 "sources", &sources,
534                                 "sinks", &sinks,
535                                 "pipeline-description", &pipeline_desc, NULL);
536   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
537
538   if(pipeline_desc) {
539     RouteDataGst *data;
540     GError       *err;
541
542     g_debug("Using forced pipeline description: %s", pipeline_desc);
543
544     data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
545     if(data->pipeline) {
546       gst_element_set_state(data->pipeline, GST_STATE_NULL);
547       gst_object_unref(data->pipeline);
548     }
549
550     data->pipeline = gst_parse_launch(pipeline_desc, &err);
551     if(data->pipeline)
552       data->ready = TRUE;
553     else
554       g_warning("Pipeline is invalid: %s", err->message);
555
556   } else {
557     for(i = 0; sources[i]; ++i) {
558       GSList   *chain = NULL;
559       unsigned j;
560
561       component = octopus_backend_get_component_by_endpoint(backend, sources[i], OCTOPUS_ENDPOINT_SOURCE);
562       if(!component)
563         continue;
564
565       for(j = 0; sinks[j]; ++j) {
566         if((chain = octopus_backend_build_component_chain(backend, component, sinks[j]))) {
567           realize_component_chain(backend, route, chain, NULL);
568           g_slist_free(chain);
569           break;
570         }
571       }
572     }
573   }
574 }
575
576 static void
577 route_endpoint_changed(OctopusRoute    *route,
578                        OctopusEndpoint *endpoint,
579                        gpointer        user_data)
580 {
581   RouteDataGst *data;
582   GstIterator  *iter;
583   gpointer     element;
584   gboolean     found = FALSE;
585
586   g_debug("Route endpoint '%s' changed", endpoint->name);
587
588   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
589
590   iter = gst_bin_iterate_recurse(GST_BIN(data->pipeline));
591   while(!found && gst_iterator_next(iter, &element) == GST_ITERATOR_OK) {
592     gchar *elem_name;
593
594     g_object_get(G_OBJECT(element), "name", &elem_name, NULL);
595     if(g_str_has_prefix(elem_name, endpoint->name)) {
596       configure_endpoint(element, endpoint);
597       found = TRUE;
598     }
599
600     g_free(elem_name);
601     gst_object_unref(element);
602   }
603   gst_iterator_free(iter);
604 }
605
606 static void
607 have_type(GstTypeFind *typefind,
608           guint       probability,
609           GstCaps     *caps,
610           gpointer    user_data)
611 {
612   OctopusRoute   *route;
613   OctopusBackend *backend;
614   GstPad         *pad;
615   gchar          *caps_str;
616
617   route = OCTOPUS_ROUTE(user_data);
618   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
619   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
620
621   caps_str = gst_caps_to_string(caps);
622   g_debug("Have type: %s", caps_str);
623   g_free(caps_str);
624
625   pad = gst_element_get_static_pad(GST_ELEMENT(typefind), "src");
626   continue_route_from_pad(backend, route, pad);
627   g_object_unref(pad);
628 }
629
630 static void
631 pad_added(GstElement *elem,
632           GstPad     *pad,
633           gpointer   *user_data)
634 {
635   OctopusRoute   *route;
636   OctopusBackend *backend;
637   RouteDataGst   *data;
638   gchar          *elem_name;
639   gchar          *pad_name;
640   GstCaps        *caps;
641   gchar          *caps_str;
642   GstElement     *queue;
643   GstPad         *qpad;
644
645   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
646   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
647   caps = gst_pad_get_caps(pad);
648   caps_str = gst_caps_to_string(caps);
649   g_debug("Pad '%s' added to '%s', caps: %s", pad_name, elem_name, caps_str);
650   gst_caps_unref(caps);
651   g_free(caps_str);
652   g_free(elem_name);
653   g_free(pad_name);
654
655   route = OCTOPUS_ROUTE(user_data);
656   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
657   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
658
659   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
660
661   queue = gst_element_factory_make("queue", NULL);
662   gst_bin_add(GST_BIN(data->pipeline), queue);
663   gst_element_set_state(queue, GST_STATE_PAUSED);
664   qpad = gst_element_get_static_pad(queue, "sink");
665   gst_pad_link(pad, qpad);
666   gst_object_unref(qpad);
667
668   qpad = gst_element_get_static_pad(queue, "src");
669   continue_route_from_pad(backend, route, qpad);
670   gst_object_unref(qpad);
671 }
672
673 static void
674 pad_removed(GstElement *elem,
675             GstPad     *pad,
676             gpointer   *user_data)
677 {
678   OctopusRoute *route;
679   RouteDataGst *data;
680   gchar        *elem_name;
681   gchar        *pad_name;
682
683   if(gst_pad_get_direction(pad) != GST_PAD_SRC)
684     return;
685
686   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
687   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
688   g_debug("Pad '%s' removed from '%s'", pad_name, elem_name);
689   g_free(elem_name);
690   g_free(pad_name);
691
692   route = OCTOPUS_ROUTE(user_data);
693   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
694   data->ready = FALSE;
695
696   while(1) {
697     GstIterator *iter;
698     gpointer    elem;
699     GSList      *stale = NULL;
700     GSList      *ptr;
701
702     /* Find any elements that have a sink pad with nothing connected */
703     iter = gst_bin_iterate_elements(GST_BIN(data->pipeline));
704     while(gst_iterator_next(iter, &elem) != GST_ITERATOR_DONE) {
705       GstPad *pad = gst_element_get_static_pad(GST_ELEMENT(elem), "sink");
706       if(pad) {
707         GstPad *peer = gst_pad_get_peer(pad);
708         if(peer)
709           gst_object_unref(peer);
710         else
711           stale = g_slist_prepend(stale, elem);
712         gst_object_unref(pad);
713       }
714       gst_object_unref(GST_OBJECT(elem));
715     }
716     gst_iterator_free(iter);
717
718     /* No stale elements found - we're done */
719     if(!stale)
720       break;
721
722     for(ptr = stale; ptr; ptr = ptr->next) {
723       GstElement *elem;
724       gchar      *name;
725
726       elem = GST_ELEMENT(ptr->data);
727       g_object_get(G_OBJECT(elem), "name", &name, NULL);
728       g_debug("Removing element '%s'", name);
729       gst_element_set_state(elem, GST_STATE_NULL);
730       gst_bin_remove(GST_BIN(data->pipeline), elem);
731     }
732   }
733 }
734
735 static void
736 no_more_pads(GstElement *elem,
737              gpointer   user_data)
738 {
739   OctopusRoute *route;
740   RouteDataGst *data;
741   gchar        *elem_name;
742
743   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
744   g_debug("No more pads from '%s'", elem_name);
745   g_free(elem_name);
746
747   route = OCTOPUS_ROUTE(user_data);
748   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
749   if(data->fakesink) {
750     gst_element_set_state(data->fakesink, GST_STATE_NULL);
751     gst_bin_remove(GST_BIN(data->pipeline), data->fakesink);
752     data->fakesink = NULL;
753     data->ready = TRUE;
754   }
755 }
756
757 static gboolean
758 bus_watch(GstBus     *bus,
759           GstMessage *msg,
760           gpointer   user_data)
761 {
762   OctopusRoute   *route;
763   RouteDataGst   *data;
764   OctopusBackend *backend;
765   
766   route = OCTOPUS_ROUTE(user_data);
767   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
768   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
769   
770   switch(GST_MESSAGE_TYPE(msg)) {
771   case GST_MESSAGE_ERROR: {
772     GError *error = NULL;
773
774     gst_message_parse_error(msg, &error, NULL);
775     g_debug("GST ERROR: %s", error->message);
776     octopus_route_playback_error(route, error->message);
777     g_error_free(error);
778     stop_route(backend, route);
779     break; }
780
781   case GST_MESSAGE_EOS:
782     g_debug("GST EOS");
783     stop_route(backend, route);
784     break;
785
786   case GST_MESSAGE_STATE_CHANGED: {
787     GstElement *elem;
788     gchar      *name;
789
790     GstState previous, state, pending;
791     gst_message_parse_state_changed (msg, &previous, &state, &pending);
792     elem = GST_ELEMENT(GST_MESSAGE_SRC(msg));
793     name = gst_element_get_name(elem);
794     g_debug("GST STATE CHANGED: %s %s->%s, pending %s", name, gst_element_state_get_name(previous), gst_element_state_get_name(state), gst_element_state_get_name(pending));
795     g_free(name);
796
797     if(elem == data->pipeline && pending == GST_STATE_VOID_PENDING) {
798       if(state == GST_STATE_NULL)
799         octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
800       else if(state == GST_STATE_PAUSED)
801         octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
802       else if(state == GST_STATE_PLAYING)
803         octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
804     }
805     break; }
806
807   default:;
808     g_debug("Unhandled GST message of type %d", GST_MESSAGE_TYPE(msg));
809   }
810
811   return TRUE;
812 }
813
814 static gboolean
815 position_timer(gpointer user_data)
816 {
817   OctopusRoute *route = (OctopusRoute *)user_data;
818   RouteDataGst *data;
819   GstFormat    fmt = GST_FORMAT_TIME;
820   gint64       pos = 0;
821   gint64       dur = 0;
822
823   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
824   gst_element_query_position(data->pipeline, &fmt, &pos);
825   gst_element_query_duration(data->pipeline, &fmt, &dur);
826
827   octopus_route_stream_position(route, (guint)(pos/1000000000), (guint)(dur/1000000000));
828
829   return TRUE;
830 }
831
832 // vim: filetype=c:expandtab:shiftwidth=2:tabstop=2:softtabstop=2