4558014f98cd03a37a53983106ecdbd76f68c096
[octopus.git] / src / octopus-backend-gst.c
1 /*   Octopus Media Engine
2  *   Copyright 2008 Movial Creative Technologies Inc
3  *
4  *   Authors: Mikko Rasa, <mikko.rasa@movial.fi>
5  *
6  *   This program is free software; you can redistribute it and/or
7  *   modify it under the terms of the GNU Lesser General Public
8  *   License as published by the Free Software Foundation; either
9  *   version 2.1 of the License, or (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *   Lesser General Public License for more details.
15  *
16  *   You should have received a copy of the GNU Lesser General Public
17  *   License along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
19  */
20 #include <string.h>
21 #include <gst/gst.h>
22 #include <gst/interfaces/xoverlay.h>
23 #include "octopus-backend-gst.h"
24 #include "octopus-module.h"
25 #include "octopus-module-manager.h"
26 #include "octopus-route.h"
27
28 typedef struct _RouteDataGst RouteDataGst;
29
30 struct _RouteDataGst
31 {
32   GstElement *pipeline;
33   GstElement *fakesink;
34   gboolean   ready;
35   guint      timeout_tag;
36 };
37
38 static gboolean
39 validate_component(OctopusBackend   *backend,
40                    OctopusComponent *comp);
41 static gboolean
42 can_link_components(OctopusBackend *backend,
43                     OctopusComponent *comp1,
44                     OctopusComponent *comp2);
45
46 gboolean
47 init_route (OctopusBackend *backend,
48             OctopusRoute   *route);
49 static void
50 destroy_route(OctopusBackend *backend,
51               OctopusRoute   *route);
52 static gboolean
53 play_route(OctopusBackend *backend,
54            OctopusRoute   *route);
55 static gboolean
56 pause_route(OctopusBackend *backend,
57             OctopusRoute   *route);
58 static gboolean
59 stop_route(OctopusBackend *backend,
60            OctopusRoute   *route);
61
62 static void
63 route_mapping_changed(OctopusRoute *route,
64                       gpointer user_data);
65 static void
66 route_endpoint_changed(OctopusRoute    *route,
67                        OctopusEndpoint *endpoint,
68                        gpointer        user_data);
69 static void
70 have_type(GstTypeFind *typefind,
71           guint       probability,
72           GstCaps     *caps,
73           gpointer    user_data);
74 static void
75 pad_added(GstElement *elem,
76           GstPad     *pad,
77           gpointer   *user_data);
78 static void
79 pad_removed(GstElement *elem,
80             GstPad     *pad,
81             gpointer   *user_data);
82 static void
83 no_more_pads(GstElement *elem,
84              gpointer   user_data);
85 static gboolean
86 bus_watch(GstBus     *bus,
87           GstMessage *msg,
88           gpointer   user_data);
89 static gboolean
90 position_timer(gpointer);
91
92 G_DEFINE_TYPE(OctopusBackendGst, octopus_backend_gst, OCTOPUS_TYPE_BACKEND)
93
94 static void
95 octopus_backend_gst_class_init(OctopusBackendGstClass *klass)
96 {
97   OctopusBackendClass *backend;
98
99   backend = OCTOPUS_BACKEND_CLASS(klass);
100
101   backend->validate_component = validate_component;
102   backend->can_link_components = can_link_components;
103   backend->init_route = init_route;
104   backend->destroy_route = destroy_route;
105   backend->play_route = play_route;
106   backend->pause_route = pause_route;
107   backend->stop_route = stop_route;
108 }
109
110 static void
111 octopus_backend_gst_init(OctopusBackendGst *backend)
112 {
113   OctopusBackend *base;
114
115   base = OCTOPUS_BACKEND(backend);
116
117   base->name = "gstreamer";
118 }
119
120 /*
121  * Helper functions
122  */
123
124 static gboolean
125 can_maybe_sink_caps(GstElementFactory *factory,
126                     GstCaps *caps)
127 {
128   const GList *pads;
129   gboolean    result = FALSE;
130
131   pads = gst_element_factory_get_static_pad_templates(factory);
132   for(; pads; pads = pads->next) {
133     GstStaticPadTemplate *tmpl = pads->data;
134     if(tmpl->direction == GST_PAD_SINK && tmpl->presence == GST_PAD_ALWAYS) {
135       caps = gst_caps_intersect(caps, gst_static_pad_template_get_caps(tmpl));
136       result = !gst_caps_is_empty(caps);
137       gst_caps_unref(caps);
138       break;
139     }
140   }
141
142   return result;
143 }
144
145 /*
146  * OctopusBackend implementation
147  */
148
149 static gboolean
150 validate_component(OctopusBackend   *backend,
151                    OctopusComponent *comp)
152 {
153   GSList            *iter;
154
155   for(iter = comp->elements; iter; iter = iter->next) {
156     GstElementFactory *factory;
157     
158     factory = gst_element_factory_find(((OctopusElement *)iter->data)->name);
159     if(!factory)
160       return FALSE;
161     gst_object_unref(factory);
162   }
163
164   return TRUE;
165 }
166
167 static gboolean
168 can_link_components(OctopusBackend   *backend,
169                     OctopusComponent *comp1,
170                     OctopusComponent *comp2)
171 {
172   OctopusElement    *elem;
173   GstElementFactory *factory;
174   const GList       *pads;
175   GstCaps           *caps = NULL;
176   gboolean          result = FALSE;
177
178   elem = (OctopusElement *)g_slist_last(comp1->elements)->data;
179   factory = gst_element_factory_find(elem->name);
180
181   pads = gst_element_factory_get_static_pad_templates(factory);
182   for(; pads; pads = pads->next) {
183     GstStaticPadTemplate *tmpl = pads->data;
184     if(tmpl->direction == GST_PAD_SRC && tmpl->presence == GST_PAD_ALWAYS) {
185       caps = gst_static_pad_template_get_caps(tmpl);
186       break;
187     }
188   }
189   gst_object_unref(factory);
190
191   if(caps) {
192     if(gst_caps_is_any(caps))
193       return FALSE;
194
195     elem = (OctopusElement *)comp2->elements->data;
196     factory = gst_element_factory_find(elem->name);
197     result = can_maybe_sink_caps(factory, caps);
198     gst_object_unref(factory);
199   }
200
201   return result;
202 }
203
204 gboolean
205 init_route(OctopusBackend *backend,
206            OctopusRoute   *route)
207 {
208   RouteDataGst *data;
209   GstBus       *bus;
210
211   data = g_new0(RouteDataGst, 1);
212   data->pipeline = gst_element_factory_make("pipeline", NULL);
213   g_object_set_data(G_OBJECT(route), "data", data);
214
215   g_signal_connect(route, "mapping-changed", G_CALLBACK(route_mapping_changed), 0);
216   g_signal_connect(route, "endpoint-changed", G_CALLBACK(route_endpoint_changed), 0);
217
218   bus = gst_pipeline_get_bus(GST_PIPELINE(data->pipeline));
219   gst_bus_add_watch(bus, bus_watch, route);
220
221   return TRUE;
222 }
223
224 static void
225 destroy_route(OctopusBackend *backend,
226               OctopusRoute   *route)
227 {
228   RouteDataGst *data;
229
230   stop_route(backend, route);
231   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
232   gst_object_unref(GST_OBJECT(data->pipeline));
233 }
234
235 static gboolean
236 play_route(OctopusBackend *backend,
237            OctopusRoute   *route)
238 {
239   RouteDataGst         *data;
240   GstStateChangeReturn ret;
241
242   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
243   if(!data->ready && !data->fakesink) {
244     /* Keep the pipeline in an async state change while building it */
245     data->fakesink = gst_element_factory_make("fakesink", NULL);
246     gst_bin_add(GST_BIN(data->pipeline), data->fakesink);
247   }
248
249   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PLAYING);
250   if(ret == GST_STATE_CHANGE_FAILURE) {
251     g_debug("Pipeline failed to start playback");
252     return FALSE;
253   } else if(ret != GST_STATE_CHANGE_ASYNC) {
254     octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
255   }
256
257   if(!data->timeout_tag)
258     data->timeout_tag = g_timeout_add(1000, position_timer, route);
259
260   return TRUE;
261 }
262
263 static gboolean
264 pause_route(OctopusBackend *backend,
265             OctopusRoute   *route)
266 {
267   RouteDataGst         *data;
268   GstStateChangeReturn ret;
269
270   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
271
272   if(data->timeout_tag) {
273     g_source_remove(data->timeout_tag);
274     data->timeout_tag = 0;
275   }
276
277   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_PAUSED);
278   if(ret == GST_STATE_CHANGE_FAILURE) {
279     g_debug("Pipeline failed to pause playback");
280     return FALSE;
281   } else if(ret != GST_STATE_CHANGE_ASYNC) {
282     octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
283   }
284
285   return TRUE;
286 }
287
288 static gboolean
289 stop_route(OctopusBackend *backend,
290            OctopusRoute   *route)
291 {
292   RouteDataGst         *data;
293   GstStateChangeReturn ret;
294
295   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
296
297   if(data->timeout_tag) {
298     g_source_remove(data->timeout_tag);
299     data->timeout_tag = 0;
300   }
301
302   ret = gst_element_set_state(GST_ELEMENT(data->pipeline), GST_STATE_NULL);
303   if(ret == GST_STATE_CHANGE_FAILURE) {
304     g_debug("Pipeline failed to stop playback");
305     return FALSE;
306   } else if(ret != GST_STATE_CHANGE_ASYNC) {
307     octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
308   }
309
310   return TRUE;
311 }
312
313 /*
314  * Private functions
315  */
316
317 static void
318 configure_endpoint(GstElement            *elem,
319                    const OctopusEndpoint *endpoint)
320 {
321   if(endpoint->uri) {
322     g_debug("Setting URI '%s'", endpoint->uri);
323     g_object_set(G_OBJECT(elem), "location", endpoint->uri, NULL);
324   }
325
326   if(endpoint->window) {
327     g_debug("Setting X11 display ('%s', %lx)", endpoint->display, endpoint->window);
328     g_object_set(G_OBJECT(elem), "display", endpoint->display, NULL);
329     gst_x_overlay_set_xwindow_id(GST_X_OVERLAY(elem), endpoint->window);
330   }
331 }
332
333 static GstElement *
334 realize_element(OctopusRoute   *route,
335                 OctopusElement *oct_elem)
336 {
337   GstElement *gst_elem;
338   GList      *iter;
339   GSList     *iter2;
340
341   g_debug("Realizing element '%s'", oct_elem->name);
342   gst_elem = gst_element_factory_make(oct_elem->name, oct_elem->endpoint);
343
344   if(oct_elem->endpoint) {
345     const OctopusEndpoint *endpoint;
346     if((endpoint = octopus_route_get_endpoint(route, oct_elem->endpoint)))
347       configure_endpoint(gst_elem, endpoint);
348   }
349
350   for(iter2 = oct_elem->parameters; iter2; iter2 = iter2->next) {
351     OctopusParameter *param = (OctopusParameter *)iter2->data;
352     GParamSpec       *pspec;
353
354     pspec = g_object_class_find_property(G_OBJECT_GET_CLASS(gst_elem), param->name);
355     if(pspec) {
356       if(pspec->value_type == G_TYPE_INT) {
357         g_object_set(G_OBJECT(gst_elem), param->name, strtol(param->value, NULL, 0), NULL);
358       } else if(pspec->value_type == G_TYPE_UINT) {
359         g_object_set(G_OBJECT(gst_elem), param->name, strtoul(param->value, NULL, 0), NULL);
360       } else if(pspec->value_type == G_TYPE_INT64) {
361         g_object_set(G_OBJECT(gst_elem), param->name, strtoll(param->value, NULL, 0), NULL);
362       } else if(pspec->value_type == G_TYPE_UINT64) {
363         g_object_set(G_OBJECT(gst_elem), param->name, strtoull(param->value, NULL, 0), NULL);
364       } else if(pspec->value_type == G_TYPE_BOOLEAN) {
365         if(!strcmp(param->value, "true"))
366           g_object_set(G_OBJECT(gst_elem), param->name, TRUE, NULL);
367         else if(!strcmp(param->value, "false"))
368           g_object_set(G_OBJECT(gst_elem), param->name, FALSE, NULL);
369       } else if(pspec->value_type == G_TYPE_STRING) {
370         g_object_set(G_OBJECT(gst_elem), param->name, param->value, NULL);
371       }
372     } else {
373       g_debug("Unknown parameter '%s' specified for element '%s'", param->name, oct_elem->name);
374     }
375   }
376
377   /* XXX better way to figure out if it's a typefind? */
378   if(g_str_has_prefix(oct_elem->name, "typefind")) {
379     g_debug("Connecting to 'have-type' signal");
380     g_signal_connect(G_OBJECT(gst_elem), "have-type", G_CALLBACK(have_type), route);
381   }
382
383   iter = gst_element_class_get_pad_template_list(GST_ELEMENT_GET_CLASS(gst_elem));
384   for(; iter; iter = iter->next) {
385     GstPadTemplate *tmpl = (GstPadTemplate *)iter->data;
386     if(GST_PAD_TEMPLATE_DIRECTION(tmpl) == GST_PAD_SRC
387        && GST_PAD_TEMPLATE_PRESENCE(tmpl) == GST_PAD_SOMETIMES)
388     {
389       g_debug("Connecting to dynamic pad signals");
390       g_signal_connect(G_OBJECT(gst_elem), "pad-added", G_CALLBACK(pad_added), route);
391       g_signal_connect(G_OBJECT(gst_elem), "pad-removed", G_CALLBACK(pad_removed), route);
392       g_signal_connect(G_OBJECT(gst_elem), "no-more-pads", G_CALLBACK(no_more_pads), route);
393       break;
394     }
395   }
396
397   return gst_elem;
398 }
399
400 static GstPad *
401 realize_component(OctopusBackend   *backend,
402                   OctopusRoute     *route,
403                   OctopusComponent *component,
404                   GstPad           *srcpad)
405 {
406   RouteDataGst *data;
407   GSList       *iter;
408   GstState     state;
409
410   g_debug("Realizing component '%s'", component->name);
411
412   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
413
414   state = GST_STATE(data->pipeline);
415   if(state != GST_STATE_NULL)
416     state = GST_STATE_PAUSED;
417
418   for(iter = component->elements; iter; iter = iter->next) {
419     OctopusElement *oct_elem = (OctopusElement *)iter->data;
420     GstElement     *gst_elem;
421
422     gst_elem = realize_element(route, oct_elem);
423     gst_bin_add(GST_BIN(data->pipeline), gst_elem);
424
425     if(srcpad) {
426       GstPad *sinkpad;
427
428       if((sinkpad = gst_element_get_static_pad(gst_elem, "sink"))) {
429         if(GST_PAD_LINK_FAILED(gst_pad_link(srcpad, sinkpad)))
430           g_debug("Linking pads failed - component '%s' may be broken", component->name);
431         gst_object_unref(sinkpad);
432       }
433
434       gst_object_unref(srcpad);
435     }
436     gst_element_set_state(gst_elem, state);
437
438     srcpad = gst_element_get_static_pad(gst_elem, "src");
439   }
440
441   return srcpad;
442 }
443
444 static void
445 realize_component_chain(OctopusBackend *backend,
446                         OctopusRoute   *route,
447                         GSList         *chain,
448                         GstPad         *srcpad)
449 {
450   GSList *iter;
451
452   if(srcpad)
453     gst_object_ref(srcpad);
454   for(iter = chain; iter; iter = iter->next)
455     srcpad = realize_component(backend, route, (OctopusComponent *)iter->data, srcpad);
456   if(srcpad)
457     gst_object_unref(srcpad);
458 }
459
460 static void
461 continue_route_from_pad(OctopusBackend *backend,
462                         OctopusRoute   *route,
463                         GstPad         *srcpad)
464 {
465   GstCaps *caps;
466   GSList  *iter;
467   gchar   **sinks;
468   GSList  *chain = NULL;
469
470   caps = gst_pad_get_caps(srcpad);
471   g_object_get(G_OBJECT(route), "sinks", &sinks, NULL);
472
473   for(iter = backend->components; (!chain && iter); iter = iter->next) {
474     OctopusComponent  *comp = (OctopusComponent *)iter->data;
475     OctopusElement    *elem = (OctopusElement *)comp->elements->data;
476     GstElementFactory *factory;
477
478     factory = gst_element_factory_find(elem->name);
479     if(can_maybe_sink_caps(factory, caps)) {
480       unsigned j;
481
482       g_debug("%s[%s] can sink this", comp->name, elem->name);
483
484       for(j = 0; (!chain && sinks[j]); ++j)
485         chain = octopus_backend_build_component_chain(backend, comp, sinks[j]);
486     }
487     gst_object_unref(factory);
488   }
489   gst_caps_unref(caps);
490
491   if(chain) {
492     realize_component_chain(backend, route, chain, srcpad);
493     g_slist_free(chain);
494   } else {
495     g_debug("Unable to continue route");
496     octopus_route_playback_error(route, "Unable to build route: unsupported media type");
497   }
498 }
499
500 static void
501 route_mapping_changed(OctopusRoute *route,
502                       gpointer user_data)
503 {
504   OctopusBackend   *backend;
505   OctopusComponent *component;
506   gchar            **sources;
507   gchar            **sinks;
508   unsigned         i;
509
510   g_debug("Route mapping changed");
511
512   g_object_get(G_OBJECT(route), "backend", &backend,
513                                 "sources", &sources,
514                                 "sinks", &sinks, NULL);
515   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
516
517   for(i = 0; sources[i]; ++i) {
518     GSList   *chain = NULL;
519     unsigned j;
520
521     component = octopus_backend_get_component_by_endpoint(backend, sources[i], OCTOPUS_ENDPOINT_SOURCE);
522     if(!component)
523       continue;
524
525     for(j = 0; sinks[j]; ++j) {
526       if((chain = octopus_backend_build_component_chain(backend, component, sinks[j]))) {
527         realize_component_chain(backend, route, chain, NULL);
528         g_slist_free(chain);
529         break;
530       }
531     }
532   }
533
534   /* TODO */
535 }
536
537 static void
538 route_endpoint_changed(OctopusRoute    *route,
539                        OctopusEndpoint *endpoint,
540                        gpointer        user_data)
541 {
542   RouteDataGst *data;
543   GstIterator  *iter;
544   gpointer     element;
545   gboolean     found = FALSE;
546
547   g_debug("Route endpoint '%s' changed", endpoint->name);
548
549   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
550
551   iter = gst_bin_iterate_recurse(GST_BIN(data->pipeline));
552   while(!found && gst_iterator_next(iter, &element) == GST_ITERATOR_OK) {
553     gchar *elem_name;
554
555     g_object_get(G_OBJECT(element), "name", &elem_name, NULL);
556     if(g_str_has_prefix(elem_name, endpoint->name)) {
557       configure_endpoint(element, endpoint);
558       found = TRUE;
559     }
560
561     g_free(elem_name);
562     gst_object_unref(element);
563   }
564   gst_iterator_free(iter);
565 }
566
567 static void
568 have_type(GstTypeFind *typefind,
569           guint       probability,
570           GstCaps     *caps,
571           gpointer    user_data)
572 {
573   OctopusRoute   *route;
574   OctopusBackend *backend;
575   GstPad         *pad;
576   gchar          *caps_str;
577
578   route = OCTOPUS_ROUTE(user_data);
579   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
580   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
581
582   caps_str = gst_caps_to_string(caps);
583   g_debug("Have type: %s", caps_str);
584   g_free(caps_str);
585
586   pad = gst_element_get_static_pad(GST_ELEMENT(typefind), "src");
587   continue_route_from_pad(backend, route, pad);
588   g_object_unref(pad);
589 }
590
591 static void
592 pad_added(GstElement *elem,
593           GstPad     *pad,
594           gpointer   *user_data)
595 {
596   OctopusRoute   *route;
597   OctopusBackend *backend;
598   RouteDataGst   *data;
599   gchar          *elem_name;
600   gchar          *pad_name;
601   GstCaps        *caps;
602   gchar          *caps_str;
603   GstElement     *queue;
604   GstPad         *qpad;
605
606   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
607   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
608   caps = gst_pad_get_caps(pad);
609   caps_str = gst_caps_to_string(caps);
610   g_debug("Pad '%s' added to '%s', caps: %s", pad_name, elem_name, caps_str);
611   gst_caps_unref(caps);
612   g_free(caps_str);
613   g_free(elem_name);
614   g_free(pad_name);
615
616   route = OCTOPUS_ROUTE(user_data);
617   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
618   g_assert(OCTOPUS_IS_BACKEND_GST(backend));
619
620   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
621
622   queue = gst_element_factory_make("queue", NULL);
623   gst_bin_add(GST_BIN(data->pipeline), queue);
624   gst_element_set_state(queue, GST_STATE_PAUSED);
625   qpad = gst_element_get_static_pad(queue, "sink");
626   gst_pad_link(pad, qpad);
627   gst_object_unref(qpad);
628
629   qpad = gst_element_get_static_pad(queue, "src");
630   continue_route_from_pad(backend, route, qpad);
631   gst_object_unref(qpad);
632 }
633
634 static void
635 pad_removed(GstElement *elem,
636             GstPad     *pad,
637             gpointer   *user_data)
638 {
639   OctopusRoute *route;
640   RouteDataGst *data;
641   gchar        *elem_name;
642   gchar        *pad_name;
643
644   if(gst_pad_get_direction(pad) != GST_PAD_SRC)
645     return;
646
647   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
648   g_object_get(G_OBJECT(pad), "name", &pad_name, NULL);
649   g_debug("Pad '%s' removed from '%s'", pad_name, elem_name);
650   g_free(elem_name);
651   g_free(pad_name);
652
653   route = OCTOPUS_ROUTE(user_data);
654   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
655   data->ready = FALSE;
656
657   while(1) {
658     GstIterator *iter;
659     gpointer    elem;
660     GSList      *stale = NULL;
661     GSList      *ptr;
662
663     /* Find any elements that have a sink pad with nothing connected */
664     iter = gst_bin_iterate_elements(GST_BIN(data->pipeline));
665     while(gst_iterator_next(iter, &elem) != GST_ITERATOR_DONE) {
666       GstPad *pad = gst_element_get_static_pad(GST_ELEMENT(elem), "sink");
667       if(pad) {
668         GstPad *peer = gst_pad_get_peer(pad);
669         if(peer)
670           gst_object_unref(peer);
671         else
672           stale = g_slist_prepend(stale, elem);
673         gst_object_unref(pad);
674       }
675       gst_object_unref(GST_OBJECT(elem));
676     }
677     gst_iterator_free(iter);
678
679     /* No stale elements found - we're done */
680     if(!stale)
681       break;
682
683     for(ptr = stale; ptr; ptr = ptr->next) {
684       GstElement *elem;
685       gchar      *name;
686
687       elem = GST_ELEMENT(ptr->data);
688       g_object_get(G_OBJECT(elem), "name", &name, NULL);
689       g_debug("Removing element '%s'", name);
690       gst_element_set_state(elem, GST_STATE_NULL);
691       gst_bin_remove(GST_BIN(data->pipeline), elem);
692     }
693   }
694 }
695
696 static void
697 no_more_pads(GstElement *elem,
698              gpointer   user_data)
699 {
700   OctopusRoute *route;
701   RouteDataGst *data;
702   gchar        *elem_name;
703
704   g_object_get(G_OBJECT(elem), "name", &elem_name, NULL);
705   g_debug("No more pads from '%s'", elem_name);
706   g_free(elem_name);
707
708   route = OCTOPUS_ROUTE(user_data);
709   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
710   if(data->fakesink) {
711     gst_element_set_state(data->fakesink, GST_STATE_NULL);
712     gst_bin_remove(GST_BIN(data->pipeline), data->fakesink);
713     data->fakesink = NULL;
714     data->ready = TRUE;
715   }
716 }
717
718 static gboolean
719 bus_watch(GstBus     *bus,
720           GstMessage *msg,
721           gpointer   user_data)
722 {
723   OctopusRoute   *route;
724   RouteDataGst   *data;
725   OctopusBackend *backend;
726   
727   route = OCTOPUS_ROUTE(user_data);
728   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
729   g_object_get(G_OBJECT(route), "backend", &backend, NULL);
730   
731   switch(GST_MESSAGE_TYPE(msg)) {
732   case GST_MESSAGE_ERROR: {
733     GError *error = NULL;
734
735     gst_message_parse_error(msg, &error, NULL);
736     g_debug("GST ERROR: %s", error->message);
737     octopus_route_playback_error(route, error->message);
738     g_error_free(error);
739     stop_route(backend, route);
740     break; }
741
742   case GST_MESSAGE_EOS:
743     g_debug("GST EOS");
744     stop_route(backend, route);
745     break;
746
747   case GST_MESSAGE_STATE_CHANGED: {
748     GstElement *elem;
749     gchar      *name;
750
751     GstState previous, state, pending;
752     gst_message_parse_state_changed (msg, &previous, &state, &pending);
753     elem = GST_ELEMENT(GST_MESSAGE_SRC(msg));
754     name = gst_element_get_name(elem);
755     g_debug("GST STATE CHANGED: %s %s->%s, pending %s", name, gst_element_state_get_name(previous), gst_element_state_get_name(state), gst_element_state_get_name(pending));
756     g_free(name);
757
758     if(elem == data->pipeline && pending == GST_STATE_VOID_PENDING) {
759       if(state == GST_STATE_NULL)
760         octopus_route_state_changed(route, OCTOPUS_ROUTE_STOPPED);
761       else if(state == GST_STATE_PAUSED)
762         octopus_route_state_changed(route, OCTOPUS_ROUTE_PAUSED);
763       else if(state == GST_STATE_PLAYING)
764         octopus_route_state_changed(route, OCTOPUS_ROUTE_PLAYING);
765     }
766     break; }
767
768   default:;
769     g_debug("Unhandled GST message of type %d", GST_MESSAGE_TYPE(msg));
770   }
771
772   return TRUE;
773 }
774
775 static gboolean
776 position_timer(gpointer user_data)
777 {
778   OctopusRoute *route = (OctopusRoute *)user_data;
779   RouteDataGst *data;
780   GstFormat    fmt = GST_FORMAT_TIME;
781   gint64       pos = 0;
782   gint64       dur = 0;
783
784   data = (RouteDataGst *)g_object_get_data(G_OBJECT(route), "data");
785   gst_element_query_position(data->pipeline, &fmt, &pos);
786   gst_element_query_duration(data->pipeline, &fmt, &dur);
787
788   octopus_route_stream_position(route, (guint)(pos/1000000000), (guint)(dur/1000000000));
789
790   return TRUE;
791 }
792
793 // vim: filetype=c:expandtab:shiftwidth=2:tabstop=2:softtabstop=2