Rework the whole plugin<->player communication, now it is two-way so we can restart...
authorIlpo Ruotsalainen <ilpo.ruotsalainen@movial.fi>
Wed, 12 Nov 2008 14:53:06 +0000 (16:53 +0200)
committerIlpo Ruotsalainen <ilpo.ruotsalainen@movial.fi>
Wed, 12 Nov 2008 14:53:06 +0000 (16:53 +0200)
common/isatis_protocol.c [new file with mode: 0644]
common/isatis_protocol.h [new file with mode: 0644]
isatis-player/build.mk
isatis-player/isatissrc.c
isatis-player/isatissrc.h
isatis-plugin/build.mk
isatis-plugin/plugin.c

diff --git a/common/isatis_protocol.c b/common/isatis_protocol.c
new file mode 100644 (file)
index 0000000..abc2008
--- /dev/null
@@ -0,0 +1,201 @@
+/*   Isatis Media Player Plugin
+ *   Copyright 2008 Movial Creative Technologies Inc
+ *
+ *   Authors: Ilpo Ruotsalainen, <ilpo.ruotsalainen@movial.fi>
+ *
+ *   This program is free software; you can redistribute it and/or
+ *   modify it under the terms of the GNU Lesser General Public
+ *   License as published by the Free Software Foundation; either
+ *   version 2.1 of the License, or (at your option) any later version.
+ *
+ *   This program is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ *   Lesser General Public License for more details.
+ *
+ *   You should have received a copy of the GNU Lesser General Public
+ *   License along with this program; if not, write to the Free Software
+ *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
+ */
+
+#include "../common/isatis_protocol.h"
+
+#include <assert.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <sys/uio.h>
+
+int isatis_socket_push(isatis_socket *socket)
+{
+       int r;
+
+       if (!socket->pending_data)
+               return 0;
+
+       r = write(socket->fd, socket->pending_ptr, socket->pending_length);
+       if (r == 0)
+       {
+               errno = ENOTCONN;
+               return -1;
+       }
+       else if (r < 0)
+               return -1;
+
+       socket->pending_ptr += r;
+       socket->pending_length -= r;
+
+       if (socket->pending_length == 0)
+       {
+               free(socket->pending_data);
+               socket->pending_data = NULL;
+
+               return 0;
+       }
+
+       errno = EAGAIN;
+
+       return -1;
+}
+
+static int isatis_socket_queue(isatis_socket *socket, uint8_t msg, void *data, size_t datalen, ssize_t offset)
+{
+       assert(socket->pending_data == NULL);
+       assert(offset >= 0);
+
+       socket->pending_data = malloc(sizeof(msg) + sizeof(datalen) + datalen);
+       socket->pending_ptr = socket->pending_data + offset;
+       socket->pending_length = datalen - offset;
+
+       return 0;
+}
+
+int isatis_socket_send(isatis_socket *socket, uint8_t msg, void *data, size_t datalen)
+{
+       int r;
+       ssize_t offset;
+       struct iovec v[3];
+
+       r = isatis_socket_push(socket);
+       if (r < 0)
+               return r;
+
+       v[0].iov_base = &msg;
+       v[0].iov_len = sizeof(msg);
+       v[1].iov_base = &datalen;
+       v[1].iov_len = sizeof(datalen);
+       v[2].iov_base = data;
+       v[2].iov_len = datalen;
+
+       offset = writev(socket->fd, v, 3);
+
+       if (offset < 0 && errno == EAGAIN)
+               offset = 0;
+       else if (offset < 0)
+               return -1;
+       
+       if (offset < sizeof(msg) + sizeof(datalen) + datalen)
+               return isatis_socket_queue(socket, msg, data, datalen, offset);
+
+       return 0;
+}
+
+int isatis_socket_can_receive(isatis_socket *socket)
+{
+       struct pollfd p =
+       {
+               .fd = socket->fd,
+               .events = POLLIN | POLLERR | POLLHUP
+       };
+
+       int r = poll(&p, 1, 0);
+
+       return r != 0;
+}
+
+static int isatis_socket_receive_full(isatis_socket *socket, char *ptr, size_t len)
+{
+       int r;
+
+       while (len > 0)
+       {
+               r = read(socket->fd, ptr, len);
+               if (r == 0)
+               {
+                       errno = ENOTCONN;
+                       return -1;
+               }
+               else if (r < 0)
+                       return -1;
+
+               ptr += r;
+               len -= r;
+       }
+
+       return 0;
+}
+
+int isatis_socket_receive_header(isatis_socket *socket, uint8_t *msg, size_t *datalen)
+{
+       int r;
+
+       r = read(socket->fd, msg, sizeof(*msg));
+       if (r == 0)
+       {
+               errno = ENOTCONN;
+               return -1;
+       }
+       else if (r < 0)
+               return -1;
+
+       return isatis_socket_receive_full(socket, (char *)datalen, sizeof(*datalen));
+}
+
+int isatis_socket_receive_data(isatis_socket *socket, void *data, size_t datalen)
+{
+       return isatis_socket_receive_full(socket, data, datalen);
+}
+
+int isatis_socket_receive_discard(isatis_socket *socket, size_t datalen)
+{
+       char tmp[1024];
+
+       while (datalen > 0)
+       {
+               int len = datalen > 1024 ? 1024 : datalen;
+
+               if (isatis_socket_receive_full(socket, tmp, len) < 0)
+                       return -1;
+
+               datalen -= len;
+       }
+
+       return 0;
+}
+
+isatis_socket *isatis_socket_create(int fd)
+{
+       isatis_socket *ret;
+
+       ret = malloc(sizeof(isatis_socket));
+
+       ret->fd = fd;
+       ret->pending_data = ret->pending_ptr = NULL;
+       ret->pending_length = 0;
+
+       return ret;
+}
+
+void isatis_socket_free(isatis_socket *socket)
+{
+       if (socket->pending_data)
+               free(socket->pending_data);
+
+       free(socket);
+}
+
+void isatis_socket_close(isatis_socket *socket)
+{
+       close(socket->fd);
+}
diff --git a/common/isatis_protocol.h b/common/isatis_protocol.h
new file mode 100644 (file)
index 0000000..9b25add
--- /dev/null
@@ -0,0 +1,55 @@
+/*   Isatis Media Player Plugin
+ *   Copyright 2008 Movial Creative Technologies Inc
+ *
+ *   Authors: Ilpo Ruotsalainen, <ilpo.ruotsalainen@movial.fi>
+ *
+ *   This program is free software; you can redistribute it and/or
+ *   modify it under the terms of the GNU Lesser General Public
+ *   License as published by the Free Software Foundation; either
+ *   version 2.1 of the License, or (at your option) any later version.
+ *
+ *   This program is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ *   Lesser General Public License for more details.
+ *
+ *   You should have received a copy of the GNU Lesser General Public
+ *   License along with this program; if not, write to the Free Software
+ *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
+ */
+
+#ifndef ISATIS_PROTOCOL_H
+#define ISATIS_PROTOCOL_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+/* player->plugin */
+#define ISATIS_CMD_START   '1'
+#define ISATIS_CMD_STOP    '0'
+
+/* plugin->player */
+#define ISATIS_CMD_DATA    'D'
+#define ISATIS_CMD_EOS     'E'
+#define ISATIS_CMD_STOPPED 'S'
+
+typedef struct
+{
+       int fd;
+
+       char *pending_data;
+       char *pending_ptr;
+       size_t pending_length;
+} isatis_socket;
+
+int isatis_socket_push(isatis_socket *socket);
+int isatis_socket_send(isatis_socket *socket, uint8_t msg, void *data, size_t datalen);
+int isatis_socket_can_receive(isatis_socket *socket);
+int isatis_socket_receive_header(isatis_socket *socket, uint8_t *msg, size_t *datalen);
+int isatis_socket_receive_data(isatis_socket *socket, void *data, size_t datalen);
+int isatis_socket_receive_discard(isatis_socket *socket, size_t datalen);
+isatis_socket *isatis_socket_create(int fd);
+void isatis_socket_free(isatis_socket *socket);
+void isatis_socket_close(isatis_socket *socket);
+
+#endif
index dc7b34a..2ad05bf 100644 (file)
@@ -1,6 +1,6 @@
 NAME           := isatis-player
 VERSION                := 0.0.1
-SOURCES                := $(wildcard isatis-player/*.c)
+SOURCES                := $(wildcard isatis-player/*.c common/*.c)
 CFLAGS         += -pthread -DDATADIR=\"$(DATADIR)/$(NAME)\" -DVERSION=\"$(VERSION)\"
 PKGS           := glib-2.0 gstreamer-0.10 gstreamer-plugins-base-0.10 gtk+-2.0 gstreamer-base-0.10
 LIBS           := -lgstinterfaces-0.10
index ebf7b13..81f857b 100644 (file)
  */
 
 #include "isatissrc.h"
+#include "../common/isatis_protocol.h"
 
 #include <unistd.h>
 #include <string.h>
 #include <fcntl.h>
 
-#define CACHE_BUFFER_SIZE 4096
-
 GST_DEBUG_CATEGORY_STATIC(isatissrc_debug);
 #define GST_CAT_DEFAULT isatissrc_debug
 
-static GStaticMutex isatis_mutex = G_STATIC_MUTEX_INIT;
-/* These are protected by the mutex and accessed from all threads */
-static gchar *isatis_uri = NULL;
-static gboolean isatis_cache_done = FALSE;
-
-/* These never change once set and don't need mutex protection */
-static gint isatis_input_fd = -1;
-static gint isatis_cache_write_fd = -1;
-static gint isatis_cache_read_fd = -1;
-
-static int gst_isatis_src_make_cache_file(void)
-{
-       char tmp_file[] = "/tmp/isatis.XXXXXX";
-
-       mktemp(tmp_file);
-
-       isatis_cache_write_fd = open(tmp_file, O_CREAT | O_EXCL | O_WRONLY, S_IRWXU);
-       if (isatis_cache_write_fd < 0)
-       {
-               GST_ERROR("failed to open cache file for writing: %s", g_strerror(errno));
-               return -1;
-       }
-
-       isatis_cache_read_fd = open(tmp_file, O_RDONLY);
-       if (isatis_cache_read_fd < 0)
-       {
-               unlink(tmp_file);
-               close(isatis_cache_write_fd);
-               isatis_cache_write_fd = -1;
-               GST_ERROR("failed to open cache file for reading: %s", g_strerror(errno));
-               return -1;
-       }
-
-       unlink(tmp_file);
-
-       return 0;
-}
-
-static gpointer gst_isatis_src_cache_thread_func(gpointer unused)
-{
-       int r;
-       char buffer[CACHE_BUFFER_SIZE];
-
-       GST_DEBUG("cache thread started");
-
-       do
-       {
-               r = read(isatis_input_fd, buffer, CACHE_BUFFER_SIZE);
-
-               if (r > 0)
-                       write(isatis_cache_write_fd, buffer, r); /* XXX handle partials and failures XXX */
-       }
-       while (r > 0);
-
-       g_static_mutex_lock(&isatis_mutex);
-       isatis_cache_done = TRUE;
-       g_static_mutex_unlock(&isatis_mutex);
-
-       GST_DEBUG("cache thread exiting");
-
-       return NULL;
-}
-
 static const GstElementDetails gst_isatis_src_details =
        GST_ELEMENT_DETAILS("Isatis plugin source",
                        "Source/File",
@@ -118,15 +54,14 @@ static gchar **gst_isatis_src_uri_get_protocols(void)
 
 static const gchar *gst_isatis_src_uri_get_uri(GstURIHandler *handler)
 {
-       /* XXX This isn't really threadsafe, and relies on the URI never changing once set XXX */
+       GstIsatisSrc *src = GST_ISATIS_SRC(handler);
 
-       return isatis_uri;
+       return src->uri;
 }
 
 static gboolean gst_isatis_src_uri_set_uri(GstURIHandler *handler, const gchar *uri)
 {
        GstIsatisSrc *src = GST_ISATIS_SRC(handler);
-       gboolean ret = TRUE;
        gint fd;
 
        GST_DEBUG_OBJECT(src, "setting URI to %s", uri);
@@ -134,34 +69,12 @@ static gboolean gst_isatis_src_uri_set_uri(GstURIHandler *handler, const gchar *
        if (sscanf(uri, "isatis://%d", &fd) != 1 || fd < 0)
                return FALSE;
 
-       g_static_mutex_lock(&isatis_mutex);
-
-       if (!isatis_uri)
-       {
-               GError *error = NULL;
-
-               isatis_uri = g_strdup(uri);
-               isatis_input_fd = fd;
-
-               if (gst_isatis_src_make_cache_file() < 0)
-               {
-                       GST_ELEMENT_ERROR(src, RESOURCE, OPEN_WRITE,
-                                       ("Could not open cache file: %s", g_strerror(errno)), NULL);
-               }
-               else if (!g_thread_create(gst_isatis_src_cache_thread_func, NULL, FALSE, &error))
-               {
-                       GST_ELEMENT_ERROR(src, CORE, THREAD,
-                                       ("Could not create cache thread: %s", error->message), NULL);
-
-                       g_error_free(error);
-               }
-       }
-       else if (strcmp(isatis_uri, uri))
-               ret = FALSE; /* Cannot change URI once it has been set */
+       g_free(src->uri);
 
-       g_static_mutex_unlock(&isatis_mutex);
+       src->socket = isatis_socket_create(fd);
+       src->uri = g_strdup(uri);
 
-       return ret;
+       return TRUE;
 }
 
 static void gst_isatis_src_uri_handler_init(gpointer g_iface, gpointer iface_data)
@@ -209,90 +122,171 @@ static gboolean gst_isatis_src_start(GstBaseSrc *bsrc)
 
        GST_DEBUG_OBJECT(src, "starting");
 
-       if (isatis_cache_read_fd != -1)
+       if (isatis_socket_send(src->socket, ISATIS_CMD_START, NULL, 0) < 0)
        {
-               if (lseek(isatis_cache_read_fd, 0, SEEK_SET) < 0)
-               {
-                       GST_ELEMENT_ERROR(src, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
-                       return FALSE;
-               }
+               GST_ELEMENT_ERROR(src, RESOURCE, WRITE, ("Error communicating with plugin"), GST_ERROR_SYSTEM);
+               return FALSE;
        }
 
+       src->eos = FALSE;
+
        return TRUE;
 }
 
-static GstFlowReturn gst_isatis_src_create(GstPushSrc *psrc, GstBuffer **outbuf)
+static gboolean gst_isatis_src_stop(GstBaseSrc *bsrc)
 {
-       GstIsatisSrc *src = GST_ISATIS_SRC(psrc);
-       GstBuffer *buf;
-       guint blocksize = GST_BASE_SRC(src)->blocksize;
-       gssize readbytes;
-       gboolean cache_done;
-
-       if (isatis_cache_read_fd == -1)
-               return GST_FLOW_WRONG_STATE;
+       GstIsatisSrc *src = GST_ISATIS_SRC(bsrc);
+       uint8_t msg;
+       size_t datalen;
 
-       buf = gst_buffer_new_and_alloc(blocksize);
+       GST_DEBUG_OBJECT(src, "stopping");
 
-retry:
-       g_static_mutex_lock(&isatis_mutex);
-       cache_done = isatis_cache_done;
-       g_static_mutex_unlock(&isatis_mutex);
+       if (isatis_socket_send(src->socket, ISATIS_CMD_STOP, NULL, 0) < 0)
+       {
+               GST_ELEMENT_ERROR(src, RESOURCE, WRITE, ("Error communicating with plugin"), GST_ERROR_SYSTEM);
+               return FALSE;
+       }
 
-       readbytes = read(isatis_cache_read_fd, GST_BUFFER_DATA(buf), blocksize);
+       /* Empty the buffer queue */
+       while (!g_queue_is_empty(src->buffer_queue))
+               gst_buffer_unref(g_queue_pop_head(src->buffer_queue));
 
-       if (readbytes < 0)
+       /* Drain messages until stopped */
+       do
        {
-               GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), ("reading cache file failed: %s", g_strerror(errno)));
-               gst_buffer_unref(buf);
-               return GST_FLOW_ERROR;
+               if (isatis_socket_receive_header(src->socket, &msg, &datalen) < 0)
+                       break;
+
+               isatis_socket_receive_discard(src->socket, datalen);
        }
-       else if (readbytes == 0)
+       while (msg != ISATIS_CMD_STOPPED);
+
+       return TRUE;
+}
+
+static GstFlowReturn gst_isatis_src_handle_messages(GstIsatisSrc *src)
+{
+       uint8_t msg;
+       size_t datalen;
+       int i = 0;
+       gboolean got_data = FALSE;
+
+       do
        {
-               if (cache_done)
-               {
-                       /* Actual EOS */
-                       GST_DEBUG_OBJECT(src, "No more data, EOS");
+               if (isatis_socket_receive_header(src->socket, &msg, &datalen) < 0)
+                       goto error;
 
-                       gst_buffer_unref(buf);
-                       return GST_FLOW_UNEXPECTED;
-               }
-               else
+#ifdef SPAMMY_DEBUG
+               GST_DEBUG_OBJECT(src, "received message '%c' with %d bytes of data", msg, datalen);
+#endif
+
+               switch (msg)
                {
-                       /* Need to pull more data from stream */
-                       GST_DEBUG_OBJECT(src, "Waiting for more data");
+                       case ISATIS_CMD_DATA:
+                               {
+                                       GstBuffer *buf = gst_buffer_new_and_alloc(datalen);
 
-                       sleep(1);
-                       goto retry;
+                                       if (isatis_socket_receive_data(src->socket, GST_BUFFER_DATA(buf), datalen) < 0)
+                                       {
+                                               gst_buffer_unref(buf);
+                                               goto error;
+                                       }
+
+                                       g_queue_push_head(src->buffer_queue, buf);
+
+                                       got_data = TRUE;
+                               }
+                               break;
+
+                       case ISATIS_CMD_EOS:
+                               src->eos = TRUE;
+
+                               if (got_data)
+                                       return GST_FLOW_OK;
+                               else
+                                       return GST_FLOW_UNEXPECTED;
+
+                       default:
+                               isatis_socket_receive_discard(src->socket, datalen);
+
+                               GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Unexpected command from plugin"), (NULL));
+                               return GST_FLOW_ERROR;
                }
        }
+       while (isatis_socket_can_receive(src->socket) && i++ < 100);
 
-       /* Got some data */
+       return GST_FLOW_OK;
 
-       GST_DEBUG_OBJECT(src, "read %" G_GSSIZE_FORMAT " bytes", readbytes);
+error:
+       GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Error communicating with plugin"), GST_ERROR_SYSTEM);
 
-       GST_BUFFER_SIZE(buf) = readbytes;
-       GST_BUFFER_OFFSET(buf) = GST_BUFFER_OFFSET_NONE; /* XXX */
-       GST_BUFFER_TIMESTAMP(buf) = GST_CLOCK_TIME_NONE;
+       return GST_FLOW_ERROR;
+}
 
-       *outbuf = buf;
+static GstFlowReturn gst_isatis_src_create(GstPushSrc *psrc, GstBuffer **outbuf)
+{
+       GstIsatisSrc *src = GST_ISATIS_SRC(psrc);
+       GstFlowReturn ret = GST_FLOW_OK;
+
+       while (g_queue_is_empty(src->buffer_queue))
+       {
+               if (src->eos)
+                       return GST_FLOW_UNEXPECTED;
+
+               GST_DEBUG_OBJECT(src, "buffer queue is empty, trying to get more");
+
+               ret = gst_isatis_src_handle_messages(src);
+               if (ret != GST_FLOW_OK)
+                       return ret;
+       }
+
+       *outbuf = g_queue_pop_tail(src->buffer_queue);
 
        return GST_FLOW_OK;
 }
 
+static void gst_isatis_src_init(GstIsatisSrc *src, GstIsatisSrcClass *klass)
+{
+       GST_DEBUG_OBJECT(src, "initializing");
+
+       src->socket = NULL;
+       src->uri = g_strdup("isatis://0");
+       src->eos = FALSE;
+       src->buffer_queue = g_queue_new();
+}
+
+static void gst_isatis_src_dispose(GObject *obj)
+{
+       GstIsatisSrc *src = GST_ISATIS_SRC(obj);
+
+       GST_DEBUG_OBJECT(src, "disposing");
+
+       g_free(src->uri);
+       src->uri = NULL;
+
+       g_assert(g_queue_is_empty(src->buffer_queue));
+       g_queue_free(src->buffer_queue);
+       src->buffer_queue = NULL;
+
+       if (src->socket)
+               isatis_socket_free(src->socket);
+       src->socket = NULL;
+
+       G_OBJECT_CLASS(parent_class)->dispose(obj);
+}
+
 static void gst_isatis_src_class_init(GstIsatisSrcClass *klass)
 {
        GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS(klass);
        GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
+       GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
 
        gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_isatis_src_start);
+       gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_isatis_src_stop);
 
        gstpushsrc_class->create = GST_DEBUG_FUNCPTR(gst_isatis_src_create);
-}
 
-static void gst_isatis_src_init(GstIsatisSrc *src, GstIsatisSrcClass *klass)
-{
-       GST_DEBUG_OBJECT(src, "initializing");
+       gobject_class->dispose = gst_isatis_src_dispose;
 }
 
 static gboolean gst_isatis_plugin_init(GstPlugin *plugin)
index b9f08dc..94c20ae 100644 (file)
@@ -21,6 +21,9 @@
 #ifndef __GST_ISATIS_SRC_H__
 #define __GST_ISATIS_SRC_H__
 
+#include "../common/isatis_protocol.h"
+
+#include <glib.h>
 #include <gst/base/gstpushsrc.h>
 
 G_BEGIN_DECLS
@@ -42,6 +45,12 @@ typedef struct _GstIsatisSrcClass GstIsatisSrcClass;
 struct _GstIsatisSrc
 {
        GstPushSrc element;
+
+       isatis_socket *socket;
+       gchar *uri;
+       gboolean eos;
+
+       GQueue *buffer_queue;
 };
 
 struct _GstIsatisSrcClass
index 872a19e..0d472ae 100644 (file)
@@ -1,7 +1,8 @@
 NAME           := libisatisplugin
 VERSION                := 0.0.1
-SOURCES                := $(wildcard isatis-plugin/*.c)
+SOURCES                := $(wildcard isatis-plugin/*.c) $(wildcard common/*.c)
 CFLAGS         += -pthread -DBINDIR=\"$(BINDIR)\" -DVERSION=\"$(VERSION)\"
+PKGS           := glib-2.0
 
 # XXX Disgusting kludge XXX
 CFLAGS         += -I/usr/include/nspr -I/usr/include/xulrunner
index fae21e3..9a48d2a 100644 (file)
  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
  */
 
+#include "../common/isatis_protocol.h"
+
 #include <npapi.h>
 #include <npupp.h>
 
+#include <glib.h>
+
 #include <string.h>
 #include <unistd.h>
 #include <errno.h>
 #include <poll.h>
 #include <fcntl.h>
 #include <sys/wait.h>
+#include <sys/socket.h>
 
 #ifdef DEBUG
 #include <stdio.h>
 
 struct plugin_private
 {
-       int data_fd;
+       isatis_socket *socket;
        pid_t player_pid;
        int has_stream;
        char *uri;
        unsigned long xid;
+       guint watch;
+       NPStream *stream;
 };
 
 #define GET_PLUGIN_PRIVATE(i, x)           \
@@ -63,19 +70,62 @@ struct plugin_private
 
 static NPNetscapeFuncs host_funcs;
 
+static gboolean NPP_OnDataReady(GIOChannel *channel, GIOCondition cond, NPP instance)
+{
+       struct plugin_private *priv;
+       uint8_t msg;
+       size_t datalen;
+
+       DBG(("NPP_OnDataReady(%p)\n", instance));
+
+       GET_PLUGIN_PRIVATE(instance, priv);
+
+       if (isatis_socket_receive_header(priv->socket, &msg, &datalen) < 0)
+       {
+               DBG(("error reading data from player: %s\n", strerror(errno)));
+               return FALSE;
+       }
+
+       DBG(("  command from client: '%c' with %d bytes of data\n", msg, datalen));
+
+       switch (msg)
+       {
+               case ISATIS_CMD_START:
+                       if (!priv->stream)
+                               CallNPN_GetURLProc(host_funcs.geturl, instance, priv->uri, NULL);
+                       break;
+
+               case ISATIS_CMD_STOP:
+                       if (priv->stream)
+                               CallNPN_DestroyStreamProc(host_funcs.destroystream, instance, priv->stream, NPRES_USER_BREAK);
+
+                       isatis_socket_send(priv->socket, ISATIS_CMD_STOPPED, NULL, 0);
+                       break;
+
+               default:
+                       isatis_socket_receive_discard(priv->socket, datalen);
+
+                       DBG(("unexpected command from client\n"));
+                       break;
+       }
+
+       return TRUE;
+}
+
 static NPError NPP_SpawnPlayer(NPP instance)
 {
        int r;
-       int pipe_fds[2];
+       int socket_fds[2];
+       GIOChannel *channel;
        struct plugin_private *priv;
 
        DBG(("NPP_SpawnPlayer(%p)\n", instance));
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       if (pipe(pipe_fds) < 0)
+       if (socketpair(PF_LOCAL, SOCK_STREAM, 0, socket_fds) < 0)
        {
-               DBG(("  pipe() failed: %s\n", strerror(errno)));
+               DBG(("  socketpair() failed: %s\n", strerror(errno)));
                return NPERR_GENERIC_ERROR;
        }
 
@@ -91,15 +141,16 @@ static NPError NPP_SpawnPlayer(NPP instance)
                                char *uri;
 
                                /* Close the input side of the pipe in the child process */
-                               close(pipe_fds[1]);
+                               close(socket_fds[1]);
 
                                snprintf(xid_buf, sizeof(xid_buf), "0x%lx", priv->xid);
 
-                               if (priv->uri)
+                               /* Shortcut file:// URIs for direct file access */
+                               if (!strncmp("file://", priv->uri, 7))
                                        uri = priv->uri;
                                else
                                {
-                                       snprintf(uri_buf, sizeof(uri_buf), "isatis://%d", pipe_fds[0]);
+                                       snprintf(uri_buf, sizeof(uri_buf), "isatis://%d", socket_fds[0]);
 
                                        uri = uri_buf;
                                }
@@ -128,12 +179,17 @@ static NPError NPP_SpawnPlayer(NPP instance)
        DBG(("  child pid is %d\n", r));
 
        /* Close the output side of the pipe in the parent process */
-       close(pipe_fds[0]);
+       close(socket_fds[0]);
 
-       fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK);
+       fcntl(socket_fds[1], F_SETFL, O_NONBLOCK);
 
        priv->player_pid = r;
-       priv->data_fd = pipe_fds[1];
+       priv->socket = isatis_socket_create(socket_fds[1]);
+
+       channel = g_io_channel_unix_new(socket_fds[1]);
+       g_io_channel_set_encoding(channel, NULL, NULL);
+       g_io_channel_set_buffered(channel, FALSE);
+       priv->watch = g_io_add_watch(channel, G_IO_IN | G_IO_HUP | G_IO_ERR, (GIOFunc)NPP_OnDataReady, instance);
 
        return NPERR_NO_ERROR;
 }
@@ -151,11 +207,13 @@ NPError NPP_New(NPMIMEType type, NPP instance, uint16 mode, int16 argc, char *ar
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       priv->data_fd = -1;
+       priv->socket = NULL;
        priv->player_pid = 0;
        priv->has_stream = 0;
        priv->uri = NULL;
        priv->xid = 0;
+       priv->watch = 0;
+       priv->stream = NULL;
 
        return NPERR_NO_ERROR;
 }
@@ -173,8 +231,14 @@ NPError NPP_Destroy(NPP instance, NPSavedData **save)
                if (priv->uri)
                        free(priv->uri);
 
-               if (priv->data_fd != -1)
-                       close(priv->data_fd);
+               if (priv->socket)
+               {
+                       isatis_socket_close(priv->socket);
+                       isatis_socket_free(priv->socket);
+               }
+
+               if (priv->watch)
+                       g_source_remove(priv->watch);
 
                if (priv->player_pid)
                {
@@ -231,29 +295,26 @@ NPError NPP_NewStream(NPP instance, NPMIMEType type, NPStream *stream, NPBool se
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       if (priv->has_stream)
-       {
-               DBG(("  browser is trying to send extra streams\n"));
-               return NPERR_GENERIC_ERROR;
-       }
-
-       priv->has_stream = 1;
-
        DBG(("  stream->url = %s\n", stream->url));
 
-       /* Shortcut file:// URIs to direct file access */
-       if (!strncmp("file://", stream->url, 7))
+       *stype = NP_NORMAL;
+
+       if (!priv->has_stream)
        {
+               priv->has_stream = 1;
+
+               /* Save stream URI for restarts */
                priv->uri = strdup(stream->url);
 
+               /* Stop the stream, it will be restarted when user presses play */
                CallNPN_DestroyStreamProc(host_funcs.destroystream, instance, stream, NPRES_DONE);
-       }
 
-       *stype = NP_NORMAL;
+               /* If we already have the XID we can launch the player process now */
+               if (priv->xid)
+                       return NPP_SpawnPlayer(instance);
+       }
 
-       /* If we already have the XID we can launch the player process now */
-       if (priv->xid)
-               return NPP_SpawnPlayer(instance);
+       priv->stream = stream;
 
        return NPERR_NO_ERROR;
 }
@@ -266,13 +327,9 @@ NPError NPP_DestroyStream(NPP instance, NPStream *stream, NPReason reason)
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       /* Unfortunately we cannot pass the reason why the stream was closed to the player, oh well... */
+       priv->stream = NULL;
 
-       if (priv->data_fd != -1)
-       {
-               close(priv->data_fd);
-               priv->data_fd = -1;
-       }
+       isatis_socket_send(priv->socket, ISATIS_CMD_EOS, NULL, 0);
 
        return NPERR_NO_ERROR;
 }
@@ -282,11 +339,15 @@ int32_t NPP_WriteReady(NPP instance, NPStream *stream)
        struct plugin_private *priv;
        struct pollfd fds;
 
-       /* DBG(("NPP_WriteReady(%p, %p)\n", instance, stream)); */
+#ifdef SPAMMY_DEBUG
+       DBG(("NPP_WriteReady(%p, %p)\n", instance, stream));
+#endif
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       fds.fd = priv->data_fd;
+       isatis_socket_push(priv->socket);
+
+       fds.fd = priv->socket->fd;
        fds.events = POLLOUT | POLLERR | POLLHUP;
 
        switch (poll(&fds, 1, 0))
@@ -297,7 +358,7 @@ int32_t NPP_WriteReady(NPP instance, NPStream *stream)
                case 1:
                        /* Note that this function isn't documented as being allowed to return -1 for failures;
                         * thus we return success even if the child has died and fail in NPP_Write() instead */
-                       return 1024;
+                       return 4096;
 
                default:
                        DBG(("poll() failed: %s\n", strerror(errno)));
@@ -310,19 +371,22 @@ int32_t NPP_WriteReady(NPP instance, NPStream *stream)
 int32_t NPP_Write(NPP instance, NPStream *stream, int32_t offset, int32_t len, void *buffer)
 {
        struct plugin_private *priv;
+       int r;
 
-       /* DBG(("NPP_Write(%p, %p, %d, %d, %p)\n", instance, stream, offset, len, buffer)); */
+#ifdef SPAMMY_DEBUG
+       DBG(("NPP_Write(%p, %p, %d, %d, %p)\n", instance, stream, offset, len, buffer));
+#endif
 
        GET_PLUGIN_PRIVATE(instance, priv);
 
-       int r = write(priv->data_fd, buffer, len);
-
-       /* XXX What about EAGAIN? It might happen if PIPE_BUF is ridiculously small for some reason XXX */
-
+       r = isatis_socket_send(priv->socket, ISATIS_CMD_DATA, buffer, len);
        if (r < 0)
-               DBG(("write() failed: %s\n", strerror(errno)));
+       {
+               DBG(("isatis_socket_send() failed: %s\n", strerror(errno)));
+               return -1;
+       }
 
-       return r;
+       return len;
 }
 
 NPError NPP_GetValue(NPP instance, NPPVariable variable, void *value)
@@ -357,6 +421,7 @@ NPError NP_Initialize(NPNetscapeFuncs *host_vtable, NPPluginFuncs *plugin_vtable
 {
        NPError err;
        NPBool hasxembed = FALSE;
+       NPNToolkitType toolkittype;
 
        DBG(("NP_Initialize\n"));
 
@@ -394,6 +459,13 @@ NPError NP_Initialize(NPNetscapeFuncs *host_vtable, NPPluginFuncs *plugin_vtable
                return NPERR_INCOMPATIBLE_VERSION_ERROR;
        }
 
+       err = CallNPN_GetValueProc(host_vtable->getvalue, NULL, NPNVToolkit, &toolkittype);
+       if (err != NPERR_NO_ERROR || toolkittype != NPNVGtk2)
+       {
+               DBG(("Unsupported windowing toolkit\n"));
+               return NPERR_INCOMPATIBLE_VERSION_ERROR;
+       }
+
        return NPERR_NO_ERROR;
 }