Source code for timeside.core.analyzer

# -*- coding: utf-8 -*-
#
# Copyright (c) 2007-2013 Parisson SARL

# This file is part of TimeSide.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# Authors:
#   Guillaume Pellerin <yomguy at parisson.com>
#   Paul Brossier <piem@piem.org>
#   Thomas Fillon <thomas  at parisson.com>

from __future__ import division

from .processor import Processor
from .tools import hdf5

import timeside  # import __version__
from timeside.core import implements, abstract
from timeside.core.api import IAnalyzer

import numpy as np
from collections import OrderedDict
import h5py
import simplejson as json

import os

if 'DISPLAY' not in os.environ:
    import matplotlib
    matplotlib.use('Agg')

import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas

numpy_data_types = [
    #'float128',
    'float64',
    'float32',
    #'float16', Not supported by h5py for version < 2.2
    'int64',
    'int16',
    'int32',
    'int8',
    'uint64',
    'uint32',
    'uint16',
    'uint8',
    #'unicode_', Strings should be handled through label_metadata
    #'string_',
    'object_',
    'longlong',
    #'timedelta64',
    #'datetime64',
    #'complex128',
    #'complex64',
]
numpy_data_types = list(map(lambda x: getattr(np, x), numpy_data_types))
# numpy_data_types += [np.ndarray]


class Parameters(dict):

    def as_dict(self):
        return self

    def to_xml(self):
        import xml.etree.ElementTree as ET
        root = ET.Element('Metadata')

        for key in self.keys():
            child = ET.SubElement(root, key)
            child.text = repr(self[key])

        return ET.tostring(root, encoding="utf-8", method="xml")

    def from_xml(self, xml_string):
        import xml.etree.ElementTree as ET
        import ast
        root = ET.fromstring(xml_string)
        for child in root:
            key = child.tag
            if child.text:
                self[key] = ast.literal_eval(child.text)

    def to_hdf5(self, h5group):
        hdf5.dict_to_hdf5(self, h5group)

    def from_hdf5(self, h5group):
        hdf5.dict_from_hdf5(self, h5group)

    def from_dict(self, dict_obj):
        for key, value in dict_obj.items():
            try:
                self[key].from_dict(value)
            except AttributeError:
                self[key] = value


class MetadataObject(Parameters):

    """
    Object that contains a metadata structure
    stucture inspired by [1]
    [1] : http://www.saltycrane.com/blog/2012/08/python-data-object-motivated-desire-mutable-namedtuple-default-values/

    Metadata
    ----------


    Methods
    -------
    as_dict()
        Return a dictionnary representation of the MetadataObject
    """

    # Define default values as an OrderDict
    # in order to keep the order of the keys for display
    _default_value = OrderedDict()

    def __init__(self, **kwargs):
        '''
        Construct an Metadata object
        Abstract Class _default_value must be specified by

        Metadata()

        Parameters
        ----------

        Returns
        -------
        Metadata
        '''
        # Set Default values
        for key, value in self._default_value.items():
            setattr(self, key, value)

        # Set metadata passed in as arguments
        for key, value in kwargs.items():
            setattr(self, key, value)

    def __setattr__(self, name, value):
        if name not in self._default_value.keys():
            raise AttributeError("%s is not a valid attribute in %s" %
                                 (name, self.__class__.__name__))
        try:
            super(MetadataObject, self).__setattr__(name, value)
        except AttributeError:
            raise

    def __delattr__(self, name):
        if name in self._default_value.keys():
            new_default_value = self._default_value.copy()
            del new_default_value[name]
            super(MetadataObject, self).__setattr__('_default_value',
                                                    new_default_value)
            super(MetadataObject, self).__delattr__(name)

    def as_dict(self):
        return dict((att, getattr(self, att))
                    for att in self.keys())

    def keys(self):
        return [attr for attr in self._default_value.keys()]

    def values(self):
        return [self[attr] for attr in self.keys()]

    def items(self):
        return [(attr, self[attr]) for attr in self.keys()]

    def __getitem__(self, key, default=None):
        try:
            return getattr(self, key)
        except AttributeError:
            return default

    def __setitem__(self, key, value):
        setattr(self, key, value)

    def __repr__(self):
        return '{}({})'.format(
            self.__class__.__name__,
            ', '.join('{}={}'.format(
                att, repr(getattr(self, att)))
                for att in self.keys()))

    def __str__(self):
        return self.as_dict().__str__()

    def __eq__(self, other):
        return (isinstance(other, self.__class__)
                and all([self[key] == other[key] for key in self.keys()]))

    def __ne__(self, other):
        return not(isinstance(other, self.__class__)
                   or self.as_dict() != other.as_dict())


class IdMetadata(MetadataObject):

    '''
    Metadata object to handle ID related Metadata

        Attributes
        ----------
        id : str
        name : str
        unit : str
        description : str
        date : str
            date and time in ISO  8601 format YYYY-MM-DDTHH:MM:SS
        version : str
        author : str
        proc_uuid : str
    '''

    # Define default values
    _default_value = OrderedDict([('id', None),
                                  ('name', None),
                                  ('unit', None),
                                  ('description', None),
                                  ('date', None),
                                  ('version', None),
                                  ('author', None),
                                  ('proc_uuid', None),
                                  ])

    def __setattr__(self, name, value):
        if value is None:
            value = ''

        super(IdMetadata, self).__setattr__(name, value)


class AudioMetadata(MetadataObject):

    '''
    Metadata object to handle Identification Metadata

        Attributes
        ----------
        uri : str
        start : float
            Start time of the segment in seconds
        duration : float
            Duration of the segment in seconds
        is_segment : boolean
            Is the media a segment of an audio source
        sha1 : str
            Sha1 hexadecimal digest of the audio source
        channels : int
            Number of channels
        channelsManagement : str
            A string that indicates how the channels are manage
            Examples :
                channelsManagement = '(L+R)/2'
                channelsManagement = 'R' keep only right channel
                channelsManagement = 'L' keep only left channel
                channelsManagement = 'stereo' keep both stereo channels
    '''

    # Define default values
    _default_value = OrderedDict([('uri', ''),
                                  ('start', 0),
                                  ('duration', None),
                                  ('is_segment', None),
                                  ('sha1', ''),
                                  ('channels', None),
                                  ('channelsManagement', '')])


class LabelMetadata(MetadataObject):

    '''
    Metadata object to handle Label Metadata

        Attributes
        ----------
        label : dict
            A dictionnary that contains :
                - label id has keys and
                - label names has values

        description : dict
            A dictionnary that contains :
                - label id has keys and
                - label descriptions has values

        label_type : str
            = 'mono' or 'multi'
            'mono' or 'multi' enable to specify the label mode :
            - 'mono'  : mono-label (only 1 label at a time)
            - 'multi' : multi-label (several labels can be observe
                        at the same time)


    '''

    # Define default values
    _default_value = OrderedDict([('label', None),
                                  ('description', None),
                                  ('label_type', 'mono')])

    def to_hdf5(self, h5group):
        """
        Save a dictionnary-like object inside a h5 file group
        """
        # Write attributes
        name = 'label_type'
        if self.__getattribute__(name) is not None:
            h5group.attrs[name] = self.__getattribute__(name)

        for name in ['label', 'description']:
            subgroup = h5group.create_group(name)
            if self.__getattribute__(name):
                hdf5.dict_to_hdf5(self.__getattribute__(name), subgroup)

    def from_hdf5(self, h5group):
        self.label = {}
        self.description = {}
        self['label_type'] = h5group.attrs['label_type']
        for subgroup_name, h5subgroup in h5group.items():
            hdf5.dict_from_hdf5(self[subgroup_name], h5subgroup)


class FrameMetadata(MetadataObject):

    '''
    Metadata object to handle Frame related Metadata

        Attributes
        ----------
        samplerate : int (or float?)
        blocksize : int
        stepsize : int
    '''
    # TODO : check is samplerate can support float

    # Define default values
    _default_value = OrderedDict([('samplerate', None),
                                  ('blocksize', None),
                                  ('stepsize', None)])


class DataObject(MetadataObject):
    '''
    Metadata object to handle data related Metadata

        Attributes
        ----------
        value : numpy array
        label : numpy array of int
        time : numpy array of float
        duration : numpy array of float

    '''

    # Define default values
    _default_value = OrderedDict([('value', None),
                                  ('label', None),
                                  ('time', None),
                                  ('duration', None)])

    def __setattr__(self, name, value):
        if value is None:
            value = []

        # Set Data with the proper type
        if name == 'value':
            value = np.asarray(value)
            if value.dtype.type not in numpy_data_types:
                raise TypeError(
                    'Result Data can not accept type %s for %s' %
                    (value.dtype.type, name))
            if value.shape == ():
                value.resize((1,))

        elif name == 'label':
            try:
                value = np.asarray(value, dtype='int')
            except ValueError:
                raise TypeError(
                    'Result Data can not accept type %s for %s' %
                    (value.dtype.type, name))

        elif name in ['time', 'duration', 'y_value']:
            try:
                value = np.asfarray(value)
            except ValueError:
                raise TypeError(
                    'Result Data can not accept type %s for %s' %
                    (value.dtype.type, name))
        elif name == 'dataType':
            return

        super(DataObject, self).__setattr__(name, value)

    def __eq__(self, other):
        # TODO fix this
        try:
            return (isinstance(other, self.__class__) and
                    all([np.array_equal(self[key], other[key])
                         for key in self.keys()]))
        except AttributeError:
            return (isinstance(other, self.__class__) and
                    all([bool(np.logical_and.reduce((self[key] == other[key]).ravel()))
                         for key in self.keys()]))

    def __ne__(self, other):
        return not(isinstance(other, self.__class__) or
                   any([np.array_equal(self[key], other[key])
                        for key in self.keys()]))

    def as_dict(self):
        as_dict = super(DataObject, self).as_dict()

        for key in ['frame_metadata', 'label_metadata']:
            #  TODO : check if its needed now
            if key in as_dict and isinstance(as_dict[key], MetadataObject):
                as_dict[key] = as_dict[key].as_dict()
        return as_dict

    def to_xml(self):
        import xml.etree.ElementTree as ET
        root = ET.Element('Metadata')

        for key in self.keys():
            child = ET.SubElement(root, key)
            value = getattr(self, key)
            if hasattr(value, 'to_xml'):
                child = value.to_xml()
            elif value.any():
                child.text = repr(value.tolist())
                child.set('dtype', value.dtype.__str__())

        return ET.tostring(root, encoding="utf-8", method="xml")

    def from_xml(self, xml_string):
        import xml.etree.ElementTree as ET
        import ast
        root = ET.fromstring(xml_string)
        for child in root:
            key = child.tag
            if child.text:
                self[key] = np.asarray(ast.literal_eval(child.text),
                                       dtype=child.get('dtype'))

    def to_hdf5(self, h5group):
        # Write Datasets
        for key in self.keys():
            if self.__getattribute__(key) is None:
                continue
            if hasattr(self.__getattribute__(key), 'to_hdf5'):
                subgroup = h5group.create_group(key)
                self.__getattribute__(key).to_hdf5(subgroup)
            elif self.__getattribute__(key).dtype == 'object':
                # Handle numpy type = object as vlen string
                h5group.create_dataset(key,
                                       data=self.__getattribute__(
                                           key).tolist().__repr__(),
                                       dtype=h5py.special_dtype(vlen=str))
            else:
                if np.prod(self.__getattribute__(key).shape):
                    maxshape = None
                else:
                    maxshape = (None,)
                h5group.create_dataset(
                    key, data=self.__getattribute__(key), maxshape=maxshape)

    def from_hdf5(self, h5group):
        for key, dataset in h5group.items():
            if isinstance(dataset, h5py.Group):
                self[key].from_hdf5(dataset)
                continue
            # Load value from the hdf5 dataset and store in data
            # FIXME : the following conditional statement is to prevent
            # reading an empty dataset.
            # see : https://github.com/h5py/h5py/issues/281
            # It should be fixed by the next h5py version
            if dataset.shape != (0,):
                if h5py.check_dtype(vlen=dataset.dtype):
                    # to deal with VLEN data used for list of
                    # list
                    self.__setattr__(key, eval(dataset[...].tolist()))
                else:
                    self.__setattr__(key, dataset[...])
            else:
                self.__setattr__(key, [])


def data_objet_class(data_mode='value', time_mode='framewise'):
    """
    Factory function for Analyzer result
    """
    classes_table = {('value', 'global'): GlobalValueObject,
                     ('value', 'event'): EventValueObject,
                     ('value', 'segment'): SegmentValueObject,
                     ('value', 'framewise'): FrameValueObject,
                     ('label', 'global'): GlobalLabelObject,
                     ('label', 'event'): EventLabelObject,
                     ('label', 'segment'): SegmentLabelObject,
                     ('label', 'framewise'): FrameLabelObject}

    try:
        return classes_table[(data_mode, time_mode)]
    except KeyError as e:
        raise ValueError('Wrong arguments')


[docs]class AnalyzerResult(MetadataObject): """ Object that contains the metadata and parameters of an analyzer process Parameters ---------- data_mode : str data_mode describes the type of data : - 'value' for values - 'label' for label data see LabelMetadata time_mode : str time_mode describes the correspondance between data values and time - 'framewise' - 'global' - 'segment' - 'event' Returns ------- A new MetadataObject with the following attributes : - data_object : :class:`DataObject` - id_metadata : :class:`IdMetadata` - audio_metadata : :class:`AudioMetadata` - frame_metadata : :class:`FrameMetadata` - label_metadata : :class:`LabelMetadata` - parameters : :class:`Parameters` Object """ # Define default values _default_value = OrderedDict([('id_metadata', None), ('data_object', None), ('audio_metadata', None), ('parameters', None) ]) def __init__(self, data_mode='value', time_mode='framewise'): super(AnalyzerResult, self).__init__() self._data_mode = data_mode self._time_mode = time_mode self.id_metadata = IdMetadata() self.audio_metadata = AudioMetadata() self.parameters = Parameters() self.data_object = data_objet_class(data_mode, time_mode)() # self.label_metadata = LabelMetadata() def __setattr__(self, name, value): if name in ['_data_mode', '_time_mode']: super(MetadataObject, self).__setattr__(name, value) return elif name in self.keys(): if isinstance(value, dict) and value: for (sub_name, sub_value) in value.items(): self[name][sub_name] = sub_value return super(AnalyzerResult, self).__setattr__(name, value) def __len__(self): if self.data_mode == 'value': return len(self.data_object.value) else: return len(self.data_object.label)
[docs] def as_dict(self): return dict([(key, self[key].as_dict()) for key in self.keys()] + # if hasattr(self[key], 'as_dict')] + [('data_mode', self.data_mode), ('time_mode', self.time_mode)])
# TODO : check if it can be simplified now
[docs] def to_xml(self): import xml.etree.ElementTree as ET root = ET.Element('result', metadata = {'name': self.id_metadata.name, 'id': self.id_metadata.id}) for name in ['data_mode', 'time_mode']: child = ET.SubElement(root, name) child.text = str(self.__getattribute__(name)) child.tag = name root.append(child) for key in self.keys(): child = ET.fromstring(self[key].to_xml()) child.tag = key root.append(child) return ET.tostring(root, encoding="utf-8", method="xml")
[docs] @staticmethod def from_xml(xml_string): import xml.etree.ElementTree as ET root = ET.fromstring(xml_string) data_mode_child = root.find('data_mode') time_mode_child = root.find('time_mode') result = AnalyzerResult(data_mode=data_mode_child.text, time_mode=time_mode_child.text) for child in root: key = child.tag if key not in ['data_mode', 'time_mode']: child_string = ET.tostring(child) result[key].from_xml(child_string) return result
[docs] def to_hdf5(self, h5_file): # Save results in HDF5 Dataset group = h5_file.create_group(self.id) group.attrs['data_mode'] = self.__getattribute__('data_mode') group.attrs['time_mode'] = self.__getattribute__('time_mode') for key in self.keys(): if key in ['data_mode', 'time_mode']: continue subgroup = group.create_group(key) self.__getattribute__(key).to_hdf5(subgroup)
[docs] @staticmethod def from_hdf5(h5group): # Read Sub-Group result = AnalyzerResult(data_mode=h5group.attrs['data_mode'], time_mode=h5group.attrs['time_mode']) for subgroup_name, h5subgroup in h5group.items(): result[subgroup_name].from_hdf5(h5subgroup) return result
[docs] def to_json(self, output_file=None): json_str = json.dumps(self.as_dict(), default=JSON_NumpyArrayEncoder) if output_file: open(output_file, 'w').write(json_str) else: return json_str
def _render_plot(self, ax, size=(1024, 256)): return NotImplemented
[docs] def render(self): '''Render a matplotlib figure from the analyzer result Return the figure, use fig.show() to display if neeeded ''' fig, ax = plt.subplots() self.data_object._render_plot(ax) return fig
def _render_PIL(self, size=(1024, 256), dpi=80, xlim=None): from .grapher import Image image_width, image_height = size xSize = image_width / dpi ySize = image_height / dpi fig = Figure(figsize=(xSize, ySize), dpi=dpi) ax = fig.add_axes([0, 0, 1, 1], frame_on=False) self.data_object._render_plot(ax, size) if xlim is not None: ax.set_xlim(xlim[0], xlim[1]) else: ax.autoscale(axis='x', tight=True) # Export to PIL image from io import BytesIO imgdata = BytesIO() canvas = FigureCanvas(fig) canvas.print_png(imgdata, dpi=dpi) imgdata.seek(0) # rewind the data return Image.open(imgdata) @property def data_mode(self): return self._data_mode @property def time_mode(self): return self._time_mode @property def data(self): return self.data_object.data @property def time(self): if self._time_mode == 'global': return self.audio_metadata.start else: return self.audio_metadata.start + self.data_object.time @property def duration(self): if self._time_mode == 'global': return self.audio_metadata.duration else: return self.data_object.duration @property def id(self): return self.id_metadata.id @property def name(self): return self.id_metadata.name @property def unit(self): return self.id_metadata.unit
class ValueObject(DataObject): @property def data(self): return self.value @property def properties(self): return dict(mean=np.mean(self.data, axis=0), std=np.std(self.data, axis=0, ddof=1), median=np.median(self.data, axis=0), max=np.max(self.data, axis=0), min=np.min(self.data, axis=0), shape=self.data.shape, ) class LabelObject(DataObject): def __init__(self): super(LabelObject, self).__init__() self.label_metadata = LabelMetadata() @property def data(self): return self.label class GlobalObject(DataObject): @property def time(self): return 0 # self.audio_metadata.start @property def duration(self): return None # self.audio_metadata.duration class FramewiseObject(DataObject): def __init__(self): super(FramewiseObject, self).__init__() self.frame_metadata = FrameMetadata() @property def time(self): return (np.arange(0, len(self.data) * self.frame_metadata.stepsize, self.frame_metadata.stepsize) / self.frame_metadata.samplerate) @property def duration(self): return (self.frame_metadata.blocksize / self.frame_metadata.samplerate * np.ones(len(self.data))) class EventObject(DataObject): @property def duration(self): return np.zeros(len(self.data)) def _render_plot(self, ax, size=(1024, 256)): ax.stem(self.time, self.data) class SegmentObject(DataObject): pass class GlobalValueObject(ValueObject, GlobalObject): # Define default values _default_value = OrderedDict([('value', None), ('y_value', None)]) class GlobalLabelObject(LabelObject, GlobalObject): # Define default values _default_value = OrderedDict([('label', None), ('label_metadata', None)]) def _render_plot(self, ax, size=(1024, 256)): # import itertools # colors = itertools.cycle(['b', 'g', 'r', 'c', 'm', 'y', 'k']) # ax_color = {} # artist = {} # for key, label in self.label_metadata.label.items(): # ax_color[key] = colors.next() # artist[key] = plt.axvspan(0, 0, color='b', alpha=0.3) # for time, duration, label in zip(self.time, self.duration, self.data): # ax.axvspan(time, time + duration, color='b', alpha=0.3) # Create legend ax.legend(self.label_metadata.label[int(self.label)]) class FrameValueObject(ValueObject, FramewiseObject): # Define default values _default_value = OrderedDict([('value', None), ('y_value', None), ('frame_metadata', None)]) def _render_plot(self, ax, size=(1024, 256)): if not self.y_value.size: # This was crashing if the data array is too large # workaround consists in downsampling the data # and plot center, min, max values # see http://stackoverflow.com/a/8881973 # TODO: mean may not be appropriate for waveform ... (mean~=0) nb_frames = self.data.shape[0] width = size[0] if nb_frames < 10 * width: ax.plot(self.time, self.data) return else: chunksize = nb_frames // width numchunks = nb_frames // chunksize if self.data.ndim <= 1: ychunks = self.data[:chunksize * numchunks].reshape((-1, chunksize)) else: # Take only first channel ychunks = self.data[:chunksize * numchunks, 0].reshape((-1, chunksize)) xchunks = self.time[:chunksize * numchunks].reshape((-1, chunksize)) # Calculate the max, min, and means of chunksize-element chunks... max_env = ychunks.max(axis=1) min_env = ychunks.min(axis=1) ycenters = ychunks.mean(axis=1) xcenters = xchunks.mean(axis=1) # Now plot the bounds and the mean... ax.fill_between(xcenters, min_env, max_env, color='blue', edgecolor='black', alpha=1) # ax.plot(xcenters, ycenters, color='gray', alpha=0.5) # ax.plot(self.time, self.data) else: ax.imshow(20 * np.log10(self.data.T), origin='lower', extent=[self.time[0], self.time[-1], self.y_value[0], self.y_value[-1]], aspect='auto') def to_sonic_visualiser(self, svenv_file, audio_file): # audio_file = os.path.basename(audio_file) # init a sonic visualiser environment file corresponding # to the analysis of media wavfname from py_sonicvisualiser import SVEnv sve = SVEnv.init_from_wave_file(audio_file) # append a spectrogram view specview = sve.add_spectrogram() sve.add_continuous_annotations(self.time, self.data, view=specview) # save the environment to a sonic visualiser environment file sve.save(svenv_file) class FrameLabelObject(LabelObject, FramewiseObject): # Define default values _default_value = OrderedDict([('label', None), ('label_metadata', None), ('frame_metadata', None)]) def _render_plot(self, ax, size=(1024, 256)): pass class EventValueObject(ValueObject, EventObject): # Define default values _default_value = OrderedDict([('value', None), ('y_value', None), ('time', None)]) class EventLabelObject(LabelObject, EventObject, DataObject): # Define default values _default_value = OrderedDict([('label', None), ('label_metadata', None), ('time', None)]) class SegmentValueObject(ValueObject, SegmentObject): # Define default values _default_value = OrderedDict([('value', None), ('y_value', None), ('time', None), ('duration', None)]) def _render_plot(self, ax, size=(1024, 256)): for time, value in (self.time, self.data): ax.axvline(time, ymin=0, ymax=value, color='r') # TODO : check value shape !!! class SegmentLabelObject(LabelObject, SegmentObject): # Define default values _default_value = OrderedDict([('label', None), ('label_metadata', None), ('time', None), ('duration', None)]) def _render_plot(self, ax, size=(1024, 256)): import matplotlib.patches as mpatches import itertools colors = itertools.cycle(['b', 'g', 'r', 'c', 'm', 'y', 'k']) ax_color = {} legend_patches = [] for key, label in self.label_metadata.label.items(): ax_color[int(key)] = next(colors) # Creating artists specifically for adding to the legend (aka. Proxy artists) legend_patches.append(mpatches.Patch(color=ax_color[int(key)], label=label)) for time, duration, key in zip(self.time, self.duration, self.data): ax.axvspan(time, time + duration, color=ax_color[int(key)], alpha=0.3) # Create legend from custom artist/label lists ax.legend(handles=legend_patches) # , self.label_metadata.label.values()) def merge_segment(self): # Merge adjacent segments if they share the same label if all(np.diff(self.label)): # Nothing to merge return # Merge adjacent segments label = self.label.tolist() time = self.time.tolist() duration = self.duration.tolist() start = 0 while True: try: if label[start] == label[start + 1]: del label[start + 1] del time[start + 1] duration[start] += duration[start + 1] del duration[start + 1] else: start = start + 1 except IndexError: break # Copy back data to data_object self.label = label self.time = time self.duration = duration def to_elan(self, elan_file=None, media_file=None, label_per_tier='ALL'): import pympi elan = pympi.Elan.Eaf(author='TimeSide') if media_file is not None: elan.add_linked_file(media_file) if label_per_tier == 'ONE': for label in self.label_metadata.label.values(): tier_id = unicode(label) elan.add_tier(tier_id) elif label_per_tier == 'ALL': tier_id = 'Analysis' elan.add_tier(tier_id) for n in range(len(self.label)): label_id = self.label_metadata.label[unicode(self.label[n])] if label_per_tier == 'ONE': tier_id = label_id # tier_id = self.label_metadata.label[unicode(label_id)] start = self.time[n] if start < 0: # TODO: check why start could be negative start = 0 end = start + self.duration[n] # Time has to be converted in millisecond integer values elan.add_annotation(id_tier=tier_id, start=int(start * 1000), end=int(end * 1000), value=label_id) elan.to_file(file_path=elan_file) def to_sonic_visualiser(self, svenv_file, audio_file): # audio_file = os.path.basename(audio_file) # init a sonic visualiser environment file corresponding # to the analysis of media wavfname from py_sonicvisualiser import SVEnv sve = SVEnv.init_from_wave_file(audio_file) # append a spectrogram view specview = sve.add_spectrogram() # append a labelled interval annotation layer on a new view labels = [self.label_metadata.label[label_id] for label_id in self.label] sve.add_interval_annotations(self.time, self.duration, labels, self.label) # save the environment to a sonic visualiser environment file sve.save(svenv_file) def JSON_NumpyArrayEncoder(obj): '''Define Specialize JSON encoder for numpy array''' if isinstance(obj, np.ndarray): return {'numpyArray': obj.tolist(), 'dtype': obj.dtype.__str__()} elif isinstance(obj, np.generic): return np.asscalar(obj) else: raise TypeError(repr(obj) + " is not JSON serializable")
[docs]class AnalyzerResultContainer(dict): ''' >>> import timeside >>> from timeside.core.analyzer import Analyzer >>> from timeside.core.tools.test_samples import samples >>> wav_file = samples['sweep.mp3'] >>> d = timeside.core.get_processor('file_decoder')(wav_file) >>> a = Analyzer() >>> (d|a).run() >>> a.new_result() #doctest: +ELLIPSIS AnalyzerResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='...', date='...', version='...', author='TimeSide', proc_uuid='...'), data_object=FrameValueObject(value=array([], dtype=float64), y_value=array([], dtype=float64), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192)), audio_metadata=AudioMetadata(uri='.../sweep.mp3', start=0.0, duration=8.0..., is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={}) >>> resContainer = timeside.core.analyzer.AnalyzerResultContainer() ''' def __init__(self, analyzer_results=None): super(AnalyzerResultContainer, self).__init__() if analyzer_results is not None: self.add(analyzer_results)
[docs] def add(self, analyzer_result, overwrite=False): if isinstance(analyzer_result, list): for res in analyzer_result: self.add(res) return # Check result if not isinstance(analyzer_result, AnalyzerResult): raise TypeError('Only AnalyzerResult can be added') if (not analyzer_result.id in self) or overwrite: self[analyzer_result.id] = analyzer_result else: raise ValueError(('Duplicated id in AnalyzerResultContainer: %s ' 'Please supply a unique id') % analyzer_result.id)
[docs] def get_result_by_id(self, result_id): if self.list_id().count(result_id) > 1: raise ValueError('Result id shared by several procesors in the pipe. Get result from the processor instead') for res in self.values(): if res.id_metadata.id == result_id: return res raise KeyError('No such result id: %s' % result_id)
[docs] def list_id(self): return [res.id for res in self.values()]
[docs] def to_xml(self, output_file=None): import xml.etree.ElementTree as ET # TODO : cf. telemeta util root = ET.Element('timeside') for result in self.values(): if result is not None: root.append(ET.fromstring(result.to_xml())) xml_str = ET.tostring(root, encoding="utf-8", method="xml") if output_file: open(output_file, 'w').write(xml_str) else: return xml_str
[docs] def from_xml(self, xml_string): import xml.etree.ElementTree as ET # TODO : from file # tree = ET.parse(xml_file) # root = tree.getroot() root = ET.fromstring(xml_string) for child in root.iter('result'): self.add(AnalyzerResult.from_xml(ET.tostring(child)), overwrite=True)
[docs] def to_json(self, output_file=None): json_str = json.dumps([res.as_dict() for res in self.values()], default=JSON_NumpyArrayEncoder) if output_file: open(output_file, 'w').write(json_str) else: return json_str
[docs] def from_json(self, json_str): # Define Specialize JSON decoder for numpy array def NumpyArrayDecoder(obj): if isinstance(obj, dict) and 'numpyArray' in obj: numpy_obj = np.asarray(obj['numpyArray'], dtype=obj['dtype']) return numpy_obj else: return obj results_json = json.loads(json_str, object_hook=NumpyArrayDecoder) for res_json in results_json: res = AnalyzerResult(data_mode=res_json['data_mode'], time_mode=res_json['time_mode']) for key in res_json.keys(): if key not in ['data_mode', 'time_mode']: res[key].from_dict(res_json[key]) self.add(res, overwrite=True)
[docs] def to_yaml(self, output_file=None): # if data_list == None: data_list = self.results import yaml # Define Specialize Yaml encoder for numpy array def numpyArray_representer(dumper, obj): return dumper.represent_mapping(u'!numpyArray', {'dtype': obj.dtype.__str__(), 'array': obj.tolist()}) yaml.add_representer(np.ndarray, numpyArray_representer) yaml_str = yaml.dump([res.as_dict() for res in self.values()]) if output_file: open(output_file, 'w').write(yaml_str) else: return yaml_str
[docs] def from_yaml(self, yaml_str): import yaml # Define Specialize Yaml encoder for numpy array def numpyArray_constructor(loader, node): mapping = loader.construct_mapping(node, deep=True) return np.asarray(mapping['array'], dtype=mapping['dtype']) yaml.add_constructor(u'!numpyArray', numpyArray_constructor) results_yaml = yaml.load(yaml_str, Loader=yaml.Loader) for res_yaml in results_yaml: res = AnalyzerResult(data_mode=res_yaml['data_mode'], time_mode=res_yaml['time_mode']) for key in res_yaml.keys(): if key not in ['data_mode', 'time_mode']: res[key].from_dict(res_yaml[key]) self.add(res, overwrite=True)
[docs] def to_numpy(self, output_file=None): if output_file: np.save(output_file, self) else: return self
[docs] def from_numpy(self, input_file): return np.load(input_file,allow_pickle=True)
[docs] def to_hdf5(self, output_file): # Open HDF5 file and save dataset (overwrite any existing file) with h5py.File(output_file, 'w') as h5_file: for res in self.values(): res.to_hdf5(h5_file)
[docs] def from_hdf5(self, input_file): import h5py # TODO : enable import for yaafe hdf5 format # Open HDF5 file for reading and get results h5_file = h5py.File(input_file, 'r') try: for group in h5_file.values(): result = AnalyzerResult.from_hdf5(group) self.add(result, overwrite=True) except TypeError: raise #('TypeError for HDF5 serialization') finally: h5_file.close() # Close the HDF5 file
class Analyzer(Processor): ''' Generic class for the analyzers ''' type = 'analyzer' implements(IAnalyzer) abstract() def __init__(self): super(Analyzer, self).__init__() def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): super(Analyzer, self).setup(channels, samplerate, blocksize, totalframes) # Set default values for result_* attributes # may be overwritten by the analyzer self.result_channels = self.input_channels self.result_samplerate = self.input_samplerate self.result_blocksize = self.input_blocksize self.result_stepsize = self.input_stepsize def add_result(self, result): if not self.uuid() in self.process_pipe.results: self.process_pipe.results[self.uuid()] = AnalyzerResultContainer() self.process_pipe.results[self.uuid()].add(result) @property def results(self): return self.process_pipe.results[self.uuid()] @staticmethod def id(): return "analyzer" @staticmethod def name(): return "Generic analyzer" @staticmethod def unit(): return "" def new_result(self, data_mode='value', time_mode='framewise'): ''' Create a new result Attributes ---------- data_object : MetadataObject id_metadata : MetadataObject audio_metadata : MetadataObject frame_metadata : MetadataObject label_metadata : MetadataObject parameters : dict ''' from datetime import datetime result = AnalyzerResult(data_mode=data_mode, time_mode=time_mode) # Automatically write known metadata result.id_metadata.date = datetime.now().replace( microsecond=0).isoformat(' ') result.id_metadata.version = timeside.core.__version__ result.id_metadata.author = 'TimeSide' result.id_metadata.id = self.id() result.id_metadata.name = self.name() result.id_metadata.description = self.description() result.id_metadata.unit = self.unit() result.id_metadata.proc_uuid = self.uuid() result.audio_metadata.uri = self.mediainfo()['uri'] result.audio_metadata.sha1 = self.mediainfo()['sha1'] result.audio_metadata.start = self.mediainfo()['start'] result.audio_metadata.duration = self.mediainfo()['duration'] result.audio_metadata.is_segment = self.mediainfo()['is_segment'] result.audio_metadata.channels = self.channels() result.parameters = Parameters(self.get_parameters()) if time_mode == 'framewise': result.data_object.frame_metadata.samplerate = self.result_samplerate result.data_object.frame_metadata.blocksize = self.result_blocksize result.data_object.frame_metadata.stepsize = self.result_stepsize return result if __name__ == "__main__": import doctest doctest.testmod(extraglobs=DOCTEST_ALIAS)