import numpy
import functools
from copy import deepcopy
from handwriting_features.data.utils.math import intersection
from handwriting_features.data.utils.math import derivation
from handwriting_features.data.utils.dsp import GaussianFilter, segment
[docs]class IntersectionUtils(object):
"""Class implementing intersection utils"""
def __init__(self, sample_wrapper):
"""Initializes writing intersection object"""
# Set the sample wrapper
self.sample_wrapper = deepcopy(sample_wrapper)
# Get the duration
self.duration = self.sample_wrapper.sample_time[-1] - self.sample_wrapper.sample_time[0]
# Get the on-surface strokes
self.on_surface_strokes = self.sample_wrapper.on_surface_strokes
# Set the number of intra-stroke and inter-stroke intersections per stroke
self.abs_num_intra = numpy.zeros((len(self.on_surface_strokes), 1), dtype=numpy.int)
self.rel_num_intra = numpy.zeros((len(self.on_surface_strokes), 1), dtype=numpy.float)
self.abs_num_inter = 0
self.rel_num_inter = 0.
# Set the concatenated vector of strokes and intersections
self.all_x = []
self.all_y = []
self.intra_all = []
# Compute the intra-stroke and inter-stroke intersections
self._compute_intersections()
[docs] def get_number_of_intra_stroke_intersections(self):
"""Extracts the number of intra-stroke intersections"""
return numpy.NaN if not self.on_surface_strokes else self.abs_num_intra.T
[docs] def get_relative_number_of_intra_stroke_intersections(self):
"""Extracts the relative number of intra-stroke intersections"""
return numpy.NaN if not self.on_surface_strokes else self.rel_num_intra.T
[docs] def get_total_number_of_intra_stroke_intersections(self):
"""Extracts the total number of intra-stroke intersections"""
return 0 if not self.on_surface_strokes else int(numpy.sum(self.abs_num_intra))
[docs] def get_relative_total_number_of_intra_stroke_intersections(self):
"""Extracts the relative total number of intra-stroke intersections"""
return 0. if not self.on_surface_strokes else round(float(numpy.sum(self.abs_num_intra) / self.duration), 6)
[docs] def get_number_of_inter_stroke_intersections(self):
"""Extracts the number of inter-stroke intersections"""
return 0 if not self.on_surface_strokes else self.abs_num_inter
[docs] def get_relative_number_of_inter_stroke_intersections(self):
"""Extracts the relative number of inter-stroke intersections"""
return 0. if not self.on_surface_strokes else round(float(self.rel_num_inter), 6)
@functools.lru_cache(maxsize=1)
def _compute_intersections(self):
"""Computes the intra-stroke and inter-stroke intersections"""
# Compute the intra-stroke intersections
for i, stroke in enumerate(self.on_surface_strokes):
intersections = [(x, y) for x, y in intersection(stroke.x, stroke.y) if x is not None and y is not None]
# Handle the intra-stroke intersections
if intersections:
self.abs_num_intra[i] = len(intersections)
self.rel_num_intra[i] = len(intersections) / (stroke.time[-1] - stroke.time[0])
self.intra_all.extend(intersections)
# Add the stroke to the concatenated vector
self.all_x.extend([x for x in stroke.x] + [numpy.nan])
self.all_y.extend([y for y in stroke.y] + [numpy.nan])
self.all_x = numpy.array(self.all_x)
self.all_y = numpy.array(self.all_y)
# Compute the inter-stroke intersections
intersections = [(x, y) for x, y in intersection(self.all_x, self.all_y) if x is not None and y is not None]
# Handle the inter-stroke intersections
if self.intra_all:
intersections = [i for i in intersections if i not in self.intra_all]
if intersections:
self.abs_num_inter = len(intersections)
self.rel_num_inter = len(intersections) / self.duration
else:
self.abs_num_inter = 0
self.rel_num_inter = 0.
[docs]class ProjectionUtils(object):
"""Class implementing projection utils"""
# Default computational arguments
n = 50
def __init__(self, sample_wrapper, fs, n=None):
"""
Initializes writing projection object.
:param fs: sampling frequency
:type fs: float
:param n: number of samples of a Gaussian filter, defaults to 50
:type n: int
"""
# Set the sample wrapper
self.sample_wrapper = deepcopy(sample_wrapper)
# Set the on-surface data
self.on_surface_data = self.sample_wrapper.on_surface_data
# Set the DSP arguments
self.fs = fs
self.n = n if n else ProjectionUtils.n
# Set the properties of the windowing function
self.window_size = int(numpy.ceil(self.fs * 0.100))
self.window_step = self.window_size
# Set the filter instances
self.gaussian_filter = GaussianFilter(self.fs, self.n)
# Prepare time, peaks and valleys
self.time = None
self.peaks = None
self.valleys = None
# Compute the projections
self._compute_projections()
@functools.lru_cache(maxsize=1)
def _compute_projections(self):
"""Computes the projections"""
# Take time and vertical movement only
t = self.on_surface_data.time
y = self.on_surface_data.y
# Segment time and vertical movement
t_segmented = segment(t, self.window_size, self.window_step)
y_segmented = segment(y, self.window_size, self.window_step)
# Reshape the segments
t_segmented = numpy.reshape(t_segmented, (t_segmented.shape[0], t_segmented.shape[-1])).T
y_segmented = numpy.reshape(y_segmented, (y_segmented.shape[0], y_segmented.shape[-1])).T
# Filter out segments with CV < 0.0001
selection = (numpy.std(y_segmented, axis=0) / numpy.mean(y_segmented, axis=0)) > 0.0001
t_segmented = t_segmented[:, selection]
y_segmented = y_segmented[:, selection]
# Flatten the segments to vectors
t = t_segmented.flatten("F")
y = y_segmented.flatten("F")
# Filter vertical movement
if len(y) > 3 * (self.n - 1):
y_filtered = self.gaussian_filter.filter(y)
else:
y_filtered = numpy.array((y.tolist() + ([y[-1]] * len(y)))[:len(y)])
# Compute the peaks and valleys
peaks = numpy.array((y_filtered[1:-2] > y_filtered[0:-3]) & (y_filtered[1:-2] > y_filtered[2:-1]))
valleys = numpy.array((y_filtered[1:-2] < y_filtered[0:-3]) & (y_filtered[1:-2] < y_filtered[2:-1]))
# Set the time, peaks and valleys
self.time = t
self.peaks = numpy.array([i + 1 for i, p in enumerate(peaks) if p])
self.valleys = numpy.array([i + 1 for i, p in enumerate(valleys) if p])
# Adjust the peaks
for i, peak in enumerate(self.peaks):
while y[self.peaks[i]] < y[self.peaks[i] - 1] and y[self.peaks[i]] < y[self.peaks[i] + 1]:
if y[self.peaks[i] - 1] > y[self.peaks[i] + 1]:
self.peaks[i] -= 1
else:
self.peaks[i] += 1
# Adjust the valleys
for i, peak in enumerate(self.valleys):
while y[self.valleys[i]] > y[self.valleys[i] - 1] and y[self.valleys[i]] > y[self.valleys[i] + 1]:
if y[self.valleys[i] - 1] < y[self.valleys[i] + 1]:
self.valleys[i] -= 1
else:
self.valleys[i] += 1
@functools.lru_cache(maxsize=1)
def _compute_temporal_velocity(self):
"""Computes the temporal velocity"""
# Prepare the trajectory and time
d = numpy.sqrt(derivation(self.on_surface_data.x) ** 2 + derivation(self.on_surface_data.y) ** 2)
t = derivation(self.on_surface_data.time)
# Return velocity
return d / t
[docs] @functools.lru_cache(maxsize=1)
def vertical_peaks_indices(self):
"""Extracts the vertical peaks indices"""
if all((self.time is not None, self.peaks is not None)):
return numpy.array([numpy.where(self.on_surface_data.time == e)[0][0] for e in self.time[self.peaks]])
else:
return numpy.nan
[docs] @functools.lru_cache(maxsize=1)
def vertical_valleys_indices(self):
"""Extracts the vertical valleys indices"""
if all((self.time is not None, self.valleys is not None)):
return numpy.array([numpy.where(self.on_surface_data.time == e)[0][0] for e in self.time[self.valleys]])
else:
return numpy.nan
[docs] def vertical_peaks_values(self):
"""Extracts the vertical peaks values"""
if self.vertical_peaks_indices() is not numpy.nan:
return numpy.array([self.on_surface_data.y[e] for e in self.vertical_peaks_indices()])
else:
return numpy.nan
[docs] def vertical_valleys_values(self):
"""Extracts the vertical valleys values"""
if self.vertical_valleys_indices() is not numpy.nan:
return numpy.array([self.on_surface_data.y[e] for e in self.vertical_valleys_indices()])
else:
return numpy.nan
[docs] def vertical_peaks_velocity(self):
"""Extracts the vertical peaks velocity"""
if self.vertical_peaks_indices() is not numpy.nan:
return numpy.array([self._compute_temporal_velocity()[e] for e in self.vertical_peaks_indices()])
else:
return numpy.nan
[docs] def vertical_valleys_velocity(self):
"""Extracts the vertical valleys velocity"""
if self.vertical_valleys_indices() is not numpy.nan:
return numpy.array([self._compute_temporal_velocity()[e] for e in self.vertical_valleys_indices()])
else:
return numpy.nan
[docs] def vertical_peaks_distance(self):
"""Extracts the vertical peaks distance"""
if self.vertical_peaks_indices() is not numpy.nan and len(self.vertical_peaks_indices()) > 1:
return derivation(numpy.array([self.on_surface_data.x[e] for e in self.vertical_peaks_indices()]))
else:
return numpy.nan
[docs] def vertical_valleys_distance(self):
"""Extracts the vertical valleys distance"""
if self.vertical_valleys_indices() is not numpy.nan and len(self.vertical_valleys_indices()) > 1:
return derivation(numpy.array([self.on_surface_data.x[e] for e in self.vertical_valleys_indices()]))
else:
return numpy.nan
[docs] def vertical_peaks_duration(self):
"""Extracts the vertical peaks duration"""
if self.vertical_peaks_indices() is not numpy.nan and len(self.vertical_peaks_indices()) > 1:
return derivation(numpy.array([self.on_surface_data.time[e] for e in self.vertical_peaks_indices()]))
else:
return numpy.nan
[docs] def vertical_valleys_duration(self):
"""Extracts the vertical valleys duration"""
if self.vertical_valleys_indices() is not numpy.nan and len(self.vertical_valleys_indices()) > 1:
return derivation(numpy.array([self.on_surface_data.time[e] for e in self.vertical_valleys_indices()]))
else:
return numpy.nan