Source code for timeside.core.grapher

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (C) 2008 MUSIC TECHNOLOGY GROUP (MTG)
#                    UNIVERSITAT POMPEU FABRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
#   Bram de Jong <bram.dejong at domain.com where domain in gmail>
#   Guillaume Pellerin <yomguy@parisson.com>


import math
import numpy

try:
    from PIL import Image, ImageDraw
except ImportError:
    import Image
    import ImageDraw

from timeside.core import Processor, implements, interfacedoc, abstract
from timeside.core.processor import FixedSizeInputAdapter
from timeside.core.api import IGrapher
from timeside.plugins.grapher.utils import smooth, im_watermark, normalize

import timeside.plugins.analyzer


[docs]class Spectrum(object): """ FFT based frequency analysis of audio frames.""" def __init__(self, fft_size, samplerate, blocksize, totalframes, lower, higher, window_function=None): self.fft_size = fft_size self.window = window_function(self.fft_size) self.window_function = window_function self.spectrum_range = None self.lower = lower self.higher = higher self.blocksize = blocksize self.lower_log = math.log10(self.lower) self.higher_log = math.log10(self.higher) self.clip = lambda val, low, high: min(high, max(low, val)) self.totalframes = totalframes self.samplerate = samplerate self.window_function = window_function self.window = self.window_function(self.blocksize) # Hanning window by default if self.window_function: self.window = self.window_function(self.blocksize) else: self.window_function = numpy.hanning self.window = self.window_function(self.blocksize)
[docs] def process(self, frames, eod, spec_range=120.0): """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames. FFT window sizes are adatable to the input frame size.""" samples = frames[:, 0] nsamples = len(frames[:, 0]) if nsamples != self.blocksize: self.window = self.window_function(nsamples) samples *= self.window while nsamples > self.fft_size: self.fft_size = 2 * self.fft_size zeros_p = numpy.zeros(int(self.fft_size / 2) - int(nsamples / 2)) if nsamples % 2: zeros_n = numpy.zeros( int(self.fft_size / 2) - int(nsamples / 2) - 1 ) else: zeros_n = numpy.zeros(int(self.fft_size / 2) - int(nsamples / 2)) samples = numpy.concatenate((zeros_p, samples, zeros_n), axis=0) fft = numpy.fft.fft(samples) # normalized abs(FFT) between 0 and 1 spectrum = numpy.abs(fft[:int(fft.shape[0] / 2 + 1)]) / float(nsamples) length = numpy.float64(spectrum.shape[0]) # scale the db spectrum from [- spec_range db ... 0 db] > [0..1] db_spectrum = ((20 * (numpy.log10(spectrum + 1e-30))) .clip(-spec_range, 0.0) + spec_range) / spec_range energy = spectrum.sum() spectral_centroid = 0 if energy > 1e-20: # calculate the spectral centroid if self.spectrum_range is None: self.spectrum_range = numpy.arange(length) spectral_centroid = (spectrum * self.spectrum_range).sum() / \ (energy * (length - 1)) * \ self.samplerate * 0.5 # clip > log10 > scale between 0 and 1 spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log) return (spectral_centroid, db_spectrum)
[docs]class Grapher(Processor): ''' Generic abstract class for the graphers ''' type = 'grapher' fft_size = 0x1000 frame_cursor = 0 pixel_cursor = 0 lower_freq = 20 implements(IGrapher) abstract() def __init__(self, width=1024, height=256, bg_color=None, color_scheme='default'): super(Grapher, self).__init__() self.bg_color = bg_color self.color_scheme = color_scheme self.graph = None self.image_width = width self.image_height = height self.bg_color = bg_color self.color_scheme = color_scheme self.previous_x, self.previous_y = None, None @staticmethod def id(): return "generic_grapher" @staticmethod def name(): return "Generic grapher" def set_colors(self, bg_color, color_scheme): self.bg_color = bg_color self.color_color_scheme = color_scheme
[docs] def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): super(Grapher, self).setup( channels, samplerate, blocksize, totalframes) self.sample_rate = samplerate self.higher_freq = self.sample_rate / 2 self.block_size = blocksize self.total_frames = totalframes self.image = Image.new( "RGBA", (self.image_width, self.image_height), self.bg_color) self.samples_per_pixel = self.total_frames / float(self.image_width) self.buffer_size = int(round(self.samples_per_pixel, 0)) self.pixels_adapter = FixedSizeInputAdapter( self.buffer_size, 1, pad=False) self.pixels_adapter_totalframes = self.pixels_adapter.blocksize( self.total_frames) self.spectrum = Spectrum( self.fft_size, self.sample_rate, self.block_size, self.total_frames, self.lower_freq, self.higher_freq, numpy.hanning) self.pixel = self.image.load() self.draw = ImageDraw.Draw(self.image)
[docs] @interfacedoc def render(self, output=None): if output: try: self.image.save(output) except AttributeError: print("Pixel %s x %d" % (self.image_width, self.image_height)) self.image.savefig(output, dpi=341) return return self.image
def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5, 5)): self.image = im_watermark( self.image, text, color=color, opacity=opacity, margin=margin)
[docs] def draw_peaks(self, x, peaks, line_color): """Draw 2 peaks at x""" y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5 y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5 if self.previous_y: self.draw.line( [self.previous_x, self.previous_y, x, y1, x, y2], line_color) else: self.draw.line([x, y1, x, y2], line_color) self.draw_anti_aliased_pixels(x, y1, y2, line_color) self.previous_x, self.previous_y = x, y2
[docs] def draw_peaks_inverted(self, x, peaks, line_color): """Draw 2 inverted peaks at x""" y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5 y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5 if self.previous_y and x < self.image_width - 1: if y1 < y2: self.draw.line((x, 0, x, y1), line_color) self.draw.line((x, self.image_height, x, y2), line_color) else: self.draw.line((x, 0, x, y2), line_color) self.draw.line((x, self.image_height, x, y1), line_color) else: self.draw.line((x, 0, x, self.image_height), line_color) self.draw_anti_aliased_pixels(x, y1, y2, line_color) self.previous_x, self.previous_y = x, y1
[docs] def draw_anti_aliased_pixels(self, x, y1, y2, color): """ vertical anti-aliasing at y1 and y2 """ y_max = max(y1, y2) y_max_int = int(y_max) alpha = y_max - y_max_int if alpha > 0.0 and alpha < 1.0 and y_max_int + 1 < self.image_height: current_pix = self.pixel[int(x), y_max_int + 1] r = int((1 - alpha) * current_pix[0] + alpha * color[0]) g = int((1 - alpha) * current_pix[1] + alpha * color[1]) b = int((1 - alpha) * current_pix[2] + alpha * color[2]) self.pixel[x, y_max_int + 1] = (r, g, b) y_min = min(y1, y2) y_min_int = int(y_min) alpha = 1.0 - (y_min - y_min_int) if alpha > 0.0 and alpha < 1.0 and y_min_int - 1 >= 0: current_pix = self.pixel[x, y_min_int - 1] r = int((1 - alpha) * current_pix[0] + alpha * color[0]) g = int((1 - alpha) * current_pix[1] + alpha * color[1]) b = int((1 - alpha) * current_pix[2] + alpha * color[2]) self.pixel[x, y_min_int - 1] = (r, g, b)
def draw_peaks_contour(self): contour = self.contour.copy() contour = smooth(contour, window_len=16) contour = normalize(contour) # Scaling #ratio = numpy.mean(contour)/numpy.sqrt(2) ratio = 1 contour = normalize(numpy.expm1(contour / ratio)) * (1 - 10 ** -6) # Spline #contour = cspline1d(contour) #contour = cspline1d_eval(contour, self.x, dx=self.dx1, x0=self.x[0]) if self.symetry: height = int(self.image_height / 2) else: height = self.image_height # Multicurve rotating for i in range(0, self.ndiv): self.previous_x, self.previous_y = None, None bright_color = int(255 * (1 - float(i) / (self.ndiv * 2))) bright_color = 255 - bright_color + self.color_offset #line_color = self.color_lookup[int(self.centroids[j]*255.0)] line_color = (bright_color, bright_color, bright_color) # Linear #contour = contour*(1.0-float(i)/self.ndiv) #contour = contour*(1-float(i)/self.ndiv) # Cosinus contour = contour * \ numpy.arccos(float(i) / self.ndiv) * 2 / numpy.pi #contour = self.contour*(1-float(i)*numpy.arccos(float(i)/self.ndiv)*2/numpy.pi/self.ndiv) #contour = contour + ((1-contour)*2/numpy.pi*numpy.arcsin(float(i)/self.ndiv)) curve = (height - 1) * contour #curve = contour*(height-2)/2+height/2 for x in self.x: x = int(x) y = curve[x] if not x == 0: if not self.symetry: self.draw.line( [self.previous_x, self.previous_y, x, y], line_color) self.draw_anti_aliased_pixels(x, y, y, line_color) else: self.draw.line( [self.previous_x, self.previous_y + height, x, y + height], line_color) self.draw_anti_aliased_pixels( x, y + height, y + height, line_color) self.draw.line( [self.previous_x, -self.previous_y + height, x, -y + height], line_color) self.draw_anti_aliased_pixels( x, -y + height, -y + height, line_color) else: if not self.symetry: self.draw.point((x, y), line_color) else: self.draw.point((x, y + height), line_color) self.previous_x, self.previous_y = x, y
[docs]class DisplayAnalyzer(Grapher): """ image from analyzer result This is an Abstract base class """ dpi = 72 # Web default value for Telemeta implements(IGrapher) abstract() @interfacedoc def __init__(self, width=1024, height=256, bg_color=(0, 0, 0), color_scheme='default'): super(DisplayAnalyzer, self).__init__(width, height, bg_color, color_scheme) self._result_id = None self._id = NotImplemented self._name = NotImplemented self.image = None self._background = None self._bg_id = ''
[docs] @interfacedoc def process(self, frames, eod=False): return frames, eod
[docs] @interfacedoc def post_process(self): pipe_result = self.process_pipe.results analyzer_uuid = self.parents['analyzer'].uuid() analyzer_result = pipe_result[analyzer_uuid][self._result_id] fg_image = analyzer_result._render_PIL((self.image_width, self.image_height), self.dpi) if self._background: bg_uuid = self.parents['bg_analyzer'].uuid() bg_result = pipe_result[bg_uuid][self._bg_id] bg_image = bg_result._render_PIL((self.image_width, self.image_height), self.dpi) # convert image to grayscale bg_image = bg_image.convert('LA').convert('RGBA') # Merge background and foreground images from PIL.Image import blend fg_image = blend(fg_image, bg_image, 0.15) self.image = fg_image
@classmethod def create(cls, analyzer, analyzer_parameters=None, result_id=None, grapher_id=None, grapher_name=None, grapher_version=None, background=None, staging=False): if analyzer_parameters is None: analyzer_parameters = {} class NewGrapher(cls): _id = grapher_id _staging = staging _from_analyzer = True _analyzer = analyzer _analyzer_parameters = analyzer_parameters _result_id = result_id _grapher_name = grapher_name implements(IGrapher) @interfacedoc def __init__(self, width=1024, height=256, bg_color=(0, 0, 0), color_scheme='default'): super(NewGrapher, self).__init__(width, height, bg_color, color_scheme) # Add a parent waveform analyzer if background == 'waveform': self._background = True bg_analyzer = timeside.plugins.analyzer.waveform.Waveform() self._bg_id = bg_analyzer.id() self.parents['bg_analyzer'] = bg_analyzer elif background == 'spectrogram': self._background = True bg_analyzer = timeside.plugins.analyzer.spectrogram.Spectrogram() self._bg_id = bg_analyzer.id() self.parents['bg_analyzer'] = bg_analyzer else: self._background = None parent_analyzer = analyzer(**analyzer_parameters) self.parents['analyzer'] = parent_analyzer self._result_id = result_id @staticmethod @interfacedoc def id(): return grapher_id @staticmethod @interfacedoc def version(): return grapher_version @staticmethod @interfacedoc def name(): return grapher_name __doc__ = """Image representing """ + grapher_name NewGrapher.__name__ = grapher_name return NewGrapher
if __name__ == "__main__": import doctest doctest.testmod()