Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions flixopt/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,6 @@ def with_startup_limit(self) -> list[str]:
"""IDs with startup limit."""
return self._categorize(lambda p: p.startup_limit is not None)

@cached_property
def with_effects_per_active_hour(self) -> list[str]:
"""IDs with effects_per_active_hour defined."""
return self._categorize(lambda p: p.effects_per_active_hour)

@cached_property
def with_effects_per_startup(self) -> list[str]:
"""IDs with effects_per_startup defined."""
return self._categorize(lambda p: p.effects_per_startup)

# === Bounds (combined min/max in single pass) ===

def _build_bounds(self, ids: list[str], min_attr: str, max_attr: str) -> tuple[xr.DataArray, xr.DataArray] | None:
Expand Down
83 changes: 23 additions & 60 deletions flixopt/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
if TYPE_CHECKING:
import linopy

from .batched import InvestmentData, StoragesData
from .batched import StoragesData
from .types import Numeric_PS, Numeric_TPS

logger = logging.getLogger('flixopt')
Expand Down Expand Up @@ -826,47 +826,10 @@ def storage(self, label: str) -> Storage:
"""Get a storage by its label_full."""
return self.elements[label]

# === Storage Categorization Properties (delegate to self.data) ===

@property
def with_investment(self) -> list[str]:
return self.data.with_investment

@property
def with_optional_investment(self) -> list[str]:
return self.data.with_optional_investment

@property
def with_mandatory_investment(self) -> list[str]:
return self.data.with_mandatory_investment

@property
def storages_with_investment(self) -> list[Storage]:
return [self.storage(sid) for sid in self.with_investment]

@property
def storages_with_optional_investment(self) -> list[Storage]:
return [self.storage(sid) for sid in self.with_optional_investment]

@property
def investment_ids(self) -> list[str]:
return self.with_investment

@property
def optional_investment_ids(self) -> list[str]:
return self.with_optional_investment

@property
def mandatory_investment_ids(self) -> list[str]:
return self.with_mandatory_investment

@property
def invest_params(self) -> dict[str, InvestParameters]:
return self.data.invest_params

@property
def _investment_data(self) -> InvestmentData | None:
return self.data.investment_data
"""IDs of storages with investment parameters. Used by external code (optimization.py)."""
return self.data.with_investment

def add_effect_contributions(self, effects_model) -> None:
"""Push ALL effect contributions from storages to EffectsModel.
Expand All @@ -879,7 +842,7 @@ def add_effect_contributions(self, effects_model) -> None:
Args:
effects_model: The EffectsModel to register contributions with.
"""
inv = self._investment_data
inv = self.data.investment_data
if inv is None:
return

Expand Down Expand Up @@ -917,21 +880,21 @@ def add_effect_contributions(self, effects_model) -> None:
@functools.cached_property
def _size_lower(self) -> xr.DataArray:
"""(storage,) - minimum size for investment storages."""
element_ids = self.with_investment
element_ids = self.data.with_investment
values = [self.storage(sid).capacity_in_flow_hours.minimum_or_fixed_size for sid in element_ids]
return stack_along_dim(values, self.dim_name, element_ids)

@functools.cached_property
def _size_upper(self) -> xr.DataArray:
"""(storage,) - maximum size for investment storages."""
element_ids = self.with_investment
element_ids = self.data.with_investment
values = [self.storage(sid).capacity_in_flow_hours.maximum_or_fixed_size for sid in element_ids]
return stack_along_dim(values, self.dim_name, element_ids)

@functools.cached_property
def _linked_periods_mask(self) -> xr.DataArray | None:
"""(storage, period) - linked periods for investment storages. None if no linking."""
element_ids = self.with_investment
element_ids = self.data.with_investment
linked_list = [self.storage(sid).capacity_in_flow_hours.linked_periods for sid in element_ids]
if not any(lp is not None for lp in linked_list):
return None
Expand All @@ -942,27 +905,27 @@ def _linked_periods_mask(self) -> xr.DataArray | None:
@functools.cached_property
def _mandatory_mask(self) -> xr.DataArray:
"""(storage,) bool - True if mandatory, False if optional."""
element_ids = self.with_investment
element_ids = self.data.with_investment
values = [self.storage(sid).capacity_in_flow_hours.mandatory for sid in element_ids]
return xr.DataArray(values, dims=[self.dim_name], coords={self.dim_name: element_ids})

@functools.cached_property
def _optional_lower(self) -> xr.DataArray | None:
"""(storage,) - minimum size for optional investment storages."""
if not self.with_optional_investment:
if not self.data.with_optional_investment:
return None

element_ids = self.with_optional_investment
element_ids = self.data.with_optional_investment
values = [self.storage(sid).capacity_in_flow_hours.minimum_or_fixed_size for sid in element_ids]
return stack_along_dim(values, self.dim_name, element_ids)

@functools.cached_property
def _optional_upper(self) -> xr.DataArray | None:
"""(storage,) - maximum size for optional investment storages."""
if not self.with_optional_investment:
if not self.data.with_optional_investment:
return None

element_ids = self.with_optional_investment
element_ids = self.data.with_optional_investment
values = [self.storage(sid).capacity_in_flow_hours.maximum_or_fixed_size for sid in element_ids]
return stack_along_dim(values, self.dim_name, element_ids)

Expand Down Expand Up @@ -1015,7 +978,7 @@ def create_variables(self) -> None:

logger.debug(
f'StoragesModel created variables: {len(self.elements)} storages, '
f'{len(self.storages_with_investment)} with investment'
f'{len(self.data.with_investment)} with investment'
)

def create_constraints(self) -> None:
Expand Down Expand Up @@ -1211,7 +1174,7 @@ def _add_batched_cluster_cyclic_constraints(self, charge_state) -> None:
@functools.cached_property
def size(self) -> linopy.Variable | None:
"""(storage, period, scenario) - size variable for storages with investment."""
if not self.storages_with_investment:
if not self.data.with_investment:
return None

size_min = self._size_lower
Expand All @@ -1238,12 +1201,12 @@ def size(self) -> linopy.Variable | None:
@functools.cached_property
def invested(self) -> linopy.Variable | None:
"""(storage, period, scenario) - binary invested variable for optional investment."""
if not self.optional_investment_ids:
if not self.data.with_optional_investment:
return None
return self.add_variables(
StorageVarName.INVESTED,
dims=('period', 'scenario'),
element_ids=self.optional_investment_ids,
element_ids=self.data.with_optional_investment,
binary=True,
)

Expand All @@ -1252,15 +1215,15 @@ def create_investment_model(self) -> None:

Must be called BEFORE create_investment_constraints().
"""
if not self.storages_with_investment:
if not self.data.with_investment:
return

from .features import InvestmentBuilder

dim = self.dim_name
element_ids = self.investment_ids
non_mandatory_ids = self.optional_investment_ids
mandatory_ids = self.mandatory_investment_ids
non_mandatory_ids = self.data.with_optional_investment
mandatory_ids = self.data.with_mandatory_investment

# Trigger variable creation via cached properties
size_var = self.size
Expand All @@ -1283,7 +1246,7 @@ def create_investment_model(self) -> None:
InvestmentBuilder.add_linked_periods_constraints(
model=self.model,
size_var=size_var,
params=self.invest_params,
params=self.data.invest_params,
element_ids=element_ids,
dim_name=dim,
)
Expand All @@ -1307,7 +1270,7 @@ def create_investment_constraints(self) -> None:

Uses the batched size variable for true vectorized constraint creation.
"""
if not self.storages_with_investment or StorageVarName.SIZE not in self:
if not self.data.with_investment or StorageVarName.SIZE not in self:
return

charge_state = self.charge
Expand Down Expand Up @@ -1344,7 +1307,7 @@ def create_investment_constraints(self) -> None:
)

logger.debug(
f'StoragesModel created batched investment constraints for {len(self.storages_with_investment)} storages'
f'StoragesModel created batched investment constraints for {len(self.data.with_investment)} storages'
)

def _add_initial_final_constraints_legacy(self, storage, cs) -> None:
Expand Down Expand Up @@ -1404,7 +1367,7 @@ def _create_piecewise_effects(self) -> None:
if size_var is None:
return

inv = self._investment_data
inv = self.data.investment_data
if inv is None or not inv.piecewise_element_ids:
return

Expand Down
14 changes: 3 additions & 11 deletions flixopt/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,14 +1016,6 @@ def __init__(self, model: FlowSystemModel, data: FlowsData):
self.create_status_model()
self.create_constraints()

@property
def _previous_status(self) -> dict[str, xr.DataArray]:
"""Previous status for flows that have it, keyed by label_full.

Delegates to FlowsData.previous_states.
"""
return self.data.previous_states

def _build_constraint_mask(self, selected_ids: set[str], reference_var: linopy.Variable) -> xr.DataArray:
"""Build a mask for constraint creation from selected flow IDs.

Expand Down Expand Up @@ -1473,11 +1465,11 @@ def constraint_switch_initial(self) -> None:
if self.startup is None:
return
dim = self.dim_name
ids = [eid for eid in self.data.with_startup_tracking if eid in self._previous_status]
ids = [eid for eid in self.data.with_startup_tracking if eid in self.data.previous_states]
if not ids:
return

prev_arrays = [self._previous_status[eid].expand_dims({dim: [eid]}) for eid in ids]
prev_arrays = [self.data.previous_states[eid].expand_dims({dim: [eid]}) for eid in ids]
prev_state = xr.concat(prev_arrays, dim=dim).isel(time=-1)

StatusBuilder.add_switch_initial_constraint(
Expand Down Expand Up @@ -1595,7 +1587,7 @@ def get_previous_status(self, flow: Flow) -> xr.DataArray | None:
DataArray of previous status (time dimension), or None if no previous status.
"""
fid = flow.label_full
return self._previous_status.get(fid)
return self.data.previous_states.get(fid)


class BusesModel(TypeModel):
Expand Down