Files
Fire-alarm-managment/import_data.py
2026-01-19 21:57:25 -05:00

266 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Import data from Excel spreadsheets into the Fire Alarm Management database.
"""
import pandas as pd
from datetime import datetime
import sys
import os
# Add the app directory to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app
from app.models import db, Job, Phase, Material
def parse_date(value):
"""Parse date from various formats."""
if pd.isna(value) or value is None:
return None
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
try:
return datetime.strptime(value, '%Y-%m-%d').date()
except ValueError:
try:
return datetime.strptime(value, '%m/%d/%Y').date()
except ValueError:
return None
return None
def parse_float(value, default=0.0):
"""Parse float from various formats."""
if pd.isna(value) or value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def parse_int(value, default=None):
"""Parse integer from various formats."""
if pd.isna(value) or value is None:
return default
try:
return int(float(value))
except (ValueError, TypeError):
return default
def parse_str(value):
"""Parse string, returning None for NaN values."""
if pd.isna(value) or value is None:
return None
return str(value).strip() if str(value).strip() else None
def import_fire_alarm_jobs(app, filepath):
"""Import jobs from the Fire Alarm Information spreadsheet."""
print(f"\nImporting jobs from: {filepath}")
try:
df = pd.read_excel(filepath, sheet_name='Sheet1')
print(f"Found {len(df)} rows")
except Exception as e:
print(f"Error reading file: {e}")
return
with app.app_context():
imported = 0
for idx, row in df.iterrows():
job_number = parse_str(row.get('JOB NUMBER'))
if not job_number:
print(f" Skipping row {idx}: No job number")
continue
# Check if job already exists
existing = Job.query.filter_by(job_number=job_number).first()
if existing:
print(f" Skipping {job_number}: Already exists")
continue
job = Job(
job_number=job_number,
job_name=parse_str(row.get('JOB NAME')) or f"Job {job_number}",
location=parse_str(row.get('LOCATION')),
percent_complete=parse_float(row.get('% COMPLETE'), 0.0),
est_starting_qtr=parse_str(row.get('Est. Starting Qtr')),
fire_alarm_budget=parse_float(row.get('FIRE ALARM BUDGET - VENDOR BID APPROVAL')),
labor_estimate=parse_float(row.get('LABOR ESTIMATE')),
material_estimate=parse_float(row.get('MATERIAL ESTIMATE')),
amount_left_on_contract=parse_float(row.get('AMOUNT LEFT ON CONTRACT')),
pm_assigned=parse_str(row.get('PM ASSIGNED')),
aor=parse_str(row.get('AOR')),
fire_vendor=parse_str(row.get('FIRE VENDOR')),
install_partner=parse_str(row.get('INSTALL PARTNER TEAM UP WITH RER')),
ps_or_install=parse_str(row.get('P/S OR INSTALL')),
voip_or_phone=parse_str(row.get('VOIP OR PHONE LINE')),
plans=parse_str(row.get('Plans')),
notes=parse_str(row.get('NOTES')),
issues=parse_str(row.get('ISSUES')),
milestone_1=parse_str(row.get('1ST MILESTONE')),
milestone_2=parse_str(row.get('2ND MILESTONE')),
milestone_3=parse_str(row.get('3RD MILESTONE')),
milestone_4=parse_str(row.get('4TH MILESTONE')),
milestone_5=parse_str(row.get('5TH MILESTONE')),
milestone_6=parse_str(row.get('6TH MILESTONE')),
milestone_7=parse_str(row.get('7TH MILESTONE')),
elevator_final=parse_date(row.get('ELEVATOR FINAL')),
pretest=parse_date(row.get('PRETEST')),
final_date=parse_date(row.get('FINAL')),
co_drop_dead_date=parse_date(row.get('C/O DROP DEAD DATE')),
number_of_units=parse_int(row.get('NUMBER OF UNITS')),
sep_club_house=parse_str(row.get('SEP CLUB HOUSE - FIRE ALARM')),
)
db.session.add(job)
imported += 1
print(f" Imported: {job_number} - {job.job_name}")
db.session.commit()
print(f"\nImported {imported} jobs")
def import_schedule_data(app, filepath):
"""Import schedule/phase data from the schedule spreadsheet."""
print(f"\nImporting schedule data from: {filepath}")
try:
xl = pd.ExcelFile(filepath)
print(f"Found sheets: {xl.sheet_names}")
except Exception as e:
print(f"Error reading file: {e}")
return
with app.app_context():
# Read Schedule sheet
try:
schedule_df = pd.read_excel(xl, sheet_name='Sechdule')
print(f"\nProcessing Schedule sheet with {len(schedule_df)} rows")
# The first row contains job info, remaining rows contain phase details
if len(schedule_df) > 0:
# Get job info from first row
first_row = schedule_df.iloc[0]
job_number = parse_str(first_row.get('Job #'))
if job_number:
job = Job.query.filter_by(job_number=job_number).first()
if not job:
# Create the job if it doesn't exist
job = Job(
job_number=job_number,
job_name=parse_str(first_row.get('Job Name')) or f"Job {job_number}",
pm_assigned=parse_str(first_row.get('PM RAR')),
subcontractor=parse_str(first_row.get('Subcontractor')),
pci=parse_str(first_row.get('PCI')),
)
db.session.add(job)
db.session.commit()
print(f" Created job: {job_number}")
else:
# Update job info
if not job.subcontractor:
job.subcontractor = parse_str(first_row.get('Subcontractor'))
if not job.pci:
job.pci = parse_str(first_row.get('PCI'))
db.session.commit()
print(f" Updated job: {job_number}")
# Import phases from the schedule
phase_types = ['Rough-in', 'Trim', 'Commissioning', 'Final', 'Turnover']
phase_type_map = {
'Rough-in': 'rough_in',
'Trim': 'trim',
'Commissioning': 'commissioning',
'Final': 'final',
'Turnover': 'turnover'
}
phases_imported = 0
for phase_type in phase_types:
for phase_num in range(1, 51):
col_name = f"{phase_type} Phase {phase_num}"
if col_name in schedule_df.columns:
# Check if we have points data for this phase (row index 1)
if len(schedule_df) > 1:
points_row = schedule_df.iloc[1]
points = parse_int(points_row.get(col_name))
if points and points > 0:
# Check if phase already exists
existing = Phase.query.filter_by(
job_id=job.id,
phase_type=phase_type_map[phase_type],
phase_number=phase_num
).first()
if not existing:
phase = Phase(
job_id=job.id,
phase_type=phase_type_map[phase_type],
phase_number=phase_num,
points=points,
)
# Try to get start/due dates from other rows
if len(schedule_df) > 3:
start_row = schedule_df.iloc[3]
phase.start_date = parse_date(start_row.get(col_name))
if len(schedule_df) > 4:
due_row = schedule_df.iloc[4]
phase.due_date = parse_date(due_row.get(col_name))
if len(schedule_df) > 5:
men_row = schedule_df.iloc[5]
phase.men_on_site = parse_int(men_row.get(col_name))
if len(schedule_df) > 6:
completed_row = schedule_df.iloc[6]
completed_val = completed_row.get(col_name)
phase.completed = completed_val == True or str(completed_val).lower() == 'true'
db.session.add(phase)
phases_imported += 1
db.session.commit()
print(f" Imported {phases_imported} phases for job {job_number}")
except Exception as e:
print(f"Error processing Schedule sheet: {e}")
def main():
print("="*50)
print("Fire Alarm Data Import")
print("="*50)
app = create_app()
# File paths
fire_alarm_file = '/root/code/romanoff/Raleigh jobs FIRE ALARM INFORMATION.xlsx'
schedule_file = '/root/code/romanoff/schedule_updated.xlsm'
# Import jobs from fire alarm spreadsheet
if os.path.exists(fire_alarm_file):
import_fire_alarm_jobs(app, fire_alarm_file)
else:
print(f"File not found: {fire_alarm_file}")
# Import schedule data
if os.path.exists(schedule_file):
import_schedule_data(app, schedule_file)
else:
print(f"File not found: {schedule_file}")
print("\n" + "="*50)
print("Import complete!")
print("="*50)
if __name__ == '__main__':
main()