diff --git a/docs/geos_mesh_docs/processing.rst b/docs/geos_mesh_docs/processing.rst index fec7f4b70..844c071af 100644 --- a/docs/geos_mesh_docs/processing.rst +++ b/docs/geos_mesh_docs/processing.rst @@ -3,6 +3,13 @@ Processing filters The `processing` module of `geos-mesh` package contains filters to process meshes. +geos.mesh.processing.CreateConstantAttributePerRegion filter +------------------------------------------------------------- + +.. automodule:: geos.mesh.processing.CreateConstantAttributePerRegion + :members: + :undoc-members: + :show-inheritance: geos.mesh.processing.FillPartialArrays filter ---------------------------------------------- diff --git a/docs/geos_posp_docs/PVplugins.rst b/docs/geos_posp_docs/PVplugins.rst index 91c859b68..293e0a65d 100644 --- a/docs/geos_posp_docs/PVplugins.rst +++ b/docs/geos_posp_docs/PVplugins.rst @@ -16,12 +16,6 @@ PVAttributeMapping plugin .. automodule:: PVplugins.PVAttributeMapping -PVCreateConstantAttributePerRegion plugin ---------------------------------------------------- - -.. automodule:: PVplugins.PVCreateConstantAttributePerRegion - - PVExtractMergeBlocksVolume plugin ------------------------------------------- diff --git a/docs/geos_pv_docs/processing.rst b/docs/geos_pv_docs/processing.rst index 439200789..f5fcaf243 100644 --- a/docs/geos_pv_docs/processing.rst +++ b/docs/geos_pv_docs/processing.rst @@ -1,8 +1,16 @@ Post-/Pre-processing ========================= + +PVCreateConstantAttributePerRegion +----------------------------------- + +.. automodule:: geos.pv.plugins.PVCreateConstantAttributePerRegion + + PVFillPartialArrays -------------------- + .. automodule:: geos.pv.plugins.PVFillPartialArrays diff --git a/geos-mesh/src/geos/mesh/processing/CreateConstantAttributePerRegion.py b/geos-mesh/src/geos/mesh/processing/CreateConstantAttributePerRegion.py new file mode 100644 index 000000000..70a90d26d --- /dev/null +++ b/geos-mesh/src/geos/mesh/processing/CreateConstantAttributePerRegion.py @@ -0,0 +1,419 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Romain Baville +import numpy as np +import numpy.typing as npt + +from typing import Union, Any +from typing_extensions import Self + +import vtkmodules.util.numpy_support as vnp +from vtkmodules.vtkCommonDataModel import ( + vtkMultiBlockDataSet, + vtkDataSet, +) + +from geos.utils.Logger import ( getLogger, Logger, logging, CountWarningHandler ) +from geos.mesh.utils.arrayHelpers import ( getArrayInObject, getComponentNames, getNumberOfComponents, + getVtkDataTypeInObject, isAttributeGlobal, getAttributePieceInfo, + checkValidValuesInDataSet, checkValidValuesInMultiBlock ) +from geos.mesh.utils.arrayModifiers import ( createAttribute, createConstantAttributeDataSet, + createConstantAttributeMultiBlock ) +from geos.mesh.utils.multiblockHelpers import getBlockElementIndexesFlatten + +__doc__ = """ +CreateConstantAttributePerRegion is a vtk filter that allows to create an attribute +with constant values per components for each chosen indexes of a reference/region attribute. +If other region indexes exist values are set to nan for float type, -1 for int type or 0 for uint type. + +Input mesh is either vtkMultiBlockDataSet or vtkDataSet and the region attribute must have one component. +The relation index/values is given by a dictionary. Its keys are the indexes and its items are the list of values for each component. +To use a handler of yours, set the variable 'speHandler' to True and add it using the member function addLoggerHandler. + +By default, the value type is set to float32, their is one component and no name and the logger use an intern handler. + +To use it: + +.. code-block:: python + + from geos.mesh.processing.CreateConstantAttributePerRegion import CreateConstantAttributePerRegion + + # Filter inputs. + mesh: Union[vtkMultiBlockDataSet, vtkDataSet] + regionName: str + dictRegionValues: dict[ Any, Any ] + newAttributeName: str + + # Optional inputs. + valueNpType: type + nbComponents: int + componentNames: tuple[ str, ... ] + speHandler: bool + + # Instantiate the filter + filter: CreateConstantAttributePerRegion = CreateConstantAttributePerRegion( mesh, + regionName, + dictRegionValues, + newAttributeName, + valueNpType, + nbComponents, + componentNames, + speHandler, + ) + + # Set your handler (only if speHandler is True). + yourHandler: logging.Handler + filter.addLoggerHandler( yourHandler ) + + # Do calculations. + filter.applyFilter() +""" + +loggerTitle: str = "Create Constant Attribute Per Region" + + +class CreateConstantAttributePerRegion: + + def __init__( + self: Self, + mesh: Union[ vtkDataSet, vtkMultiBlockDataSet ], + regionName: str, + dictRegionValues: dict[ Any, Any ], + newAttributeName: str, + valueNpType: type = np.float32, + nbComponents: int = 1, + componentNames: tuple[ str, ...] = (), # noqa: C408 + speHandler: bool = False, + ) -> None: + """Create an attribute with constant value per region. + + Args: + mesh (Union[ vtkDataSet, vtkMultiBlockDataSet ]): The mesh where to create the constant attribute per region. + regionName (str): The name of the attribute with the region indexes. + dictRegionValues (dict[ Any, Any ]): The dictionary with the region indexes as keys and the list of values as items. + newAttributeName (str): The name of the new attribute to create. + valueNpType (type, optional): The numpy scalar type for values. + Defaults to numpy.float32. + nbComponents (int, optional): Number of components for the new attribute. + Defaults to 1. + componentNames (tuple[str,...], optional): Name of the components for vectorial attributes. If one component, gives an empty tuple. + Defaults to an empty tuple. + speHandler (bool, optional): True to use a specific handler, False to use the internal handler. + Defaults to False. + """ + self.mesh: Union[ vtkDataSet, vtkMultiBlockDataSet ] = mesh + + # New attribute settings. + self.newAttributeName: str = newAttributeName + self.valueNpType: type = valueNpType + self.nbComponents: int = nbComponents + self.componentNames: tuple[ str, ...] = componentNames + + # Region attribute settings. + self.regionName: str = regionName + self.dictRegionValues: dict[ Any, Any ] = dictRegionValues + self.onPoints: Union[ None, bool ] + self.onBoth: bool + self.onPoints, self.onBoth = getAttributePieceInfo( self.mesh, self.regionName ) + + self.useDefaultValue: bool = False # Check if the new component have default values (information for the output message). + + # Warnings counter. + self.counter: CountWarningHandler = CountWarningHandler() + self.counter.setLevel( logging.INFO ) + + # Logger. + self.logger: Logger + if not speHandler: + self.logger = getLogger( loggerTitle, True ) + else: + self.logger = logging.getLogger( loggerTitle ) + self.logger.setLevel( logging.INFO ) + + def setLoggerHandler( self: Self, handler: logging.Handler ) -> None: + """Set a specific handler for the filter logger. + + In this filter 4 log levels are use, .info, .error, .warning and .critical, + be sure to have at least the same 4 levels. + + Args: + handler (logging.Handler): The handler to add. + """ + if not self.logger.hasHandlers(): + self.logger.addHandler( handler ) + else: + # This warning does not count for the number of warning created during the application of the filter. + self.logger.warning( + "The logger already has an handler, to use yours set the argument 'speHandler' to True during the filter initialization." + ) + + def applyFilter( self: Self ) -> bool: + """Create a constant attribute per region in the mesh. + + Returns: + boolean (bool): True if calculation successfully ended, False otherwise. + """ + self.logger.info( f"Apply filter { self.logger.name }." ) + + # Add the handler to count warnings messages. + self.logger.addHandler( self.counter ) + + # Check the validity of the attribute region. + if self.onPoints is None: + self.logger.error( f"{ self.regionName } is not in the mesh." ) + self.logger.error( f"The new attribute { self.newAttributeName } has not been added." ) + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + if self.onBoth: + self.logger.error( + f"There are two attributes named { self.regionName }, one on points and the other on cells. The region attribute must be unique." + ) + self.logger.error( f"The new attribute { self.newAttributeName } has not been added." ) + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + nbComponentsRegion: int = getNumberOfComponents( self.mesh, self.regionName, self.onPoints ) + if nbComponentsRegion != 1: + self.logger.error( f"The region attribute { self.regionName } has to many components, one is requires." ) + self.logger.error( f"The new attribute { self.newAttributeName } has not been added." ) + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + self._setInfoRegion() + # Check if the number of components and number of values for the region indexes are coherent. + for index in self.dictRegionValues: + if len( self.dictRegionValues[ index ] ) != self.nbComponents: + self.logger.error( + f"The number of value given for the region index { index } is not correct. You must set a value for each component, in this case { self.nbComponents }." + ) + return False + + listIndexes: list[ Any ] = list( self.dictRegionValues.keys() ) + validIndexes: list[ Any ] = [] + invalidIndexes: list[ Any ] = [] + regionArray: npt.NDArray[ Any ] + newArray: npt.NDArray[ Any ] + if isinstance( self.mesh, vtkMultiBlockDataSet ): + # Check if the attribute region is global. + if not isAttributeGlobal( self.mesh, self.regionName, self.onPoints ): + self.logger.error( f"The region attribute { self.regionName } has to be global." ) + self.logger.error( f"The new attribute { self.newAttributeName } has not been added." ) + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + validIndexes, invalidIndexes = checkValidValuesInMultiBlock( self.mesh, self.regionName, listIndexes, + self.onPoints ) + if len( validIndexes ) == 0: + if len( self.dictRegionValues ) == 0: + self.logger.warning( "No region indexes entered." ) + else: + self.logger.warning( + f"The region indexes entered are not in the region attribute { self.regionName }." ) + + if not createConstantAttributeMultiBlock( self.mesh, + self.defaultValue, + self.newAttributeName, + componentNames=self.componentNames, + onPoints=self.onPoints, + logger=self.logger ): + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + else: + if len( invalidIndexes ) > 0: + self.logger.warning( + f"The region indexes { invalidIndexes } are not in the region attribute { self.regionName }." ) + + # Parse the mesh to add the attribute on each dataset. + listFlatIdDataSet: list[ int ] = getBlockElementIndexesFlatten( self.mesh ) + for flatIdDataSet in listFlatIdDataSet: + dataSet: vtkDataSet = vtkDataSet.SafeDownCast( self.mesh.GetDataSet( flatIdDataSet ) ) + + regionArray = getArrayInObject( dataSet, self.regionName, self.onPoints ) + newArray = self._createArrayFromRegionArrayWithValueMap( regionArray ) + if not createAttribute( dataSet, + newArray, + self.newAttributeName, + componentNames=self.componentNames, + onPoints=self.onPoints, + logger=self.logger ): + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + else: + validIndexes, invalidIndexes = checkValidValuesInDataSet( self.mesh, self.regionName, listIndexes, + self.onPoints ) + if len( validIndexes ) == 0: + if len( self.dictRegionValues ) == 0: + self.logger.warning( "No region indexes entered." ) + else: + self.logger.warning( + f"The region indexes entered are not in the region attribute { self.regionName }." ) + + if not createConstantAttributeDataSet( self.mesh, + self.defaultValue, + self.newAttributeName, + componentNames=self.componentNames, + onPoints=self.onPoints, + logger=self.logger ): + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + else: + if len( invalidIndexes ) > 0: + self.logger.warning( + f"The region indexes { invalidIndexes } are not in the region attribute { self.regionName }." ) + + regionArray = getArrayInObject( self.mesh, self.regionName, self.onPoints ) + newArray = self._createArrayFromRegionArrayWithValueMap( regionArray ) + if not createAttribute( self.mesh, + newArray, + self.newAttributeName, + componentNames=self.componentNames, + onPoints=self.onPoints, + logger=self.logger ): + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + # Log the output message. + self._logOutputMessage( validIndexes ) + + return True + + def _setInfoRegion( self: Self ) -> None: + """Update self.dictRegionValues and set self.defaultValue. + + Values and default value type are set with the numpy type given by self.valueNpType. + Default value is set to nan for float data, -1 for int data and 0 for uint data. + """ + # Get the numpy type from the vtk typecode. + dictType: dict[ int, Any ] = vnp.get_vtk_to_numpy_typemap() + regionVtkType: int = getVtkDataTypeInObject( self.mesh, self.regionName, self.onPoints ) + regionNpType: type = dictType[ regionVtkType ] + + # Set the correct type of values and region index. + dictRegionValuesUpdateType: dict[ Any, Any ] = {} + for idRegion in self.dictRegionValues: + dictRegionValuesUpdateType[ regionNpType( idRegion ) ] = [ + self.valueNpType( value ) for value in self.dictRegionValues[ idRegion ] + ] + self.dictRegionValues = dictRegionValuesUpdateType + + # Set the list of default value for each component depending of the type. + self.defaultValue: list[ Any ] + ## Default value for float types is nan. + if self.valueNpType in ( np.float32, np.float64 ): + self.defaultValue = [ self.valueNpType( np.nan ) for _ in range( self.nbComponents ) ] + ## Default value for int types is -1. + elif self.valueNpType in ( np.int8, np.int16, np.int32, np.int64 ): + self.defaultValue = [ self.valueNpType( -1 ) for _ in range( self.nbComponents ) ] + ## Default value for uint types is 0. + elif self.valueNpType in ( np.uint8, np.uint16, np.uint32, np.uint64 ): + self.defaultValue = [ self.valueNpType( 0 ) for _ in range( self.nbComponents ) ] + + def _createArrayFromRegionArrayWithValueMap( self: Self, regionArray: npt.NDArray[ Any ] ) -> npt.NDArray[ Any ]: + """Create the array from the regionArray and the valueMap (self.valueMap) giving the relation between the values of the regionArray and the new one. + + For each element (idElement) of the regionArray: + - If the value (regionArray[idElement]) is mapped (a keys of the valueMap), valueArray[idElement] = self.valueMap[value]. + - If not, valueArray[idElement] = self.defaultValue. + + Args: + regionArray (npt.NDArray[Any]): The array with the region indexes. + + Returns: + npt.NDArray[Any]: The new array with values mapped from the regionArray. + """ + nbElements: int = len( regionArray ) + newArray: npt.NDArray[ Any ] + if self.nbComponents == 1: + newArray = np.ones( nbElements, self.valueNpType ) + else: + newArray = np.ones( ( nbElements, self.nbComponents ), self.valueNpType ) + + for idElement in range( nbElements ): + value: Any = regionArray[ idElement ] + if value in self.dictRegionValues: + if self.nbComponents == 1: + newArray[ idElement ] = self.dictRegionValues[ value ][ 0 ] + else: + newArray[ idElement ] = self.dictRegionValues[ value ] + else: + if self.nbComponents == 1: + newArray[ idElement ] = self.defaultValue[ 0 ] + self.useDefaultValue = True + else: + newArray[ idElement ] = self.defaultValue + self.useDefaultValue = True + + return newArray + + def _logOutputMessage( self: Self, trueIndexes: list[ Any ] ) -> None: + """Create and log result messages of the filter. + + Args: + trueIndexes (list[Any]): The list of the true region indexes use to create the attribute. + """ + # The Filter succeed. + self.logger.info( f"The filter { self.logger.name } succeeded." ) + + # Info about the created attribute. + ## The piece where the attribute is created. + piece: str = "points" if self.onPoints else "cells" + self.logger.info( f"The new attribute { self.newAttributeName } is created on { piece }." ) + + ## The number of component and they names if multiple. + componentNamesCreated: tuple[ str, ...] = getComponentNames( self.mesh, self.newAttributeName, self.onPoints ) + if self.nbComponents > 1: + messComponent: str = f"The new attribute { self.newAttributeName } has { self.nbComponents } components named { componentNamesCreated }." + if componentNamesCreated != self.componentNames: + ### Warn the user because other component names than those given have been used. + self.logger.warning( messComponent ) + else: + self.logger.info( messComponent ) + + ## The values of the attribute. + messValue: str = f"The new attribute { self.newAttributeName } is constant" + if len( trueIndexes ) == 0: + ### Create the message to have the value of each component. + messValue = f"{ messValue } with" + if self.nbComponents > 1: + for idComponent in range( self.nbComponents ): + messValue = f"{ messValue } the value { self.defaultValue[ idComponent ] } for the component { componentNamesCreated[ idComponent ] }," + messValue = f"{ messValue[:-1] }." + else: + messValue = f"{ messValue } the value { self.defaultValue[ 0 ] }." + ### Warn the user because no region index has been used. + self.logger.warning( messValue ) + + else: + ### Create the message to have for each component the value of the region index. + messValue = f"{ messValue } per region indexes with:\n" + for index in trueIndexes: + messValue = f"{ messValue }\tThe value { self.dictRegionValues[ index ][ 0 ] } for the" + if self.nbComponents > 1: + messValue = f"{ messValue } component { componentNamesCreated[ 0 ] }," + for idComponent in range( 1, self.nbComponents - 1 ): + messValue = f"{ messValue } the value { self.dictRegionValues[ index ][ idComponent ] } for the component { componentNamesCreated[ idComponent ] }," + messValue = f"{ messValue[ : -1 ] } and the value { self.dictRegionValues[ index ][ -1 ] } for the component { componentNamesCreated[ -1 ] } for the index { index }.\n" + else: + messValue = f"{ messValue } index { index }.\n" + + if self.useDefaultValue: + messValue = f"{ messValue }\tThe value { self.defaultValue[ 0 ] } for the" + if self.nbComponents > 1: + messValue = f"{ messValue } component { componentNamesCreated[ 0 ] }," + for idComponent in range( 1, self.nbComponents - 1 ): + messValue = f"{ messValue } the value { self.defaultValue[ idComponent ] } for the component { componentNamesCreated[ idComponent ] }," + messValue = f"{ messValue[ : -1 ] } and the value { self.defaultValue[ -1 ] } for the component { componentNamesCreated[ -1 ] } for the other indexes." + else: + messValue = f"{ messValue } other indexes." + ### Warn the user because a default value has been used. + self.logger.warning( messValue ) + else: + if self.counter.warningCount > 0: + ### Warn the user because other component names than those given have been used. + self.logger.warning( messValue ) + else: + self.logger.info( messValue ) diff --git a/geos-mesh/src/geos/mesh/processing/FillPartialArrays.py b/geos-mesh/src/geos/mesh/processing/FillPartialArrays.py index b89df6114..1810bc735 100644 --- a/geos-mesh/src/geos/mesh/processing/FillPartialArrays.py +++ b/geos-mesh/src/geos/mesh/processing/FillPartialArrays.py @@ -7,7 +7,7 @@ from geos.utils.Logger import logging, Logger, getLogger from geos.mesh.utils.arrayModifiers import fillPartialAttributes -from geos.mesh.utils.arrayHelpers import isAttributeInObject +from geos.mesh.utils.arrayHelpers import getAttributePieceInfo from vtkmodules.vtkCommonDataModel import vtkMultiBlockDataSet @@ -23,7 +23,7 @@ 0 for uint data, -1 for int data and nan for float data. To use a handler of yours for the logger, set the variable 'speHandler' to True and add it to the filter -with the member function addLoggerHandler. +with the member function setLoggerHandler. To use it: @@ -42,7 +42,7 @@ # Set the handler of yours (only if speHandler is True). yourHandler: logging.Handler - filter.addLoggerHandler( yourHandler ) + filter.setLoggerHandler( yourHandler ) # Do calculations. filter.applyFilter() @@ -106,15 +106,17 @@ def applyFilter( self: Self ) -> bool: """ self.logger.info( f"Apply filter { self.logger.name }." ) + onPoints: Union[ None, bool ] + onBoth: bool for attributeName in self.dictAttributesValues: - self._setPieceRegionAttribute( attributeName ) - if self.onPoints is None: + onPoints, onBoth = getAttributePieceInfo( self.multiBlockDataSet, attributeName ) + if onPoints is None: self.logger.error( f"{ attributeName } is not in the mesh." ) self.logger.error( f"The attribute { attributeName } has not been filled." ) self.logger.error( f"The filter { self.logger.name } failed." ) return False - if self.onBoth: + if onBoth: self.logger.error( f"Their is two attribute named { attributeName }, one on points and the other on cells. The attribute must be unique." ) @@ -124,7 +126,7 @@ def applyFilter( self: Self ) -> bool: if not fillPartialAttributes( self.multiBlockDataSet, attributeName, - onPoints=self.onPoints, + onPoints=onPoints, listValues=self.dictAttributesValues[ attributeName ], logger=self.logger ): self.logger.error( f"The filter { self.logger.name } failed." ) @@ -133,22 +135,3 @@ def applyFilter( self: Self ) -> bool: self.logger.info( f"The filter { self.logger.name } succeed." ) return True - - def _setPieceRegionAttribute( self: Self, attributeName: str ) -> None: - """Set the attribute self.onPoints and self.onBoth. - - self.onPoints is True if the region attribute is on points, False if it is on cells, None otherwise. - - self.onBoth is True if a region attribute is on points and on cells, False otherwise. - - Args: - attributeName (str): The name of the attribute to verify. - """ - self.onPoints: Union[ bool, None ] = None - self.onBoth: bool = False - if isAttributeInObject( self.multiBlockDataSet, attributeName, False ): - self.onPoints = False - if isAttributeInObject( self.multiBlockDataSet, attributeName, True ): - if self.onPoints is False: - self.onBoth = True - self.onPoints = True diff --git a/geos-mesh/src/geos/mesh/utils/arrayHelpers.py b/geos-mesh/src/geos/mesh/utils/arrayHelpers.py index 43f363714..802a552bf 100644 --- a/geos-mesh/src/geos/mesh/utils/arrayHelpers.py +++ b/geos-mesh/src/geos/mesh/utils/arrayHelpers.py @@ -47,6 +47,102 @@ def has_array( mesh: vtkUnstructuredGrid, array_names: list[ str ] ) -> bool: return False +def getAttributePieceInfo( + mesh: Union[ vtkDataSet, vtkMultiBlockDataSet ], + attributeName: str, +) -> tuple[ Union[ None, bool ], bool ]: + """Get the attribute piece information. + + Two information are given: + - onPoints (Union[None, bool]): True if the attribute is on points or on both pieces, False if it is on cells, None otherwise. + - onBoth (bool): True if the attribute is on points and on cells, False otherwise. + + Args: + mesh (Union[vtkDataSet, vtkMultiBlockDataSet]): The mesh with the attribute. + attributeName (str): The name of the attribute. + + Returns: + tuple[Union[None, bool], bool]: The piece information of the attribute. + """ + onPoints: Union[ bool, None ] = None + onBoth: bool = False + if isAttributeInObject( mesh, attributeName, False ): + onPoints = False + if isAttributeInObject( mesh, attributeName, True ): + if onPoints is False: + onBoth = True + onPoints = True + + return ( onPoints, onBoth ) + + +def checkValidValuesInMultiBlock( + multiBlockDataSet: vtkMultiBlockDataSet, + attributeName: str, + listValues: list[ Any ], + onPoints: bool, +) -> tuple[ list[ Any ], list[ Any ] ]: + """Check if each value is valid , ie if that value is a data of the attribute in at least one dataset of the multiblock. + + Args: + multiBlockDataSet (vtkMultiBlockDataSet): The multiblock dataset mesh to check. + attributeName (str): The name of the attribute with the data. + listValues (list[Any]): The list of values to check. + onPoints (bool): True if the attribute is on points, False if on cells. + + Returns: + tuple[list[Any], list[Any]]: Tuple containing the list of valid values and the list of the invalid ones. + """ + validValues: list[ Any ] = [] + invalidValues: list[ Any ] = [] + listFlatIdDataSet: list[ int ] = getBlockElementIndexesFlatten( multiBlockDataSet ) + for flatIdDataSet in listFlatIdDataSet: + dataSet: vtkDataSet = vtkDataSet.SafeDownCast( multiBlockDataSet.GetDataSet( flatIdDataSet ) ) + # Get the valid values of the dataset. + validValuesDataSet: list[ Any ] = checkValidValuesInDataSet( dataSet, attributeName, listValues, onPoints )[ 0 ] + + # Keep the new true values. + for value in validValuesDataSet: + if value not in validValues: + validValues.append( value ) + + # Get the false indexes. + for value in listValues: + if value not in validValues: + invalidValues.append( value ) + + return ( validValues, invalidValues ) + + +def checkValidValuesInDataSet( + dataSet: vtkDataSet, + attributeName: str, + listValues: list[ Any ], + onPoints: bool, +) -> tuple[ list[ Any ], list[ Any ] ]: + """Check if each value is valid , ie if that value is a data of the attribute in the dataset. + + Args: + dataSet (vtkDataSet): The dataset mesh to check. + attributeName (str): The name of the attribute with the data. + listValues (list[Any]): The list of values to check. + onPoints (bool): True if the attribute is on points, False if on cells. + + Returns: + tuple[list[Any], list[Any]]: Tuple containing the list of valid values and the list of the invalid ones. + """ + attributeNpArray = getArrayInObject( dataSet, attributeName, onPoints ) + validValues: list[ Any ] = [] + invalidValues: list[ Any ] = [] + for value in listValues: + if value in attributeNpArray: + validValues.append( value ) + else: + invalidValues.append( value ) + + return ( validValues, invalidValues ) + + def getFieldType( data: vtkFieldData ) -> str: """Returns whether the data is "vtkFieldData", "vtkCellData" or "vtkPointData". @@ -357,6 +453,24 @@ def getArrayInObject( dataSet: vtkDataSet, attributeName: str, onPoints: bool ) return npArray +def getVtkDataTypeInObject( multiBlockDataSet: Union[ vtkDataSet, vtkMultiBlockDataSet ], attributeName: str, + onPoints: bool ) -> int: + """Return VTK type of requested array from input mesh. + + Args: + multiBlockDataSet (Union[vtkDataSet, vtkMultiBlockDataSet]): Input multiBlockDataSet. + attributeName (str): Name of the attribute. + onPoints (bool): True if attributes are on points, False if they are on cells. + + Returns: + int: The type of the vtk array corresponding to input attribute name. + """ + if isinstance( multiBlockDataSet, vtkDataSet ): + return getVtkArrayTypeInObject( multiBlockDataSet, attributeName, onPoints ) + else: + return getVtkArrayTypeInMultiBlock( multiBlockDataSet, attributeName, onPoints ) + + def getVtkArrayTypeInObject( dataSet: vtkDataSet, attributeName: str, onPoints: bool ) -> int: """Return VTK type of requested array from dataset input. diff --git a/geos-mesh/tests/test_CreateConstantAttributePerRegion.py b/geos-mesh/tests/test_CreateConstantAttributePerRegion.py new file mode 100644 index 000000000..ca1c08409 --- /dev/null +++ b/geos-mesh/tests/test_CreateConstantAttributePerRegion.py @@ -0,0 +1,122 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Romain Baville +# SPDX-License-Identifier: Apache 2.0 +# ruff: noqa: E402 # disable Module level import not at top of file +# mypy: disable-error-code="operator" +import pytest +from typing import Union, Any +from vtkmodules.vtkCommonDataModel import ( vtkDataSet, vtkMultiBlockDataSet ) + +from geos.mesh.processing.CreateConstantAttributePerRegion import CreateConstantAttributePerRegion, np + + +@pytest.mark.parametrize( + "meshType, newAttributeName, regionName, dictRegionValues, componentNames, componentNamesTest, valueNpType, succeed", + [ + # Test the name of the new attribute (new on the mesh, one present on the other piece). + ## For vtkDataSet. + ( "dataset", "newAttribute", "GLOBAL_IDS_POINTS", {}, (), (), np.float32, True ), + ( "dataset", "CellAttribute", "GLOBAL_IDS_POINTS", {}, (), (), np.float32, True ), + ## For vtkMultiBlockDataSet. + ( "multiblock", "newAttribute", "GLOBAL_IDS_POINTS", {}, (), (), np.float32, True ), + ( "multiblock", "CellAttribute", "GLOBAL_IDS_POINTS", {}, (), (), np.float32, True ), + ( "multiblock", "GLOBAL_IDS_CELLS", "GLOBAL_IDS_POINTS", {}, (), (), np.float32, True ), + # Test if the region attribute is on cells or on points. + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.float32, True ), + # Test the component name. + ( "dataset", "newAttribute", "FAULT", {}, ( "X" ), (), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), ( "Component0", "Component1" ), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", {}, ( "X" ), ( "Component0", "Component1" ), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", {}, ( "X", "Y" ), ( "X", "Y" ), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", {}, ( "X", "Y", "Z" ), ( "X", "Y" ), np.float32, True ), + # Test the type of value. + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.int8, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.int16, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.int32, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.int64, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.uint8, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.uint16, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.uint32, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.uint64, True ), + ( "dataset", "newAttribute", "FAULT", {}, (), (), np.float64, True ), + # Test index/value. + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0 ], + 100: [ 1 ] + }, (), (), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0 ], + 100: [ 1 ], + 101: [ 2 ] + }, (), (), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0 ], + 100: [ 1 ], + 101: [ 2 ], + 2: [ 3 ] + }, (), (), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0, 0 ], + 100: [ 1, 1 ] + }, (), ( "Component0", "Component1" ), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0, 0 ], + 100: [ 1, 1 ], + 101: [ 2, 2 ] + }, (), ( "Component0", "Component1" ), np.float32, True ), + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0, 0 ], + 100: [ 1, 1 ], + 101: [ 2, 2 ], + 2: [ 3, 3 ] + }, (), ( "Component0", "Component1" ), np.float32, True ), + # Test common error. + ## Number of components. + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0 ], + 100: [ 1, 1 ] + }, (), (), np.float32, False ), # Number of value inconsistent. + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0, 0 ], + 100: [ 1, 1 ] + }, (), (), np.float32, False ), # More values than components. + ( "dataset", "newAttribute", "FAULT", { + 0: [ 0 ], + 100: [ 1 ] + }, ( "X", "Y" ), ( "X", "Y" ), np.float32, False ), # More components than value. + ## Attribute name. + ( "dataset", "PERM", "FAULT", {}, (), (), np.float32, False ), # The attribute name already exist. + ## Region attribute. + ( "dataset", "newAttribute", "PERM", {}, (), + (), np.float32, False ), # Region attribute has too many components. + ( "multiblock", "newAttribute", "FAULT", {}, (), (), np.float32, False ), # Region attribute is partial. + ] ) +def test_CreateConstantAttributePerRegion( + dataSetTest: Union[ vtkMultiBlockDataSet, vtkDataSet ], + meshType: str, + newAttributeName: str, + regionName: str, + dictRegionValues: dict[ Any, Any ], + componentNames: tuple[ str, ...], + componentNamesTest: tuple[ str, ...], + valueNpType: int, + succeed: bool, +) -> None: + """Test CreateConstantAttributePerRegion.""" + mesh: Union[ vtkMultiBlockDataSet, vtkDataSet ] = dataSetTest( meshType ) + nbComponents: int = len( componentNamesTest ) + if nbComponents == 0: # If the attribute has one component, the component has no name. + nbComponents += 1 + + filter: CreateConstantAttributePerRegion = CreateConstantAttributePerRegion( + mesh, + regionName, + dictRegionValues, + newAttributeName, + valueNpType=valueNpType, + nbComponents=nbComponents, + componentNames=componentNames, + ) + + assert filter.applyFilter() == succeed diff --git a/geos-mesh/tests/test_arrayHelpers.py b/geos-mesh/tests/test_arrayHelpers.py index 35951f74c..cbfaf50d4 100644 --- a/geos-mesh/tests/test_arrayHelpers.py +++ b/geos-mesh/tests/test_arrayHelpers.py @@ -5,7 +5,7 @@ # ruff: noqa: E402 # disable Module level import not at top of file # mypy: disable-error-code="operator, attr-defined" import pytest -from typing import Tuple +from typing import Tuple, Union, Any import numpy as np import numpy.typing as npt @@ -16,6 +16,7 @@ from vtkmodules.vtkCommonDataModel import vtkDataSet, vtkMultiBlockDataSet, vtkPolyData from geos.mesh.utils import arrayHelpers +from geos.mesh.utils.arrayModifiers import createConstantAttribute @pytest.mark.parametrize( "onpoints, expected", [ ( True, { @@ -39,6 +40,52 @@ def test_getAttributeFromMultiBlockDataSet( dataSetTest: vtkMultiBlockDataSet, o assert attributes == expected +@pytest.mark.parametrize( "attributeName, onPointsTest, onBothTest", [ + ( "CellAttribute", False, False ), + ( "PointAttribute", True, False ), + ( "NewAttribute", None, False ), + ( "NewAttribute", True, True ), +] ) +def test_getAttributePieceInfo( + dataSetTest: vtkDataSet, + attributeName: str, + onPointsTest: Union[ None, bool ], + onBothTest: bool, +) -> None: + """Test getting attribute piece information.""" + dataSet: vtkDataSet = dataSetTest( "dataset" ) + if onBothTest: # Create a case with an attribute with the same name on points and on cells. + createConstantAttribute( dataSet, [ 42. ], attributeName, onPoints=True ) + createConstantAttribute( dataSet, [ 42. ], attributeName, onPoints=False ) + onPoints: Union[ None, bool ] + onBoth: bool + onPoints, onBoth = arrayHelpers.getAttributePieceInfo( dataSet, attributeName ) + assert onPoints == onPointsTest + assert onBoth == onBothTest + + +@pytest.mark.parametrize( "attributeName, listValues, onPoints, validValuesTest, invalidValuesTest", [ + ( "PointAttribute", [ [ 12.4, 9.7, 10.5 ], [ 0, 0, 0 ] ], True, [ [ 12.4, 9.7, 10.5 ] ], [ [ 0, 0, 0 ] ] ), + ( "CellAttribute", [ [ 24.8, 19.4, 21 ], [ 0, 0, 0 ] ], False, [ [ 24.8, 19.4, 21 ] ], [ [ 0, 0, 0 ] ] ), + ( "FAULT", [ 0, 100, 101, 2 ], False, [ 0, 100, 101 ], [ 2 ] ), +] ) +def test_checkValidValuesInDataSet( + dataSetTest: vtkDataSet, + attributeName: str, + listValues: list[ Any ], + onPoints: bool, + validValuesTest: list[ Any ], + invalidValuesTest: list[ Any ], +) -> None: + """Test the function checkValidValuesInDataSet.""" + dataSet: vtkDataSet = dataSetTest( "dataset" ) + validValues: list[ Any ] + invalidValues: list[ Any ] + validValues, invalidValues = arrayHelpers.checkValidValuesInDataSet( dataSet, attributeName, listValues, onPoints ) + assert validValues == validValuesTest + assert invalidValues == invalidValuesTest + + @pytest.mark.parametrize( "onpoints, expected", [ ( True, { 'GLOBAL_IDS_POINTS': 1, 'PointAttribute': 3, diff --git a/geos-posp/src/PVplugins/PVCreateConstantAttributePerRegion.py b/geos-posp/src/PVplugins/PVCreateConstantAttributePerRegion.py deleted file mode 100644 index 8f25df08a..000000000 --- a/geos-posp/src/PVplugins/PVCreateConstantAttributePerRegion.py +++ /dev/null @@ -1,303 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. -# SPDX-FileContributor: Martin Lemay -# ruff: noqa: E402 # disable Module level import not at top of file -import os -import sys -from typing import Union - -import numpy as np -import numpy.typing as npt -from typing_extensions import Self - -dir_path = os.path.dirname( os.path.realpath( __file__ ) ) -parent_dir_path = os.path.dirname( dir_path ) -if parent_dir_path not in sys.path: - sys.path.append( parent_dir_path ) - -import PVplugins # noqa: F401 - -import vtkmodules.util.numpy_support as vnp -from geos.utils.Logger import Logger, getLogger -from geos.mesh.utils.multiblockHelpers import ( - getBlockElementIndexesFlatten, - getBlockFromFlatIndex, -) -from geos.mesh.utils.arrayHelpers import isAttributeInObject -from paraview.util.vtkAlgorithm import ( # type: ignore[import-not-found] - VTKPythonAlgorithmBase, smdomain, smhint, smproperty, smproxy, -) -from vtk import VTK_DOUBLE # type: ignore[import-untyped] -from vtkmodules.vtkCommonCore import ( - vtkDataArray, - vtkInformation, - vtkInformationVector, -) -from vtkmodules.vtkCommonDataModel import ( - vtkDataObject, - vtkMultiBlockDataSet, - vtkUnstructuredGrid, -) - -__doc__ = """ -PVCreateConstantAttributePerRegion is a Paraview plugin that allows to -create 2 attributes whom values are constant for each region index. - -Input and output are either vtkMultiBlockDataSet or vtkUnstructuredGrid. - -To use it: - -* Load the module in Paraview: Tools>Manage Plugins...>Load new>PVCreateConstantAttributePerRegion. -* Select the mesh you want to create the attributes and containing a region attribute. -* Search and Apply Create Constant Attribute Per Region Filter. - -""" - -SOURCE_NAME: str = "" -DEFAULT_REGION_ATTRIBUTE_NAME = "region" - - -@smproxy.filter( - name="PVCreateConstantAttributePerRegion", - label="Create Constant Attribute Per Region", -) -@smhint.xml( """""" ) -@smproperty.input( name="Input", port_index=0 ) -@smdomain.datatype( - dataTypes=[ "vtkMultiBlockDataSet", "vtkUnstructuredGrid" ], - composite_data_supported=True, -) -class PVCreateConstantAttributePerRegion( VTKPythonAlgorithmBase ): - - def __init__( self: Self ) -> None: - """Create an attribute with constant value per region.""" - super().__init__( nInputPorts=1, nOutputPorts=1, outputType="vtkDataSet" ) - - self.m_table: list[ tuple[ int, float ] ] = [] - self.m_regionAttributeName: str = DEFAULT_REGION_ATTRIBUTE_NAME - self.m_attributeName: str = "attribute" - - # logger - self.m_logger: Logger = getLogger( "Create Constant Attribute Per Region Filter" ) - - def SetLogger( self: Self, logger: Logger ) -> None: - """Set filter logger. - - Args: - logger (Logger): logger - """ - self.m_logger = logger - - @smproperty.xml( """ - - - - - - - - Select an attribute containing the indexes of the regions - - - """ ) - def a01SetRegionAttributeName( self: Self, name: str ) -> None: - """Set region attribute name.""" - self.m_regionAttributeName = name - self.Modified() - - @smproperty.xml( """ - - - Name of the new attribute - - - """ ) - def a02SetAttributeName( self: Self, value: str ) -> None: - """Set attribute name. - - Args: - value (str): attribute name. - """ - self.m_attributeName = value - self.Modified() - - @smproperty.xml( """ - - - - - - - - - Set new attributes values for each region index. - - - """ ) - def b01SetAttributeValues( self: Self, regionIndex: int, value: float ) -> None: - """Set attribute values per region. - - Args: - regionIndex (int): region index. - - value (float): attribute value. - """ - self.m_table.append( ( regionIndex, value ) ) - self.Modified() - - @smproperty.xml( """ - - """ ) - def b02GroupFlow( self: Self ) -> None: - """Organize groups.""" - self.Modified() - - def RequestDataObject( - self: Self, - request: vtkInformation, - inInfoVec: list[ vtkInformationVector ], - outInfoVec: vtkInformationVector, - ) -> int: - """Inherited from VTKPythonAlgorithmBase::RequestDataObject. - - Args: - request (vtkInformation): request - inInfoVec (list[vtkInformationVector]): input objects - outInfoVec (vtkInformationVector): output objects - - Returns: - int: 1 if calculation successfully ended, 0 otherwise. - """ - inData = self.GetInputData( inInfoVec, 0, 0 ) - outData = self.GetOutputData( outInfoVec, 0 ) - assert inData is not None - if outData is None or ( not outData.IsA( inData.GetClassName() ) ): - outData = inData.NewInstance() - outInfoVec.GetInformationObject( 0 ).Set( outData.DATA_OBJECT(), outData ) - return super().RequestDataObject( request, inInfoVec, outInfoVec ) # type: ignore[no-any-return] - - def RequestData( - self: Self, - request: vtkInformation, # noqa: F841 - inInfoVec: list[ vtkInformationVector ], # noqa: F841 - outInfoVec: vtkInformationVector, # noqa: F841 - ) -> int: - """Inherited from VTKPythonAlgorithmBase::RequestData. - - Args: - request (vtkInformation): request - inInfoVec (list[vtkInformationVector]): input objects - outInfoVec (vtkInformationVector): output objects - - Returns: - int: 1 if calculation successfully ended, 0 otherwise. - """ - self.m_logger.info( f"Apply filter {__name__}" ) - try: - input0: Union[ vtkUnstructuredGrid, vtkMultiBlockDataSet ] = ( self.GetInputData( inInfoVec, 0, 0 ) ) - output: Union[ vtkUnstructuredGrid, vtkMultiBlockDataSet ] = ( self.GetOutputData( outInfoVec, 0 ) ) - - assert input0 is not None, "Input Surface is null." - assert output is not None, "Output pipeline is null." - - output.ShallowCopy( input0 ) - - assert ( len( self.m_regionAttributeName ) - > 0 ), "Region attribute is undefined, please select an attribute." - if isinstance( output, vtkMultiBlockDataSet ): - self.createAttributesMultiBlock( output ) - else: - self.createAttributes( output ) - - mess: str = ( f"The new attribute {self.m_attributeName} was successfully added." ) - self.Modified() - self.m_logger.info( mess ) - except AssertionError as e: - mess1: str = "The new attribute was not added due to:" - self.m_logger.error( mess1 ) - self.m_logger.error( e, exc_info=True ) - return 0 - except Exception as e: - mess0: str = "The new attribute was not added due to:" - self.m_logger.critical( mess0 ) - self.m_logger.critical( e, exc_info=True ) - return 0 - self.m_compute = True - return 1 - - def createAttributesMultiBlock( self: Self, output: vtkMultiBlockDataSet ) -> None: - """Create attributes on vtkMultiBlockDataSet from input data. - - Args: - output (vtkMultiBlockDataSet): mesh where to create the attributes. - """ - # for each block - blockIndexes: list[ int ] = getBlockElementIndexesFlatten( output ) - for blockIndex in blockIndexes: - block0: vtkDataObject = getBlockFromFlatIndex( output, blockIndex ) - assert block0 is not None, "Block is undefined." - block: vtkUnstructuredGrid = vtkUnstructuredGrid.SafeDownCast( block0 ) - try: - self.createAttributes( block ) - except AssertionError as e: - self.m_logger.warning( f"Block {blockIndex}: {e}" ) - output.Modified() - - def createAttributes( self: Self, mesh: vtkUnstructuredGrid ) -> None: - """Create attributes on vtkUnstructuredGrid from input data. - - Args: - mesh (vtkUnstructuredGrid): mesh where to create the attributes. - """ - assert isAttributeInObject( mesh, self.m_regionAttributeName, - False ), f"{self.m_regionAttributeName} is not in the mesh." - regionAttr: vtkDataArray = mesh.GetCellData().GetArray( self.m_regionAttributeName ) - assert regionAttr is not None, "Region attribute is undefined" - npArray: npt.NDArray[ np.float64 ] = self.createNpArray( regionAttr ) - newAttr: vtkDataArray = vnp.numpy_to_vtk( npArray, True, VTK_DOUBLE ) - newAttr.SetName( self.m_attributeName ) - mesh.GetCellData().AddArray( newAttr ) - mesh.GetCellData().Modified() - mesh.Modified() - - def createNpArray( self: Self, regionAttr: vtkDataArray ) -> npt.NDArray[ np.float64 ]: - """Create numpy arrays from input data. - - Args: - regionAttr (vtkDataArray): Region attribute - - Returns: - npt.NDArray[np.float64]: numpy array of the new attribute. - """ - regionNpArray: npt.NDArray[ np.float64 ] = vnp.vtk_to_numpy( regionAttr ) - npArray: npt.NDArray[ np.float64 ] = np.full_like( regionNpArray, np.nan ) - # for each region - for regionIndex, value in self.m_table: - if regionIndex in np.unique( regionNpArray ): - mask: npt.NDArray[ np.bool_ ] = regionNpArray == regionIndex - npArray[ mask ] = value - else: - self.m_logger.warning( f"Index {regionIndex} is not in the values of the region" + - f" attribute '{regionAttr.GetName()}'" ) - return npArray diff --git a/geos-pv/src/geos/pv/plugins/PVCreateConstantAttributePerRegion.py b/geos-pv/src/geos/pv/plugins/PVCreateConstantAttributePerRegion.py new file mode 100644 index 000000000..fe6a61b41 --- /dev/null +++ b/geos-pv/src/geos/pv/plugins/PVCreateConstantAttributePerRegion.py @@ -0,0 +1,355 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Martin Lemay, Romain Baville +# ruff: noqa: E402 # disable Module level import not at top of file +import sys +from pathlib import Path + +from typing import Union, Any +from typing_extensions import Self + +from paraview.util.vtkAlgorithm import ( # type: ignore[import-not-found] + smdomain, smhint, smproperty, smproxy, +) # source: https://github.com/Kitware/ParaView/blob/master/Wrapping/Python/paraview/util/vtkAlgorithm.py +from paraview.detail.loghandler import ( # type: ignore[import-not-found] + VTKHandler, +) # source: https://github.com/Kitware/ParaView/blob/master/Wrapping/Python/paraview/detail/loghandler.py + +from vtkmodules.util.vtkAlgorithm import VTKPythonAlgorithmBase +from vtkmodules.vtkCommonCore import ( + vtkInformation, + vtkInformationVector, +) +from vtkmodules.vtkCommonDataModel import ( + vtkMultiBlockDataSet, + vtkDataSet, +) + +# update sys.path to load all GEOS Python Package dependencies +geos_pv_path: Path = Path( __file__ ).parent.parent.parent.parent.parent +sys.path.insert( 0, str( geos_pv_path / "src" ) ) +from geos.pv.utils.config import update_paths + +update_paths() + +from geos.mesh.processing.CreateConstantAttributePerRegion import CreateConstantAttributePerRegion, vnp, np + +__doc__ = """ +PVCreateConstantAttributePerRegion is a Paraview plugin that allows to create an attribute +with constant values per components for each chosen indexes of a reference/region attribute. +If other region indexes exist, values are set to nan for float type, -1 for int type or 0 for uint type. + +Input mesh is either vtkMultiBlockDataSet or vtkDataSet and the region attribute must have one component. +The relation index/values is given by a dictionary. Its keys are the indexes and its items are the list of values for each component. + +.. Warning:: + The input mesh should contain an attribute corresponding to the regions. + +To use it: + +* Load the module in Paraview: Tools>Manage Plugins...>Load new>PVCreateConstantAttributePerRegion. +* Select the mesh in which you want to create the attributes. +* Select the filter Create Constant Attribute Per Region in filter|0- Geos Pre-processing. +* Choose the region attribute, the relation index/values, the new attribute name, the type of the value, the number of components and their names. +* Apply. + +""" + + +@smproxy.filter( + name="PVCreateConstantAttributePerRegion", + label="Create Constant Attribute Per Region", +) +@smhint.xml( """""" ) +@smproperty.input( name="Input", port_index=0 ) +@smdomain.datatype( + dataTypes=[ "vtkMultiBlockDataSet", "vtkDataSet" ], + composite_data_supported=True, +) +class PVCreateConstantAttributePerRegion( VTKPythonAlgorithmBase ): + + def __init__( self: Self ) -> None: + """Create an attribute with constant value per region.""" + super().__init__( nInputPorts=1, nOutputPorts=1, inputType="vtkDataObject", outputType="vtkDataObject" ) + + self.clearDictRegionValues: bool = True + + # Region attribute settings. + self.regionName: str = "" + self.dictRegionValues: dict[ Any, Any ] = {} + + # New attribute settings. + self.newAttributeName: str = "newAttribute" + self.valueNpType: type = np.float32 + self.nbComponents: int = 1 + self.componentNames: tuple[ str, ...] = () + + # Use the handler of paraview for the log. + self.speHandler: bool = True + + # Settings of the attribute with the region indexes: + @smproperty.stringvector( + name="ChooseRegionAttribute", + label="Attribute with region indexes", + default_values="Choose an attribute", + number_of_elements="1", + element_types="2", + ) + @smdomain.xml( """ + + + + + + + Select the attribute to consider for regions indexes. + + + + + """ ) + def _setRegionAttributeName( self: Self, regionName: str ) -> None: + """Set region attribute name. + + Args: + regionName (str): The name of the attribute to consider as the region attribute. + """ + self.regionName = regionName + self.Modified() + + @smproperty.xml( """ + + + Set the value of the new attribute for each region indexes, use a comma between the value of each components:\n + valueRegionIndex : valueComponent1, valueComponent2 ...\n + If the region attribute has other indexes than those given, a default value is use:\n + 0 for uint type, -1 for int type and nan for float type. + + + + + + + + + + """ ) + def _setDictRegionValues( self: Self, regionIndex: str, value: str ) -> None: + """Set the dictionary with the region indexes and its corresponding list of values for each components. + + Args: + regionIndex (str): Region index of the region attribute to consider. + value (str): List of value to use for the regionIndex. If multiple components use a comma between the value of each component. + """ + if self.clearDictRegionValues: + self.dictRegionValues = {} + self.clearDictRegionValues = False + + if regionIndex is not None and value is not None: + self.dictRegionValues[ regionIndex ] = list( value.split( "," ) ) + + self.Modified() + + @smproperty.xml( """ + + + + + """ ) + def _groupRegionAttributeSettingsWidgets( self: Self ) -> None: + """Group the widgets to set the settings of the region attribute.""" + self.Modified() + + # Settings of the new attribute: + @smproperty.xml( """ + + + Name of the new attribute to create. + + + """ ) + def _setAttributeName( self: Self, newAttributeName: str ) -> None: + """Set attribute name. + + Args: + newAttributeName (str): Name of the new attribute to create. + """ + self.newAttributeName = newAttributeName + self.Modified() + + @smproperty.intvector( + name="ValueType", + label="The type of the values:", + number_of_elements=1, + default_values=10, + panel_visibility="default", + ) + @smdomain.xml( """ + + + + + + + + + + + + + + The requested numpy scalar type for values of the new attribute. + + """ ) + def _setValueType( self: Self, valueType: int ) -> None: + """Set the type for the value used to create the new attribute. + + Args: + valueType (int): The numpy scalar type encoding with int (vtk typecode). + """ + dictType: dict[ int, Any ] = vnp.get_vtk_to_numpy_typemap() + self.valueNpType = dictType[ valueType ] + self.Modified() + + @smproperty.intvector( + name="NumberOfComponents", + label="Number of components:", + number_of_elements=1, + default_values=1, + panel_visibility="default", + ) + @smdomain.xml( """ + + The number of components for the new attribute to create. + + """ ) + def _setNbComponent( self: Self, nbComponents: int ) -> None: + """Set the number of components of the attribute to create. + + Args: + nbComponents (int): Number of components of the new attribute. + """ + self.nbComponents = nbComponents + self.Modified() + + @smproperty.stringvector( + name="ComponentNames", + label="Names of components:", + number_of_elements=1, + default_values="Change if multiple components", + panel_visibility="default", + ) + @smdomain.xml( """ + + Names of components if multiple for the new attribute to create. + Use the coma and a coma between each component name:\n + Names of components: X, Y, Z + + """ ) + def _setComponentNames( self: Self, componentNames: str ) -> None: + """Set the names of the components of the attribute to create. + + Args: + componentNames (str): Names of component for the new attribute. Use a coma between each component names. + """ + if componentNames == "" or componentNames == "Change if multiple components" or self.nbComponents == 1: + self.componentNames = () + else: + self.componentNames = tuple( componentNames.split( "," ) ) + + self.Modified() + + @smproperty.xml( """ + + + + + + """ ) + def _groupNewAttributeSettingsWidgets( self: Self ) -> None: + """Group the widgets to set the settings of the new attribute.""" + self.Modified() + + def RequestDataObject( + self: Self, + request: vtkInformation, + inInfoVec: list[ vtkInformationVector ], + outInfoVec: vtkInformationVector, + ) -> int: + """Inherited from VTKPythonAlgorithmBase::RequestDataObject. + + Args: + request (vtkInformation): request + inInfoVec (list[vtkInformationVector]): input objects + outInfoVec (vtkInformationVector): output objects + + Returns: + int: 1 if calculation successfully ended, 0 otherwise. + """ + inData = self.GetInputData( inInfoVec, 0, 0 ) + outData = self.GetOutputData( outInfoVec, 0 ) + assert inData is not None + if outData is None or ( not outData.IsA( inData.GetClassName() ) ): + outData = inData.NewInstance() + outInfoVec.GetInformationObject( 0 ).Set( outData.DATA_OBJECT(), outData ) + return super().RequestDataObject( request, inInfoVec, outInfoVec ) # type: ignore[no-any-return] + + def RequestData( + self: Self, + request: vtkInformation, # noqa: F841 + inInfoVec: list[ vtkInformationVector ], # noqa: F841 + outInfoVec: vtkInformationVector, # noqa: F841 + ) -> int: + """Inherited from VTKPythonAlgorithmBase::RequestData. + + Args: + request (vtkInformation): Request. + inInfoVec (list[vtkInformationVector]): Input objects. + outInfoVec (vtkInformationVector): Output objects. + + Returns: + int: 1 if calculation successfully ended, 0 otherwise. + """ + inputMesh: Union[ vtkDataSet, vtkMultiBlockDataSet ] = self.GetInputData( inInfoVec, 0, 0 ) + outputMesh: Union[ vtkDataSet, vtkMultiBlockDataSet ] = self.GetOutputData( outInfoVec, 0 ) + + assert inputMesh is not None, "Input mesh is null." + assert outputMesh is not None, "Output pipeline is null." + + outputMesh.ShallowCopy( inputMesh ) + filter: CreateConstantAttributePerRegion = CreateConstantAttributePerRegion( + outputMesh, + self.regionName, + self.dictRegionValues, + self.newAttributeName, + self.valueNpType, + self.nbComponents, + self.componentNames, + self.speHandler, + ) + + if not filter.logger.hasHandlers(): + filter.setLoggerHandler( VTKHandler() ) + + filter.applyFilter() + + self.clearDictRegion = True + + return 1 diff --git a/geos-utils/src/geos/utils/Logger.py b/geos-utils/src/geos/utils/Logger.py index 69ec6ec51..9d3fe5c41 100644 --- a/geos-utils/src/geos/utils/Logger.py +++ b/geos-utils/src/geos/utils/Logger.py @@ -12,6 +12,24 @@ """ +class CountWarningHandler( logging.Handler ): + """Create an handler to count the warnings logged.""" + + def __init__( self: Self ) -> None: + """Init the handler.""" + super().__init__() + self.warningCount = 0 + + def emit( self: Self, record: logging.LogRecord ) -> None: + """Count all the warnings logged. + + Args: + record (logging.LogRecord): Record. + """ + if record.levelno == logging.WARNING: + self.warningCount += 1 + + # Add the convenience method for the logger def results( self: logging.Logger, message: str, *args: Any, **kws: Any ) -> None: """Logs a message with the custom 'RESULTS' severity level. @@ -49,7 +67,7 @@ def results( self: logging.Logger, message: str, *args: Any, **kws: Any ) -> Non class CustomLoggerFormatter( logging.Formatter ): """Custom formatter for the logger. - .. WARNING:: Colors do not work in the ouput message window of Paraview. + .. WARNING:: Colors do not work in the output message window of Paraview. To use it: @@ -78,7 +96,7 @@ class CustomLoggerFormatter( logging.Formatter ): #: format for each logger output type with colors FORMATS_COLOR: dict[ int, str ] = { DEBUG: grey + format2 + reset, - INFO: grey + format1 + reset, + INFO: green + format1 + reset, WARNING: yellow + format1 + reset, ERROR: red + format1 + reset, CRITICAL: bold_red + format2 + reset, @@ -151,7 +169,7 @@ def getLogger( title: str, use_color: bool = False ) -> Logger: # module import import Logger - # logger instanciation + # logger instantiation logger :Logger.Logger = Logger.getLogger("My application") # logger use