| | import pandas as pd |
| | import src.preprocess.transform as transformed_data |
| | import datetime |
| | from datetime import timedelta |
| | import src.preprocess.extract as extract |
| | from src.config.constants import ShiftType, LineType, KitLevel, DefaultConfig |
| |
|
| | |
| | import importlib |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def get_date_span(): |
| | """Get date span from streamlit session state, or return default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state'): |
| | |
| | if 'start_date' in st.session_state and 'planning_days' in st.session_state: |
| | from datetime import datetime, timedelta |
| | start_date = datetime.combine(st.session_state.start_date, datetime.min.time()) |
| | planning_days = st.session_state.planning_days |
| | end_date = start_date + timedelta(days=planning_days - 1) |
| | date_span = list(range(1, planning_days + 1)) |
| | return date_span, start_date, end_date |
| | except: |
| | pass |
| | |
| | |
| | from datetime import datetime |
| | return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) |
| |
|
| |
|
| | |
| | |
| | DATE_SPAN = None |
| | start_date = None |
| | end_date = None |
| |
|
| | def get_product_list(): |
| | """Get filtered product list without printing spam""" |
| | try: |
| | from src.demand_filtering import DemandFilter |
| | filter_instance = DemandFilter() |
| | filter_instance.load_data(force_reload=True) |
| | return filter_instance.get_filtered_product_list() |
| | except: |
| | |
| | date_span, start_date, end_date = get_date_span() |
| | return transformed_data.get_released_product_list(start_date) |
| |
|
| |
|
| | def get_employee_type_list(): |
| | """Get employee type list from session state or default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state: |
| | return st.session_state.selected_employee_types |
| | except: |
| | pass |
| | |
| | |
| | employee_type_list = extract.read_employee_data() |
| | return employee_type_list["employment_type"].unique().tolist() |
| |
|
| | |
| | def get_shift_list(): |
| | """Get shift list from session state or default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state: |
| | return st.session_state.selected_shifts |
| | except: |
| | pass |
| | |
| | |
| | shift_list = extract.get_shift_info() |
| | return shift_list["id"].unique().tolist() |
| |
|
| | |
| | |
| | |
| | |
| | |
| | EVENING_SHIFT_MODE = "normal" |
| |
|
| | |
| | |
| | EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 |
| |
|
| | |
| | def get_active_shift_list(): |
| | """ |
| | Get the list of active shifts based on EVENING_SHIFT_MODE setting. |
| | """ |
| | all_shifts = get_shift_list() |
| | |
| | if EVENING_SHIFT_MODE == "normal": |
| | |
| | active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] |
| | print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only, NO evening)") |
| | |
| | elif EVENING_SHIFT_MODE == "activate_evening": |
| | |
| | active_shifts = list(all_shifts) |
| | print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}") |
| | |
| | elif EVENING_SHIFT_MODE == "always_available": |
| | |
| | active_shifts = list(all_shifts) |
| | print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}") |
| | |
| | else: |
| | |
| | active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] |
| | print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}") |
| | |
| | return active_shifts |
| |
|
| | |
| | |
| |
|
| | |
| | def get_line_list(): |
| | """Get line list - try from streamlit session state first, then from data files""" |
| | try: |
| | |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'selected_lines' in st.session_state: |
| | print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}") |
| | return st.session_state.selected_lines |
| | except Exception as e: |
| | print(f"Could not get lines from streamlit session: {e}") |
| | |
| | |
| | print(f"Loading line list from data files") |
| | line_df = extract.read_packaging_line_data() |
| | line_list = line_df["id"].unique().tolist() |
| | return line_list |
| |
|
| | |
| | |
| |
|
| | |
| | def get_kit_line_match(): |
| | kit_line_match = extract.read_kit_line_match_data() |
| | kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() |
| | |
| | |
| | line_name_to_id = { |
| | "long line": LineType.LONG_LINE, |
| | "mini load": LineType.MINI_LOAD, |
| | "miniload": LineType.MINI_LOAD, |
| | "Long_line": LineType.LONG_LINE, |
| | "Mini_load": LineType.MINI_LOAD, |
| | } |
| | |
| | |
| | converted_dict = {} |
| | for kit, line_name in kit_line_match_dict.items(): |
| | if isinstance(line_name, str) and line_name.strip(): |
| | |
| | line_id = line_name_to_id.get(line_name.strip(), None) |
| | if line_id is not None: |
| | converted_dict[kit] = line_id |
| | else: |
| | print(f"Warning: Unknown line type '{line_name}' for kit {kit}") |
| | |
| | converted_dict[kit] = LineType.LONG_LINE |
| | elif isinstance(line_name, (int, float)) and not pd.isna(line_name): |
| | |
| | converted_dict[kit] = int(line_name) |
| | else: |
| | |
| | pass |
| | |
| | return converted_dict |
| |
|
| | KIT_LINE_MATCH_DICT = get_kit_line_match() |
| |
|
| |
|
| | def get_line_cnt_per_type(): |
| | try: |
| | |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'line_counts' in st.session_state: |
| | print(f"Using line counts from config page: {st.session_state.line_counts}") |
| | return st.session_state.line_counts |
| | except Exception as e: |
| | print(f"Could not get line counts from streamlit session: {e}") |
| | |
| | print(f"Loading default line count values from data files") |
| | line_df = extract.read_packaging_line_data() |
| | line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict() |
| | print("line cnt per type", line_cnt_per_type) |
| | return line_cnt_per_type |
| |
|
| | |
| | |
| |
|
| | |
| | def get_demand_dictionary(force_reload=False): |
| | """ |
| | Get filtered demand dictionary. |
| | IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. |
| | """ |
| | try: |
| | |
| | from src.demand_filtering import DemandFilter |
| | filter_instance = DemandFilter() |
| | |
| | |
| | filter_instance.load_data(force_reload=True) |
| | |
| | demand_dictionary = filter_instance.get_filtered_demand_dictionary() |
| | print(f"📈 FRESH FILTERED DEMAND: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}") |
| | print(f"🔄 LOADED DYNAMICALLY: Reflects current Streamlit configs") |
| | return demand_dictionary |
| | except Exception as e: |
| | print(f"Error loading dynamic demand dictionary: {e}") |
| | raise Exception("Demand dictionary not found with error:"+str(e)) |
| |
|
| | |
| | |
| |
|
| | |
| | def get_cost_list_per_emp_shift(): |
| | try: |
| | |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'cost_list_per_emp_shift' in st.session_state: |
| | print(f"Using cost list from config page: {st.session_state.cost_list_per_emp_shift}") |
| | return st.session_state.cost_list_per_emp_shift |
| | except Exception as e: |
| | print(f"Could not get cost list from streamlit session: {e}") |
| | |
| | print(f"Loading default cost values") |
| | |
| | return DefaultConfig.DEFAULT_COST_RATES |
| |
|
| | def shift_code_to_name(): |
| | return ShiftType.get_all_names() |
| |
|
| | def line_code_to_name(): |
| | """Convert line type IDs to readable names""" |
| | return LineType.get_all_names() |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| | def get_team_requirements(product_list=None): |
| | """ |
| | Extract team requirements from Kits Calculation CSV. |
| | Returns dictionary with employee type as key and product requirements as nested dict. |
| | """ |
| | if product_list is None: |
| | product_list = get_product_list() |
| | |
| |
|
| | kits_df = extract.read_personnel_requirement_data() |
| |
|
| | team_req_dict = { |
| | "UNICEF Fixed term": {}, |
| | "Humanizer": {} |
| | } |
| | |
| | |
| | for product in product_list: |
| | print("product",product) |
| | print(f"Processing team requirements for product: {product}") |
| | product_data = kits_df[kits_df['Kit'] == product] |
| | print("product_data",product_data) |
| | if not product_data.empty: |
| | |
| | humanizer_req = product_data["Humanizer"].iloc[0] |
| | unicef_req = product_data["UNICEF staff"].iloc[0] |
| | |
| | |
| | team_req_dict["Humanizer"][product] = int(humanizer_req) |
| | team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) |
| | else: |
| | print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0") |
| | |
| | |
| | return team_req_dict |
| |
|
| |
|
| |
|
| | def get_max_employee_per_type_on_day(): |
| | try: |
| | |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'max_employee_per_type_on_day' in st.session_state: |
| | print(f"Using max employee counts from config page: {st.session_state.max_employee_per_type_on_day}") |
| | return st.session_state.max_employee_per_type_on_day |
| | except Exception as e: |
| | print(f"Could not get max employee counts from streamlit session: {e}") |
| | |
| | print(f"Loading default max employee values") |
| | |
| | if DATE_SPAN is None: |
| | date_span, _, _ = get_date_span() |
| | else: |
| | date_span = DATE_SPAN |
| | |
| | max_employee_per_type_on_day = { |
| | "UNICEF Fixed term": { |
| | t: 8 for t in date_span |
| | }, |
| | "Humanizer": { |
| | t: 10 for t in date_span |
| | } |
| | } |
| | return max_employee_per_type_on_day |
| |
|
| |
|
| | |
| | MAX_HOUR_PER_PERSON_PER_DAY = 14 |
| | def get_max_hour_per_shift_per_person(): |
| | """Get max hours per shift per person from session state or default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state'): |
| | |
| | max_hours = { |
| | ShiftType.REGULAR: st.session_state.get('max_hours_shift_1', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.REGULAR]), |
| | ShiftType.EVENING: st.session_state.get('max_hours_shift_2', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.EVENING]), |
| | ShiftType.OVERTIME: st.session_state.get('max_hours_shift_3', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.OVERTIME]) |
| | } |
| | return max_hours |
| | except Exception as e: |
| | print(f"Could not get max hours per shift from session: {e}") |
| | |
| | |
| | return DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON |
| |
|
| |
|
| |
|
| | |
| | def get_evening_shift_demand_threshold(): |
| | """Get evening shift demand threshold from session state or default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state'): |
| | return st.session_state.get('evening_shift_threshold', DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD) |
| | except Exception as e: |
| | print(f"Could not get evening shift threshold from session: {e}") |
| | |
| | |
| | return DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD |
| |
|
| |
|
| | |
| | def get_kit_hierarchy_data(): |
| | kit_levels, dependencies, priority_order = extract.get_production_order_data() |
| | |
| | return kit_levels, dependencies, priority_order |
| |
|
| | KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data() |
| | print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items") |
| |
|
| | def get_kit_levels(): |
| | """Get kit levels lazily - returns {kit_id: level} where 0=prepack, 1=subkit, 2=master""" |
| | kit_levels, _, _ = get_kit_hierarchy_data() |
| | return kit_levels |
| |
|
| | def get_kit_dependencies(): |
| | """Get kit dependencies lazily - returns {kit_id: [dependency_list]}""" |
| | _, dependencies, _ = get_kit_hierarchy_data() |
| | return dependencies |
| |
|
| | def get_max_parallel_workers(): |
| | """Get max parallel workers from session state or default""" |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state'): |
| | |
| | max_parallel_workers = { |
| | LineType.LONG_LINE: st.session_state.get('max_parallel_workers_long_line', DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE), |
| | LineType.MINI_LOAD: st.session_state.get('max_parallel_workers_mini_load', DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD) |
| | } |
| | return max_parallel_workers |
| | except Exception as e: |
| | print(f"Could not get max parallel workers from session: {e}") |
| | |
| | |
| | return { |
| | LineType.LONG_LINE: DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE, |
| | LineType.MINI_LOAD: DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD |
| | } |
| |
|
| |
|
| |
|
| | def get_fixed_min_unicef_per_day(): |
| | """ |
| | Get fixed minimum UNICEF employees per day - try from streamlit session state first, then default |
| | This ensures a minimum number of UNICEF fixed-term staff are present every working day |
| | """ |
| | try: |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: |
| | print(f"Using fixed minimum UNICEF per day from config page: {st.session_state.fixed_min_unicef_per_day}") |
| | return st.session_state.fixed_min_unicef_per_day |
| | except ImportError: |
| | pass |
| | |
| | |
| | return DefaultConfig.FIXED_MIN_UNICEF_PER_DAY |
| |
|
| |
|
| | def get_payment_mode_config(): |
| | """ |
| | Get payment mode configuration - try from streamlit session state first, then default values |
| | Payment modes: |
| | - "bulk": If employee works any hours in shift, pay for full shift hours |
| | - "partial": Pay only for actual hours worked |
| | """ |
| | try: |
| | |
| | import streamlit as st |
| | if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state: |
| | print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}") |
| | return st.session_state.payment_mode_config |
| | except Exception as e: |
| | print(f"Could not get payment mode config from streamlit session: {e}") |
| | |
| | |
| | print(f"Loading default payment mode configuration") |
| | payment_mode_config = DefaultConfig.PAYMENT_MODE_CONFIG |
| | |
| | return payment_mode_config |
| |
|
| |
|
| | print("✅ Module-level configuration functions defined (variables initialized dynamically)") |
| |
|