Examples
This section provides practical examples of using PyP6Xer for common project management tasks.
File Operations
Loading Multiple XER Files
from xerparser.reader import Reader
import glob
# Load all XER files in a directory
xer_files = []
for filename in glob.glob("*.xer"):
try:
xer = Reader(filename)
xer_files.append(xer)
print(f"Loaded: {filename}")
except Exception as e:
print(f"Failed to load {filename}: {e}")
Writing XER Files
from xerparser.reader import Reader
# Load and modify data
xer = Reader("input.xer")
# Make modifications to the data...
# (See modification examples below)
# Write to a new file
xer.write("output.xer")
print("XER file written successfully")
Project Analysis
Project Summary Report
def project_summary(xer_file):
"""Generate a comprehensive project summary."""
for project in xer_file.projects:
print(f"\n{'='*50}")
print(f"PROJECT SUMMARY: {project.proj_short_name}")
print(f"{'='*50}")
# Basic info
print(f"Project Name: {project.proj_name}")
print(f"Project Manager: {project.proj_mgr}")
print(f"Status: {project.status_code}")
# Dates
print(f"\nSCHEDULE:")
print(f"Data Date: {project.last_recalc_date}")
print(f"Planned Start: {project.plan_start_date}")
print(f"Planned Finish: {project.plan_end_date}")
# Activity statistics
activities = project.activities
total_activities = len(activities)
not_started = len([a for a in activities if a.status_code == "TK_NotStart"])
in_progress = len([a for a in activities if a.status_code == "TK_Active"])
completed = len([a for a in activities if a.status_code == "TK_Complete"])
print(f"\nACTIVITIES:")
print(f"Total: {total_activities}")
print(f"Not Started: {not_started}")
print(f"In Progress: {in_progress}")
print(f"Completed: {completed}")
# Progress calculation
if total_activities > 0:
completion_pct = (completed / total_activities) * 100
print(f"Completion: {completion_pct:.1f}%")
# Usage
xer = Reader("project.xer")
project_summary(xer)
Critical Path Analysis
def critical_path_analysis(project):
"""Analyze the critical path of a project."""
activities = project.activities
critical_activities = [a for a in activities if a.driving_path_flag == "Y"]
print(f"CRITICAL PATH ANALYSIS")
print(f"{'='*30}")
print(f"Total Activities: {len(activities)}")
print(f"Critical Activities: {len(critical_activities)}")
if critical_activities:
print(f"\nCRITICAL PATH ACTIVITIES:")
print(f"{'ID':<15} {'Name':<40} {'Duration':<12} {'Float'}")
print(f"{'-'*80}")
for activity in critical_activities:
print(f"{activity.task_code:<15} "
f"{activity.task_name[:40]:<40} "
f"{activity.target_drtn_hr_cnt or 0:<12} "
f"{activity.total_float_hr_cnt or 0}")
# Usage
project = xer.projects[0]
critical_path_analysis(project)
Filtering Data
Activity Filtering
def filter_activities(activities, **filters):
"""Filter activities based on various criteria."""
filtered = activities
# Filter by status
if 'status' in filters:
filtered = [a for a in filtered if a.status_code == filters['status']]
# Filter by date range
if 'start_after' in filters:
start_date = filters['start_after']
filtered = [a for a in filtered
if a.act_start_date and a.act_start_date >= start_date]
# Filter by duration
if 'min_duration' in filters:
min_dur = filters['min_duration']
filtered = [a for a in filtered
if a.target_drtn_hr_cnt and a.target_drtn_hr_cnt >= min_dur]
# Filter by WBS
if 'wbs_name' in filters:
wbs_name = filters['wbs_name']
filtered = [a for a in filtered
if wbs_name.lower() in (a.wbs_name or "").lower()]
return filtered
# Usage examples
project = xer.projects[0]
activities = project.activities
# Get all active activities
active = filter_activities(activities, status="TK_Active")
# Get long-duration activities (> 40 hours)
long_activities = filter_activities(activities, min_duration=40)
# Get activities in specific WBS
design_activities = filter_activities(activities, wbs_name="Design")
Resource Utilization
def resource_utilization_report(xer_file):
"""Generate a resource utilization report."""
resources = xer_file.resources
assignments = xer_file.activityresources
print(f"RESOURCE UTILIZATION REPORT")
print(f"{'='*50}")
for resource in resources:
# Find all assignments for this resource
resource_assignments = [a for a in assignments
if a.rsrc_id == resource.rsrc_id]
total_budget = sum(a.target_qty or 0 for a in resource_assignments)
total_actual = sum(a.act_reg_qty or 0 for a in resource_assignments)
print(f"\nResource: {resource.rsrc_name}")
print(f" Type: {resource.rsrc_type}")
print(f" Assignments: {len(resource_assignments)}")
print(f" Budgeted Hours: {total_budget}")
print(f" Actual Hours: {total_actual}")
if total_budget > 0:
utilization = (total_actual / total_budget) * 100
print(f" Utilization: {utilization:.1f}%")
# Usage
resource_utilization_report(xer)
Data Export
Export to CSV
import csv
def export_activities_to_csv(activities, filename):
"""Export activities to CSV format."""
fieldnames = [
'task_id', 'task_code', 'task_name', 'status_code',
'target_drtn_hr_cnt', 'act_start_date', 'act_end_date',
'phys_complete_pct', 'total_float_hr_cnt'
]
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for activity in activities:
row = {}
for field in fieldnames:
row[field] = getattr(activity, field, None)
writer.writerow(row)
print(f"Exported {len(activities)} activities to {filename}")
# Usage
project = xer.projects[0]
export_activities_to_csv(project.activities, "activities.csv")
Export to JSON
import json
from datetime import datetime
def activity_to_dict(activity):
"""Convert activity to dictionary for JSON export."""
def serialize_date(obj):
if isinstance(obj, datetime):
return obj.isoformat()
return obj
return {
'task_id': activity.task_id,
'task_code': activity.task_code,
'task_name': activity.task_name,
'status_code': activity.status_code,
'duration_hours': activity.target_drtn_hr_cnt,
'start_date': serialize_date(activity.act_start_date),
'end_date': serialize_date(activity.act_end_date),
'percent_complete': activity.phys_complete_pct,
'total_float': activity.total_float_hr_cnt,
'wbs_id': activity.wbs_id
}
def export_project_to_json(project, filename):
"""Export project data to JSON."""
project_data = {
'project_info': {
'id': project.proj_id,
'short_name': project.proj_short_name,
'name': project.proj_name,
'status': project.status_code
},
'activities': [activity_to_dict(a) for a in project.activities]
}
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(project_data, jsonfile, indent=2, ensure_ascii=False)
print(f"Exported project to {filename}")
# Usage
project = xer.projects[0]
export_project_to_json(project, "project_data.json")
Schedule Analysis
Float Analysis
def float_analysis(activities):
"""Analyze float (slack) in project activities."""
# Filter activities with float data
float_activities = [a for a in activities if a.total_float_hr_cnt is not None]
if not float_activities:
print("No float data available")
return
# Calculate statistics
floats = [a.total_float_hr_cnt for a in float_activities]
critical = [a for a in float_activities if a.total_float_hr_cnt <= 0]
near_critical = [a for a in float_activities if 0 < a.total_float_hr_cnt <= 8]
print(f"FLOAT ANALYSIS")
print(f"{'='*30}")
print(f"Total Activities: {len(float_activities)}")
print(f"Critical (0 float): {len(critical)}")
print(f"Near Critical (1-8 hrs): {len(near_critical)}")
print(f"Average Float: {sum(floats) / len(floats):.1f} hours")
print(f"Maximum Float: {max(floats):.1f} hours")
# List critical activities
if critical:
print(f"\nCRITICAL ACTIVITIES:")
for activity in critical[:10]: # Show first 10
print(f" {activity.task_code}: {activity.task_name}")
# Usage
project = xer.projects[0]
float_analysis(project.activities)
Calendar Analysis
def calendar_analysis(xer_file):
"""Analyze calendar usage and working time."""
calendars = xer_file.calendars
print(f"CALENDAR ANALYSIS")
print(f"{'='*30}")
for calendar in calendars:
print(f"\nCalendar: {calendar.clndr_name}")
print(f" Type: {calendar.clndr_type}")
print(f" Default: {'Yes' if calendar.default_flag == 'Y' else 'No'}")
print(f" Hours per Day: {calendar.day_hr_cnt}")
print(f" Hours per Week: {calendar.week_hr_cnt}")
# Count activities using this calendar
activities_count = len([a for a in xer_file.activities.get_list()
if a.clndr_id == calendar.clndr_id])
print(f" Used by {activities_count} activities")
# Usage
calendar_analysis(xer)
Advanced Examples
Earned Value Analysis
def earned_value_analysis(project):
"""Perform basic earned value analysis."""
activities = project.activities
# Calculate totals
total_budget = sum(a.target_cost or 0 for a in activities)
total_actual = sum(a.act_reg_cost or 0 for a in activities)
# Calculate earned value (simplified)
earned_value = 0
for activity in activities:
if activity.phys_complete_pct and activity.target_cost:
earned_value += (activity.phys_complete_pct / 100) * activity.target_cost
# Calculate variances
cost_variance = earned_value - total_actual
schedule_variance = earned_value - total_budget
print(f"EARNED VALUE ANALYSIS")
print(f"{'='*30}")
print(f"Planned Value (PV): ${total_budget:,.2f}")
print(f"Earned Value (EV): ${earned_value:,.2f}")
print(f"Actual Cost (AC): ${total_actual:,.2f}")
print(f"Cost Variance (CV): ${cost_variance:,.2f}")
print(f"Schedule Variance (SV): ${schedule_variance:,.2f}")
if total_actual > 0:
cpi = earned_value / total_actual
print(f"Cost Performance Index (CPI): {cpi:.2f}")
if total_budget > 0:
spi = earned_value / total_budget
print(f"Schedule Performance Index (SPI): {spi:.2f}")
# Usage
project = xer.projects[0]
earned_value_analysis(project)
Network Analysis
def network_analysis(xer_file):
"""Analyze the project network structure."""
activities = xer_file.activities.get_list()
relationships = xer_file.relations.get_list()
# Count relationship types
relationship_types = {}
for rel in relationships:
rel_type = rel.pred_type
relationship_types[rel_type] = relationship_types.get(rel_type, 0) + 1
# Find activities with no predecessors (start activities)
activity_ids = {a.task_id for a in activities}
successor_ids = {r.task_id for r in relationships}
start_activities = activity_ids - successor_ids
# Find activities with no successors (end activities)
predecessor_ids = {r.pred_task_id for r in relationships}
end_activities = activity_ids - predecessor_ids
print(f"NETWORK ANALYSIS")
print(f"{'='*30}")
print(f"Total Activities: {len(activities)}")
print(f"Total Relationships: {len(relationships)}")
print(f"Start Activities: {len(start_activities)}")
print(f"End Activities: {len(end_activities)}")
print(f"\nRelationship Types:")
for rel_type, count in relationship_types.items():
print(f" {rel_type}: {count}")
# Usage
network_analysis(xer)
These examples demonstrate the flexibility and power of PyP6Xer for project analysis and data manipulation. You can combine and modify these examples to suit your specific needs.