diff --git a/flixopt/batched.py b/flixopt/batched.py index 03b509d89..7f7d5882c 100644 --- a/flixopt/batched.py +++ b/flixopt/batched.py @@ -154,16 +154,6 @@ def with_startup_limit(self) -> list[str]: """IDs with startup limit.""" return self._categorize(lambda p: p.startup_limit is not None) - @cached_property - def with_effects_per_active_hour(self) -> list[str]: - """IDs with effects_per_active_hour defined.""" - return self._categorize(lambda p: p.effects_per_active_hour) - - @cached_property - def with_effects_per_startup(self) -> list[str]: - """IDs with effects_per_startup defined.""" - return self._categorize(lambda p: p.effects_per_startup) - # === Bounds (combined min/max in single pass) === def _build_bounds(self, ids: list[str], min_attr: str, max_attr: str) -> tuple[xr.DataArray, xr.DataArray] | None: diff --git a/flixopt/components.py b/flixopt/components.py index 3851197d7..08dba94fe 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -30,7 +30,7 @@ if TYPE_CHECKING: import linopy - from .batched import InvestmentData, StoragesData + from .batched import StoragesData from .types import Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -826,47 +826,10 @@ def storage(self, label: str) -> Storage: """Get a storage by its label_full.""" return self.elements[label] - # === Storage Categorization Properties (delegate to self.data) === - - @property - def with_investment(self) -> list[str]: - return self.data.with_investment - - @property - def with_optional_investment(self) -> list[str]: - return self.data.with_optional_investment - - @property - def with_mandatory_investment(self) -> list[str]: - return self.data.with_mandatory_investment - - @property - def storages_with_investment(self) -> list[Storage]: - return [self.storage(sid) for sid in self.with_investment] - - @property - def storages_with_optional_investment(self) -> list[Storage]: - return [self.storage(sid) for sid in self.with_optional_investment] - @property def investment_ids(self) -> list[str]: - return self.with_investment - - @property - def optional_investment_ids(self) -> list[str]: - return self.with_optional_investment - - @property - def mandatory_investment_ids(self) -> list[str]: - return self.with_mandatory_investment - - @property - def invest_params(self) -> dict[str, InvestParameters]: - return self.data.invest_params - - @property - def _investment_data(self) -> InvestmentData | None: - return self.data.investment_data + """IDs of storages with investment parameters. Used by external code (optimization.py).""" + return self.data.with_investment def add_effect_contributions(self, effects_model) -> None: """Push ALL effect contributions from storages to EffectsModel. @@ -879,7 +842,7 @@ def add_effect_contributions(self, effects_model) -> None: Args: effects_model: The EffectsModel to register contributions with. """ - inv = self._investment_data + inv = self.data.investment_data if inv is None: return @@ -917,21 +880,21 @@ def add_effect_contributions(self, effects_model) -> None: @functools.cached_property def _size_lower(self) -> xr.DataArray: """(storage,) - minimum size for investment storages.""" - element_ids = self.with_investment + element_ids = self.data.with_investment values = [self.storage(sid).capacity_in_flow_hours.minimum_or_fixed_size for sid in element_ids] return stack_along_dim(values, self.dim_name, element_ids) @functools.cached_property def _size_upper(self) -> xr.DataArray: """(storage,) - maximum size for investment storages.""" - element_ids = self.with_investment + element_ids = self.data.with_investment values = [self.storage(sid).capacity_in_flow_hours.maximum_or_fixed_size for sid in element_ids] return stack_along_dim(values, self.dim_name, element_ids) @functools.cached_property def _linked_periods_mask(self) -> xr.DataArray | None: """(storage, period) - linked periods for investment storages. None if no linking.""" - element_ids = self.with_investment + element_ids = self.data.with_investment linked_list = [self.storage(sid).capacity_in_flow_hours.linked_periods for sid in element_ids] if not any(lp is not None for lp in linked_list): return None @@ -942,27 +905,27 @@ def _linked_periods_mask(self) -> xr.DataArray | None: @functools.cached_property def _mandatory_mask(self) -> xr.DataArray: """(storage,) bool - True if mandatory, False if optional.""" - element_ids = self.with_investment + element_ids = self.data.with_investment values = [self.storage(sid).capacity_in_flow_hours.mandatory for sid in element_ids] return xr.DataArray(values, dims=[self.dim_name], coords={self.dim_name: element_ids}) @functools.cached_property def _optional_lower(self) -> xr.DataArray | None: """(storage,) - minimum size for optional investment storages.""" - if not self.with_optional_investment: + if not self.data.with_optional_investment: return None - element_ids = self.with_optional_investment + element_ids = self.data.with_optional_investment values = [self.storage(sid).capacity_in_flow_hours.minimum_or_fixed_size for sid in element_ids] return stack_along_dim(values, self.dim_name, element_ids) @functools.cached_property def _optional_upper(self) -> xr.DataArray | None: """(storage,) - maximum size for optional investment storages.""" - if not self.with_optional_investment: + if not self.data.with_optional_investment: return None - element_ids = self.with_optional_investment + element_ids = self.data.with_optional_investment values = [self.storage(sid).capacity_in_flow_hours.maximum_or_fixed_size for sid in element_ids] return stack_along_dim(values, self.dim_name, element_ids) @@ -1015,7 +978,7 @@ def create_variables(self) -> None: logger.debug( f'StoragesModel created variables: {len(self.elements)} storages, ' - f'{len(self.storages_with_investment)} with investment' + f'{len(self.data.with_investment)} with investment' ) def create_constraints(self) -> None: @@ -1211,7 +1174,7 @@ def _add_batched_cluster_cyclic_constraints(self, charge_state) -> None: @functools.cached_property def size(self) -> linopy.Variable | None: """(storage, period, scenario) - size variable for storages with investment.""" - if not self.storages_with_investment: + if not self.data.with_investment: return None size_min = self._size_lower @@ -1238,12 +1201,12 @@ def size(self) -> linopy.Variable | None: @functools.cached_property def invested(self) -> linopy.Variable | None: """(storage, period, scenario) - binary invested variable for optional investment.""" - if not self.optional_investment_ids: + if not self.data.with_optional_investment: return None return self.add_variables( StorageVarName.INVESTED, dims=('period', 'scenario'), - element_ids=self.optional_investment_ids, + element_ids=self.data.with_optional_investment, binary=True, ) @@ -1252,15 +1215,15 @@ def create_investment_model(self) -> None: Must be called BEFORE create_investment_constraints(). """ - if not self.storages_with_investment: + if not self.data.with_investment: return from .features import InvestmentBuilder dim = self.dim_name element_ids = self.investment_ids - non_mandatory_ids = self.optional_investment_ids - mandatory_ids = self.mandatory_investment_ids + non_mandatory_ids = self.data.with_optional_investment + mandatory_ids = self.data.with_mandatory_investment # Trigger variable creation via cached properties size_var = self.size @@ -1283,7 +1246,7 @@ def create_investment_model(self) -> None: InvestmentBuilder.add_linked_periods_constraints( model=self.model, size_var=size_var, - params=self.invest_params, + params=self.data.invest_params, element_ids=element_ids, dim_name=dim, ) @@ -1307,7 +1270,7 @@ def create_investment_constraints(self) -> None: Uses the batched size variable for true vectorized constraint creation. """ - if not self.storages_with_investment or StorageVarName.SIZE not in self: + if not self.data.with_investment or StorageVarName.SIZE not in self: return charge_state = self.charge @@ -1344,7 +1307,7 @@ def create_investment_constraints(self) -> None: ) logger.debug( - f'StoragesModel created batched investment constraints for {len(self.storages_with_investment)} storages' + f'StoragesModel created batched investment constraints for {len(self.data.with_investment)} storages' ) def _add_initial_final_constraints_legacy(self, storage, cs) -> None: @@ -1404,7 +1367,7 @@ def _create_piecewise_effects(self) -> None: if size_var is None: return - inv = self._investment_data + inv = self.data.investment_data if inv is None or not inv.piecewise_element_ids: return diff --git a/flixopt/elements.py b/flixopt/elements.py index 5f529dcbd..df1dc4a99 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -1016,14 +1016,6 @@ def __init__(self, model: FlowSystemModel, data: FlowsData): self.create_status_model() self.create_constraints() - @property - def _previous_status(self) -> dict[str, xr.DataArray]: - """Previous status for flows that have it, keyed by label_full. - - Delegates to FlowsData.previous_states. - """ - return self.data.previous_states - def _build_constraint_mask(self, selected_ids: set[str], reference_var: linopy.Variable) -> xr.DataArray: """Build a mask for constraint creation from selected flow IDs. @@ -1473,11 +1465,11 @@ def constraint_switch_initial(self) -> None: if self.startup is None: return dim = self.dim_name - ids = [eid for eid in self.data.with_startup_tracking if eid in self._previous_status] + ids = [eid for eid in self.data.with_startup_tracking if eid in self.data.previous_states] if not ids: return - prev_arrays = [self._previous_status[eid].expand_dims({dim: [eid]}) for eid in ids] + prev_arrays = [self.data.previous_states[eid].expand_dims({dim: [eid]}) for eid in ids] prev_state = xr.concat(prev_arrays, dim=dim).isel(time=-1) StatusBuilder.add_switch_initial_constraint( @@ -1595,7 +1587,7 @@ def get_previous_status(self, flow: Flow) -> xr.DataArray | None: DataArray of previous status (time dimension), or None if no previous status. """ fid = flow.label_full - return self._previous_status.get(fid) + return self.data.previous_states.get(fid) class BusesModel(TypeModel):