Source code for linescanning.utils

import csv
import fnmatch
import json
import math
import matplotlib.colors as mcolors
from matplotlib import cm
import nibabel as nb
from nilearn import signal
import numpy as np
import operator
import os
import pandas as pd
from PIL import ImageColor
import random
from scipy import io, interpolate
from shapely import geometry
import subprocess
import warnings
import itertools

opj = os.path.join
pd.options.mode.chained_assignment = None # disable warning thrown by string2float
warnings.filterwarnings("ignore")

# Define a function 'pairwise' that iterates over all pairs of consecutive items in a list
def pairwise(l1):
    # Create an empty list 'temp' to store the pairs
    temp = []

    # Iterate through the list elements up to the second-to-last element
    for i in range(len(l1) - 1):
        # Get the current element and the next element in the list
        current_element, next_element = l1[i], l1[i + 1]

        # Create a tuple 'x' containing the current and next elements
        x = (current_element, next_element)

        # Append the tuple 'x' to the 'temp' list
        temp.append(x)

    # Return the list of pairs
    return temp

def normalize(data):
    return (data-np.min(data))/(np.max(data)-np.min(data))

def verbose(msg, verbose, flush=True, **kwargs):
    if verbose:
        print(msg, flush=flush, **kwargs)

def calculate_tsnr(data,ax):
    mean_d = np.mean(data,axis=ax)
    std_d = np.std(data,axis=ax)
    tsnr = mean_d/std_d
    tsnr[np.where(np.isinf(tsnr))] = np.nan

    return tsnr
    
[docs] def copy_hdr(source_img,dest_img): """copy_hdr Similar functionality as fslcpgeom but than more rigorious using Nibabel. Copies the ENTIRE header, including affine, quaternion rotations, and dimensions. Parameters ---------- source_img: str, nibabel.Nifti1Image source image from which to derive the header information dest_img: str, nibabel.Nifti1Image destination image to which to copy the header from <source image> to Returns ---------- nibabel.Nifti1Image `source_img` with updated header information Example ---------- >>> new_img = copy_hdr(img1,img2) """ if isinstance(source_img, nb.Nifti1Image): src_img = source_img elif isinstance(source_img, str): src_img = nb.load(source_img) if isinstance(dest_img, nb.Nifti1Image): targ_img = dest_img elif isinstance(dest_img, str): targ_img = nb.load(dest_img) new = nb.Nifti1Image(targ_img.get_fdata(), affine=src_img.affine, header=src_img.header) return new
class color: # """color # Add some color to the terminal. # Example # ---------- # >>> print("set orientation to " + utils.color.BOLD + utils.color.RED + "SOME TEXT THAT'LL BE IN TED" + utils.color.END) # """ PURPLE = '\033[95m' CYAN = '\033[96m' DARKCYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def str2operator(ops): if ops in ["and","&","&&"]: return operator.and_ elif ops in ["or","|","||"]: return operator.or_ elif ops in ["is not","!="]: return operator.ne elif ops in ["is","==","="]: return operator.eq elif ops in ["gt",">"]: return operator.gt elif ops in ["lt","<"]: return operator.lt elif ops in ["ge",">="]: return operator.ge elif ops in ["le","<="]: return operator.le elif ops in ["x","*"]: return operator.mul elif ops == "/": return operator.truediv else: raise NotImplementedError()
[docs] def find_nearest(array, value, return_nr=1): """find_nearest Find the index and value in an array given a value. You can either choose to have 1 item (the `closest`) returned, or the 5 nearest items (`return_nr=5`), or everything you're interested in (`return_nr="all"`) Parameters ---------- array: numpy.ndarray array to search in value: float value to search for in `array` return_nr: int, str, optional number of elements to return after searching for elements in `array` that are close to `value`. Can either be an integer or a string *all* Returns ---------- int integer representing the index of the element in `array` closest to `value`. list if `return_nr` > 1, a list of indices will be returned numpy.ndarray value in `array` at the index closest to `value` """ array = np.asarray(array) if return_nr == 1: idx = np.nanargmin((np.abs(array-value))) return idx, array[idx] else: # check nan indices nans = np.isnan(array) # initialize output idx = np.full_like(array, np.nan) # loop through values in array for qq,ii in enumerate(array): # don't do anything if value is nan if not nans[qq]: idx[qq] = np.abs(ii-value) # sort idx = np.argsort(idx) # return everything if return_nr == "all": idc_list = idx.copy() else: # return closest X values idc_list = idx[:return_nr] return idc_list, array[idc_list]
[docs] def replace_string(fn, str1, str2, fn_sep='_'): """replace_string Replace a string with another string given a filename Parameters ---------- fn: str filename in which we need to replace something str1: str string-to-be-replaced str2: str string-to-replace-str1-with fn_sep: str what type of element can we use to split the filename into chunks that we can replace Returns ---------- str filename with replaced substring """ if not isinstance(fn, str): raise ValueError(f"Input must be string, not {fn} of type {type(fn)}") split_name = fn.split(os.sep)[-1].split(fn_sep) idx = [(i, split_name.index(str1)) for i, split_name in enumerate(split_name) if str1 in split_name][0][0] split_name[idx] = split_name[idx].replace(split_name[idx], str2) new_filename = fn_sep.join(split_name) new_filename = opj(os.path.dirname(fn), new_filename) return new_filename
[docs] def convert2unit(v, method="np"): """convert vector to unit vector""" import numpy as np if method.lower() == "np": v_hat = v / np.linalg.norm(v) return v_hat elif method.lower() == "mesh": # https://sites.google.com/site/dlampetest/python/calculating-normals-of-a-triangle-mesh-using-numpy lens = np.sqrt( v[:,0]**2 + v[:,1]**2 + v[:,2]**2 ) v[:,0] /= lens v[:,1] /= lens v[:,2] /= lens return v
[docs] def string2list(string_array, make_float=False): """string2list This function converts a array in string representation to a list of string. This can happen, for instance, when you use bash to give a list of strings to python, where ast.literal_eval fails. Parameters ---------- string_array: str string to be converted to a valid numpy array with float values Returns ---------- numpy.ndarray array containing elements in float rather than in string representation Example ---------- >>> string2list('[tc,bgfs]') ['tc', 'bgfs'] """ if type(string_array) == str: new = string_array.split(',')[0:] new = list(filter(None, new)) if make_float: new = [float(ii) for ii in new] return new else: # array is already in non-string format return string_array
[docs] def string2float(string_array): """string2float This function converts a array in string representation to a regular float array. This can happen, for instance, when you've stored a numpy array in a pandas dataframe (such is the case with the 'normal' vector). It starts by splitting based on empty spaces, filter these, and convert any remaining elements to floats and returns these in an array. Parameters ---------- string_array: str string to be converted to a valid numpy array with float values Returns ---------- numpy.ndarray array containing elements in float rather than in string representation Example ---------- >>> string2float('[ -7.42 -92.97 -15.28]') array([ -7.42, -92.97, -15.28]) """ if type(string_array) == str: new = string_array[1:-1].split(' ')[0:] new = list(filter(None, new)) new = [float(i.strip(",")) for i in new] new = np.array(new) return new else: # array is already in non-string format return string_array
[docs] def get_module_nr(key_word): """get_module_nr Fetches the module number from the master script given an input string. It sends a command using sed and grep to the bash command line. Won't work on windows! See `call_bashhelper` for more information (that version is actually more accurate as it allows additions to the `master` usage, while that's hardcoded in this one..) Parameters ---------- key_word: str search string of the module your interested in. Should at least match otherwise the function will not find anything. For instance, if we want to know which module the creation of the sinus mask is, we can do: Example ---------- >>> get_module_nr('sinus') '12' """ cmd = "sed -n \'50,85p\' {master} | grep -A0 \"{key}\" | grep -Eo \"[0-9]{{1,2}}\" | head -n 1".format( master=opj(os.environ['DIR_SCRIPTS'], 'shell', 'master'), key=key_word) # print(cmd) mod = subprocess.getoutput(cmd) return mod
[docs] def bids_fullfile(bids_image): """get full path to a BIDS-image filename""" fullfile = opj(bids_image.dirname, bids_image.filename) return fullfile
[docs] def decode(obj): """decode an object""" if isinstance(obj, bytes): obj = obj.decode() return obj
[docs] def reverse_sign(x): """reverse_sign Inverts the sign given set of values. Can be either one value or an array of values that need to be inverted Parameters ---------- x: int,float,list,numpy.ndarray input that needs inverting, either one value or a list Returns ---------- the inverse of whatever the input `x` was Example ---------- >>> # input is integer >>> x = 5 >>> reverse_sign(x) -5 >>> # input is array >>> x = np.array([2, -2340, 2345,123342, 123]) >>> In [6]: reverse_sign(x) array([-2.00000e+00, 2.34000e+03, -2.34500e+03, -1.23342e+05,-1.23000e+02]) >>> # input is float >>> x = 5.0 >>> reverse_sign(x) -5.0 """ import numpy as np inverted = () if isinstance(x, int) or isinstance(x, float) or isinstance(x, np.float32): if x > 0: inverted = -x else: inverted = abs(x) elif isinstance(x, np.ndarray): for i in x: if float(i) > 0: val = -float(i) else: val = abs(float(i)) inverted = np.append(inverted, val) return inverted
[docs] def remove_files(path, string, ext=False): """remove_files Remove files from a given path that containg a string as extension (`ext=True`), or at the start of the file (`ext=False`) Parameters ---------- path: str path to the directory from which we need to remove files string: str tag for files we need to remove ext: str, optional only remove files containing `string` that end with `ext` """ files_in_directory = os.listdir(path) if ext: filtered_files = [file for file in files_in_directory if file.endswith(string)] else: filtered_files = [file for file in files_in_directory if file.startswith(string)] for file in filtered_files: path_to_file = os.path.join(path, file) os.remove(path_to_file)
[docs] def match_lists_on(ref_list, search_list, matcher="run"): """match_lists_on Match two list based on a BIDS-specifier such as 'sub', 'run', etc. Can be any key that is extracted using :func:`linescanning.utils.split_bids_components`. Parameters ---------- ref_list: list List to use as reference search_list: list List to search for items in `ref_list` matcher: str, optional BIDS-identifier, by default "run" Returns ---------- list new `search_list` filtered for items in `ref_list` Example ---------- >>> # Let's say I have functional files for 3 runs >>> func_file >>> ['sub-003_ses-3_task-SR_run-3_bold.mat', >>> 'sub-003_ses-3_task-SR_run-4_bold.mat', >>> 'sub-003_ses-3_task-SR_run-6_bold.mat'] >>> # and anatomical slices for 5 runs >>> anat_slices >>> ['sub-003_ses-3_acq-1slice_run-2_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-3_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-4_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-5_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-6_T1w.nii.gz'] >>> # I can then use `match_list_on` to find the anatomical slices corresponding to the functional files >>> from linescanning import utils >>> utils.match_lists_on(func_file, anat_slices, matcher='run') >>> ['sub-003_ses-3_acq-1slice_run-3_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-4_T1w.nii.gz', >>> 'sub-003_ses-3_acq-1slice_run-6_T1w.nii.gz'] """ if isinstance(matcher, str): matcher = [matcher] new_list = [] for ii in ref_list: comps = split_bids_components(ii) # loop through elements in 'matcher' list search_for = [f"{ii}-{comps[ii]}" for ii in matcher] ff = get_file_from_substring(search_for, search_list, return_msg="None") if ff != None: if ff == search_list: raise ValueError(f"Output list is equal to input list with identifier '{matcher}'. Please use unique identifier") new_list.append(ff) return new_list
def get_unique_ids(df, id=None, sort=True, as_int=False, drop_na=True): try: df = df.reset_index() except: pass if not isinstance(id, str): raise ValueError(f"Please specify a identifier from the dataframe") try: a = df[id].values if not sort: indexes = np.unique(a, return_index=True)[1] ret_list = [a[index] for index in sorted(indexes)] else: ret_list = list(np.unique(a)) # https://stackoverflow.com/a/50297200 if drop_na: ret_list = [x for x in ret_list if x == x] if as_int: ret_list = [int(i) for i in ret_list] return ret_list except Exception as e: raise RuntimeError(f"Could not find '{id}' in {list(df.columns)}")
[docs] def get_file_from_substring(filt, path, return_msg='error', exclude=None): """get_file_from_substring This function returns the file given a path and a substring. Avoids annoying stuff with glob. Now also allows multiple filters to be applied to the list of files in the directory. The idea here is to construct a binary matrix of shape (files_in_directory, nr_of_filters), and test for each filter if it exists in the filename. If all filters are present in a file, then the entire row should be 1. This is what we'll be looking for. If multiple files are found in this manner, a list of paths is returned. If only 1 file was found, the string representing the filepath will be returned. Parameters ---------- filt: str, list tag for files we need to select. Now also support a list of multiple filters. path: str path to the directory from which we need to remove files return_msg: str, optional whether to raise an error (*return_msg='error') or return None (*return_msg=None*). Default = 'error'. exclude: str, optional: Specify string to exclude from options. This criteria will be ensued after finding files that conform to `filt` as final filter. Returns ---------- str path to the files containing `string`. If no files could be found, `None` is returned list list of paths if multiple files were found Raises ---------- FileNotFoundError If no files usingn the specified filters could be found Example ---------- >>> get_file_from_substring("R2", "/path/to/prf") '/path/to/prf/r2.npy' >>> get_file_from_substring(['gauss', 'best_vertices'], "path/to/pycortex/sub-xxx") '/path/to/pycortex/sub-xxx/sub-xxx_model-gauss_desc-best_vertices.csv' >>> get_file_from_substring(['best_vertices'], "path/to/pycortex/sub-xxx") ['/path/to/pycortex/sub-xxx/sub-xxx_model-gauss_desc-best_vertices.csv', '/path/to/pycortex/sub-xxx/sub-xxx_model-norm_desc-best_vertices.csv'] """ input_is_list = False if isinstance(filt, str): filt = [filt] if isinstance(exclude, str): exclude = [exclude] if isinstance(filt, list): # list and sort all files in the directory if isinstance(path, str): files_in_directory = sorted(os.listdir(path)) elif isinstance(path, list): input_is_list = True files_in_directory = path.copy() else: raise ValueError("Unknown input type; should be string to path or list of files") # the idea is to create a binary matrix for the files in 'path', loop through the filters, and find the row where all values are 1 filt_array = np.zeros((len(files_in_directory), len(filt))) for ix,f in enumerate(files_in_directory): for filt_ix,filt_opt in enumerate(filt): filt_array[ix,filt_ix] = filt_opt in f # now we have a binary <number of files x number of filters> array. If all filters were available in a file, the entire row should be 1, # so we're going to look for those rows full_match = np.ones(len(filt)) full_match_idc = np.where(np.all(filt_array==full_match,axis=1))[0] if len(full_match_idc) == 1: fname = files_in_directory[full_match_idc[0]] if input_is_list: return fname else: f = opj(path, fname) if isinstance(exclude, list): if not any(x in f for x in exclude): return opj(path, fname) else: if return_msg == "error": raise FileNotFoundError(f"Could not find file with filters: {filt} and exclusion of [{exclude}] in '{path}'") else: return None else: return opj(path, fname) elif len(full_match_idc) > 1: match_list = [] for match in full_match_idc: fname = files_in_directory[match] if input_is_list: match_list.append(fname) else: match_list.append(opj(path, fname)) if isinstance(exclude, list): exl_list = [] for f in match_list: if not any(x in f for x in exclude): exl_list.append(f) # return the string if there's only 1 element if len(exl_list) == 1: return exl_list[0] else: return exl_list else: return match_list # return match_list else: if return_msg == "error": raise FileNotFoundError(f"Could not find file with filters: {filt} in {path}") else: return None
[docs] def get_matrixfromants(mat, invert=False): """get_matrixfromants This function greps the rotation and translation matrices from the matrix-file create by `antsRegistration`. It basically does the same as on of the ANTs functions, but still.. Parameters ---------- mat: str string pointing to a *.mat*-file containing the transformation. invert: bool Boolean for inverting the matrix (`invert=False`) or not (`invert=True`) Return ---------- numpy.ndarray (4,4) array representing the transformation matrix """ if mat.endswith(".mat"): genaff = io.loadmat(mat) key = list(genaff.keys())[0] matrix = np.hstack((genaff[key][0:9].reshape(3, 3), genaff[key][9:].reshape(3, 1))) matrix = np.vstack([matrix, [0, 0, 0, 1]]) elif mat.endswith(".txt"): matrix = np.loadtxt(mat) if invert == True: matrix = np.linalg.inv(matrix) return matrix
def ants_truncate_intensities( in_file, out_file, lower=0.01, upper=0.99, n_bins=256): import nibabel as nb import os if not isinstance(in_file, str): raise TypeError(f"Input must be a string pointing to a nifti file or a nb.Nifti1Image-object, not '{type(in_file)}'") else: dims = nb.load(in_file).header["dim"][0] if not isinstance(out_file, str): out_file = os.path.abspath(in_file) raise TypeError(f"out_file must be a string, not '{type(out_file)}'") else: out_file = os.path.abspath(out_file) cmd = f"ImageMath {dims} {out_file} TruncateImageIntensity {in_file} {lower} {upper} {n_bins}" # print command if verb = True print(cmd) os.system(cmd) return out_file
[docs] def ants_to_spm_moco(affine, deg=False, convention="SPM"): """SPM output = x [LR], y [AP], z [SI], rx, ry, rz. ANTs employs an LPS system, so y value should be switched""" dx, dy, dz = affine[9:] if convention == "SPM": dy = reverse_sign(dy) rot_x = np.arcsin(affine[6]) cos_rot_x = np.cos(rot_x) rot_y = np.arctan2(affine[7] / cos_rot_x, affine[8] / cos_rot_x) rot_z = np.arctan2(affine[3] / cos_rot_x, affine[0] / cos_rot_x) if deg: rx,ry,rz = np.degrees(rot_x),np.degrees(rot_y),np.degrees(rot_z) else: rx,ry,rz = rot_x,rot_y,rot_z moco_pars = np.array([dx,dy,dz,rx,ry,rz]) return moco_pars
[docs] def make_chicken_csv(coord, input="ras", output_file=None, vol=0.343): """make_chicken_csv This function creates a .csv-file like the chicken.csv example from ANTs to warp a coordinate using a transformation file. ANTs assumes the input coordinate to be LPS, but this function can deal with RAS-coordinates too. (see https://github.com/stnava/chicken for the reason of this function's name) Parameters ---------- coord: np.ndarray numpy array containing the three coordinates in x,y,z direction input: str specify whether your coordinates uses RAS or LPS convention (default is RAS, and will be converted to LPS to create the file) output_file: str path-like string pointing to an output file (.csv!) vol: float volume of voxels (pixdim_x*pixdim_y*pixdim_z). If you're using the standard 0.7 MP2RAGE, the default vol will be ok Returns ---------- str path pointing to the `csv`-file containing the coordinate Example ---------- >>> make_chicken_csv(np.array([-16.239,-67.23,-2.81]), output_file="sub-001_space-fs_desc-lpi.csv") "sub-001_space-fs_desc-lpi.csv" """ if len(coord) > 3: coord = coord[:3] if input.lower() == "ras": # ras2lps LPS = np.array([[-1,0,0], [0,-1,0], [0,0,1]]) coord = LPS @ coord # rows = ["x,y,z,t,label,mass,volume,count", f"{coord[0]},{coord[1]},{coord[2]},0,1,1,{vol},1"] with open(output_file, "w") as target: writer = csv.writer(target, delimiter=",") writer.writerow(["x","y","z","t","label","mass","volume","count"]) writer.writerow([coord[0],coord[1],coord[2],0,1,1,vol,1]) return output_file
[docs] def read_chicken_csv(chicken_file, return_type="lps"): """read_chicken_csv Function to get at least the coordinates from a csv file used with antsApplyTransformsToPoints. (see https://github.com/stnava/chicken for the reason of this function's name) Parameters ---------- chicken_file: str path-like string pointing to an input file (.csv!) return_type: str specify the coordinate system that the output should be in Returns ---------- numpy.ndarray (3,) array containing the coordinate in `chicken_file` Example ---------- >>> read_chicken_csv("sub-001_space-fs_desc-lpi.csv") array([-16.239,-67.23,-2.81]) """ contents = pd.read_csv(chicken_file) coord = np.squeeze(contents.iloc[:,0:3].values) if return_type.lower() == "lps": return coord elif return_type.lower() == "ras": # ras2lps LPS = np.array([[-1,0,0], [0,-1,0], [0,0,1]]) return LPS@coord
def get_vertex_nr(subject, as_list=False, debug=False, fs_dir=None): if not isinstance(fs_dir, str): fs_dir = os.environ.get("SUBJECTS_DIR") n_verts_fs = [] for i in ['lh', 'rh']: surf = opj(fs_dir, subject, 'surf', f'{i}.white') verbose(surf, debug) if not os.path.exists(surf): raise FileNotFoundError(f"Could not find file '{surf}'") try: verts = nb.freesurfer.io.read_geometry(surf)[0].shape[0] except: annot = opj(fs_dir, subject, 'label', f'{i}.aparc.a2009s.annot') verts = nb.freesurfer.io.read_annot(annot)[0].shape[0] n_verts_fs.append(verts) if as_list: return n_verts_fs else: return sum(n_verts_fs)
[docs] class VertexInfo: """ VertexInfo This object reads a .csv file containing relevant information about the angles, vertex position, and normal vector. Parameters ---------- infofile: str path to the information file containing `best_vertices` in the filename subject: str subject ID as used in `SUBJECTS_DIR` Returns ---------- attr sets attributes in the class """ def __init__(self, infofile=None, subject=None, hemi="lh"): self.infofile = infofile self.data = pd.read_csv(self.infofile) # try to set the index to hemi. It will throw an error if you want to set the index while there already is an index. # E.g., initially we will set the index to 'hemi'. If we then later on read in that file again, the index is already # set try: self.data = self.data.set_index('hemi') except: pass if hemi.lower() in ["lh","l","left"]: self.hemi = "L" elif hemi.lower() in ["rh","r","right"]: self.hemi = "R" else: self.hemi = "both" if self.hemi == "both": # check if arrays are in string format for hemi in ["L", "R"]: self.data['normal'][hemi] = string2float(self.data['normal'][hemi]) self.data['position'][hemi] = string2float(self.data['position'][hemi]) else: self.data['normal'][self.hemi] = string2float(self.data['normal'][self.hemi]) self.data['position'][self.hemi] = string2float(self.data['position'][self.hemi]) self.subject = subject
[docs] def get(self, keyword, hemi='both'): """return values from dataframe given keyword. Can be any column name or 'prf' for pRF-parameters""" keywords = np.array(self.data.columns) if keyword == "prf": if hemi == "both": return { "lh": [self.data[ii]['L'] for ii in ['x', 'y', 'size', 'beta', 'baseline', 'r2']], "rh": [self.data[ii]['R'] for ii in ['x', 'y', 'size', 'beta', 'baseline', 'r2']]} elif hemi.lower() == "right" or hemi.lower() == "r" or hemi.lower() == "rh": return [self.data[ii]['R'] for ii in ['x', 'y', 'size', 'beta', 'baseline', 'r2']] elif hemi.lower() == "left" or hemi.lower() == "l" or hemi.lower() == "lh": return [self.data[ii]['L'] for ii in ['x', 'y', 'size', 'beta', 'baseline', 'r2']] else: if keyword not in keywords: raise ValueError(f"{keyword} does not exist in {keywords}") if hemi == "both": return {"lh": self.data[keyword]['L'], "rh": self.data[keyword]['R']} elif hemi.lower() == "right" or hemi.lower() == "r" or hemi.lower() == "rh": return self.data[keyword]['R'] elif hemi.lower() == "left" or hemi.lower() == "l" or hemi.lower() == "lh": return self.data[keyword]['L']
def convert_to_rgb(color, as_integer=False): if isinstance(color, tuple): (R,G,B) = color elif isinstance(color, str): if len(color) == 1: color = mcolors.to_rgb(color) else: color = ImageColor.getcolor(color, "RGB") (R,G,B) = color if not as_integer: rgb = [] for v in [R,G,B]: if v>1: v /= 255 rgb.append(v) R,G,B = rgb else: rgb = [] for v in [R,G,B]: if v<=1: v = int(v*255) rgb.append(v) R,G,B = rgb return (R,G,B)
[docs] def make_binary_cm(color): """make_binary_cm This function creates a custom binary colormap using matplotlib based on the RGB code specified. Especially useful if you want to overlay in imshow, for instance. These RGB values will be converted to range between 0-1 so make sure you're specifying the actual RGB-values. I like `https://htmlcolorcodes.com` to look up RGB-values of my desired color. The snippet of code used here comes from https://kbkb-wx-python.blogspot.com/2015/12/python-transparent-colormap.html Parameters ---------- <color>: tuple, str either hex-code with (!!) '#' or a tuple consisting of: * <R> int | red-channel (0-255) * <G> int | green-channel (0-255) * <B> int | blue-channel (0-255) Returns ---------- matplotlib.colors.LinearSegmentedColormap object colormap to be used with `plt.imshow` Example ---------- >>> cm = make_binary_cm((232,255,0)) >>> cm <matplotlib.colors.LinearSegmentedColormap at 0x7f35f7154a30> >>> cm = make_binary_cm("#D01B47") >>> cm >>> <matplotlib.colors.LinearSegmentedColormap at 0x7f35f7154a30> """ # convert input to RGB R,G,B = convert_to_rgb(color) colors = [(R,G,B,c) for c in np.linspace(0,1,100)] cmap = mcolors.LinearSegmentedColormap.from_list('mycmap', colors, N=5) return cmap
def find_missing(lst): return [i for x, y in zip(lst, lst[1:]) for i in range(x + 1, y) if y - x > 1] def make_between_cm( col1, col2, as_list=False, **kwargs): input_list = [col1,col2] # scale to 0-1 col_list = [] for color in input_list: scaled_color = convert_to_rgb(color) col_list.append(scaled_color) cm = mcolors.LinearSegmentedColormap.from_list("", col_list, **kwargs) if as_list: return [mcolors.rgb2hex(cm(i)) for i in range(cm.N)] else: return cm def make_stats_cm( direction, lower_neg=(51,0,248), upper_neg=(151,253,253), lower_pos=(217,36,36), upper_pos=(251,255,72), invert=False, ): if direction not in ["pos","neg"]: raise ValueError(f"direction must be one of 'pos' or 'neg', not '{direction}'") if direction == "pos": input_list = [lower_pos,upper_pos] else: input_list = [lower_neg, upper_neg] if invert: input_list = input_list[::-1] # scale to 0-1 col_list = [] for color in input_list: scaled_color = convert_to_rgb(color) col_list.append(scaled_color) return mcolors.LinearSegmentedColormap.from_list("", col_list)
[docs] def percent_change( ts, ax, nilearn=False, baseline=20, prf=False, dm=None ): """percent_change Function to convert input data to percent signal change. Two options are current supported: the nilearn method (`nilearn=True`), where the mean of the entire timecourse if subtracted from the timecourse, and the baseline method (`nilearn=False`), where the median of `baseline` is subtracted from the timecourse. Parameters ---------- ts: numpy.ndarray Array representing the data to be converted to percent signal change. Should be of shape (n_voxels, n_timepoints) ax: int Axis over which to perform the conversion. If shape (n_voxels, n_timepoints), then ax=1. If shape (n_timepoints, n_voxels), then ax=0. nilearn: bool, optional Use nilearn method, by default False baseline: int, list, np.ndarray optional Use custom method where only the median of the baseline (instead of the full timecourse) is subtracted, by default 20. Length should be in `volumes`, not `seconds`. Can also be a list or numpy array (1d) of indices which are to be considered as baseline. The list of indices should be corrected for any deleted volumes at the beginning. Returns ---------- numpy.ndarray Array with the same size as `ts` (voxels,time), but with percent signal change. Raises ---------- ValueError If `ax` > 2 """ if ts.ndim == 1: ts = ts[:,np.newaxis] ax = 0 if prf: from linescanning import prf # format data if ts.ndim == 1: ts = ts[...,np.newaxis] # read design matrix if isinstance(dm, str): dm = prf.read_par_file(dm) # calculate mean avg = np.mean(ts, axis=ax) ts *= (100/avg) # find points with no stimulus timepoints_no_stim = prf.baseline_from_dm(dm) # find timecourses with no stimulus if ax == 0: med_bsl = ts[timepoints_no_stim,:] else: med_bsl = ts[:,timepoints_no_stim] # calculat median over baseline median_baseline = np.median(med_bsl, axis=ax) # shift to zero ts -= median_baseline return ts else: if nilearn: if ax == 0: psc = signal._standardize(ts, standardize='psc') else: psc = signal._standardize(ts.T, standardize='psc').T else: # first step of PSC; set NaNs to zero if dividing by 0 (in case of crappy timecourses) ts_m = ts*np.expand_dims(np.nan_to_num((100/np.mean(ts, axis=ax))), ax) # get median of baseline if isinstance(baseline, np.ndarray): baseline = list(baseline) if ax == 0: if isinstance(baseline, list): median_baseline = np.median(ts_m[baseline,:], axis=0) else: median_baseline = np.median(ts_m[:baseline,:], axis=0) elif ax == 1: if isinstance(baseline, list): median_baseline = np.median(ts_m[:,baseline], axis=1) else: median_baseline = np.median(ts_m[:,:baseline], axis=1) else: raise ValueError("ax must be 0 or 1") # subtract psc = ts_m-np.expand_dims(median_baseline,ax) return psc
[docs] def unique_combinations(elements, l=2): """ Precondition: `elements` does not contain duplicates. Postcondition: Returns unique combinations of length 2 from `elements`. >>> unique_combinations(["apple", "orange", "banana"]) [("apple", "orange"), ("apple", "banana"), ("orange", "banana")] """ return list(itertools.combinations(elements, l))
[docs] def select_from_df( df, expression=None, index=True, indices=None, match_exact=True ): """select_from_df Select a subset of a dataframe based on an expression. Dataframe should be indexed by the variable you want to select on or have the variable specified in the expression argument as column name. If index is True, the dataframe will be indexed by the selected variable. If indices is specified, the dataframe will be indexed by the indices specified through a list (only select the elements in the list) or a `range`-object (select within range). Parameters ---------- df: pandas.DataFrame input dataframe expression: str, optional what subject of the dataframe to select, by default None. The expression must consist of a variable name and an operator. The operator can be any of the following: '=', '>', '<', '>=', '<=', '!=', separated by spaces. You can also change 2 operations by specifying the `&`-operator between the two expressions. If you want to use `indices`, specify `expression="ribbon"`. index: bool, optional return output dataframe with the same indexing as `df`, by default True indices: list, range, numpy.ndarray, optional List, range, or numpy array of indices to select from `df`, by default None match_exact: bool, optional: When you insert a list of strings with `indices` to be filtered from the dataframe, you can either request that the items of `indices` should **match** exactly (`match_exact=True`, default) the column names of `df`, or whether the columns of `df` should **contain** the items of `indices` (`match_exact=False`). Returns ---------- pandas.DataFrame new dataframe where `expression` or `indices` were selected from `df` Raises ---------- TypeError If `indices` is not a tuple, list, or array Notes ---------- See https://linescanning.readthedocs.io/en/latest/examples/nideconv.html for an example of how to use this function (do ctrl+F and enter "select_from_df"). """ # not the biggest fan of a function within a function, but this allows easier translation of expressions/operators def sort_expressions(expression): expr_ = expression.split(" ") if len(expr_)>3: for ix,i in enumerate(expr_): try: _ = str2operator(i) break except: pass col1 = " ".join(expr_[:ix]) val1 = " ".join(expr_[(ix+1):]) operator1 = expr_[ix] else: col1,operator1,val1 = expr_ return col1,operator1,val1 if not isinstance(expression, (str,tuple,list)): raise ValueError(f"Please specify expressions to apply to the dataframe. Input is '{expression}' of type ({type(expression)})") if expression == "ribbon": if isinstance(indices, tuple): return df.iloc[:,indices[0]:indices[1]] elif isinstance(indices, list): if all(isinstance(item, str) for item in indices): if match_exact: return df[df.columns[df.columns.isin(indices)]] else: df_tmp = [] for item in indices: df_tmp.append(df[df.columns[df.columns.str.contains(item)]]) return pd.concat(df_tmp, axis=1) else: return df.iloc[:,indices] elif isinstance(indices, np.ndarray): return df.iloc[:,list(indices)] else: raise TypeError(f"Unknown type '{type(indices)}' for indices; must be a tuple of 2 values representing a range, or a list/array of indices to select") else: # fetch existing indices idc = list(df.index.names) if idc[0] != None: reindex = True else: reindex = False # sometimes throws an error if you're trying to reindex a non-indexed dataframe try: df = df.reset_index() except: pass sub_df = df.copy() if isinstance(expression, str): expression = [expression] if isinstance(expression, (tuple,list)): expressions = expression[::2] operators = expression[1::2] if len(expressions) == 1: # find operator index col1,operator1,val1 = sort_expressions(expressions[0]) # convert to operator function ops1 = str2operator(operator1) # use dtype of whatever dtype the colum is search_value = np.array([val1], dtype=type(sub_df[col1].values[0])) sub_df = sub_df.loc[ops1(sub_df[col1], search_value[0])] if len(expressions) == 2: col1,operator1,val1 = sort_expressions(expressions[0]) col2,operator2,val2 = sort_expressions(expressions[1]) main_ops = str2operator(operators[0]) ops1 = str2operator(operator1) ops2 = str2operator(operator2) # check if we should interpret values invididually as integers search_value1 = np.array([val1], dtype=type(sub_df[col1].values[0]))[0] search_value2 = np.array([val2], dtype=type(sub_df[col2].values[0]))[0] sub_df = sub_df.loc[main_ops(ops1(sub_df[col1], search_value1), ops2(sub_df[col2], search_value2))] # first check if we should do indexing if index != None: # then check if we actually have something to index if reindex: if idc[0] != None: sub_df = sub_df.set_index(idc) if sub_df.shape[0] == 0: raise ValueError(f"The following expression(s) resulted in an empty dataframe: {expression}") return sub_df
def multiselect_from_df(df, expression=[]): if not isinstance(expression, list): raise TypeError(f"expression must be list of tuples (see docs utils.select_from_df), not {type(expression)}") if len(expression) == 0: raise ValueError(f"List is empty") start_df = df.copy() for expr in expression: df = select_from_df(df, expression=expr) return df def split_bids_components(fname, entities=False): comp_list = fname.split('_') comps = {} ids = ['sub', 'ses', 'task', 'acq', 'rec', 'run', 'space', 'hemi', 'model', 'stage', 'desc', 'vox'] full_entities = [ "subject", "session", "task", "reconstruction", "acquisition", "run" ] for el in comp_list: for i in ids: if i in el: comp = el.split('-')[-1] if "." in comp: ic = comp.index(".") if ic > 0: ex = 0 else: ex = -1 comp = comp.split(".")[ex] # if i == "run": # comp = int(comp) comps[i] = comp if len(comps) != 0: if entities: return comps, full_entities else: return comps else: raise ValueError(f"Could not find any element of {ids} in {fname}") class BIDSFile(): def __init__(self, bids_file): self.bids_file = os.path.abspath(bids_file) def get_bids_basepath(self, *args): return self._get_bids_basepath(self.bids_file, *args) def get_bids_root(self, *args): return self._get_bids_root(self.bids_file, *args) def get_bids_workbase(self, *args): return self._get_bids_workbase(self.bids_file, *args) def get_bids_workflow(self, **kwargs): return assemble_fmriprep_wf(self.bids_file, **kwargs) # def get_bids_root(self): @staticmethod def _get_bids_basepath(file, pref="sub"): sp = file.split(os.sep) for i in sp: if i.startswith(pref) and not i.endswith('.nii.gz'): base_path = os.sep.join(sp[sp.index(i)+1:-1]) break return base_path # def get_bids_root(self): @staticmethod def _get_bids_workbase(file, pref="sub"): sp = file.split(os.sep) for i in sp: if i.startswith(pref) and not i.endswith('.nii.gz'): base_path = os.sep.join(sp[sp.index(i):-2]) break return base_path @staticmethod def _get_bids_root(file, pref="sub"): sp = file.split(os.sep) for i in sp: if i.startswith(pref) and not i.endswith('.nii.gz'): bids_root = os.sep.join(sp[:sp.index(i)]) break return bids_root def get_bids_ids(self, **kwargs): return split_bids_components(self.bids_file, **kwargs) def get_ids(func_list, bids="task"): ids = [] if isinstance(func_list, list): for ff in func_list: if isinstance(ff, str): bids_comps = split_bids_components(ff) if bids in list(bids_comps.keys()): ids.append(bids_comps[bids]) if len(ids) > 0: ids = list(np.unique(np.array(ids))) return ids else: return [] def subjects_in_list(input): subj = [] for ii in input: subj.append(split_bids_components(ii)["sub"]) return list(np.unique(np.array(subj)))
[docs] def filter_for_nans(array): """filter out NaNs from an array""" if np.isnan(array).any(): return np.nan_to_num(array) else: return array
[docs] def find_max_val(array): """find the index of maximum value given an array""" return np.where(array == np.amax(array))[0]
[docs] def read_fs_reg(dat_file): """read_fs_reg Read a `.dat`-formatted registration file from FreeSurfer Parameters ---------- dat_file: str path pointing to the registration file Returns ---------- nump.ndarray (4,4) numpy array containing the transformation """ with open(dat_file) as f: d = f.readlines()[4:-1] return np.array([[float(s) for s in dd.split() if s] for dd in d])
[docs] def random_timeseries(intercept, volatility, nr): """random_timeseries Create a random timecourse by multiplying an intercept with a random Gaussian distribution. Parameters ---------- intercept: float starting point of timecourse volatility: float this factor is multiplied with the Gaussian distribution before multiplied with the intercept nr: int length of timecourse Returns ---------- numpy.ndarray array of length `nr` Example ---------- >>> from linescanning import utils >>> ts = utils.random_timeseries(1.2, 0.5, 100) Notes ---------- Source: https://stackoverflow.com/questions/67977231/how-to-generate-random-time-series-data-with-noise-in-python-3 """ time_series = [intercept, ] for _ in range(nr): time_series.append(time_series[-1] + intercept * random.gauss(0,1) * volatility) return np.array(time_series[:-1])
[docs] def squeeze_generic(a, axes_to_keep): """squeeze_generic Numpy squeeze implementation keeping <axes_to_keep> dimensions. Parameters ---------- a: numpy.ndarray array to be squeezed axes_to_keep: tuple, range tuple of axes to keep from original input Returns ---------- numpy.ndarray `axes_to_keep` from `a` Example ---------- >>> a = np.random.rand(3,5,1) >>> squeeze_generic(a, axes_to_keep=range(2)).shape (3, 5) Notes ---------- From: https://stackoverflow.com/questions/57472104/is-it-possible-to-squeeze-all-but-n-dimensions-using-numpy """ out_s = [s for i, s in enumerate(a.shape) if i in axes_to_keep or s != 1] return a.reshape(out_s)
[docs] def find_intersection(xx, curve1, curve2): """find_intersection Find the intersection coordinates given two functions using `Shapely`. Parameters ---------- xx: numpy.ndarray array describing the x-axis values curve1: numpy.ndarray array describing the first curve curve2: numpy.ndarray array describing the first curve Returns ---------- tuple x,y coordinates where *curve1* and *curve2* intersect Raises ---------- ValueError if no intersection coordinates could be found Example ---------- See [refer to linescanning.prf.SizeResponse.find_stim_sizes] """ first_line = geometry.LineString(np.column_stack((xx, curve1))) second_line = geometry.LineString(np.column_stack((xx, curve2))) geom = first_line.intersection(second_line) try: if isinstance(geom, geometry.multipoint.MultiPoint): # multiple coordinates coords = [i.coords._coords for i in list(geom.geoms)] elif isinstance(geom, geometry.point.Point): # single coordinate coords = [geom.coords._coords] elif isinstance(geom, geometry.collection.GeometryCollection): # coordinates + line segments mapper = geometry.mapping(geom) coords = [] for el in mapper["geometries"]: coor = np.array(el["coordinates"]) if coor.ndim > 1: coor = coor[0] coords.append(coor[np.newaxis,...]) # to make indexing same as above else: raise ValueError(f"Can't deal with output of type {type(geom)}") # coords = geom except: raise ValueError("Could not find intersection between curves..") return coords
[docs] def disassemble_fmriprep_wf(wf_path, subj_ID, prefix="sub-"): """disassemble_fmriprep_wf Parses the workflow-folder from fMRIPrep into its constituents to recreate a filename. Searches for the following keys: `['ses', 'task', 'acq', 'run']`. Parameters ---------- wf_path: str Path to workflow-folder subj_ID: str Subject ID to append to `prefix` prefix: str, optional Forms together with `subj_ID` the beginning of the new filename. By default "sub-" Returns ---------- str filename based on constituent file parts Example ---------- >>> from linescanning.utils import disassemble_fmriprep_wf >>> wf_dir = "func_preproc_ses_2_task_pRF_run_1_acq_3DEPI_wf" >>> fname = disassemble_fmriprep_wf(wf_dir, "001") >>> fname 'sub-001_ses-2_task-pRF_acq-3DEPI_run-1' """ wf_name = [ii for ii in wf_path.split(os.sep) if "func_preproc" in ii][0] wf_elem = wf_name.split("_") fname = [f"{prefix}{subj_ID}"] for tag in ['ses', 'task', 'acq', 'run']: if tag in wf_elem: idx = wf_elem.index(tag)+1 fname.append(f"{tag}-{wf_elem[idx]}") fname = "_".join(fname) return fname
[docs] def assemble_fmriprep_wf(bold_path, wf_only=False): """assemble_fmriprep_wf Parses the bold file into a workflow name for fMRIPrep into its constituents to recreate a filename. Searches for the following keys: `['ses', 'task', 'acq', 'run']`. Parameters ---------- bold_path: str Path to bold-file wf_only: bool, optional If `sub` tag is found in `bold_path`, we can reconstruct the full workflow folder including preceding `single_subject_<sub_id>_wf`. If you do not want this, set `wf_only` to **False**. Returns ---------- str filename based on constituent file parts Example ---------- >>> from linescanning.utils import disassemble_fmriprep_wf >>> bold_file = "sub-008_ses-2_task-SRFi_acq-3DEPI_run-1_desc-preproc_bold.nii.gz" >>> wf_name = assemble_fmriprep_wf(bold_file) >>> wf_name >>> 'single_subject_008_wf/func_preproc_ses_2_task_SRFi_run_1_acq_3DEPI_wf' >>> # workflow name only >>> wf_name = assemble_fmriprep_wf(bold_file, wf_only=True) >>> wf_name >>> 'func_preproc_ses_2_task_SRFi_run_1_acq_3DEPI_wf' """ bids_comps = split_bids_components(os.path.basename(bold_path)) fname = ["func_preproc"] for tag in ['ses', 'task', 'run', 'acq']: if tag in list(bids_comps.keys()): fname.append(f"{tag}_{bids_comps[tag]}") base_dir = "" fname = "_".join(fname)+"_wf" if 'sub' in list(bids_comps.keys()): base_dir = f"single_subject_{bids_comps['sub']}_wf" if wf_only: return fname else: return opj(base_dir, fname) else: return fname
[docs] def resample2d(array:np.ndarray, new_size:int, kind='linear'): """resample2d Resamples a 2D (or 3D) array with :func:`scipy.interpolate.interp2d` to `new_size`. If input is 2D, we'll loop over the final axis. Parameters ---------- array: np.ndarray Array to be interpolated. Ideally axis have the same size. new_size: int New size of array kind: str, optional Interpolation method, by default 'linear' Returns ---------- np.ndarray If 2D: resampled array of shape `(new_size,new_size)` If 3D: resampled array of shape `(new_size,new_size, array.shape[-1])` """ # set up interpolater x = np.array(range(array.shape[0])) y = np.array(range(array.shape[1])) # define new grid xnew = np.linspace(0, x.shape[0], new_size) ynew = np.linspace(0, y.shape[0], new_size) if array.ndim > 2: new = np.zeros((new_size,new_size,array.shape[-1])) for dd in range(array.shape[-1]): f = interpolate.interp2d(x, y, array[...,dd], kind=kind) new[...,dd] = f(xnew,ynew) return new else: f = interpolate.interp2d(x, y, array, kind=kind) return f(xnew,ynew)
class FindFiles(): def __init__(self, directory, extension, exclude=None, maxdepth=None, filters=None): self.directory = directory self.extension = extension self.exclude = exclude self.maxdepth = maxdepth self.filters = filters self.files = [] for filename in self.find_files(self.directory, f'*{self.extension}', maxdepth=self.maxdepth): if not filename.startswith('._'): self.files.append(filename) self.files.sort() if isinstance(self.exclude, (str,list)) or isinstance(self.filters, (list, str)): if isinstance(self.filters, str): self.filters = [self.filters] elif isinstance(self.filters, list): pass else: self.filters = [] self.files = get_file_from_substring(self.filters, self.files, exclude=self.exclude) @staticmethod def find_files(directory, pattern, maxdepth=None): start = None if isinstance(maxdepth, int): start = 0 for root, dirs, files in os.walk(directory, followlinks=True): for basename in files: if fnmatch.fnmatch(basename, pattern): filename = os.path.join(root, basename) yield filename if isinstance(maxdepth, int): if start > maxdepth: break if isinstance(start, int): start += 1
[docs] def round_decimals_down(number:float, decimals:int=2): """ Returns a value rounded down to a specific number of decimal places. see: https://kodify.net/python/math/round-decimals/#round-decimal-places-up-and-down-round """ if not isinstance(decimals, int): raise TypeError("decimal places must be an integer") elif decimals < 0: raise ValueError("decimal places has to be 0 or more") elif decimals == 0: return math.floor(number) factor = 10 ** decimals return math.floor(number * factor) / factor
[docs] def round_decimals_up(number:float, decimals:int=2): """ Returns a value rounded up to a specific number of decimal places. see: https://kodify.net/python/math/round-decimals/#round-decimal-places-up-and-down-round """ if not isinstance(decimals, int): raise TypeError("decimal places must be an integer") elif decimals < 0: raise ValueError("decimal places has to be 0 or more") elif decimals == 0: return math.ceil(number) factor = 10 ** decimals return math.ceil(number * factor) / factor
def make_polar_cmap(): top = cm.get_cmap('hsv', 256) bottom = cm.get_cmap('hsv', 256) newcolors = np.vstack((top(np.linspace(0, 1, 256)), bottom(np.linspace(0, 1, 256)))) cmap = mcolors.ListedColormap(newcolors, name='hsvx2') return cmap # https://stackoverflow.com/a/50692782 def paste_slices(tup): pos, w, max_w = tup wall_min = max(pos, 0) wall_max = min(pos+w, max_w) block_min = -min(pos, 0) block_max = max_w-max(pos+w, max_w) block_max = block_max if block_max != 0 else None return slice(wall_min, wall_max), slice(block_min, block_max) def paste(large, small, loc=(0,0)): loc_zip = zip(loc, small.shape, large.shape) large_slices, small_slices = zip(*map(paste_slices, loc_zip)) large[large_slices] = small[small_slices] def SDT(hits, misses, fas, crs): from scipy import stats Z = stats.norm.ppf """ returns a dict with d-prime measures given hits, misses, false alarms, and correct rejections""" # Floors an ceilings are replaced by half hits and half FA's half_hit = 0.5 / (hits + misses) half_fa = 0.5 / (fas + crs) # Calculate hit_rate and avoid d' infinity hit_rate = hits / (hits + misses) if hit_rate == 1: hit_rate = 1 - half_hit if hit_rate == 0: hit_rate = half_hit # Calculate false alarm rate and avoid d' infinity fa_rate = fas / (fas + crs) if fa_rate == 1: fa_rate = 1 - half_fa if fa_rate == 0: fa_rate = half_fa # Return d', beta, c and Ad' out = {} out['d'] = Z(hit_rate) - Z(fa_rate) out['beta'] = math.exp((Z(fa_rate)**2 - Z(hit_rate)**2) / 2) out['c'] = -(Z(hit_rate) + Z(fa_rate)) / 2 out['Ad'] = stats.norm.cdf(out['d'] / math.sqrt(2)) out['hit'] = hit_rate out['fa'] = fa_rate return(out) def update_kwargs(kwargs, el, val, force=False): if not force: if not el in list(kwargs.keys()): kwargs[el] = val else: kwargs[el] = val return kwargs
[docs] def create_sinewave( amplitude=1, frequency=1, phase=0, sampling_rate=100, duration=1 ): """create_sinewave Parameters ---------- amplitude: int,float Amplitude of the wave. Default = 1 frequency: int,float Frequency of the wave in Hertz. Default = 1 phase: int,float Phase shift of the wave in radians sampling_rate: int Number of samples per second duration: int Duration of the wave in seconds Returns ---------- A tuple with first element the numpy array containing the sine wave. The second element is the time axis Example ---------- >>> from linescanning import utils >>> wave,time = utils.create_sinewave() >>> from linescanning import utils >>> wave,time = utils.create_sinewave( >>> frequency=4, >>> amplitude=0.2 >>> ) """ time = np.linspace(0, duration, int(duration * sampling_rate), endpoint=False) sine_wave = amplitude * np.sin(2*np.pi*frequency*time+phase) return sine_wave,time