Source code for timeside.core.preprocessors

# -*- coding: utf-8 -*-
#
# Copyright (c) 2009-2013 Parisson SARL
#
# This file is part of TimeSide.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Author : Thomas fillon <thomas@parisson.com>
'''
    Collections of preprocessors to use as decorators for the analyzers process

    Preprocessors process `(frame, eod)` arguments in order to handle various
    preprocessing such as :

    - Downmixing to mono
    - Adapt the frames to match the input_blocksize and input_stepsize of the analyzer
'''


[docs]def downmix_to_mono(process_func): ''' Pre-processing decorator that downmixes frames from multi-channel to mono Downmix is achieved by averaging all channels >>> from timeside.core.preprocessors import downmix_to_mono >>> @downmix_to_mono ... def process(analyzer,frames,eod): ... print('Frames, eod inside process :') ... print(frames, eod) ... return frames, eod ... >>> import numpy as np >>> frames = np.asarray([[1,2],[3,4],[5,6],[7,8],[9,10]]) >>> eod = False >>> frames_, eod_ = process(object(),frames,eod) Frames, eod inside process : [1.5 3.5 5.5 7.5 9.5] False Outside Process frames and eod are preserved : >>> frames_ array([[ 1, 2], [ 3, 4], [ 5, 6], [ 7, 8], [ 9, 10]]) >>> eod_ False ''' import functools @functools.wraps(process_func) def wrapper(analyzer, frames, eod): # Pre-processing if frames.ndim > 1: downmix_frames = frames.mean(axis=-1) else: downmix_frames = frames # Processing process_func(analyzer, downmix_frames, eod) return frames, eod return wrapper
[docs]def frames_adapter(process_func): ''' Pre-processing decorator that adapt frames to match input_blocksize and input_stepsize of the decorated analyzer >>> from timeside.core.preprocessors import frames_adapter >>> @frames_adapter ... def process(analyzer,frames,eod): ... analyzer.frames.append(frames) ... return frames, eod >>> class Fake_Analyzer(object): ... def __init__(self): ... self.input_blocksize = 4 ... self.input_stepsize = 3 ... self.frames = [] # Container for the frame as viewed by process ... @staticmethod ... def id(): ... return 'fake_analyzer' >>> import numpy as np >>> analyzer = Fake_Analyzer() >>> frames = np.asarray(range(0,12)) >>> eod = False >>> frames_, eod_ = process(analyzer,frames,eod) Inside the process the frames have been adapted to match input_blocksize and input_stepsize >>> analyzer.frames [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9])] Outside the process, the original frames and eod are preserved: >>> frames_ array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) >>> eod_ False Releasing the process with eod=True will zeropad the last frame if necessary >>> frames = np.asarray(range(12,14)) >>> eod = True >>> frames_, eod_ = process(analyzer,frames,eod) >>> analyzer.frames [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9]), array([ 9, 10, 11, 12]), array([12, 13, 0, 0])] ''' import functools import numpy as np class framesBuffer(object): def __init__(self, blocksize, stepsize): self.blocksize = blocksize self.stepsize = stepsize self.buffer = None def frames(self, frames, eod): if self.buffer is not None: stack = np.concatenate([self.buffer, frames]) else: stack = frames.copy() stack_length = len(stack) nb_frames = ( stack_length - self.blocksize + self.stepsize) // self.stepsize nb_frames = max(nb_frames, 0) frames_length = nb_frames * self.stepsize + \ self.blocksize - self.stepsize last_block_size = stack_length - frames_length if eod: # Final zeropadding pad_shape = tuple( int(self.blocksize - last_block_size) if i == 0 else x for i, x in enumerate(frames.shape)) stack = np.concatenate([stack, np.zeros(pad_shape, dtype=frames.dtype)]) nb_frames += 1 self.buffer = stack[int(nb_frames * self.stepsize):] eod_list = np.repeat(False, nb_frames) if eod and len(eod_list): eod_list[-1] = eod for index, eod in zip(range(0, int(nb_frames * self.stepsize), int(self.stepsize)), eod_list): yield (stack[index:index + self.blocksize], eod) aubio_analyzers = ['aubio_melenergy', 'aubio_mfcc', 'aubio_pitch', 'aubio_specdesc', 'aubio_temporal'] @functools.wraps(process_func) def wrapper(analyzer, frames, eod): # Pre-processing if not hasattr(analyzer, 'frames_buffer'): if analyzer.id() in aubio_analyzers: # Aubio analyzers are waiting for stepsize length block # and reconstructs blocksize length frames itself # thus frames_adapter has to provide Aubio Pitch blocksize=stepsize length frames analyzer.frames_buffer = framesBuffer(analyzer.input_stepsize, analyzer.input_stepsize) else: analyzer.frames_buffer = framesBuffer(analyzer.input_blocksize, analyzer.input_stepsize) # Processing for adapted_frames, adapted_eod in analyzer.frames_buffer.frames(frames, eod): process_func(analyzer, adapted_frames, adapted_eod) return frames, eod return wrapper
if __name__ == "__main__": import doctest doctest.testmod()