Examples

This section provides practical examples of using PyP6Xer for common project management tasks.

File Operations

Loading Multiple XER Files

from xerparser.reader import Reader
import glob

# Load all XER files in a directory
xer_files = []
for filename in glob.glob("*.xer"):
    try:
        xer = Reader(filename)
        xer_files.append(xer)
        print(f"Loaded: {filename}")
    except Exception as e:
        print(f"Failed to load {filename}: {e}")

Writing XER Files

from xerparser.reader import Reader

# Load and modify data
xer = Reader("input.xer")

# Make modifications to the data...
# (See modification examples below)

# Write to a new file
xer.write("output.xer")
print("XER file written successfully")

Project Analysis

Project Summary Report

def project_summary(xer_file):
    """Generate a comprehensive project summary."""
    
    for project in xer_file.projects:
        print(f"\n{'='*50}")
        print(f"PROJECT SUMMARY: {project.proj_short_name}")
        print(f"{'='*50}")
        
        # Basic info
        print(f"Project Name: {project.proj_name}")
        print(f"Project Manager: {project.proj_mgr}")
        print(f"Status: {project.status_code}")
        
        # Dates
        print(f"\nSCHEDULE:")
        print(f"Data Date: {project.last_recalc_date}")
        print(f"Planned Start: {project.plan_start_date}")
        print(f"Planned Finish: {project.plan_end_date}")
        
        # Activity statistics
        activities = project.activities
        total_activities = len(activities)
        
        not_started = len([a for a in activities if a.status_code == "TK_NotStart"])
        in_progress = len([a for a in activities if a.status_code == "TK_Active"])
        completed = len([a for a in activities if a.status_code == "TK_Complete"])
        
        print(f"\nACTIVITIES:")
        print(f"Total: {total_activities}")
        print(f"Not Started: {not_started}")
        print(f"In Progress: {in_progress}")
        print(f"Completed: {completed}")
        
        # Progress calculation
        if total_activities > 0:
            completion_pct = (completed / total_activities) * 100
            print(f"Completion: {completion_pct:.1f}%")

# Usage
xer = Reader("project.xer")
project_summary(xer)

Critical Path Analysis

def critical_path_analysis(project):
    """Analyze the critical path of a project."""
    
    activities = project.activities
    critical_activities = [a for a in activities if a.driving_path_flag == "Y"]
    
    print(f"CRITICAL PATH ANALYSIS")
    print(f"{'='*30}")
    print(f"Total Activities: {len(activities)}")
    print(f"Critical Activities: {len(critical_activities)}")
    
    if critical_activities:
        print(f"\nCRITICAL PATH ACTIVITIES:")
        print(f"{'ID':<15} {'Name':<40} {'Duration':<12} {'Float'}")
        print(f"{'-'*80}")
        
        for activity in critical_activities:
            print(f"{activity.task_code:<15} "
                  f"{activity.task_name[:40]:<40} "
                  f"{activity.target_drtn_hr_cnt or 0:<12} "
                  f"{activity.total_float_hr_cnt or 0}")

# Usage
project = xer.projects[0]
critical_path_analysis(project)

Filtering Data

Activity Filtering

def filter_activities(activities, **filters):
    """Filter activities based on various criteria."""
    
    filtered = activities
    
    # Filter by status
    if 'status' in filters:
        filtered = [a for a in filtered if a.status_code == filters['status']]
    
    # Filter by date range
    if 'start_after' in filters:
        start_date = filters['start_after']
        filtered = [a for a in filtered 
                   if a.act_start_date and a.act_start_date >= start_date]
    
    # Filter by duration
    if 'min_duration' in filters:
        min_dur = filters['min_duration']
        filtered = [a for a in filtered 
                   if a.target_drtn_hr_cnt and a.target_drtn_hr_cnt >= min_dur]
    
    # Filter by WBS
    if 'wbs_name' in filters:
        wbs_name = filters['wbs_name']
        filtered = [a for a in filtered 
                   if wbs_name.lower() in (a.wbs_name or "").lower()]
    
    return filtered

# Usage examples
project = xer.projects[0]
activities = project.activities

# Get all active activities
active = filter_activities(activities, status="TK_Active")

# Get long-duration activities (> 40 hours)
long_activities = filter_activities(activities, min_duration=40)

# Get activities in specific WBS
design_activities = filter_activities(activities, wbs_name="Design")

Resource Utilization

def resource_utilization_report(xer_file):
    """Generate a resource utilization report."""
    
    resources = xer_file.resources
    assignments = xer_file.activityresources
    
    print(f"RESOURCE UTILIZATION REPORT")
    print(f"{'='*50}")
    
    for resource in resources:
        # Find all assignments for this resource
        resource_assignments = [a for a in assignments 
                              if a.rsrc_id == resource.rsrc_id]
        
        total_budget = sum(a.target_qty or 0 for a in resource_assignments)
        total_actual = sum(a.act_reg_qty or 0 for a in resource_assignments)
        
        print(f"\nResource: {resource.rsrc_name}")
        print(f"  Type: {resource.rsrc_type}")
        print(f"  Assignments: {len(resource_assignments)}")
        print(f"  Budgeted Hours: {total_budget}")
        print(f"  Actual Hours: {total_actual}")
        
        if total_budget > 0:
            utilization = (total_actual / total_budget) * 100
            print(f"  Utilization: {utilization:.1f}%")

# Usage
resource_utilization_report(xer)

Data Export

Export to CSV

import csv

def export_activities_to_csv(activities, filename):
    """Export activities to CSV format."""
    
    fieldnames = [
        'task_id', 'task_code', 'task_name', 'status_code',
        'target_drtn_hr_cnt', 'act_start_date', 'act_end_date',
        'phys_complete_pct', 'total_float_hr_cnt'
    ]
    
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        
        for activity in activities:
            row = {}
            for field in fieldnames:
                row[field] = getattr(activity, field, None)
            writer.writerow(row)
    
    print(f"Exported {len(activities)} activities to {filename}")

# Usage
project = xer.projects[0]
export_activities_to_csv(project.activities, "activities.csv")

Export to JSON

import json
from datetime import datetime

def activity_to_dict(activity):
    """Convert activity to dictionary for JSON export."""
    
    def serialize_date(obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return obj
    
    return {
        'task_id': activity.task_id,
        'task_code': activity.task_code,
        'task_name': activity.task_name,
        'status_code': activity.status_code,
        'duration_hours': activity.target_drtn_hr_cnt,
        'start_date': serialize_date(activity.act_start_date),
        'end_date': serialize_date(activity.act_end_date),
        'percent_complete': activity.phys_complete_pct,
        'total_float': activity.total_float_hr_cnt,
        'wbs_id': activity.wbs_id
    }

def export_project_to_json(project, filename):
    """Export project data to JSON."""
    
    project_data = {
        'project_info': {
            'id': project.proj_id,
            'short_name': project.proj_short_name,
            'name': project.proj_name,
            'status': project.status_code
        },
        'activities': [activity_to_dict(a) for a in project.activities]
    }
    
    with open(filename, 'w', encoding='utf-8') as jsonfile:
        json.dump(project_data, jsonfile, indent=2, ensure_ascii=False)
    
    print(f"Exported project to {filename}")

# Usage
project = xer.projects[0]
export_project_to_json(project, "project_data.json")

Schedule Analysis

Float Analysis

def float_analysis(activities):
    """Analyze float (slack) in project activities."""
    
    # Filter activities with float data
    float_activities = [a for a in activities if a.total_float_hr_cnt is not None]
    
    if not float_activities:
        print("No float data available")
        return
    
    # Calculate statistics
    floats = [a.total_float_hr_cnt for a in float_activities]
    critical = [a for a in float_activities if a.total_float_hr_cnt <= 0]
    near_critical = [a for a in float_activities if 0 < a.total_float_hr_cnt <= 8]
    
    print(f"FLOAT ANALYSIS")
    print(f"{'='*30}")
    print(f"Total Activities: {len(float_activities)}")
    print(f"Critical (0 float): {len(critical)}")
    print(f"Near Critical (1-8 hrs): {len(near_critical)}")
    print(f"Average Float: {sum(floats) / len(floats):.1f} hours")
    print(f"Maximum Float: {max(floats):.1f} hours")
    
    # List critical activities
    if critical:
        print(f"\nCRITICAL ACTIVITIES:")
        for activity in critical[:10]:  # Show first 10
            print(f"  {activity.task_code}: {activity.task_name}")

# Usage
project = xer.projects[0]
float_analysis(project.activities)

Calendar Analysis

def calendar_analysis(xer_file):
    """Analyze calendar usage and working time."""
    
    calendars = xer_file.calendars
    
    print(f"CALENDAR ANALYSIS")
    print(f"{'='*30}")
    
    for calendar in calendars:
        print(f"\nCalendar: {calendar.clndr_name}")
        print(f"  Type: {calendar.clndr_type}")
        print(f"  Default: {'Yes' if calendar.default_flag == 'Y' else 'No'}")
        print(f"  Hours per Day: {calendar.day_hr_cnt}")
        print(f"  Hours per Week: {calendar.week_hr_cnt}")
        
        # Count activities using this calendar
        activities_count = len([a for a in xer_file.activities.get_list() 
                              if a.clndr_id == calendar.clndr_id])
        print(f"  Used by {activities_count} activities")

# Usage
calendar_analysis(xer)

Advanced Examples

Earned Value Analysis

def earned_value_analysis(project):
    """Perform basic earned value analysis."""
    
    activities = project.activities
    
    # Calculate totals
    total_budget = sum(a.target_cost or 0 for a in activities)
    total_actual = sum(a.act_reg_cost or 0 for a in activities)
    
    # Calculate earned value (simplified)
    earned_value = 0
    for activity in activities:
        if activity.phys_complete_pct and activity.target_cost:
            earned_value += (activity.phys_complete_pct / 100) * activity.target_cost
    
    # Calculate variances
    cost_variance = earned_value - total_actual
    schedule_variance = earned_value - total_budget
    
    print(f"EARNED VALUE ANALYSIS")
    print(f"{'='*30}")
    print(f"Planned Value (PV): ${total_budget:,.2f}")
    print(f"Earned Value (EV): ${earned_value:,.2f}")
    print(f"Actual Cost (AC): ${total_actual:,.2f}")
    print(f"Cost Variance (CV): ${cost_variance:,.2f}")
    print(f"Schedule Variance (SV): ${schedule_variance:,.2f}")
    
    if total_actual > 0:
        cpi = earned_value / total_actual
        print(f"Cost Performance Index (CPI): {cpi:.2f}")
    
    if total_budget > 0:
        spi = earned_value / total_budget
        print(f"Schedule Performance Index (SPI): {spi:.2f}")

# Usage
project = xer.projects[0]
earned_value_analysis(project)

Network Analysis

def network_analysis(xer_file):
    """Analyze the project network structure."""
    
    activities = xer_file.activities.get_list()
    relationships = xer_file.relations.get_list()
    
    # Count relationship types
    relationship_types = {}
    for rel in relationships:
        rel_type = rel.pred_type
        relationship_types[rel_type] = relationship_types.get(rel_type, 0) + 1
    
    # Find activities with no predecessors (start activities)
    activity_ids = {a.task_id for a in activities}
    successor_ids = {r.task_id for r in relationships}
    start_activities = activity_ids - successor_ids
    
    # Find activities with no successors (end activities)
    predecessor_ids = {r.pred_task_id for r in relationships}
    end_activities = activity_ids - predecessor_ids
    
    print(f"NETWORK ANALYSIS")
    print(f"{'='*30}")
    print(f"Total Activities: {len(activities)}")
    print(f"Total Relationships: {len(relationships)}")
    print(f"Start Activities: {len(start_activities)}")
    print(f"End Activities: {len(end_activities)}")
    
    print(f"\nRelationship Types:")
    for rel_type, count in relationship_types.items():
        print(f"  {rel_type}: {count}")

# Usage
network_analysis(xer)

These examples demonstrate the flexibility and power of PyP6Xer for project analysis and data manipulation. You can combine and modify these examples to suit your specific needs.