Module dryft.signal

Removes non-linear ground reaction force signal drift in a stepwise manner. It is intended for running ground reaction force data commonly analyzed in the field of Biomechanics. The aerial phase before and after a given stance phase are used to tare the signal instead of assuming an overall linear trend or signal offset.

Licensed under an MIT License (c) Ryan Alcantara 2019

Distributed here: https://github.com/alcantarar/dryft

Expand source code
"""

Removes non-linear ground reaction force signal drift in a stepwise manner. It is intended
for running ground reaction force data commonly analyzed in the field of Biomechanics. The aerial phase before and after
a given stance phase are used to tare the signal instead of assuming an overall linear trend or signal offset.

Licensed under an MIT License (c) Ryan Alcantara 2019

Distributed here: https://github.com/alcantarar/dryft


"""
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd


def detrend(force_f, aerial, aerial_loc):
    """Remove drift from running ground reaction force signal based on aerial phases.

    Parameters
    ----------
    force_f : `ndarray`
        Filtered ground reaction force signal [n,]. Using unfiltered signal may cause unreliable results.
    aerial : `ndarray`
        Array of force signal measured at middle of each aerial phase.
    aerial_loc : `ndarray`
        Array of frame indexes for values in aerial. output from `aerialforce()`

    Returns
    -------
    force_fd : `ndarray`
        Array with shape of force_f, but with drift removed (detrended).

    Examples
    --------
        from dryft import signal

        force_fd = signal.detrend(GRF_filt, aerial_vals, aerial_loc)
    """

    force_f = force_f.flatten()
    # Create NaN array with aerial values at respective frame locations

    drift_signal = np.full(force_f.shape, np.nan)
    drift_signal[aerial_loc] = aerial
    # Use 3rd order spline to fill NaNs, creating the underlying drift of the signal.
    drift_signal_p = pd.Series(drift_signal)
    drift_signal_p = drift_signal_p.interpolate(method = 'spline', order = 3, s = 0, limit_direction= 'both')
    drift_signal = drift_signal_p.to_numpy()
    # Subtract drift from force signal
    force_fd = force_f - drift_signal

    return force_fd


def findgoodaerial(stance_begin, stance_end, good_stances):
    """Locate good aerial phases when bad stance phases are present.

        Parameters
        ----------
        stance_begin : `ndarray`
            Array of frame indexes for start of each stance phase.
        stance_end : `ndarray`
            Array of frame indexes for end of each stance phase. Same size as `begin`.
        good_stances : `ndarray`
            Boolean array of which stance phases meet min_tc & max_tc requirements.

        Returns
        -------
        good_aerial_begin : `ndarray`
            Array of frame indexes for aerial phase beginnings not connected to bad steps (per min/max_tc requirements).
        good_aerial_end : `ndarray`
            Array of frame indexes for aerial phase ends not connected to bad steps (per min/max_tc requirements).
        """
    bs = np.where(good_stances == False)
    aerial_start = np.ones((len(good_stances),), dtype=bool)
    aerial_end = np.ones((len(good_stances),), dtype=bool)

    aerial_end[bs[0]] = False
    aerial_end[bs[0] + 1] = False

    aerial_start[bs[0]] = False
    aerial_start[bs[0] - 1] = False

    good_aerial_begin = stance_end[aerial_start][:-1]
    good_aerial_end = stance_begin[aerial_end][1:]

    return good_aerial_begin, good_aerial_end


def aerialforce(force, stance_begin, stance_end, good_stances):
    """Calculate force signal at middle of aerial phase of running.

    Parameters
    ----------
    force : `ndarray`
        Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
    stance_begin : `ndarray`
        Array of frame indexes for start of each stance phase.
    stance_end : `ndarray`
        Array of frame indexes for end of each stance phase. Same size as `stance_begin`.
    good_stances : `ndarray`
        Boolean array of which stance phases meet min_tc & max_tc requirements.

    Returns
    -------
    aerial : `ndarray`
        Array containing force measured at middle of aerial phase force signal.
    aerial_loc : `ndarray`
        Array of frame indexes for values in aerial. output from `aerialforce()`

    """
    if False in good_stances:
        aerial_begin, aerial_end = findgoodaerial(stance_begin, stance_end, good_stances)
    else:
        aerial_begin = stance_end[good_stances][:-1]
        aerial_end = stance_begin[good_stances][1:]

    # calculate force at middle of aerial phase (foot not on ground, should be zero)
    if force.ndim == 2:  # one axis only
        force = force.flatten()
        aerial = np.full([aerial_begin.shape[0], ], np.nan)
        aerial_middle = np.full([aerial_begin.shape[0], ], np.nan)
    elif force.ndim == 1:
        aerial = np.full([aerial_begin.shape[0], ], np.nan)
        aerial_middle = np.full([aerial_begin.shape[0], ], np.nan)
    else: raise IndexError('Can only handle shape (n,) or (n,1)')

    for i in range(aerial.shape[0]):
        aerial_len = aerial_end-aerial_begin
        aerial_middle[i,] = round(aerial_len[i]/2)
        aerial[i,] = force[aerial_begin[i]+aerial_middle.astype(int)[i]]
    aerial_loc = aerial_begin + aerial_middle.astype(int)

    return aerial, aerial_loc


def splitsteps(vGRF, threshold, Fs, min_tc, max_tc, plot=False):
    """Read in filtered vertical ground reaction force (vGRF) signal and ID stance phases based on a threshold.

    Designed for running, hopping, or activity where 1 foot is on the force plate at a time.
    Identified stance phases are compared to min/max contact time (tc) to eliminate ones that are too short/long. 
    Update these parameters and threshold if little-no stance phases are identified. Setting plots=True can aid in 
    troubleshooting.

    Parameters
    ----------
    vGRF : `ndarray`
        Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
    threshold : `number`
        Determines when initial contact and final contact are defined. In the same units as vGRF signal. Please be
        responsible and set to < 50N for running data.
    Fs : `number`
        Sampling frequency of signal.
    min_tc : `number`
        Minimum contact time, in seconds, to consider as a real stance phase. Jogging > 0.2s
    max_tc : number
        Maximum contact time, in seconds, to consider as a real stance phase. Jogging > 0.4s
    plot : `bool`
        If true, return plot showing vGRF signal with initial and final contact points identified. Helpful for
        determining appropriate threshold, min_tc, and max_tc values.

    Returns
    -------
    stance_begin_all : `ndarray`
        Array of frame indexes for every start of stance phase found in trial. Use `good_stances` to index which ones
        pass the min/max_tc requirements.
    stance_end_all : `ndarray`
        Array of frame indexes for every end of stance phase found in trial. Use `good_stances` to index which ones
        pass the min/max_tc requirements.
    good_stances : `ndarray`
        Boolean array of which stance phases meet min_tc & max_tc requirements.

    Examples
    --------
        from dryft import signal

        stance_begin_all, stance_end_all, good_stances = signal.splitsteps(vGRF=GRF_filt,
                                                                           threshold=140,
                                                                           Fs=300,
                                                                           min_tc=0.2,
                                                                           max_tc=0.4,
                                                                           plot=True)
    """

    if min_tc < max_tc:
        # Identification. Forces over threshold register as stance phase (foot on ground).
        compare = (vGRF.flatten() > threshold).astype(int)

        events = np.diff(compare)
        stance_begin_all = np.squeeze(np.asarray(np.nonzero(events == 1)).transpose())
        stance_end_all = np.squeeze(np.asarray(np.nonzero(events == -1)).transpose())

        if plot:
            plt.plot(vGRF)
            plt.plot(events*500)
            plt.show(block = False)

        # if trial starts with end of stance phase, ignore
        stance_end_all = stance_end_all[stance_end_all > stance_begin_all[0]]
        # trim end of stance_begin_all to match stance_end_all.
        stance_begin_all = stance_begin_all[0:stance_end_all.shape[0]]

        stance_len = stance_end_all - stance_begin_all
        good_stances = np.logical_and(stance_len >= min_tc*Fs, stance_len <= max_tc*Fs)

        # ID suspicious stance phases (too long or short)
        if np.any(stance_len < min_tc*Fs):
            print('Out of', stance_len.shape[0], 'stance phases,', sum(stance_len < min_tc*Fs), ' < ',
                  min_tc, 'seconds.')
        if np.any(stance_len > max_tc*Fs):
            print('Out of', stance_len.shape[0], 'stance_phases,', sum(stance_len > max_tc*Fs), ' > ',
                  max_tc, 'seconds.')
        # print sizes
        print('Total number of contact time begin/end:', stance_begin_all.shape[0], stance_end_all.shape[0])

        return stance_begin_all, stance_end_all, good_stances

    else:
        raise IndexError('Did not ID stance phases: min_tc > max_tc.')

Functions

def aerialforce(force, stance_begin, stance_end, good_stances)

Calculate force signal at middle of aerial phase of running.

Parameters

force : ndarray
Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
stance_begin : ndarray
Array of frame indexes for start of each stance phase.
stance_end : ndarray
Array of frame indexes for end of each stance phase. Same size as stance_begin.
good_stances : ndarray
Boolean array of which stance phases meet min_tc & max_tc requirements.

Returns

aerial : ndarray
Array containing force measured at middle of aerial phase force signal.
aerial_loc : ndarray
Array of frame indexes for values in aerial. output from aerialforce()
Expand source code
def aerialforce(force, stance_begin, stance_end, good_stances):
    """Calculate force signal at middle of aerial phase of running.

    Parameters
    ----------
    force : `ndarray`
        Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
    stance_begin : `ndarray`
        Array of frame indexes for start of each stance phase.
    stance_end : `ndarray`
        Array of frame indexes for end of each stance phase. Same size as `stance_begin`.
    good_stances : `ndarray`
        Boolean array of which stance phases meet min_tc & max_tc requirements.

    Returns
    -------
    aerial : `ndarray`
        Array containing force measured at middle of aerial phase force signal.
    aerial_loc : `ndarray`
        Array of frame indexes for values in aerial. output from `aerialforce()`

    """
    if False in good_stances:
        aerial_begin, aerial_end = findgoodaerial(stance_begin, stance_end, good_stances)
    else:
        aerial_begin = stance_end[good_stances][:-1]
        aerial_end = stance_begin[good_stances][1:]

    # calculate force at middle of aerial phase (foot not on ground, should be zero)
    if force.ndim == 2:  # one axis only
        force = force.flatten()
        aerial = np.full([aerial_begin.shape[0], ], np.nan)
        aerial_middle = np.full([aerial_begin.shape[0], ], np.nan)
    elif force.ndim == 1:
        aerial = np.full([aerial_begin.shape[0], ], np.nan)
        aerial_middle = np.full([aerial_begin.shape[0], ], np.nan)
    else: raise IndexError('Can only handle shape (n,) or (n,1)')

    for i in range(aerial.shape[0]):
        aerial_len = aerial_end-aerial_begin
        aerial_middle[i,] = round(aerial_len[i]/2)
        aerial[i,] = force[aerial_begin[i]+aerial_middle.astype(int)[i]]
    aerial_loc = aerial_begin + aerial_middle.astype(int)

    return aerial, aerial_loc
def detrend(force_f, aerial, aerial_loc)

Remove drift from running ground reaction force signal based on aerial phases.

Parameters

force_f : ndarray
Filtered ground reaction force signal [n,]. Using unfiltered signal may cause unreliable results.
aerial : ndarray
Array of force signal measured at middle of each aerial phase.
aerial_loc : ndarray
Array of frame indexes for values in aerial. output from aerialforce()

Returns

force_fd : ndarray
Array with shape of force_f, but with drift removed (detrended).

Examples

from dryft import signal

force_fd = signal.detrend(GRF_filt, aerial_vals, aerial_loc)
Expand source code
def detrend(force_f, aerial, aerial_loc):
    """Remove drift from running ground reaction force signal based on aerial phases.

    Parameters
    ----------
    force_f : `ndarray`
        Filtered ground reaction force signal [n,]. Using unfiltered signal may cause unreliable results.
    aerial : `ndarray`
        Array of force signal measured at middle of each aerial phase.
    aerial_loc : `ndarray`
        Array of frame indexes for values in aerial. output from `aerialforce()`

    Returns
    -------
    force_fd : `ndarray`
        Array with shape of force_f, but with drift removed (detrended).

    Examples
    --------
        from dryft import signal

        force_fd = signal.detrend(GRF_filt, aerial_vals, aerial_loc)
    """

    force_f = force_f.flatten()
    # Create NaN array with aerial values at respective frame locations

    drift_signal = np.full(force_f.shape, np.nan)
    drift_signal[aerial_loc] = aerial
    # Use 3rd order spline to fill NaNs, creating the underlying drift of the signal.
    drift_signal_p = pd.Series(drift_signal)
    drift_signal_p = drift_signal_p.interpolate(method = 'spline', order = 3, s = 0, limit_direction= 'both')
    drift_signal = drift_signal_p.to_numpy()
    # Subtract drift from force signal
    force_fd = force_f - drift_signal

    return force_fd
def findgoodaerial(stance_begin, stance_end, good_stances)

Locate good aerial phases when bad stance phases are present.

Parameters

stance_begin : ndarray
Array of frame indexes for start of each stance phase.
stance_end : ndarray
Array of frame indexes for end of each stance phase. Same size as begin.
good_stances : ndarray
Boolean array of which stance phases meet min_tc & max_tc requirements.

Returns

good_aerial_begin : ndarray
Array of frame indexes for aerial phase beginnings not connected to bad steps (per min/max_tc requirements).
good_aerial_end : ndarray
Array of frame indexes for aerial phase ends not connected to bad steps (per min/max_tc requirements).
Expand source code
def findgoodaerial(stance_begin, stance_end, good_stances):
    """Locate good aerial phases when bad stance phases are present.

        Parameters
        ----------
        stance_begin : `ndarray`
            Array of frame indexes for start of each stance phase.
        stance_end : `ndarray`
            Array of frame indexes for end of each stance phase. Same size as `begin`.
        good_stances : `ndarray`
            Boolean array of which stance phases meet min_tc & max_tc requirements.

        Returns
        -------
        good_aerial_begin : `ndarray`
            Array of frame indexes for aerial phase beginnings not connected to bad steps (per min/max_tc requirements).
        good_aerial_end : `ndarray`
            Array of frame indexes for aerial phase ends not connected to bad steps (per min/max_tc requirements).
        """
    bs = np.where(good_stances == False)
    aerial_start = np.ones((len(good_stances),), dtype=bool)
    aerial_end = np.ones((len(good_stances),), dtype=bool)

    aerial_end[bs[0]] = False
    aerial_end[bs[0] + 1] = False

    aerial_start[bs[0]] = False
    aerial_start[bs[0] - 1] = False

    good_aerial_begin = stance_end[aerial_start][:-1]
    good_aerial_end = stance_begin[aerial_end][1:]

    return good_aerial_begin, good_aerial_end
def splitsteps(vGRF, threshold, Fs, min_tc, max_tc, plot=False)

Read in filtered vertical ground reaction force (vGRF) signal and ID stance phases based on a threshold.

Designed for running, hopping, or activity where 1 foot is on the force plate at a time. Identified stance phases are compared to min/max contact time (tc) to eliminate ones that are too short/long. Update these parameters and threshold if little-no stance phases are identified. Setting plots=True can aid in troubleshooting.

Parameters

vGRF : ndarray
Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
threshold : number
Determines when initial contact and final contact are defined. In the same units as vGRF signal. Please be responsible and set to < 50N for running data.
Fs : number
Sampling frequency of signal.
min_tc : number
Minimum contact time, in seconds, to consider as a real stance phase. Jogging > 0.2s
max_tc : number
Maximum contact time, in seconds, to consider as a real stance phase. Jogging > 0.4s
plot : bool
If true, return plot showing vGRF signal with initial and final contact points identified. Helpful for determining appropriate threshold, min_tc, and max_tc values.

Returns

stance_begin_all : ndarray
Array of frame indexes for every start of stance phase found in trial. Use good_stances to index which ones pass the min/max_tc requirements.
stance_end_all : ndarray
Array of frame indexes for every end of stance phase found in trial. Use good_stances to index which ones pass the min/max_tc requirements.
good_stances : ndarray
Boolean array of which stance phases meet min_tc & max_tc requirements.

Examples

from dryft import signal

stance_begin_all, stance_end_all, good_stances = signal.splitsteps(vGRF=GRF_filt,
                                                                   threshold=140,
                                                                   Fs=300,
                                                                   min_tc=0.2,
                                                                   max_tc=0.4,
                                                                   plot=True)
Expand source code
def splitsteps(vGRF, threshold, Fs, min_tc, max_tc, plot=False):
    """Read in filtered vertical ground reaction force (vGRF) signal and ID stance phases based on a threshold.

    Designed for running, hopping, or activity where 1 foot is on the force plate at a time.
    Identified stance phases are compared to min/max contact time (tc) to eliminate ones that are too short/long. 
    Update these parameters and threshold if little-no stance phases are identified. Setting plots=True can aid in 
    troubleshooting.

    Parameters
    ----------
    vGRF : `ndarray`
        Filtered vertical ground reaction force (vGRF) signal [n,]. Using unfiltered signal will cause unreliable results.
    threshold : `number`
        Determines when initial contact and final contact are defined. In the same units as vGRF signal. Please be
        responsible and set to < 50N for running data.
    Fs : `number`
        Sampling frequency of signal.
    min_tc : `number`
        Minimum contact time, in seconds, to consider as a real stance phase. Jogging > 0.2s
    max_tc : number
        Maximum contact time, in seconds, to consider as a real stance phase. Jogging > 0.4s
    plot : `bool`
        If true, return plot showing vGRF signal with initial and final contact points identified. Helpful for
        determining appropriate threshold, min_tc, and max_tc values.

    Returns
    -------
    stance_begin_all : `ndarray`
        Array of frame indexes for every start of stance phase found in trial. Use `good_stances` to index which ones
        pass the min/max_tc requirements.
    stance_end_all : `ndarray`
        Array of frame indexes for every end of stance phase found in trial. Use `good_stances` to index which ones
        pass the min/max_tc requirements.
    good_stances : `ndarray`
        Boolean array of which stance phases meet min_tc & max_tc requirements.

    Examples
    --------
        from dryft import signal

        stance_begin_all, stance_end_all, good_stances = signal.splitsteps(vGRF=GRF_filt,
                                                                           threshold=140,
                                                                           Fs=300,
                                                                           min_tc=0.2,
                                                                           max_tc=0.4,
                                                                           plot=True)
    """

    if min_tc < max_tc:
        # Identification. Forces over threshold register as stance phase (foot on ground).
        compare = (vGRF.flatten() > threshold).astype(int)

        events = np.diff(compare)
        stance_begin_all = np.squeeze(np.asarray(np.nonzero(events == 1)).transpose())
        stance_end_all = np.squeeze(np.asarray(np.nonzero(events == -1)).transpose())

        if plot:
            plt.plot(vGRF)
            plt.plot(events*500)
            plt.show(block = False)

        # if trial starts with end of stance phase, ignore
        stance_end_all = stance_end_all[stance_end_all > stance_begin_all[0]]
        # trim end of stance_begin_all to match stance_end_all.
        stance_begin_all = stance_begin_all[0:stance_end_all.shape[0]]

        stance_len = stance_end_all - stance_begin_all
        good_stances = np.logical_and(stance_len >= min_tc*Fs, stance_len <= max_tc*Fs)

        # ID suspicious stance phases (too long or short)
        if np.any(stance_len < min_tc*Fs):
            print('Out of', stance_len.shape[0], 'stance phases,', sum(stance_len < min_tc*Fs), ' < ',
                  min_tc, 'seconds.')
        if np.any(stance_len > max_tc*Fs):
            print('Out of', stance_len.shape[0], 'stance_phases,', sum(stance_len > max_tc*Fs), ' > ',
                  max_tc, 'seconds.')
        # print sizes
        print('Total number of contact time begin/end:', stance_begin_all.shape[0], stance_end_all.shape[0])

        return stance_begin_all, stance_end_all, good_stances

    else:
        raise IndexError('Did not ID stance phases: min_tc > max_tc.')