Source code for bioden.processor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  Copyright 2010, 2011, 2015 GiMaRIS <info@gimaris.com>
#
#  This file is part of BioDen - A data normalizer and transponer for
#  files containing taxon biomass/density data for ecotopes.
#
#  BioDen is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  BioDen is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sys
import os
import time
import threading
from sqlite3 import dbapi2 as sqlite
import csv

from gi.repository import GObject
import xlrd

import bioden.std
import bioden.exporter

class DataProcessor(threading.Thread):
    def __init__(self):
        super(DataProcessor, self).__init__()
        self._stop = threading.Event()
        self._input_file = None
        self._reader = None
        self._output_folder = None
        self._property = None
        self._dbfile = None
        self._do_round = None
        self._target_sample_surface = 0.2
        self._output_format = 'csv'
        self._pdialog = None
        self.pdialog_handler = bioden.std.ProgressDialogHandler()
        self._representative_groups = {}
        self._properties = {
            'density': 'sum_of_density',
            'biomass': 'sum_of_biomass'
        }
        self.ecotopes = []
        self.taxa = []
        self.csv_dialect = csv.excel

        # Set the path to the database file.
        self.set_directives()

    def stop(self):
        """Stop this thread."""
        self._stop.set()

    def stopped(self):
        """Return True if this thread needs to be stopped."""
        return self._stop.is_set()

    def set_input_file(self, filename, type):
        """Set the input file and type."""
        supported_types = ('csv','xls')
        if type not in supported_types:
            raise ValueError("Unknown file type '%s'." % type)
        self._input_file = (filename, type)

    def set_csv_dialect(self, delimiter, quotechar):
        self.csv_dialect = csv.excel
        self.csv_dialect.delimiter = delimiter
        self.csv_dialect.quotechar = quotechar

    def create_reader(self):
        """Set a file reader from the input file and type."""
        if self._input_file[1] == "csv":
            reader = csv.DictReader(open(self._input_file[0]), dialect=self.csv_dialect,
                fieldnames=None)
            self.set_reader(reader)
        elif self._input_file[1] == "xls":
            book = xlrd.open_workbook(self._input_file[0])
            self.set_reader(book)

    def set_directives(self):
        """Set the path to the database file."""
        data_path = os.path.expanduser(os.path.join('~','.bioden'))

        # Check if the data folder exists. If not, create it.
        if not os.path.exists(data_path):
            os.makedirs(data_path)

        # Set the path to the database file.
        self._dbfile = os.path.join(data_path, 'data.sqlite')

    def set_progress_dialog(self, dialog):
        self._pdialog = dialog
        self.pdialog_handler.set_progress_dialog(dialog)

    def set_property(self, property):
        if property in self._properties:
            self._property = property
        else:
            raise ValueError("Property can be either 'density' or "
                "'biomass', not '%s'." % property)

    def set_output_folder(self, output_folder):
        if not os.path.exists(output_folder):
            raise ValueError("Output folder does not exist.")
        self._output_folder = output_folder

    def set_round(self, round_to):
        if not isinstance(round_to, int) or round_to < 0:
            raise ValueError("Argument 'round_to' must be an integer >= 0.")
        self._do_round = round_to

    def set_target_sample_surface(self, surface):
        if not isinstance(surface, float) or surface <= 0:
            raise ValueError("Argument 'surface' must be a float > 0.")
        self._target_sample_surface = surface

    def set_output_format(self, format):
        formats = ('csv', 'xls')
        if format not in formats:
            raise ValueError("Possible formats are 'csv' and 'xls', not '%s'." % format)
        self._output_format = format

    def run(self):
        # Create a CSV or XSL generator.
        if self._output_format == 'csv':
            generator = bioden.exporter.CSVExporter(self)
        elif self._output_format == 'xls':
            generator = bioden.exporter.XLSExporter(self)

        # Check if all required settings are set.
        self.check_settings()

        # Load the data.
        self.pdialog_handler.set_action("Loading data...")
        self.pdialog_handler.add_details("Loading data...")
        try:
            # Create and set the file reader.
            self.create_reader()

            # Load the data.
            self.load_data()
        except Exception as strerror:
            # Emit the signal that the process has failed.
            GObject.idle_add(bioden.std.sender.emit, 'load-data-failed', strerror)
            return

        # Pre-process some data. This will populate self.ecotopes, which
        # is needed now by the progress dialog handler.
        self.pre_process()

        # Set the number of times we will call pdialog_handler.increase().
        steps = 7 + (len(self.ecotopes) * 4)
        self.pdialog_handler.set_total_steps(steps)

        if not self.stopped():
            # Process data for the property 'self._property'.
            self.pdialog_handler.increase("Making sample groups for property '%s'..." % (self._property))
            # Here, pdialog_handler.increase will be called for each ecotope.
            self.process()

        if not self.stopped():
            # Export the results.
            self.pdialog_handler.increase("Exporting non-grouped ecotope data...")
            # Here, pdialog_handler.increase will be called for each ecotope.
            generator.export_ecotopes_raw()

        if not self.stopped():
            self.pdialog_handler.increase("Exporting raw ecotope groups...")
            # Here, pdialog_handler.increase will be called for each ecotope.
            generator.export_ecotopes_grouped('raw')

        if not self.stopped():
            self.pdialog_handler.increase("Exporting normalized ecotope groups...")
            # Here, pdialog_handler.increase will be called for each ecotope.
            generator.export_ecotopes_grouped('normalized')

        if not self.stopped():
            self.pdialog_handler.increase("Determining representative sample group for each ecotope...")
            self.determine_representative_groups()

        if not self.stopped():
            self.pdialog_handler.increase("Exporting representative sample groups...")
            generator.export_representatives()

            self.pdialog_handler.increase("")

            # Emit the signal that the process was successful.
            GObject.idle_add(bioden.std.sender.emit, 'process-finished')

    def check_settings(self):
        if not self._input_file:
            raise ValueError("Attribute 'input_file' has not been set.")
        if not self._dbfile:
            raise ValueError("Attribute 'dbfile' has not been set.")
        if not self._property:
            raise ValueError("Attribute 'property' has not been set.")
        if not self._output_folder:
            raise ValueError("Attribute 'output_folder' has not been set.")

        return True

    def make_db(self):
        """Create the database file with the necessary tables."""

        # Delete the current database file.
        if os.path.isfile(self._dbfile):
            self.remove_db_file()

        # This will automatically create a new database file.
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()

        cursor.execute("CREATE TABLE data ( \
            id INTEGER PRIMARY KEY, \
            sample_code INTEGER, \
            compiled_ecotope VARCHAR, \
            standardised_taxon VARCHAR, \
            sum_of_density REAL, \
            sum_of_biomass REAL \
        )")

        cursor.execute("CREATE TABLE samples ( \
            sample_code INTEGER PRIMARY KEY, \
            sample_surface REAL \
        )")

        cursor.execute("CREATE TABLE sums_of ( \
            id INTEGER PRIMARY KEY, \
            group_id INTEGER, \
            compiled_ecotope VARCHAR, \
            standardised_taxon VARCHAR, \
            sum_of REAL, \
            group_surface REAL \
        )")

        cursor.execute("CREATE TABLE normalized_sums_of (\
            id INTEGER PRIMARY KEY, \
            group_id INTEGER, \
            compiled_ecotope VARCHAR, \
            standardised_taxon VARCHAR, \
            sum_of REAL, \
            group_surface REAL \
        )")

        cursor.execute("CREATE TABLE biodiversity ( \
            id INTEGER PRIMARY KEY, \
            compiled_ecotope VARCHAR, \
            group_id INTEGER, \
            diversity INTEGER \
        )")

        # Commit the transaction.
        connection.commit()

        # Close connection with the local database.
        cursor.close()
        connection.close()

    def remove_db_file(self, tries=0):
        """Remove the database file."""
        if tries > 2:
            raise EnvironmentError("Unable to remove the file %s. "
                "Please make sure it's not in use by a different "
                "process." % self._dbfile)
        try:
            os.remove(self._dbfile)
        except:
            tries += 1
            time.sleep(2)
            self.remove_db_file(tries)
        return True

    def pre_process(self):
        # This will automatically create a new database file.
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()

        # Compile a list of all taxa.
        cursor.execute("SELECT standardised_taxon FROM data")
        for taxon in cursor:
            if taxon[0] not in self.taxa:
                self.taxa.append(taxon[0])

        # Compile a list of all ecotopes.
        cursor.execute("SELECT compiled_ecotope FROM data")
        for ecotope in cursor:
            # Convert ecotopes to lower case to account for upper/lowercase
            # differences.
            ecotope = ecotope[0].lower()

            if ecotope not in self.ecotopes:
                self.ecotopes.append(ecotope)

        # Close connection with the local database.
        cursor.close()
        connection.close()

    def process(self):
        """Calculate the sample groups with a sample surface of
        'self._target_sample_surface' and save them to the database.
        """
        log = "Processing data for property '%s'..." % self._property
        self.pdialog_handler.add_details(log)

        # Set the field to select from based on the property.
        self.select_field = self._properties[self._property]

        # This will automatically create a new database file.
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()

        # Walk through each ecotope.
        for ecotope in self.ecotopes:
            # Update the progress dialog.
            self.pdialog_handler.increase()

            # Create a log message.
            log = "Processing ecotope '%s'..." % ecotope
            self.pdialog_handler.add_details(log)

            # Get all sample codes matching the current ecotope.
            cursor.execute("SELECT sample_code "
                "FROM data "
                "WHERE compiled_ecotope = ?",
                (ecotope,))

            sample_codes_for_ecotope = []
            for sample_code in cursor:
                sample_code = str(sample_code[0])
                if sample_code not in sample_codes_for_ecotope:
                    sample_codes_for_ecotope.append(sample_code)

            # Group the sums into groups with a surface of
            # 'self._target_sample_surface' or higher.
            groups = self.make_groups(sample_codes_for_ecotope)

            # Get each group from the sample, and insert the data for
            # that group into the database.
            for group_id, group in enumerate(groups, start=1):
                # Unpack each raw group.
                group_surface, group_data = group

                # Unpack group data and insert it into the database.
                for taxon, sum_of in group_data.iteritems():
                    cursor.execute("INSERT INTO sums_of \
                        VALUES (null,?,?,?,?,?)",
                        (group_id, ecotope, taxon,
                        sum_of, group_surface))

            # Make normalized groups out of the raw groups.
            # Make all group surfaces exactly 'self._target_sample_surface' and
            # transform the corresponding sums accrodingly.
            normalized_groups = self.normalize_groups(groups)

            # Insert the normalized data into the database as well.
            for group_id, group in enumerate(normalized_groups, start=1):
                # Unpack each normalized group.
                group_surface, group_data = group

                # Unpack group data and insert it into the database.
                for taxon, sum_of in group_data.iteritems():
                    cursor.execute("INSERT INTO normalized_sums_of \
                        VALUES (null,?,?,?,?,?)",
                        (group_id, ecotope, taxon,
                        sum_of, group_surface))

        # Commit the transaction.
        connection.commit()

        # Close connection with the local database.
        cursor.close()
        connection.close()

    def make_groups(self, sample_codes):
        """Return sample groups with a sample surface of
        `self._target_sample_surface` or higher for the list of sample codes
        `sample_codes`.
        """
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()
        cursor2 = connection.cursor()

        # Select sample codes+surfaces for the sample codes in the list
        # 'sample_codes'.
        sample_codes = ",".join(sample_codes)
        cursor.execute("SELECT sample_code, sample_surface "
            "FROM samples "
            "WHERE sample_code "
            "IN (%s)" % sample_codes)

        # This list will contain the dictionaries with sums per taxon
        # for the supplied sample codes.
        groups = []

        # The surface of the current group.
        group_surface = 0.0

        # This dictionary contains the sum per taxon for the
        # current group.
        group_data = {}

        for sample_code,sample_surface in cursor:
            # A group surface is the sum of all sample surfaces that
            # makes up the group. So each time we process a new sample,
            # add that sample's surface to the group surface.
            group_surface += sample_surface

            # Get all taxon data for the current sample.
            cursor2.execute("SELECT standardised_taxon, %s "
                "FROM data "
                "WHERE sample_code = ?" % (self.select_field),
                (sample_code,))

            # Calculate the sums of all taxa we encounter.
            for taxon,sum_of in cursor2:
                if taxon in group_data:
                    group_data[taxon] += sum_of
                else:
                    group_data[taxon] = sum_of

            # Check if we reached the current group's maximum surface.
            if group_surface >= self._target_sample_surface:
                # When this group reached a group_surface of
                # 'self._target_sample_surface' or higher, add it to the
                # 'groups' list. Note that this means that if we don't reach
                # 'self._target_sample_surface', the group won't be processed.
                groups.append( [group_surface,group_data] )

                # Reset the current group so we can start a new group.
                group_data = {}
                group_surface = 0.0

        # Close connection with the local database.
        cursor.close()
        cursor2.close()
        connection.close()

        # Return the groups.
        return groups

    def normalize_groups(self, groups):
        """Return a normalized version of sample groups `groups`. It
        converts the sums of the groups to a sample surface of exactly
        `self._target_sample_surface`.
        """
        for i, group in enumerate(groups):
            # Unpack current group.
            group_surface, group_data = group

            # Skip the calculations for this group if the surface
            # is already equal to 'self._target_sample_surface'.
            if group_surface == self._target_sample_surface:
                continue

            # Calculate the factor needed to calculate the sum for a
            # surface of 'self._target_sample_surface' for each group.
            factor = self._target_sample_surface / group_surface

            # Unpack group data.
            for taxon, sum_of in group_data.iteritems():
                # Calculate the sum if the group surface would be
                # 'self._target_sample_surface'.
                new_sum_of = sum_of * factor

                # Set the new sum_of value for this taxon in the current
                # group.
                groups[i][1][taxon] = new_sum_of

            # When done with all taxa for this group, set the value for
            # group surface to 'self._target_sample_surface'.
            groups[i][0] = self._target_sample_surface

        # Return the normalized groups.
        return groups

    def __determine_biodiversities(self):
        """Calculate the biodiversity for each sample group."""
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()

        for ecotope in self.ecotopes:
            # Get the number of groups for this ecotope.
            cursor.execute("SELECT group_id \
                FROM sums_of \
                WHERE compiled_ecotope = ?",
                (ecotope,)
                )
            groups = []
            for id in cursor:
                if id[0] not in groups:
                    groups.append(id[0])

            for group_id in groups:
                # We define the biodiversity for each group by looking
                # up the number of taxa registered by that group ID and
                # ecotope.
                # Note: We exclude the taxa which have a 'sum of' of 0.
                cursor.execute("SELECT COUNT(standardised_taxon) \
                    FROM sums_of \
                    WHERE compiled_ecotope = ? \
                    AND group_id = ? \
                    AND sum_of > 0",
                    (ecotope, group_id)
                    )
                diversity = cursor.fetchone()[0]

                cursor.execute("INSERT INTO biodiversity \
                    VALUES (null,?,?,?)",
                    (ecotope, group_id, diversity)
                    )

        # Commit the transaction.
        connection.commit()

        # Close connection with the local database.
        cursor.close()
        connection.close()

    def determine_representative_groups(self):
        """Determine which sample group is the most representative
        for each ecotope by finding which ecotope group's biodiversity
        is closest to the biodiversity median of the ecotope.
        """

        # Determine the biodiversities.
        self.__determine_biodiversities()

        # Connect to the database.
        connection = sqlite.connect(self._dbfile)
        cursor = connection.cursor()

        # A dictionary containing the median of the biodiversities for
        # each ecotope.
        medians = {}

        for ecotope in self.ecotopes:
            # Get the number of groups for this ecotope.
            cursor.execute("SELECT diversity \
                FROM biodiversity \
                WHERE compiled_ecotope = ?",
                (ecotope,)
                )

            # Get all biodiversities for this ecotope.
            diversities = []
            for diversity in cursor:
                diversities.append(diversity[0])

            # Skip this ecotope if there were no diversities found, and thus
            # no groups.
            if len(diversities) == 0:
                continue

            # Calculate the median.
            medians[ecotope] = bioden.std.median(diversities)

        for ecotope in self.ecotopes:
            cursor.execute("SELECT diversity, group_id \
                FROM biodiversity \
                WHERE compiled_ecotope = ?",
                (ecotope,)
                )

            # Calculate the differences between the diversity median for
            # this ecotope and all diversities from this ecotope. Save
            # the group_id for the group with the smalles difference.
            smallest_difference = None
            for diversity, group_id in cursor:
                difference = abs(medians[ecotope] - diversity)

                if smallest_difference == None:
                    smallest_difference = difference
                    self._representative_groups[ecotope] = group_id
                elif difference < smallest_difference:
                    smallest_difference = difference
                    self._representative_groups[ecotope] = group_id

        # Close connection with the local database.
        cursor.close()
        connection.close()

[docs]class CSVProcessor(DataProcessor): """Process CSV data."""
[docs] def set_reader(self, reader): """Set the CSV reader.""" if isinstance(reader, csv.DictReader): self._reader = reader else: raise TypeError("Argument 'reader' must be an instance of 'csv.DictReader'.")
[docs] def load_data(self): """Extract the required columns from the CSV data and insert these into the database. """ # Create a new database file. self.make_db() # The required field names. fields = {'sample code': None, 'compiled ecotope': None, 'standardised taxon': None, 'density': None, 'biomass': None, 'sample surface': None} # Update the values in the 'fields' dictionary to the index number # for the corresponding field in the CSV file. fieldnames = self._reader.fieldnames for f in fields: for name in fieldnames: if f in name.lower(): fields[f] = name break # Connect with the database. connection = sqlite.connect(self._dbfile) cursor = connection.cursor() # List of sample codes. Used to check which sample codes have # already been inserted into the database. sample_codes = [] # Insert CSV data into database. for row in self._reader: sample_code = int(row[fields['sample code']]) # Insert the data into the 'data' table. if self._property == 'density': if not fields['density']: raise ValueError("The data file is missing the 'density' column.") cursor.execute("INSERT INTO data VALUES (null,?,?,?,?,null)", (sample_code, row[fields['compiled ecotope']].lower(), # Save ecotopes in lower case. row[fields['standardised taxon']], bioden.std.to_float(row[fields['density']]) )) elif self._property == 'biomass': if not fields['biomass']: raise ValueError("The data file is missing the 'biomass' column.") cursor.execute("INSERT INTO data VALUES (null,?,?,?,null,?)", (sample_code, row[fields['compiled ecotope']].lower(), # Save ecotopes in lower case. row[fields['standardised taxon']], bioden.std.to_float(row[fields['biomass']]) )) # Check is the current sample code is in the list of sample # codes. if sample_code not in sample_codes: # If not, add it to the list of sample codes, and # insert the sample code + surface into the 'samples' # table. sample_codes.append(sample_code) # Sample codes and sample surfaces are saved in a # separate table because each sample code is linked # to a single sample surface. cursor.execute("INSERT INTO samples VALUES (?,?)", ( sample_code, bioden.std.to_float(row[fields['sample surface']]) ) ) # Commit the transaction. connection.commit() # Close connection with the local database. cursor.close() connection.close()
[docs]class XLSProcessor(DataProcessor): """Process XSL data."""
[docs] def set_reader(self, book): """Set the XSL reader.""" if isinstance(book, xlrd.Book): # By default, use the first sheet in the Excel file. self._reader = self.sheet = book.sheets()[0] else: raise TypeError("Argument 'book' must be an instance of 'xlrd.Book'.")
[docs] def load_data(self): """Extract the required columns from the CSV data and insert these into the database. """ # Create a new database file. self.make_db() # The required field names. fields = {'sample code': None, 'compiled ecotope': None, 'standardised taxon': None, 'density': None, 'biomass': None, 'sample surface': None} # Update the values in the 'fields' dictionary to the index number # for the corresponding field in the XSL file. fieldnames = self.sheet.row_values(0) for f in fields: for name in fieldnames: if f in name.lower(): fields[f] = fieldnames.index(name) break # Connect with the database. connection = sqlite.connect(self._dbfile) cursor = connection.cursor() # List of sample codes. Used to check which sample codes have # already been inserted into the database. sample_codes = [] # Insert CSV data into database. for row_n in range(self.sheet.nrows): # Skip the first row, as this row contains the field names. if row_n == 0: continue # Get the values for the current row. row = self.sheet.row_values(row_n) # Get the sample code from the current row. sample_code = int(row[fields['sample code']]) # Insert the data into the 'data' table. if self._property == 'density': if not fields['density']: raise ValueError("The data file is missing the 'density' column.") cursor.execute("INSERT INTO data VALUES (null,?,?,?,?,null)", (sample_code, row[fields['compiled ecotope']].lower(), # Save ecotopes in lower case. row[fields['standardised taxon']], bioden.std.to_float(row[fields['density']]) )) elif self._property == 'biomass': if not fields['biomass']: raise ValueError("The data file is missing the 'biomass' column.") cursor.execute("INSERT INTO data VALUES (null,?,?,?,null,?)", (sample_code, row[fields['compiled ecotope']].lower(), # Save ecotopes in lower case. row[fields['standardised taxon']], bioden.std.to_float(row[fields['biomass']]) )) # Check if the current sample code is in the list of sample # codes. if sample_code not in sample_codes: # If not, add it to the list of sample codes, and # insert the sample code + surface into the 'samples' # table. sample_codes.append(sample_code) # Sample codes and sample surfaces are saved in a # separate table because each sample code is linked # to a single sample surface. cursor.execute("INSERT INTO samples VALUES (?,?)", ( sample_code, bioden.std.to_float(row[fields['sample surface']]) ) ) # Commit the transaction. connection.commit() # Close connection with the local database. cursor.close() connection.close()