Implement working caching.
[isatis.git] / isatis-player / isatissrc.c
1 #include "isatissrc.h"
2
3 #include <unistd.h>
4 #include <string.h>
5 #include <fcntl.h>
6
7 #define CACHE_BUFFER_SIZE 4096
8
9 GST_DEBUG_CATEGORY_STATIC(isatissrc_debug);
10 #define GST_CAT_DEFAULT isatissrc_debug
11
12 static GStaticMutex isatis_mutex = G_STATIC_MUTEX_INIT;
13 /* These are protected by the mutex and accessed from all threads */
14 static gchar *isatis_uri = NULL;
15 static gboolean isatis_cache_done = FALSE;
16
17 /* These never change once set and don't need mutex protection */
18 static gint isatis_input_fd = -1;
19 static gint isatis_cache_write_fd = -1;
20 static gint isatis_cache_read_fd = -1;
21
22 static int gst_isatis_src_make_cache_file(void)
23 {
24         char tmp_file[] = "/tmp/isatis.XXXXXX";
25
26         mktemp(tmp_file);
27
28         isatis_cache_write_fd = open(tmp_file, O_CREAT | O_EXCL | O_WRONLY, S_IRWXU);
29         if (isatis_cache_write_fd < 0)
30         {
31                 GST_ERROR("failed to open cache file for writing: %s", g_strerror(errno));
32                 return -1;
33         }
34
35         isatis_cache_read_fd = open(tmp_file, O_RDONLY);
36         if (isatis_cache_read_fd < 0)
37         {
38                 unlink(tmp_file);
39                 close(isatis_cache_write_fd);
40                 isatis_cache_write_fd = -1;
41                 GST_ERROR("failed to open cache file for reading: %s", g_strerror(errno));
42                 return -1;
43         }
44
45         unlink(tmp_file);
46
47         return 0;
48 }
49
50 static gpointer gst_isatis_src_cache_thread_func(gpointer unused)
51 {
52         int r;
53         char buffer[CACHE_BUFFER_SIZE];
54
55         GST_DEBUG("cache thread started");
56
57         do
58         {
59                 r = read(isatis_input_fd, buffer, CACHE_BUFFER_SIZE);
60
61                 if (r > 0)
62                         write(isatis_cache_write_fd, buffer, r); /* XXX handle partials and failures XXX */
63         }
64         while (r > 0);
65
66         g_static_mutex_lock(&isatis_mutex);
67         isatis_cache_done = TRUE;
68         g_static_mutex_unlock(&isatis_mutex);
69
70         GST_DEBUG("cache thread exiting");
71
72         return NULL;
73 }
74
75 static const GstElementDetails gst_isatis_src_details =
76         GST_ELEMENT_DETAILS("Isatis plugin source",
77                         "Source/File",
78                         "Read from Isatis browser plugin",
79                         "Ilpo Ruotsalainen <ilpo.ruotsalainen@movial.fi>");
80
81 static GstStaticPadTemplate srctemplate =
82         GST_STATIC_PAD_TEMPLATE("src",
83                         GST_PAD_SRC,
84                         GST_PAD_ALWAYS,
85                         GST_STATIC_CAPS_ANY);
86
87 static GstURIType gst_isatis_src_uri_get_type(void)
88 {
89         return GST_URI_SRC;
90 }
91
92 static gchar **gst_isatis_src_uri_get_protocols(void)
93 {
94         static gchar *protocols[] = { "isatis", NULL };
95
96         return protocols;
97 }
98
99 static const gchar *gst_isatis_src_uri_get_uri(GstURIHandler *handler)
100 {
101         /* XXX This isn't really threadsafe, and relies on the URI never changing once set XXX */
102
103         return isatis_uri;
104 }
105
106 static gboolean gst_isatis_src_uri_set_uri(GstURIHandler *handler, const gchar *uri)
107 {
108         GstIsatisSrc *src = GST_ISATIS_SRC(handler);
109         gboolean ret = TRUE;
110         gint fd;
111
112         GST_DEBUG_OBJECT(src, "setting URI to %s", uri);
113
114         if (sscanf(uri, "isatis://%d", &fd) != 1 || fd < 0)
115                 return FALSE;
116
117         g_static_mutex_lock(&isatis_mutex);
118
119         if (!isatis_uri)
120         {
121                 GError *error = NULL;
122
123                 isatis_uri = g_strdup(uri);
124                 isatis_input_fd = fd;
125
126                 if (gst_isatis_src_make_cache_file() < 0)
127                 {
128                         GST_ELEMENT_ERROR(src, RESOURCE, OPEN_WRITE,
129                                         ("Could not open cache file: %s", g_strerror(errno)), NULL);
130                 }
131                 else if (!g_thread_create(gst_isatis_src_cache_thread_func, NULL, FALSE, &error))
132                 {
133                         GST_ELEMENT_ERROR(src, CORE, THREAD,
134                                         ("Could not create cache thread: %s", error->message), NULL);
135
136                         g_error_free(error);
137                 }
138         }
139         else if (strcmp(isatis_uri, uri))
140                 ret = FALSE; /* Cannot change URI once it has been set */
141
142         g_static_mutex_unlock(&isatis_mutex);
143
144         return ret;
145 }
146
147 static void gst_isatis_src_uri_handler_init(gpointer g_iface, gpointer iface_data)
148 {
149         GstURIHandlerInterface *iface = (GstURIHandlerInterface *)g_iface;
150
151         iface->get_type = gst_isatis_src_uri_get_type;
152         iface->get_protocols = gst_isatis_src_uri_get_protocols;
153         iface->get_uri = gst_isatis_src_uri_get_uri;
154         iface->set_uri = gst_isatis_src_uri_set_uri;
155 }
156
157 static void _do_init(GType isatissrc_type)
158 {
159         static const GInterfaceInfo urihandler_info =
160         {
161                 gst_isatis_src_uri_handler_init,
162                 NULL,
163                 NULL
164         };
165
166         g_type_add_interface_static(isatissrc_type, GST_TYPE_URI_HANDLER, &urihandler_info);
167         GST_DEBUG_CATEGORY_INIT(isatissrc_debug, "isatissrc", 0, "Isatis Source");
168 }
169
170 GST_BOILERPLATE_FULL(GstIsatisSrc, gst_isatis_src, GstPushSrc, GST_TYPE_PUSH_SRC, _do_init);
171
172 static void gst_isatis_src_base_init(gpointer g_class)
173 {
174         GstElementClass *element_class = GST_ELEMENT_CLASS(g_class);
175         static const GstElementDetails details =
176                 GST_ELEMENT_DETAILS(
177                         "Isatis plugin source",
178                         "Source",
179                         "Read from Isatis plugin",
180                         "Ilpo Ruotsalainen <ilpo.ruotsalainen@movial.fi>");
181
182         gst_element_class_set_details(element_class, &details);
183         gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&srctemplate));
184 }
185
186 static gboolean gst_isatis_src_start(GstBaseSrc *bsrc)
187 {
188         GstIsatisSrc *src = GST_ISATIS_SRC(bsrc);
189
190         GST_DEBUG_OBJECT(src, "starting");
191
192         if (isatis_cache_read_fd != -1)
193         {
194                 if (lseek(isatis_cache_read_fd, 0, SEEK_SET) < 0)
195                 {
196                         GST_ELEMENT_ERROR(src, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
197                         return FALSE;
198                 }
199         }
200
201         return TRUE;
202 }
203
204 static GstFlowReturn gst_isatis_src_create(GstPushSrc *psrc, GstBuffer **outbuf)
205 {
206         GstIsatisSrc *src = GST_ISATIS_SRC(psrc);
207         GstBuffer *buf;
208         guint blocksize = GST_BASE_SRC(src)->blocksize;
209         gssize readbytes;
210         gboolean cache_done;
211
212         if (isatis_cache_read_fd == -1)
213                 return GST_FLOW_WRONG_STATE;
214
215         buf = gst_buffer_new_and_alloc(blocksize);
216
217 retry:
218         g_static_mutex_lock(&isatis_mutex);
219         cache_done = isatis_cache_done;
220         g_static_mutex_unlock(&isatis_mutex);
221
222         readbytes = read(isatis_cache_read_fd, GST_BUFFER_DATA(buf), blocksize);
223
224         if (readbytes < 0)
225         {
226                 GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), ("reading cache file failed: %s", g_strerror(errno)));
227                 gst_buffer_unref(buf);
228                 return GST_FLOW_ERROR;
229         }
230         else if (readbytes == 0)
231         {
232                 if (cache_done)
233                 {
234                         /* Actual EOS */
235                         GST_DEBUG_OBJECT(src, "No more data, EOS");
236
237                         gst_buffer_unref(buf);
238                         return GST_FLOW_UNEXPECTED;
239                 }
240                 else
241                 {
242                         /* Need to pull more data from stream */
243                         GST_DEBUG_OBJECT(src, "Waiting for more data");
244
245                         sleep(1);
246                         goto retry;
247                 }
248         }
249
250         /* Got some data */
251
252         GST_DEBUG_OBJECT(src, "read %" G_GSSIZE_FORMAT " bytes", readbytes);
253
254         GST_BUFFER_SIZE(buf) = readbytes;
255         GST_BUFFER_OFFSET(buf) = GST_BUFFER_OFFSET_NONE; /* XXX */
256         GST_BUFFER_TIMESTAMP(buf) = GST_CLOCK_TIME_NONE;
257
258         *outbuf = buf;
259
260         return GST_FLOW_OK;
261 }
262
263 static void gst_isatis_src_class_init(GstIsatisSrcClass *klass)
264 {
265         GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS(klass);
266         GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
267
268         gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_isatis_src_start);
269
270         gstpushsrc_class->create = GST_DEBUG_FUNCPTR(gst_isatis_src_create);
271 }
272
273 static void gst_isatis_src_init(GstIsatisSrc *src, GstIsatisSrcClass *klass)
274 {
275         GST_DEBUG_OBJECT(src, "initializing");
276 }
277
278 static gboolean gst_isatis_plugin_init(GstPlugin *plugin)
279 {
280         return gst_element_register(plugin, "isatissrc", GST_RANK_PRIMARY, gst_isatis_src_get_type());
281 }
282
283 /* Sigh. */
284 #define PACKAGE "isatis"
285
286 GST_PLUGIN_DEFINE_STATIC(
287                 GST_VERSION_MAJOR,
288                 GST_VERSION_MINOR,
289                 "isatisplugin",
290                 "Isatis internal plugin",
291                 gst_isatis_plugin_init,
292                 VERSION,
293                 "GPL",
294                 "Isatis",
295                 "");