Source code for PAMI.extras.dbStats.UtilityDatabase

# UtilityDatabase is a code used to get stats of the database.
#
# **Importing this algorithm into a python program**
# --------------------------------------------------------
#
#             from PAMI.extras.dbStats import UtilityDatabase as db
#
#             obj = db.UtilityDatabase(iFile, "\t")
#
#             obj.save(oFile)
#
#             obj.run()
#
#             obj.printStats()
#




__copyright__ = """
Copyright (C)  2021 Rage Uday Kiran

     This program is free software: you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published by
     the Free Software Foundation, either version 3 of the License, or
     (at your option) any later version.

     This program is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     GNU General Public License for more details.

     You should have received a copy of the GNU General Public License
     along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

import sys
import statistics
from urllib.request import urlopen
import pandas as pd
from typing import Union
import PAMI.extras.graph.plotLineGraphFromDictionary as plt

[docs] class UtilityDatabase: """ :Description: UtilityDatabase is class to get stats of database. :Attributes: :param inputFile: file : input file path :param sep: str separator in file. Default is tab space. **Importing this algorithm into a python program** -------------------------------------------------------- .. code-block:: python from PAMI.extras.dbStats import UtilityDatabase as db obj = db.UtilityDatabase(iFile, "\t" ) obj.save(oFile) obj.run() obj.printStats() """ def __init__(self, inputFile: Union[str, pd.DataFrame], sep: str='\t') -> None: """ :param inputFile: input file name or path :type inputFile: str :param sep: separator in file :type sep: str or :return: None """ self.inputFile = inputFile self.database = {} self.lengthList = [] self.utility = {} self.sep = sep self.Database,self.utilityValues = None,None
[docs] def run(self) -> None: self.readDatabase()
[docs] def creatingItemSets(self) -> None: """ Storing the complete transactions of the database/input file in a database variable """ self.Database = [] self.utilityValues = [] if isinstance(self.inputFile, pd.DataFrame): if self.inputFile.empty: print("its empty..") i = self.inputFile.columns.values.tolist() if 'Transactions' in i: self.Database = self.inputFile['Transactions'].tolist() if 'Patterns' in i: self.Database = self.inputFile['Patterns'].tolist() if 'Utility' in i: self.utilityValues = self.inputFile['Utility'].tolist() if isinstance(self.inputFile, str): if self.inputFile.startswith("http://") or self.inputFile.startswith("https://"): data = urlopen(self.inputFile) for line in data: line.strip() line = line.decode("utf-8") temp = [i.rstrip() for i in line.split(":")] transaction = [s for s in temp[0].split(self.sep)] self.Database.append([x for x in transaction if x]) utilities = [int(s) for s in temp[2].split(self.sep)] self.utilityValues.append([x for x in utilities if x]) else: try: with open(self.inputFile, 'r', encoding='utf-8') as f: for line in f: line.strip() temp = [i.rstrip() for i in line.split(":")] transaction = [s for s in temp[0].split(self.sep)] self.Database.append([x for x in transaction if x]) utilities = [int(s) for s in temp[2].split(self.sep)] self.utilityValues.append([x for x in utilities if x]) except IOError: print("File Not Found") quit()
[docs] def readDatabase(self) -> None: """ read database from input file and store into database and size of each transaction. """ numberOfTransaction = 0 self.creatingItemSets() for k in range(len(self.Database)): numberOfTransaction += 1 transaction = self.Database[k] utilities = self.utilityValues[k] self.database[numberOfTransaction] = transaction for i in range(len(transaction)): self.utility[transaction[i]] = self.utility.get(transaction[i],0) self.utility[transaction[i]] += utilities[i] self.lengthList = [len(s) for s in self.database.values()] self.utility = {k: v for k, v in sorted(self.utility.items(), key=lambda x:x[1], reverse=True)}
[docs] def getDatabaseSize(self) -> int: """ get the size of database :return: size of database :rtype: int """ return len(self.database)
[docs] def getTotalNumberOfItems(self) -> int: """ get the number of items in database. :return: number of items :rtype: int """ return len(self.getSortedListOfItemFrequencies())
[docs] def getMinimumTransactionLength(self) -> int: """ get the minimum transaction length :return: minimum transaction length :rtype: int """ return min(self.lengthList)
[docs] def getAverageTransactionLength(self) -> float: """ get the average transaction length. It is sum of all transaction length divided by database length. :return: average transaction length :rtype: float """ totalLength = sum(self.lengthList) return totalLength / len(self.database)
[docs] def getMaximumTransactionLength(self) -> int: """ get the maximum transaction length :return: maximum transaction length :rtype: int """ return max(self.lengthList)
[docs] def getStandardDeviationTransactionLength(self) -> float: """ get the standard deviation transaction length :return: standard deviation transaction length :rtype: float """ return statistics.pstdev(self.lengthList)
[docs] def getVarianceTransactionLength(self) -> float: """ get the variance transaction length :return: variance transaction length :rtype: float """ return statistics.variance(self.lengthList)
[docs] def getNumberOfItems(self) -> int: """ get the number of items in database. :return: number of items :rtype: int """ return len(self.getSortedListOfItemFrequencies())
[docs] def getSparsity(self) -> float: # percentage of 0 dense dataframe """ get the sparsity of database :return: sparsity of database in floating values :rtype: float """ matrixSize = self.getDatabaseSize()*len(self.getSortedListOfItemFrequencies()) return (matrixSize - sum(self.getSortedListOfItemFrequencies().values())) / matrixSize
[docs] def getSortedListOfItemFrequencies(self) -> dict: """ get sorted list of item frequencies :return: item frequencies :rtype: dict """ itemFrequencies = {} for tid in self.database: for item in self.database[tid]: itemFrequencies[item] = itemFrequencies.get(item, 0) itemFrequencies[item] += 1 return {k: v for k, v in sorted(itemFrequencies.items(), key=lambda x:x[1], reverse=True)}
[docs] def getFrequenciesInRange(self) -> dict: """ This function is used to get the Frequencies in range :return: Frequencies In Range :rtype: dict """ fre = self.getSortedListOfItemFrequencies() rangeFrequencies = {} maximum = max([i for i in fre.values()]) values = [int(i*maximum/6) for i in range(1,6)] #print(maximum) va = len({key: val for key, val in fre.items() if 0 < val < values[0]}) rangeFrequencies[va] = values[0] for i in range(1,len(values)): va = len({key: val for key, val in fre.items() if values[i] > val > values[i - 1]}) rangeFrequencies[va] = values[i] return rangeFrequencies
[docs] def getTransanctionalLengthDistribution(self) -> dict: """ get transaction length :return: a dictionary of Transaction Length Distribution :rtype: dict """ transactionLength = {} for length in self.lengthList: transactionLength[length] = transactionLength.get(length, 0) transactionLength[length] += 1 return {k: v for k, v in sorted(transactionLength.items(), key=lambda x:x[0])}
[docs] def save(self, data, outputFile) -> None: """ store data into outputFile :param data: input data :type data: dict :param outputFile: output file name or path to store :type outputFile: str :return: None """ with open(outputFile, 'w') as f: for key, value in data.items(): f.write(f'{key}\t{value}\n')
[docs] def getTotalUtility(self) -> int: """ get sum of utility :return: total utility :rtype: int """ return sum(list(self.utility.values()))
[docs] def getMinimumUtility(self) -> int: """ get the minimum utility :return: integer value of minimum utility :rtype: int """ return min(list(self.utility.values()))
[docs] def getAverageUtility(self) -> float: """ get the average utility :return: average utility :rtype: float """ return sum(list(self.utility.values())) / len(self.utility)
[docs] def getMaximumUtility(self) -> int: """ get the maximum utility :return: integer value of maximum utility :rtype: int """ return max(list(self.utility.values()))
[docs] def getSortedUtilityValuesOfItem(self) -> dict: """ get sorted utility value each item. key is item and value is utility of item :return: sorted dictionary utility value of item :rtype: dict """ return self.utility
[docs] def printStats(self) -> None: """ This function is used to print the results """ print(f'Database size : {self.getDatabaseSize()}') print(f'Number of items : {self.getTotalNumberOfItems()}') print(f'Minimum Transaction Size : {self.getMinimumTransactionLength()}') print(f'Average Transaction Size : {self.getAverageTransactionLength()}') print(f'Maximum Transaction Size : {self.getMaximumTransactionLength()}') print(f'Minimum utility : {self.getMinimumUtility()}') print(f'Average utility : {self.getAverageUtility()}') print(f'Maximum utility : {self.getMaximumUtility()}') print(f'Standard Deviation Transaction Size : {self.getStandardDeviationTransactionLength()}') print(f'Variance : {self.getVarianceTransactionLength()}') print(f'Sparsity : {self.getSparsity()}')
[docs] def plotGraphs(self) -> None: itemFrequencies = self.getFrequenciesInRange() transactionLength = self.getTransanctionalLengthDistribution() plt.plotLineGraphFromDictionary(itemFrequencies, 100, 0, 'Frequency', 'no of items', 'frequency') plt.plotLineGraphFromDictionary(transactionLength, 100, 0, 'transaction length', 'transaction length', 'frequency')
if __name__ == '__main__': try: if len(sys.argv) != 3: raise ValueError("Missing some of the input parameters. Format: python UtilityDatabase.py <fileName> <seperator (optional)>") iFile, separator = sys.argv[1], sys.argv[2] obj = UtilityDatabase(iFile, separator) obj.run() if obj.getDatabaseSize() > 0: obj.printStats() obj.plotGraphs() else: print("No data found in the database.") except ValueError as ve: print(f"ValueError: {ve}")