Source code for handwriting_features.features.implementation.conventional.utils.composite

import numpy
import functools
from copy import deepcopy
from handwriting_features.data.utils.math import derivation
from handwriting_features.data.utils.dsp import LowPassFilter, GaussianFilter
from handwriting_features.data.exceptions.dsp import FiltrationError
from handwriting_features.features.implementation.conventional.spatial import stroke_length
from handwriting_features.features.implementation.conventional.temporal import stroke_duration


[docs] class WritingTempoUtils(object): """Class implementing writing tempo utils""" def __init__(self, sample_wrapper, in_air): """Initializes the writing tempo utils object""" # Set the stroke lengths and durations self.stroke_lengths = stroke_length(sample_wrapper, in_air) self.stroke_durations = stroke_duration(sample_wrapper, in_air)
[docs] def get_writing_tempo(self): """Extracts the writing tempo""" return len(self.stroke_lengths) / (sum(self.stroke_durations) + numpy.finfo(float).eps)
[docs] class WritingStopsUtils(object): """Class implementing writing stops utils""" def __init__(self, sample_wrapper): """Initializes the writing stops utils object""" # Set the strokes and indices self.strokes = sample_wrapper.strokes self.indices = self._get_stroke_indexes([stroke for _, stroke in self.strokes])
[docs] def get_writing_stops(self): """Extracts the stroke stops""" # Prepare the stops time = numpy.nan stops_borders_left = [] stops_borders_right = [] # Extract the stops for (pen_status, stroke), index in zip(self.strokes, self.indices): if pen_status == "in_air": continue # Compute the vector of length, time length = numpy.sqrt(numpy.power(derivation(stroke.x), 2) + numpy.power(derivation(stroke.y), 2)) time = derivation(stroke.time) # Compute the vector of velocity (value <= 1 mm/s is set to 0) velocity = (d / t for (d, t) in zip(length, time)) velocity = numpy.array([0 if v <= 1 else v for v in velocity]) # Get the number of samples equaling to 15 ms num_samples = numpy.ceil(0.015 / numpy.mean(time)) # Identify the stops border_left, border_right = self._get_borders(velocity) # Take only pauses lasting at least 15 ms pause_indices = numpy.where((border_right - border_left) > num_samples)[0] border_left = border_left[pause_indices].astype(float) border_right = border_right[pause_indices].astype(float) # Fuze the pauses border_left, border_right = self._fuze_pauses(border_left, border_right, num_samples) # Add the starting index of the stroke border_left += index border_right += index - 1 # Append the borders to the stops stops_borders_left += border_left.tolist() stops_borders_right += border_right.tolist() # Get the writing stops stops = (numpy.array(stops_borders_right) - numpy.array(stops_borders_left)) * numpy.mean(time) stops = numpy.array(stops) # Return the writing stops return stops
@classmethod def _get_stroke_indexes(cls, strokes): """Gets the indexes of the strokes""" # Prepare the list of indexes indexes = [] # Get the indexes for i in range(len(strokes)): if i == 0: indexes.append(0) else: indexes.append(indexes[i - 1] + len(strokes[i - 1].time)) # Return the indexes return indexes @classmethod def _get_borders(cls, array, border_value=0): """Gets borders of an array given a border value""" # Get the shifted arrays array_l = array[:-1] array_r = array[1:] # Get the borders border_l = numpy.where(numpy.logical_and(array_r == border_value, array_l != border_value))[0] border_r = numpy.where(numpy.logical_and(array_r != border_value, array_l == border_value))[0] if array[0] == border_value: border_l = numpy.array([0] + border_l.tolist()) if array[-1] == border_value: border_r = numpy.array(border_r.tolist() + [len(array)]) # Return the borders return border_l, border_r @classmethod def _fuze_pauses(cls, border_left, border_right, num_samples): """Fuzes the pauses""" # Fuze the pauses if len(border_left) > 1: for i in range(len(border_left) - 1): if border_left[i + 1] - border_right[i] < (2 * num_samples): border_left[i + 1], border_right[i] = numpy.nan, numpy.nan # Get the improper pauses to_remove = [ i for i, (l, r) in enumerate(zip(border_left, border_right)) if numpy.isnan(l) and numpy.isnan(r) ] # Remove improper pauses border_left = numpy.delete(border_left, to_remove) border_right = numpy.delete(border_right, to_remove) # Update the pauses nans_right = numpy.isnan(border_right) if nans_right.any(): border_right = numpy.array([ border_right[i + 1] if is_nan else border_right[i] for i, is_nan in zip(range(len(border_right)), nans_right) ]) border_left = numpy.delete(border_left, numpy.where(nans_right)[0] + 1) border_right = numpy.delete(border_right, numpy.where(nans_right)[0] + 1) # Return the fused pauses return border_left, border_right
[docs] class WritingNumberOfChangesUtils(object): """Class implementing writing number of changes utils""" # Default computational arguments fc = 17.5 n = 50 def __init__(self, sample_wrapper, fs, fc=None, n=None, subset=None): """ Initializes the writing number of changes utils object. :param fs: sampling frequency :type fs: float :param fc: cutoff frequency for the low-pass filter, defaults to 17.5 :type fc: float, optional :param n: number of samples of a Gaussian filter, defaults to 50 :type n: int, optional :param subset: subset of composite features to return, defaults to None :type subset: list, optional """ # Set the sample wrapper self.sample_wrapper = deepcopy(sample_wrapper) # Set the DSP arguments self.fs = fs self.fc = fc if fc else WritingNumberOfChangesUtils.fc self.n = n if n else WritingNumberOfChangesUtils.n # Set the filter instances self.low_pass_filter = LowPassFilter(self.fs, self.fc) self.gaussian_filter = GaussianFilter(self.fs, self.n) # Get the duration self.duration = self.sample_wrapper.sample_time[-1] - self.sample_wrapper.sample_time[0] # Filter x, y, azimuth, tilt, and pressure by a low-pass filter self.sample_wrapper.sample = self._filter_data_with_low_pass_filter(self.sample_wrapper.sample) # Get the on-surface strokes self.strokes = self.sample_wrapper.on_surface_strokes # Set the subset of the features to return self.subset = subset
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_x_profile(self): """Gets the number of changes in x profile""" return sum(self._get_changes(self._filter_data_with_gaussian_filter(s).x) for s in self.strokes)
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_y_profile(self): """Gets the number of changes in y profile""" return sum(self._get_changes(self._filter_data_with_gaussian_filter(s).y) for s in self.strokes)
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_azimuth(self): """Gets the number of changes in azimuth""" return sum(self._get_changes(self._filter_data_with_gaussian_filter(s).azimuth) for s in self.strokes)
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_tilt(self): """Gets the number of changes in tilt""" return sum(self._get_changes(self._filter_data_with_gaussian_filter(s).tilt) for s in self.strokes)
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_pressure(self): """Gets the number of changes in pressure""" return sum(self._get_changes(self._filter_data_with_gaussian_filter(s).pressure) for s in self.strokes)
[docs] @functools.lru_cache(maxsize=1) def get_number_of_changes_in_velocity_profile(self): """Gets the number of changes in velocity profile""" # Prepare the number of changes num_changes = 0 # Compute the number of changes for stroke in self.strokes: # Compute the vector of length, time, and velocity length = numpy.sqrt(numpy.power(derivation(stroke.x), 2) + numpy.power(derivation(stroke.y), 2)) time = derivation(stroke.time) velocity = numpy.array([d / t for (d, t) in zip(length, time)]) # Filter the velocity by a Gaussian filter try: velocity = self._filter_velocity_with_gaussian_filter(velocity) except FiltrationError: print(f"Filtration error: skipping stroke {stroke}") continue # Compute the number of changes num_changes += self._get_changes(velocity) # Return the number of changes return num_changes
[docs] def get_relative_number_of_changes_in_x_profile(self): """Gets the relative number of changes in x profile""" return self.get_number_of_changes_in_x_profile() / self.duration
[docs] def get_relative_number_of_changes_in_y_profile(self): """Gets the relative number of changes in y profile""" return self.get_number_of_changes_in_y_profile() / self.duration
[docs] def get_relative_number_of_changes_in_azimuth(self): """Gets the relative number of changes in azimuth""" return self.get_number_of_changes_in_azimuth() / self.duration
[docs] def get_relative_number_of_changes_in_tilt(self): """Gets the relative number of changes in tilt""" return self.get_number_of_changes_in_tilt() / self.duration
[docs] def get_relative_number_of_changes_in_pressure(self): """Gets the relative number of changes in pressure""" return self.get_number_of_changes_in_pressure() / self.duration
[docs] def get_relative_number_of_changes_in_velocity_profile(self): """Gets the relative number of changes in velocity profile""" return self.get_number_of_changes_in_velocity_profile() / self.duration
@classmethod def _get_changes(cls, signal): """Gets the changes""" # Get the left/right side changes changes_left = numpy.sum(numpy.logical_and(signal[1:-1] > signal[:-2], signal[1:-1] > signal[2:])) changes_right = numpy.sum(numpy.logical_and(signal[1:-1] < signal[:-2], signal[1:-1] < signal[2:])) # Return the changes return changes_left + changes_right def _filter_data_with_low_pass_filter(self, signal): """Filters an input sample/stroke data by a low-pass filter""" # Filter the signal signal.x = self.low_pass_filter.filter(signal.x) signal.y = self.low_pass_filter.filter(signal.y) signal.azimuth = self.low_pass_filter.filter(signal.azimuth) signal.tilt = self.low_pass_filter.filter(signal.tilt) signal.pressure = self.low_pass_filter.filter(signal.pressure) # Return the filtered signal return signal def _filter_data_with_gaussian_filter(self, signal): """Filters an input sample/stroke data by a Gaussian filter""" # Filter the signal signal.x = self.gaussian_filter.filter(signal.x) signal.y = self.gaussian_filter.filter(signal.y) signal.azimuth = self.gaussian_filter.filter(signal.azimuth) signal.tilt = self.gaussian_filter.filter(signal.tilt) signal.pressure = self.gaussian_filter.filter(signal.pressure) # Return the filtered signal return signal def _filter_velocity_with_low_pass_filter(self, velocity): """Filters an input velocity by a low-pass filter""" return self.low_pass_filter.filter(velocity) def _filter_velocity_with_gaussian_filter(self, velocity): """Filters an input velocity by a Gaussian filter""" return self.gaussian_filter.filter(velocity)