# Copyright (C) 2020-2025 Fraunhofer ITWM and Sebastian Blauth
#
# This file is part of cashocs.
#
# cashocs is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cashocs is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cashocs. If not, see <https://www.gnu.org/licenses/>.
"""Output managers for cashocs."""
from __future__ import annotations
import abc
import json
import os
import shutil
import subprocess # nosec B404
from typing import cast, TYPE_CHECKING
import fenics
from cashocs import log
from cashocs.io import mesh as iomesh
if TYPE_CHECKING:
from cashocs._database import database
output_mapping = {
"iteration": "iter",
"objective_value": "cost function",
"relative_norm": "rel. grad. norm",
"gradient_norm": "abs. grad. norm",
"mesh_quality": "mesh qlty",
"angle": "angle",
"stepsize": "step size",
"constraint_violation": "constraint violation",
"mu": "mu",
}
[docs]
def generate_summary_str(db: database.Database, precision: int) -> str:
"""Generates a string for the summary of the optimization.
Args:
db: The database of the problem.
precision: The precision used for displaying the numbers.
Returns:
The summary string.
"""
optimization_state = db.parameter_db.optimization_state
summary_str_list = [
"\n",
"Optimization was successful.\n",
"Statistics:\n",
f" total iterations: {optimization_state['iteration']:4d}\n",
]
for key, value in output_mapping.items():
if key in optimization_state.keys() and key != "iteration":
parameter_name = value
parameter_value = optimization_state[key]
summary_str_list.append(
f" final {parameter_name}: {parameter_value:.{precision}e}\n"
)
if "no_state_solves" in optimization_state.keys():
summary_str_list.append(
" total number of state systems solved: "
f"{optimization_state['no_state_solves']:4d}\n"
)
if "no_adjoint_solves" in optimization_state.keys():
summary_str_list.append(
" total number of adjoint systems solved: "
f"{optimization_state['no_adjoint_solves']:4d}\n"
)
return "".join(summary_str_list)
[docs]
def generate_output_str(db: database.Database, precision: int) -> str:
"""Generates the string which can be written to console and file.
Args:
db: The database of the problem.
precision: The precision used for displaying the numbers.
Returns:
The output string, which is used later.
"""
optimization_state = db.parameter_db.optimization_state
iteration = optimization_state["iteration"]
info_str_list = []
output_str_list = []
if iteration % 10 == 0:
info_str_list.append("\n")
for key, value in output_mapping.items():
if key in optimization_state.keys():
if key == "iteration":
if iteration % 10 == 0:
info_str_list.append("iter, ")
output_str_list.append(f"{iteration:4d}, ")
else:
parameter_value = optimization_state[key]
parameter_name = value
temp_name_str = f"{parameter_name}, "
temp_value_str = f"{parameter_value:.{precision}e}, "
string_length = max(len(temp_name_str), len(temp_value_str))
if iteration % 10 == 0:
info_str_list.append(f"{parameter_name}, ".rjust(string_length))
output_str_list.append(
f"{parameter_value:>{string_length - 3}.{precision}e}, "
)
else:
continue
if iteration % 10 == 0:
info_str_list.append("\n\n")
if iteration == 0:
output_str_list.append("\n")
info_str = "".join(info_str_list)
output_str = "".join(output_str_list)
return info_str + output_str
[docs]
class IOManager(abc.ABC):
"""Abstract base class for input / output management."""
def __init__(self, db: database.Database, result_dir: str):
"""Initializes self.
Args:
db: The database of the problem.
result_dir: Path to the directory, where the results are saved.
"""
self.db = db
self.result_dir = result_dir
self.config = self.db.config
[docs]
@abc.abstractmethod
def output(self) -> None:
"""The output operation, which is performed after every iteration.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
@abc.abstractmethod
def output_summary(self) -> None:
"""The output operation, which is performed after convergence.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
@abc.abstractmethod
def post_process(self) -> None:
"""The output operation which is performed as part of the postprocessing.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
class ResultManager(IOManager):
"""Class for managing the output of the optimization history."""
def __init__(self, db: database.Database, result_dir: str) -> None:
"""Initializes self.
Args:
db: The database of the problem.
result_dir: Path to the directory, where the results are saved.
"""
super().__init__(db, result_dir)
self.save_results = self.config.getboolean("Output", "save_results")
self.output_dict = {}
if self.db.parameter_db.temp_dict:
for key in self.db.parameter_db.temp_dict["output_dict"].keys():
self.output_dict[key] = self.db.parameter_db.temp_dict["output_dict"][
key
]
[docs]
def output(self) -> None:
"""Saves the optimization history to a dictionary."""
for key in self.db.parameter_db.optimization_state.keys():
if key not in self.output_dict:
self.output_dict[key] = []
self.output_dict[key].append(self.db.parameter_db.optimization_state[key])
[docs]
def output_summary(self) -> None:
"""The output operation, which is performed after convergence.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
def post_process(self) -> None:
"""Saves the history of the optimization to a .json file."""
if self.save_results and fenics.MPI.rank(fenics.MPI.comm_world) == 0:
with open(f"{self.result_dir}/history.json", "w", encoding="utf-8") as file:
json.dump(self.output_dict, file, indent=4)
fenics.MPI.barrier(fenics.MPI.comm_world)
[docs]
class ConsoleManager(IOManager):
"""Management of the console output."""
def __init__(
self, db: database.Database, result_dir: str, verbose: bool = False
) -> None:
"""Initializes self.
Args:
db: The database of the problem.
result_dir: The directory, where the results are written to.
verbose: Boolean which indicates whether the logging setup (False) or the
old setup with print (True) should be used. Default is `False`.
"""
super().__init__(db, result_dir)
self.verbose = verbose
self.precision = self.config.getint("Output", "precision")
[docs]
def output(self) -> None:
"""Prints the output string to the console."""
message = generate_output_str(self.db, self.precision)
if self.verbose:
if fenics.MPI.rank(fenics.MPI.comm_world) == 0:
print(message, flush=True)
fenics.MPI.barrier(fenics.MPI.comm_world)
else:
log.info(message)
[docs]
def output_summary(self) -> None:
"""Prints the summary in the console."""
message = generate_summary_str(self.db, self.precision)
if self.verbose:
if fenics.MPI.rank(fenics.MPI.comm_world) == 0:
print(message, flush=True)
fenics.MPI.barrier(fenics.MPI.comm_world)
else:
log.info(message)
[docs]
def post_process(self) -> None:
"""The output operation which is performed as part of the postprocessing.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
class FileManager(IOManager):
"""Class for managing the human-readable output of cashocs."""
def __init__(self, db: database.Database, result_dir: str) -> None:
"""Initializes self.
Args:
db: The database of the problem.
result_dir: The directory, where the results are written to.
"""
super().__init__(db, result_dir)
self.precision = self.config.getint("Output", "precision")
[docs]
def output(self) -> None:
"""Saves the output string in a file."""
if fenics.MPI.rank(fenics.MPI.comm_world) == 0:
if self.db.parameter_db.optimization_state["iteration"] == 0:
file_attr = "w"
else:
file_attr = "a"
with open(
f"{self.result_dir}/history.txt", file_attr, encoding="utf-8"
) as file:
file.write(f"{generate_output_str(self.db, self.precision)}\n")
fenics.MPI.barrier(fenics.MPI.comm_world)
[docs]
def output_summary(self) -> None:
"""Save the summary in a file."""
if fenics.MPI.rank(fenics.MPI.comm_world) == 0:
with open(f"{self.result_dir}/history.txt", "a", encoding="utf-8") as file:
file.write(generate_summary_str(self.db, self.precision))
fenics.MPI.barrier(fenics.MPI.comm_world)
[docs]
def post_process(self) -> None:
"""The output operation which is performed as part of the postprocessing.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
class TempFileManager(IOManager):
"""Class for managing temporary files."""
[docs]
def output(self) -> None:
"""The output operation, which is performed after every iteration.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
def output_summary(self) -> None:
"""The output operation, which is performed after convergence.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
def post_process(self) -> None:
"""Deletes temporary files."""
if self.db.parameter_db.problem_type == "shape":
if (
self.config.getboolean("Mesh", "remesh")
and not self.config.getboolean("Debug", "remeshing")
and self.db.parameter_db.temp_dict
and fenics.MPI.rank(fenics.MPI.comm_world) == 0
):
subprocess.run( # nosec B603, B607
["rm", "-r", self.db.parameter_db.remesh_directory], check=True
)
fenics.MPI.barrier(fenics.MPI.comm_world)
[docs]
class MeshManager(IOManager):
"""Manages the output of meshes."""
[docs]
def output(self) -> None:
"""Saves the mesh as checkpoint for each iteration."""
iteration = int(self.db.parameter_db.optimization_state["iteration"])
if not self.db.parameter_db.gmsh_file_path:
gmsh_file = self.config.get("Mesh", "gmsh_file")
else:
gmsh_file = self.db.parameter_db.gmsh_file_path
iomesh.write_out_mesh(
self.db.geometry_db.mesh,
gmsh_file,
f"{self.result_dir}/checkpoints/mesh/mesh_{iteration}.msh",
)
[docs]
def output_summary(self) -> None:
"""The output operation, which is performed after convergence.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
def post_process(self) -> None:
"""Saves a copy of the optimized mesh in Gmsh format."""
if not self.db.parameter_db.gmsh_file_path:
gmsh_file = self.config.get("Mesh", "gmsh_file")
else:
gmsh_file = self.db.parameter_db.gmsh_file_path
iomesh.write_out_mesh(
self.db.function_db.states[0].function_space().mesh(),
gmsh_file,
f"{self.result_dir}/optimized_mesh.msh",
)
[docs]
class XDMFFileManager(IOManager):
"""Class for managing visualization .xdmf files."""
def __init__(
self,
db: database.Database,
result_dir: str,
) -> None:
"""Initializes self.
Args:
db: The database of the problem.
result_dir: Path to the directory, where the output files are saved in.
"""
super().__init__(db, result_dir)
self.save_state = self.config.getboolean("Output", "save_state")
self.save_adjoint = self.config.getboolean("Output", "save_adjoint")
self.save_gradient = self.config.getboolean("Output", "save_gradient")
self.is_initialized = False
self.state_xdmf_list: list[str | list[str]] = []
self.control_xdmf_list: list[str | list[str]] = []
self.adjoint_xdmf_list: list[str | list[str]] = []
self.gradient_xdmf_list: list[str | list[str]] = []
def _initialize_states_xdmf(self) -> None:
"""Initializes the list of xdmf files for the state variables."""
if self.save_state:
for i in range(self.db.parameter_db.state_dim):
self.state_xdmf_list.append(
self._generate_xdmf_file_strings(
self.db.function_db.state_spaces[i], f"state_{i:d}"
)
)
def _initialize_controls_xdmf(self) -> None:
"""Initializes the list of xdmf files for the control variables."""
if self.save_state and self.db.parameter_db.problem_type in [
"control",
"topology",
]:
for i in range(self.db.parameter_db.control_dim):
self.control_xdmf_list.append(
self._generate_xdmf_file_strings(
self.db.function_db.control_spaces[i], f"control_{i:d}"
)
)
def _initialize_adjoints_xdmf(self) -> None:
"""Initialize the list of xdmf files for the adjoint variables."""
if self.save_adjoint:
for i in range(self.db.parameter_db.state_dim):
self.adjoint_xdmf_list.append(
self._generate_xdmf_file_strings(
self.db.function_db.adjoint_spaces[i], f"adjoint_{i:d}"
)
)
def _initialize_gradients_xdmf(self) -> None:
"""Initialize the list of xdmf files for the gradients."""
if self.save_gradient:
for i in range(self.db.parameter_db.control_dim):
if self.db.parameter_db.problem_type in ["control", "topology"]:
gradient_str = f"gradient_{i:d}"
else:
gradient_str = "shape_gradient"
self.gradient_xdmf_list.append(
self._generate_xdmf_file_strings(
self.db.function_db.control_spaces[i], gradient_str
)
)
def _initialize_xdmf_lists(self) -> None:
"""Initializes the lists of xdmf files."""
if not self.is_initialized:
self._initialize_states_xdmf()
self._initialize_controls_xdmf()
self._initialize_adjoints_xdmf()
self._initialize_gradients_xdmf()
self.is_initialized = True
def _generate_xdmf_file_strings(
self, space: fenics.FunctionSpace, name: str
) -> str | list[str]:
"""Generate the strings (paths) to the xdmf files for visualization.
Args:
space: The FEM function space where the function is taken from.
name: The name of the function / file
Returns:
A string containing the path to the xdmf files for visualization.
"""
if space.num_sub_spaces() > 0 and space.ufl_element().family() == "Mixed":
lst = []
for j in range(space.num_sub_spaces()):
lst.append(f"{self.result_dir}/checkpoints/{name}_{j:d}.xdmf")
return lst
else:
file = f"{self.result_dir}/checkpoints/{name}.xdmf"
return file
def _save_states(self, iteration: int) -> None:
"""Saves the state variables to xdmf files.
Args:
iteration: The current iteration count.
"""
if self.save_state:
for i in range(self.db.parameter_db.state_dim):
if isinstance(self.state_xdmf_list[i], list):
for j in range(len(self.state_xdmf_list[i])):
self._write_xdmf_step(
self.state_xdmf_list[i][j],
self.db.function_db.states[i].sub(j, True),
f"state_{i}_sub_{j}",
iteration,
)
else:
self._write_xdmf_step(
cast(str, self.state_xdmf_list[i]),
self.db.function_db.states[i],
f"state_{i}",
iteration,
)
def _save_controls(self, iteration: int) -> None:
"""Saves the control variables to xdmf.
Args:
iteration: The current iteration count.
"""
if self.save_state and self.db.parameter_db.problem_type in [
"control",
"topology",
]:
for i in range(len(self.db.function_db.controls)):
self._write_xdmf_step(
cast(str, self.control_xdmf_list[i]),
self.db.function_db.controls[i],
f"control_{i}",
iteration,
)
def _save_adjoints(self, iteration: int) -> None:
"""Saves the adjoint variables to xdmf files.
Args:
iteration: The current iteration count.
"""
if self.save_adjoint:
for i in range(self.db.parameter_db.state_dim):
if isinstance(self.adjoint_xdmf_list[i], list):
for j in range(len(self.adjoint_xdmf_list[i])):
self._write_xdmf_step(
self.adjoint_xdmf_list[i][j],
self.db.function_db.adjoints[i].sub(j, True),
f"adjoint_{i}_sub_{j}",
iteration,
)
else:
self._write_xdmf_step(
cast(str, self.adjoint_xdmf_list[i]),
self.db.function_db.adjoints[i],
f"adjoint_{i}",
iteration,
)
def _save_gradients(self, iteration: int) -> None:
"""Saves the gradients to xdmf files.
Args:
iteration: The current iteration count.
"""
if self.save_gradient:
for i in range(self.db.parameter_db.control_dim):
self._write_xdmf_step(
cast(str, self.gradient_xdmf_list[i]),
self.db.function_db.gradient[i],
f"gradient_{i}",
iteration,
)
def _write_xdmf_step(
self,
filename: str,
function: fenics.Function,
function_name: str,
iteration: int,
) -> None:
"""Write the current function to the corresponding xdmf file for visualization.
Args:
filename: The path to the xdmf file.
function: The function which is to be stored.
function_name: The label of the function in the xdmf file.
iteration: The current iteration counter.
"""
if iteration == 0:
append = False
else:
append = True
mesh = function.function_space().mesh()
comm = mesh.mpi_comm()
if function.function_space().ufl_element().family() in [
"Real",
"NodalEnrichedElement",
]:
if len(function.ufl_shape) > 0:
space = fenics.VectorFunctionSpace(
mesh, "CG", 1, dim=function.ufl_shape[0]
)
else:
space = fenics.FunctionSpace(mesh, "CG", 1)
function = fenics.interpolate(function, space)
elif function.function_space().ufl_element().family() == "Crouzeix-Raviart":
degree = function.function_space().ufl_element().degree()
if len(function.ufl_shape) > 0:
space = fenics.VectorFunctionSpace(
mesh, "DG", degree, dim=function.ufl_shape[0]
)
else:
space = fenics.FunctionSpace(mesh, "DG", degree)
function = fenics.interpolate(function, space)
function.rename(function_name, function_name)
with fenics.XDMFFile(comm, filename) as file:
file.parameters["flush_output"] = True
file.parameters["functions_share_mesh"] = False
file.write_checkpoint(
function,
function_name,
iteration,
fenics.XDMFFile.Encoding.HDF5,
append,
)
fenics.MPI.barrier(comm)
[docs]
def output(self) -> None:
"""Saves the variables to xdmf files."""
self._initialize_xdmf_lists()
iteration = int(self.db.parameter_db.optimization_state["iteration"])
if iteration == 0:
if fenics.MPI.rank(fenics.MPI.comm_world) == 0:
directory = f"{self.result_dir}/checkpoints/"
for files in os.listdir(directory):
path = os.path.join(directory, files)
try:
shutil.rmtree(path)
except OSError:
os.remove(path)
fenics.MPI.barrier(fenics.MPI.comm_world)
self._save_states(iteration)
self._save_controls(iteration)
self._save_adjoints(iteration)
self._save_gradients(iteration)
[docs]
def output_summary(self) -> None:
"""The output operation, which is performed after convergence.
Args:
solver: The optimization algorithm.
"""
pass
[docs]
def post_process(self) -> None:
"""The output operation which is performed as part of the postprocessing.
Args:
solver: The optimization algorithm.
"""
pass