#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2007-2014 Parisson SARL
# Copyright (c) 2006-2014 Guillaume Pellerin <pellerin@parisson.com>
# Copyright (c) 2010-2014 Paul Brossier <piem@piem.org>
# Copyright (c) 2013-2014 Thomas Fillon <thomas@parisson.com>
# This file is part of TimeSide.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
from timeside.core import Processor, implements, interfacedoc, abstract
from timeside.core.api import IEncoder
from .tools.gstutils import numpy_array_to_gst_buffer, MainloopThread
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import GLib, Gst
Gst.init(None)
import threading
# Streaming queue configuration
QUEUE_SIZE = 10
GST_APPSINK_MAX_BUFFERS = 10
[docs]class GstEncoder(Processor):
    implements(IEncoder)
    abstract()
    type = 'encoder'
    def __init__(self, output, streaming=False, overwrite=False):
        super(GstEncoder, self).__init__()
        if isinstance(output, str):
            import os.path
            if os.path.isdir(output):
                raise IOError("Encoder output must be a file, not a directory")
            elif os.path.isfile(output) and not overwrite:
                raise IOError(
                    "Encoder output %s exists, but overwrite set to False")
            self.filename = output
        else:
            self.filename = None
        self.streaming = streaming
        if not self.filename and not self.streaming:
            raise Exception('Must give an output')
        self.end_cond = threading.Condition(threading.Lock())
        self.eod = False
        self.metadata = None
        self.num_samples = 0
        self._chunk_len = 0
[docs]    @interfacedoc
    def release(self):
        if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'):
            self.end_cond.acquire()
            while not hasattr(self, 'end_reached'):
                self.end_cond.wait()
            self.end_cond.release()
        if hasattr(self, 'error_msg'):
            raise IOError(self.error_msg) 
    def __del__(self):
        self.release()
[docs]    def start_pipeline(self, channels, samplerate):
        self.pipeline = Gst.parse_launch(self.pipe)
        # store a pointer to appsrc in our encoder object
        self.src = self.pipeline.get_by_name('src')
        if self.streaming:
            try: # py3
                import queue
            except: # py2
                import Queue as queue
            self._streaming_queue = queue.Queue(QUEUE_SIZE)
            # store a pointer to appsink in our encoder object
            self.app = self.pipeline.get_by_name('app')
            self.app.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
            self.app.set_property("drop", False)
            self.app.set_property('emit-signals', True)
            self.app.connect("new-sample", self._on_new_sample_streaming)
            self.app.connect('new-preroll', self._on_new_preroll_streaming)
        srccaps = Gst.Caps("""audio/x-raw,
            format=F32LE,
            layout=interleaved,
            channels=(int)%s,
            rate=(int)%d""" % (int(channels), int(samplerate)))
        self.src.set_property("caps", srccaps)
        self.src.set_property('emit-signals', True)
        self.src.set_property('num-buffers', -1)
        self.src.set_property('block', False)
        #self.src.set_property('do-timestamp', True)
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self._on_message_cb)
        self.mainloop = GLib.MainLoop()
        self.mainloopthread = MainloopThread(self.mainloop)
        self.mainloopthread.start()
        # start pipeline
        self.pipeline.set_state(Gst.State.PLAYING) 
    def _on_message_cb(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            self.end_cond.acquire()
            if self.streaming:
                self._streaming_queue.put(Gst.MessageType.EOS)
            self.pipeline.set_state(Gst.State.NULL)
            self.mainloop.quit()
            self.end_reached = True
            self.end_cond.notify()
            self.end_cond.release()
        elif t == Gst.MessageType.ERROR:
            self.end_cond.acquire()
            self.pipeline.set_state(Gst.State.NULL)
            self.mainloop.quit()
            self.end_reached = True
            err, debug = message.parse_error()
            self.error_msg = "Error: %s" % err, debug
            self.end_cond.notify()
            self.end_cond.release()
    def _on_new_sample_streaming(self, appsink):
        # print('pull-sample')
        chunk = appsink.emit('pull-sample')
        self._streaming_queue.put(chunk)
    def _on_new_preroll_streaming(self, appsink):
        # print('preroll')
        chunk = appsink.emit('pull-preroll')
        self._streaming_queue.put(chunk)
[docs]    @interfacedoc
    def process(self, frames, eod=False):
        self.eod = eod
        if eod:
            self.num_samples += frames.shape[0]
        else:
            self.num_samples += self.blocksize()
        buf = numpy_array_to_gst_buffer(frames, frames.shape[0],
                                        self.num_samples, self.samplerate())
        self.src.emit('push-buffer', buf)
        if self.eod:
            self.src.emit('end-of-stream')
        return frames, eod 
[docs]    def get_stream_chunk(self):
        if self.streaming:
            chunk = self._streaming_queue.get(block=True)
            if chunk == Gst.MessageType.EOS:
                return None
            else:
                self._streaming_queue.task_done()
                return chunk
        else:
            raise TypeError('function only available in streaming mode')  
if __name__ == "__main__":
    import doctest
    doctest.testmod()