Source code for xerparser.reader

# PyP6XER
# Copyright (C) 2020, 2021 Hassan Emam <hassan@constology.com>
#
# This file is part of PyP6XER.
#
# PyP6XER library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License v2.1 as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyP6XER is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyP6XER.  If not, see <https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html>.


'''
This file starts the process of reading and parsing xer files

'''
import csv
import mmap
import codecs
from xerparser import *
from typing import List
from xerparser.model.classes.data import Data
from xerparser.write import writeXER

[docs] class Reader: """ Reader class is the main parser for xer files. It takes a file path as a parameter. The read records are parsed into classes depending on their types like activities, wbs,etc. :param <file>: is an xer file that need to be parsed :return: Reader object that contains collection of parsed records :Example: from xerparser.reader import Reader file = Reader('file.xer') """
[docs] current_table = ''
[docs] current_headers = []
[docs] def write(self, filename=None): if filename is None: raise Exception("You have to provide the filename") writeXER(self, filename)
[docs] def create_object(self, object_type, params): """ Args: object_type: params: Returns: """ if object_type.strip() == "CURRTYPE": self._currencies.add(params) elif object_type.strip() == "ROLES": self._roles.add(params) elif object_type.strip() == "ACCOUNT": self._accounts.add(params) elif object_type.strip() == "ROLERATE": self._rolerates.add(params) elif object_type.strip() == "OBS": self._obss.add(params) elif object_type.strip() == "RCATTYPE": self._rcattypes.add(params) elif object_type.strip() == "UDFTYPE": self._udftypes.add(params) elif object_type.strip() == "RCATVAL": self._rcatvals.add(params) elif object_type.strip() == "PROJECT": self._projects.add(params, self._data) elif object_type.strip() == "CALENDAR": self._calendars.add(params) elif object_type.strip() == "SCHEDOPTIONS": self._schedoptions.add(params) elif object_type.strip() == "PROJWBS": self._wbss.add(params, self._data) elif object_type.strip() == "RSRC": self._resources.add(params) elif object_type.strip() == "RSRCCURVDATA": self._rsrcurves.add(params) elif object_type.strip() == "ACTVTYPE": self._acttypes.add(params) elif object_type.strip() == "PCATTYPE": self._pcattypes.add(params) elif object_type.strip() == "PROJPCAT": self._projpcats.add(params) elif object_type.strip() == "PCATVAL": self._pcatvals.add(params) elif object_type.strip() == "RSRCRATE": self._rsrcrates.add(params) elif object_type.strip() == "RSRCRCAT": self._rsrccats.add(params) elif object_type.strip() == "TASK": self._tasks.add(params, self._data) elif object_type.strip() == "ACTVCODE": self._actvcodes.add(params) elif object_type.strip() == "TASKPRED": self._predecessors.add(params) elif object_type.strip() == "TASKRSRC": self._activityresources.add(params, self._data) elif object_type.strip() == "TASKPROC": self._taskprocs.add(params) elif object_type.strip() == "TASKACTV": self._activitycodes.add(params, self._data) elif object_type.strip() == "UDFVALUE": self._udfvalues.add(params) elif object_type.strip() == "FINTMPL": self._fintmpls.add(params) elif object_type.strip() == "NONWORK": self._nonworks.add(params)
[docs] def summary(self): print('Number of activities: ', self.tasks.count) print('Number of relationships: ', len(TaskPred.obj_list))
@property
[docs] def projects(self) -> Projects: """ Projects :return: list of all projects contained in the xer file todo:: text """ return self._projects
@property
[docs] def activities(self) -> Tasks: """ Property to retrieve list of tasks Returns: list of tasks """ return self._tasks
@property
[docs] def wbss(self) -> WBSs: """ Property to return all wbs elements in the parsed file Returns: list of wbs elements """ return self._wbss
@property
[docs] def relations(self) -> Predecessors: """ Property to retrieve relationships from parsed file Returns: list of relations """ return self._predecessors
@property
[docs] def resources(self) -> Resources: """ Property to return a list of resources from parsed file Returns: list of resources of type Resources """ return self._resources
@property
[docs] def accounts(self) -> Accounts: """ Property to return a list of accounts from parsed files Returns: list of accounts of type Accounts """ return self._accounts
@property
[docs] def activitycodes(self) -> ActivityCodes: """ Property to return a list of activity codes from parsed files Returns: list of accounts of type ActivityCode """ return self._activitycodes
@property
[docs] def actvcodes(self) -> TaskActvs: return self._actvcodes
@property
[docs] def acttypes(self) -> ActTypes: return self._acttypes
@property
[docs] def calendars(self) -> Calendars: return self._calendars
@property
[docs] def currencies(self) -> Currencies: return self._currencies
@property
[docs] def obss(self) -> OBSs: return self._obss
@property
[docs] def rcattypes(self) -> RCatTypes: return self._rcattypes
@property
[docs] def rcatvals(self) -> RCatVals: return self._rcatvals
@property
[docs] def rolerates(self) -> RoleRates: return self._rolerates
@property
[docs] def roles(self) -> Roles: return self._roles
@property
[docs] def resourcecurves(self) -> ResourceCurves: return self._rsrcurves
@property
[docs] def resourcerates(self) -> ResourceRates: return self._rsrcrates
@property
[docs] def resourcecategories(self) -> ResourceCategories: return self._rsrccats
@property
[docs] def scheduleoptions(self) -> SchedOptions: return self._schedoptions
@property
[docs] def activityresources(self) -> ActivityResources: return self._activityresources
@property
[docs] def udfvalues(self) -> UDFValues: return self._udfvalues
@property
[docs] def udftypes(self) -> UDFTypes: return self._udftypes
@property
[docs] def pcattypes(self) -> List[PCatTypes]: return self._pcattypes
@property
[docs] def pcatvals(self) -> List[PCatVals]: return self._pcatvals
@property
[docs] def projpcats(self) -> List[ProjCat]: return self._projpcats
@property
[docs] def taskprocs(self) -> List[TaskProc]: return self._taskprocs
@property
[docs] def fintmpls(self) -> List[FinTmpl]: return self._fintmpls
@property
[docs] def nonworks(self) -> List[NonWork]: return self._nonworks
def __init__(self, filename):
[docs] self.file = filename
self._tasks = Tasks() self._predecessors = Predecessors() self._projects = Projects() self._wbss = WBSs() self._resources = Resources() self._accounts = Accounts() self._actvcodes = ActivityCodes() self._activitycodes = TaskActvs() self._acttypes = ActTypes() self._calendars = Calendars() self._currencies = Currencies() self._obss = OBSs() self._rcatvals = RCatVals() self._rcattypes = RCatTypes() self._rolerates = RoleRates() self._roles = Roles() self._rsrcurves = ResourceCurves() self._rsrcrates = ResourceRates() self._rsrccats = ResourceCategories() self._schedoptions = SchedOptions() self._activityresources = ActivityResources() self._udftypes = UDFTypes() self._udfvalues = UDFValues() self._pcattypes = PCatTypes() self._pcatvals = PCatVals() self._projpcats = ProjCats() self._taskprocs = TaskProcs() self._fintmpls = FinTmpls() self._nonworks = NonWorks() self._data = Data() self._data.projects = self._projects self._data.wbss = self._wbss self._data.tasks = self._tasks self._data.resources = self._resources self._data.taskresource = self._activityresources self._data.taskactvcodes = self._activitycodes self._data.predecessors = self._predecessors with codecs.open(filename, encoding='utf-8', errors='ignore') as tsvfile: stream = csv.reader(tsvfile, delimiter='\t') for row in stream: if row[0] =="%T": current_table = row[1] elif row[0] == "%F": current_headers = [r.strip() for r in row[1:]] elif row[0] == "%R": zipped_record = dict(zip(current_headers, row[1:])) self.create_object(current_table, zipped_record) # for line in content: # line_lst = line.split('\t') # if line_lst[0] == "%T": # current_table = line_lst[1] # elif line_lst[0] == "%R": # self.create_object(current_table, line_lst[1:])
[docs] def get_num_lines(self, file_path): fp = open(file_path, "r+") buf = mmap.mmap(fp.fileno(), 0) lines = 0 while buf.readline(): lines += 1 return lines