Source code for cashocs._optimization.cost_functional

# Copyright (C) 2020-2024 Sebastian Blauth
#
# This file is part of cashocs.
#
# cashocs is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cashocs is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cashocs.  If not, see <https://www.gnu.org/licenses/>.

"""Reduced cost functionals."""

from __future__ import annotations

import abc
import ctypes
from typing import cast, List, Optional, Set, Tuple, TYPE_CHECKING, Union

import fenics
import numpy as np

try:
    import ufl_legacy as ufl
except ImportError:
    import ufl

from cashocs import _exceptions
from cashocs import _forms
from cashocs import _utils

if TYPE_CHECKING:
    from cashocs import _pde_problems
    from cashocs import _typing
    from cashocs._database import database


class ReducedCostFunctional:
    """Reduced cost functional for PDE constrained optimization."""

    def __init__(
        self,
        db: database.Database,
        form_handler: _typing.FormHandler,
        state_problem: _pde_problems.StateProblem,
    ) -> None:
        """Initializes self.

        Args:
            db: The database of the problem.
            form_handler: The FormHandler object for the optimization problem.
            state_problem: The StateProblem object corresponding to the state system.

        """
        self.db = db
        self.form_handler = form_handler
        self.state_problem = state_problem

    def evaluate(self) -> float:
        """Evaluates the reduced cost functional.

        First solves the state system, so that the state variables are up-to-date,
        and then evaluates the reduced cost functional by assembling the corresponding
        UFL form.

        Returns:
            The value of the reduced cost functional

        """
        self.state_problem.solve()

        vals = [
            functional.evaluate() for functional in self.db.form_db.cost_functional_list
        ]
        val = sum(vals)
        val += self.form_handler.cost_functional_shift

        if self.db.parameter_db.problem_type == "shape":
            self.form_handler = cast(_forms.ShapeFormHandler, self.form_handler)
            val += self.form_handler.shape_regularization.compute_objective()

        return val


[docs] class Functional(abc.ABC): """Base class for all cost functionals.""" def __init__(self) -> None: """Initialize the functional.""" pass
[docs] @abc.abstractmethod def evaluate(self) -> float: """Evaluates the functional. Returns: The current value of the functional. """ pass
[docs] @abc.abstractmethod def derivative( self, argument: ufl.core.expr.Expr, direction: ufl.core.expr.Expr ) -> ufl.Form: """Computes the derivative of the functional w.r.t. argument towards direction. Args: argument: The argument, w.r.t. which the functional is differentiated direction: The direction into which the derivative is computed Returns: A form of the resulting derivative """ pass
[docs] @abc.abstractmethod def coefficients(self) -> Tuple[fenics.Function]: """Computes the ufl coefficients which are used in the functional. Returns: The set of used coefficients. """ pass
[docs] @abc.abstractmethod def scale(self, scaling_factor: Union[float, int]) -> None: """Scales the functional by a scalar. Args: scaling_factor: The scaling factor used to scale the functional """ pass
[docs] @abc.abstractmethod def update(self) -> None: """Updates the functional after solving the state equation.""" pass
[docs] class IntegralFunctional(Functional): """A functional which is given by the integral of ``form``.""" def __init__(self, form: ufl.Form) -> None: """Initializes self. Args: form: The form of the integrand, which is to be calculated for evaluating the functional. """ super().__init__() self.form = form
[docs] def evaluate(self) -> float: """Evaluates the functional. Returns: The current value of the functional. """ val: float = fenics.assemble(self.form) return val
[docs] def derivative( self, argument: ufl.core.expr.Expr, direction: ufl.core.expr.Expr ) -> ufl.Form: """Computes the derivative of the functional w.r.t. argument towards direction. Args: argument: The argument, w.r.t. which the functional is differentiated direction: The direction into which the derivative is computed Returns: A form of the resulting derivative """ return fenics.derivative(self.form, argument, direction)
[docs] def coefficients(self) -> Tuple[fenics.Function]: """Computes the ufl coefficients which are used in the functional. Returns: The set of used coefficients. """ coeffs: Tuple[fenics.Function] = self.form.coefficients() return coeffs
[docs] def scale(self, scaling_factor: Union[float, int]) -> None: """Scales the functional by a scalar. Args: scaling_factor: The scaling factor used to scale the functional """ self.form = fenics.Constant(scaling_factor) * self.form
[docs] def update(self) -> None: """Updates the functional after solving the state equation.""" pass
[docs] class ScalarTrackingFunctional(Functional): """Tracking cost functional for scalar quantities arising due to integration.""" def __init__( self, integrand: ufl.Form, tracking_goal: Union[float, int, ctypes.c_float, ctypes.c_double], weight: Union[float, int] = 1.0, ) -> None: """Initializes self. Args: integrand: The integrand of the functional tracking_goal: A real number, which the integral of the integrand should track. Note, that when a ctypes object is passed, the float is assumed to be mutable and the tracking_goal is updated every iteration. weight: A real number which gives the scaling factor for this functional """ super().__init__() self.integrand = integrand self.tracking_goal = tracking_goal if not isinstance(self.tracking_goal, (ctypes.c_float, ctypes.c_double)): self.tracking_goal_value = self.tracking_goal else: self.tracking_goal_value = self.tracking_goal.value mesh = self.integrand.integrals()[0].ufl_domain().ufl_cargo() self.integrand_value = fenics.Function(fenics.FunctionSpace(mesh, "R", 0)) self.weight = fenics.Function(fenics.FunctionSpace(mesh, "R", 0)) self.weight.vector().vec().set(weight) self.weight.vector().apply("")
[docs] def evaluate(self) -> float: """Evaluates the functional. Returns: The current value of the functional. """ if isinstance(self.tracking_goal, (ctypes.c_float, ctypes.c_double)): self.tracking_goal_value = self.tracking_goal.value scalar_integral_value = fenics.assemble(self.integrand) self.integrand_value.vector().vec().set(scalar_integral_value) self.integrand_value.vector().apply("") val: float = ( self.weight.vector().vec().sum() / 2.0 * pow(scalar_integral_value - self.tracking_goal_value, 2) ) return val
[docs] def derivative( self, argument: ufl.core.expr.Expr, direction: ufl.core.expr.Expr ) -> ufl.Form: """Computes the derivative of the functional w.r.t. argument towards direction. Args: argument: The argument, w.r.t. which the functional is differentiated direction: The direction into which the derivative is computed Returns: A form of the resulting derivative """ if isinstance(self.tracking_goal, (ctypes.c_float, ctypes.c_double)): self.tracking_goal_value = self.tracking_goal.value derivative = fenics.derivative( self.weight * (self.integrand_value - fenics.Constant(self.tracking_goal_value)) * self.integrand, argument, direction, ) return derivative
[docs] def coefficients(self) -> Tuple[fenics.Function]: """Computes the ufl coefficients which are used in the functional. Returns: The set of used coefficients. """ coeffs: Tuple[fenics.Function] = self.integrand.coefficients() return coeffs
[docs] def scale(self, scaling_factor: Union[float, int]) -> None: """Scales the functional by a scalar. Args: scaling_factor: The scaling factor used to scale the functional """ self.weight.vector().vec().set(scaling_factor) self.weight.vector().apply("")
[docs] def update(self) -> None: """Updates the functional after solving the state equation.""" scalar_integral_value = fenics.assemble(self.integrand) self.integrand_value.vector().vec().set(scalar_integral_value) self.integrand_value.vector().apply("")
[docs] class MinMaxFunctional(Functional): """Cost functional involving a maximum of 0 and a integral term squared.""" def __init__( self, integrand: ufl.Form, lower_bound: Optional[Union[float, int]] = None, upper_bound: Optional[Union[float, int]] = None, mu: Union[float, int] = 1.0, lambd: Union[float, int] = 0.0, ) -> None: """Initializes self.""" super().__init__() self.integrand = integrand if upper_bound is None and lower_bound is None: raise _exceptions.InputError( "cashocs.MinMaxFunctional", "lower_bound and upper_bound", "At least one of lower_bound or upper_bound must not be None.", ) self.lower_bound = lower_bound self.upper_bound = upper_bound mesh = self.integrand.integrals()[0].ufl_domain().ufl_cargo() self.integrand_value = fenics.Function(fenics.FunctionSpace(mesh, "R", 0)) self.mu = fenics.Function(fenics.FunctionSpace(mesh, "R", 0)) self.mu.vector().vec().set(mu) self.mu.vector().apply("") self.lambd = fenics.Function(fenics.FunctionSpace(mesh, "R", 0)) self.lambd.vector().vec().set(lambd) self.lambd.vector().apply("")
[docs] def evaluate(self) -> float: """Evaluates the functional. Returns: The current value of the functional. """ min_max_integrand_value = fenics.assemble(self.integrand) self.integrand_value.vector().vec().set(min_max_integrand_value) self.integrand_value.vector().apply("") val = 0.0 if self.lower_bound is not None: val += ( 1 / (2 * self.mu.vector().vec().sum()) * pow( np.minimum( 0.0, self.lambd.vector().vec().sum() + self.mu.vector().vec().sum() * (min_max_integrand_value - self.lower_bound), ), 2, ) ) if self.upper_bound is not None: val += ( 1 / (2 * self.mu.vector().vec().sum()) * pow( np.maximum( 0.0, self.lambd.vector().vec().sum() + self.mu.vector().vec().sum() * (min_max_integrand_value - self.upper_bound), ), 2, ) ) return val
[docs] def derivative( self, argument: ufl.core.expr.Expr, direction: ufl.core.expr.Expr ) -> ufl.Form: """Computes the derivative of the functional w.r.t. argument towards direction. Args: argument: The argument, w.r.t. which the functional is differentiated direction: The direction into which the derivative is computed Returns: A form of the resulting derivative """ derivative_list = [] if self.lower_bound is not None: term_lower = self.lambd + self.mu * ( self.integrand_value - self.lower_bound ) derivative = fenics.derivative( _utils.min_(fenics.Constant(0.0), term_lower) * self.integrand, argument, direction, ) derivative_list.append(derivative) if self.upper_bound is not None: term_upper = self.lambd + self.mu * ( self.integrand_value - self.upper_bound ) derivative = fenics.derivative( _utils.max_(fenics.Constant(0.0), term_upper) * self.integrand, argument, direction, ) derivative_list.append(derivative) return _utils.summation(derivative_list)
[docs] def coefficients(self) -> Tuple[fenics.Function]: """Computes the ufl coefficients which are used in the functional. Returns: The set of used coefficients. """ coeffs: Tuple[fenics.Function] = self.integrand.coefficients() return coeffs
[docs] def scale(self, scaling_factor: Union[float, int]) -> None: """Scales the functional by a scalar. Args: scaling_factor: The scaling factor used to scale the functional """ pass
[docs] def update(self) -> None: """Updates the functional after solving the state equation.""" min_max_integrand_value = fenics.assemble(self.integrand) self.integrand_value.vector().vec().set(min_max_integrand_value) self.integrand_value.vector().apply("")
class Lagrangian: """A Lagrangian function for the optimization problem.""" def __init__( self, cost_functional_list: List[_typing.CostFunctional], state_forms: List[ufl.Form], ) -> None: """Initializes self. Args: cost_functional_list: The list of cost functionals. state_forms: The list of state forms. """ self.cost_functional_list = cost_functional_list self.state_forms = state_forms self.summed_state_forms = _utils.summation(self.state_forms) def derivative( self, argument: ufl.core.expr.Expr, direction: ufl.core.expr.Expr ) -> ufl.Form: """Computes the derivative of the Lagrangian w.r.t. argument towards direction. Args: argument: The argument, w.r.t. which the Lagrangian is differentiated direction: The direction into which the derivative is computed Returns: A form of the resulting derivative """ cost_functional_derivative_list = [ functional.derivative(argument, direction) for functional in self.cost_functional_list ] cost_functional_derivative = _utils.summation(cost_functional_derivative_list) state_forms_derivative = fenics.derivative( self.summed_state_forms, argument, direction ) derivative = cost_functional_derivative + state_forms_derivative return derivative def coefficients(self) -> Set[fenics.Function]: """Computes the ufl coefficients which are used in the functional. Returns: The set of used coefficients. """ state_coeffs = set(self.summed_state_forms.coefficients()) functional_coeffs = [ set(functional.coefficients()) for functional in self.cost_functional_list ] coeffs = set().union(*functional_coeffs) coeffs.union(state_coeffs) return coeffs