Source code for spikes.utility
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2016 Akshay Raj Gollahalli
"""
Reads the data from folders and json files.
"""
from __future__ import print_function
import csv
import logging
import os
import re
import sys
import numpy as np
from sklearn.model_selection import train_test_split
from .utils.errors import LengthMismatchError, NotEnoughDataError, SplitDataError
from .utils.utils import log_it
__all__ = ['ReadCSV']
[docs]class ReadCSV(object):
"""Reads the CSV files in a given folder.
If no folder names is give, this program searches for folder ``Data``; If
no location is given.
:param str path: Needed.
Location of the directory containing the data files.
:param bool log: Default False.
Verbose mode.
"""
def __init__(self, path, log=False):
self.yes_no = log
self.data_folder = os.path.abspath(path) + os.sep
log_it(self.yes_no)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.DEBUG)
self.logger.debug("ReadCSV class called")
try:
if os.path.isdir(self.data_folder):
self.prefixed = [filename for filename in os.listdir(self.data_folder) if
filename.startswith("sam")]
self.logger.debug("Reading the data")
else:
self.logger.debug("%s does not exist", self.data_folder)
except IOError as err:
self.logger.exception("Data files not found - %s", err)
sys.exit(1)
if len(self.prefixed) is 1:
raise NotEnoughDataError("There should be more than one sample to continue.")
[docs] def get_samples(self):
"""
Reads all the sample starting with sam*.csv and returns a Pandas DataFrame.
:rtype: dict
:return samples:
A dictionary of ``samples`` and ``labels`` (this depends if you have labels file).
>>> data = ReadCSV('Data')
>>> data.get_samples()
{
'labels': [...],
'samples: array(...)
}
"""
self.prefixed.sort(key=natural_keys) # Sorted with filename and sample number
abs_path_of_samples = [self.data_folder + self.prefixed for self.prefixed in self.prefixed]
raw_samples_list = np.asarray(
[np.genfromtxt(sample, delimiter=',') for sample in abs_path_of_samples])
labels = self._get_class_labels()
if labels[0] is not False:
if len(raw_samples_list) != len(labels):
LengthMismatchError(
"Number of samples ({0}) is not equal to number of labels ({1}).".format(
len(raw_samples_list),
len(labels)))
samples = {'samples': raw_samples_list, 'labels': labels}
else:
samples = {'samples': raw_samples_list}
return samples
[docs] def get_split_data(self, split_to=0.5):
"""Split samples into training and testing data.
The default test size is 0.5 (50%)
:param float split_to:
Percentage split training and testing. Defaults to ``0.5``.
:rtype: dict
:return:
Dictionary of ``train_samples``, ``train_labels``, ``test_samples``,
``test_labels`` and ``split_percentage``.
"""
samples = self.get_samples()
if split_to > 1:
raise SplitDataError("Split data should be less that 1.0")
train_samples, train_labels, test_samples, test_labels = train_test_split(
samples['samples'],
samples['labels'],
test_size=split_to)
self.logger.debug("Data has been split into Training and Testing data")
return {'train_samples': train_samples, 'train_labels': train_labels,
'test_samples': test_samples,
'test_labels': test_labels,
'split_percentage': split_to}
[docs] def sample_size(self):
"""Returns the length of the sample size.
:rtype: int
:return size:
Length on samples.
"""
size = len(self.prefixed)
self.logger.debug("Sample size returned")
return size
[docs] def time_feature_length(self):
"""
Returns the time length of a file by counting it's number of columns.
:rtype: dict
:return data_dict:
Dictionary of ``time_length`` and ``feature_length``.
"""
_file = self.prefixed[0]
sample = []
with open(self.data_folder + _file) as csv_file:
reader = csv.reader(csv_file, delimiter=' ', quotechar='|')
for col in reader:
sample.append(col)
data_dict = {'time_length': len(sample), 'feature_length': len(sample[0][0].split(','))}
self.logger.debug("Feature length returned")
return data_dict
[docs] def get_feature_names(self):
"""
Reads feature names from ``feature_names_eeg.txt`` if it is present else creates a
feature names automatically.
:rtype: dict
:return data_dict:
A dictionary of Python list, which contains ``number_of_features`` and
``name_features``.
"""
names = []
number_of_features = 0
if os.path.isfile(self.data_folder + 'feature_names_eeg.txt'):
try:
with open(self.data_folder + 'feature_names_eeg.txt', 'r') as _file:
data = _file.read()
names = data.split('\n')
number_of_features = len(names)
except IOError as err:
self.logger.exception("File not found - %s", err)
else:
with open(self.data_folder + self.prefixed[0]) as _file:
number_of_features = len(_file.readline().split(','))
for num in range(1, number_of_features + 1):
names.append("feature {}".format(num))
return {'number_of_features': number_of_features, 'name_features': names}
def _get_class_labels(self):
_labels = []
if os.path.isfile(self.data_folder + 'tar_class_labels.csv'):
with open(self.data_folder + 'tar_class_labels.csv', 'r') as csv_file:
reader = csv.reader(csv_file, delimiter=' ', quotechar='|')
for row in reader:
_labels.append(int(row[0]))
else:
_labels.append(False)
self.logger.debug("Class labels returned")
return _labels
[docs]def atoi(text):
"""Checks if the file names contain numbers.
:param str text:
File name.
:rtype: bool
:return: True or false based on the digits in text.
"""
logging.debug("File names checked")
return int(text) if text.isdigit() else text
[docs]def natural_keys(text):
"""
Splits the number from the file name.
:param str text:
File name.
:rtype: list
:return:
Splits the ``text`` if number is present in it.
"""
logging.debug("File names split")
return [atoi(num) for num in re.split(r'(\d+)', text)]