# Copyright (C) 2020-2026 Fraunhofer ITWM and Sebastian Blauth
#
# This file is part of cashocs.
#
# cashocs is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cashocs is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cashocs. If not, see <https://www.gnu.org/licenses/>.
"""Mesh input and output."""
from __future__ import annotations
import configparser
import contextlib
import gc
import json
import pathlib
import subprocess
import tempfile
import time
from typing import TYPE_CHECKING
import fenics
import h5py
import meshio
from mpi4py import MPI
import numpy as np
from cashocs import _exceptions
from cashocs import _utils
from cashocs import log
from cashocs import mpi
from cashocs.geometry.measure import NamedMeasure
from cashocs.geometry.mesh import _get_mesh_stats
if TYPE_CHECKING:
from cashocs import _typing
[docs]
def import_mesh(mesh_file: str, comm: MPI.Comm | None = None) -> _typing.MeshTuple:
"""Imports a mesh file for use with cashocs / FEniCS.
This function imports a mesh file. The mesh file can either be a Gmsh mesh file
or an xdmf mesh file that was generated by GMSH and converted to
.xdmf with the function :py:func:`cashocs.convert`.
If there are Physical quantities specified in the GMSH file, these are imported
to the subdomains and boundaries output of this function and can also be directly
accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc.
Args:
mesh_file: The location of the mesh file in .xdmf or .msh file format.
comm: MPI communicator that is to be used for creating the mesh.
Returns:
A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported
FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh
function for the boundaries, dx is a volume measure, ds is a surface measure,
and dS is a measure for the interior facets.
Notes:
In case the boundaries in the Gmsh .msh file are not only marked with numbers
(as physical groups), but also with names (i.e. strings), these strings can be
used with the integration measures ``dx`` and ``ds`` returned by this method.
E.g., if one specified the following in a 2D Gmsh .geo file ::
Physical Surface("domain", 1) = {i,j,k};
where i,j,k are representative for some integers, then this can be used in the
measure ``dx`` (as we are 2D) as follows. The command ::
dx(1)
is completely equivalent to ::
dx("domain")
and both can be used interchangeably.
"""
mesh_path = pathlib.Path(mesh_file)
mesh_type = mesh_path.suffix
if mesh_type == ".msh":
mesh_tuple = _import_gmsh_mesh(mesh_file, comm)
elif mesh_type == ".xdmf":
mesh_tuple = _import_xdmf_mesh(mesh_file, comm)
else:
raise _exceptions.InputError(
"cashocs.io.mesh.import_mesh",
"mesh_file",
"Only XDMF (.xdmf) or Gmsh (.msh) meshes are supported.",
)
gc.collect()
return mesh_tuple
def _import_gmsh_mesh(
mesh_file: str, comm: MPI.Comm | None = None
) -> _typing.MeshTuple:
"""Imports a mesh file for use with cashocs / FEniCS.
This function imports a mesh file that was generated by GMSH.
If there are Physical quantities specified in the GMSH file, these are imported
to the subdomains and boundaries output of this function and can also be directly
accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc.
Args:
mesh_file: The location of the mesh file in .xdmf file format.
comm: MPI communicator that is to be used for creating the mesh.
Returns:
A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported
FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh
function for the boundaries, dx is a volume measure, ds is a surface measure,
and dS is a measure for the interior facets.
Notes:
In case the boundaries in the Gmsh .msh file are not only marked with numbers
(as physical groups), but also with names (i.e. strings), these strings can be
used with the integration measures ``dx`` and ``ds`` returned by this method.
E.g., if one specified the following in a 2D Gmsh .geo file ::
Physical Surface("domain", 1) = {i,j,k};
where i,j,k are representative for some integers, then this can be used in the
measure ``dx`` (as we are 2D) as follows. The command ::
dx(1)
is completely equivalent to ::
dx("domain")
and both can be used interchangeably.
"""
convert(mesh_file, quiet=True)
mesh_filename = f"{mesh_file[:-4]}.xdmf"
mesh_tuple = _import_xdmf_mesh(mesh_filename, comm)
return mesh_tuple
def _get_mesh_paths(mesh_file: str) -> tuple[str, str, str]:
mesh_path = pathlib.Path(mesh_file)
subdomains_test_path = mesh_path.with_stem(mesh_path.stem + "_subdomains")
if subdomains_test_path.is_file():
subdomains_path = subdomains_test_path
else:
subdomains_path = mesh_path
boundaries_path = mesh_path.with_stem(mesh_path.stem + "_boundaries")
return str(mesh_path), str(subdomains_path), str(boundaries_path)
def _get_tags_from_mvc(mvc: fenics.MeshValueCollection, comm: MPI.Comm) -> dict:
tag_dict = {}
for val in set(mvc.values().values()):
tag_dict[str(val)] = val
gathered_dicts = comm.allgather(tag_dict)
merged_dict = {}
for d in gathered_dicts:
merged_dict.update(d)
return merged_dict
@_get_mesh_stats("import") # pylint:disable=protected-access
def _import_xdmf_mesh(
mesh_file: str, comm: MPI.Comm | None = None
) -> _typing.MeshTuple:
"""Imports a mesh file for use with cashocs / FEniCS.
This function imports a mesh file that was generated by GMSH and converted to
.xdmf with the function :py:func:`cashocs.convert`.
If there are Physical quantities specified in the GMSH file, these are imported
to the subdomains and boundaries output of this function and can also be directly
accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc.
Args:
mesh_file: The location of the mesh file in .xdmf file format.
comm: MPI communicator that is to be used for creating the mesh. If this is
`None`, then COMM_WORLD is used.
Returns:
A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported
FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh
function for the boundaries, dx is a volume measure, ds is a surface measure,
and dS is a measure for the interior facets.
Notes:
In case the boundaries in the Gmsh .msh file are not only marked with numbers
(as physical groups), but also with names (i.e. strings), these strings can be
used with the integration measures ``dx`` and ``ds`` returned by this method.
E.g., if one specified the following in a 2D Gmsh .geo file ::
Physical Surface("domain", 1) = {i,j,k};
where i,j,k are representative for some integers, then this can be used in the
measure ``dx`` (as we are 2D) as follows. The command ::
dx(1)
is completely equivalent to ::
dx("domain")
and both can be used interchangeably.
"""
if isinstance(mesh_file, configparser.ConfigParser):
raise _exceptions.InputError(
"cashocs.import_mesh",
"mesh_file",
"Calling cashocs.import_mesh with a ConfigParser instance"
" is not allowed anymore starting with cashocs v2.",
)
# Check for the file format
mesh_string, subdomain_string, boundaries_string = _get_mesh_paths(mesh_file)
if comm is None:
comm = mpi.COMM_WORLD
mesh = fenics.Mesh(comm)
with fenics.XDMFFile(mesh.mpi_comm(), mesh_string) as xdmf_file:
xdmf_file.read(mesh)
subdomains_mvc = fenics.MeshValueCollection(
"size_t", mesh, mesh.geometric_dimension()
)
boundaries_mvc = fenics.MeshValueCollection(
"size_t", mesh, mesh.geometric_dimension() - 1
)
subdomains_path = pathlib.Path(subdomain_string)
if subdomains_path.is_file():
with fenics.XDMFFile(mesh.mpi_comm(), str(subdomains_path)) as xdmf_subdomains:
xdmf_subdomains.read(subdomains_mvc, "subdomains")
boundaries_path = pathlib.Path(boundaries_string)
if boundaries_path.is_file():
with fenics.XDMFFile(mesh.mpi_comm(), str(boundaries_path)) as xdmf_boundaries:
xdmf_boundaries.read(boundaries_mvc, "boundaries")
physical_groups_path = pathlib.Path(mesh_string)
physical_groups_path = physical_groups_path.with_name(
physical_groups_path.stem + "_physical_groups.json"
)
if physical_groups_path.is_file():
with physical_groups_path.open("r", encoding="utf-8") as file:
physical_groups: dict[str, dict[str, int]] = json.load(file)
else:
physical_groups = {
"dx": _get_tags_from_mvc(subdomains_mvc, comm),
"ds": _get_tags_from_mvc(boundaries_mvc, comm),
}
subdomains = fenics.MeshFunction("size_t", mesh, subdomains_mvc)
boundaries = fenics.MeshFunction("size_t", mesh, boundaries_mvc)
dx = NamedMeasure(
"dx", domain=mesh, subdomain_data=subdomains, physical_groups=physical_groups
)
ds = NamedMeasure(
"ds", domain=mesh, subdomain_data=boundaries, physical_groups=physical_groups
)
dS = NamedMeasure( # pylint: disable=invalid-name
"dS", domain=mesh, subdomain_data=boundaries, physical_groups=physical_groups
)
mesh.physical_groups = physical_groups
return mesh, subdomains, boundaries, dx, ds, dS
[docs]
def export_mesh(
mesh: fenics.Mesh,
mesh_file: str,
subdomains: fenics.MeshFunction | None = None,
boundaries: fenics.MeshFunction | None = None,
comm: MPI.Comm | None = None,
) -> None:
"""Exports a mesh (together with its subdomains and boundaries).
This is useful so that the mesh can be reimported later on again. This is used for
checkpointing in cashocs, but also has other use cases, e.g., storing a function
on hard disk and later reimporting it to perform a post-processing.
Args:
mesh: The fenics mesh which shall be exported.
mesh_file: Filename / path to the exported mesh. Has to end in .xdmf. Boundaries
and subdomains will be named accordingly.
subdomains: The subdomains meshfunction corresponding to the mesh. Optional,
default is `None`, so that no subdomain information is used.
boundaries: The boundaries meshfunction corresponding to the mesh. Optional,
default is `None`, so that no boundary information is used.
comm: The MPI communicator used. Optional, default is `None`. This argument is
deprecated and will be removed in the future.
"""
comm = mesh.mpi_comm()
_utils.check_file_extension(mesh_file, "xdmf")
filepath = pathlib.Path(mesh_file)
folder_path = filepath.parent
filename = filepath.stem
if not folder_path.is_dir():
folder_path.mkdir(parents=True, exist_ok=True)
with fenics.XDMFFile(comm, mesh_file) as file:
file.write(mesh)
if subdomains is not None:
with fenics.XDMFFile(
comm, f"{str(folder_path)}/{filename}_subdomains.xdmf"
) as file:
file.write(subdomains)
if boundaries is not None:
with fenics.XDMFFile(
comm, f"{str(folder_path)}/{filename}_boundaries.xdmf"
) as file:
file.write(boundaries)
[docs]
def convert(
input_file: str,
output_file: str | None = None,
mode: str = "physical",
quiet: bool = False,
comm: MPI.Comm | None = None,
) -> None:
"""Converts the input mesh file to a xdmf mesh file for cashocs to work with.
Args:
input_file: A gmsh .msh file.
output_file: The name of the output .xdmf file or ``None``. If this is ``None``,
then a file name.msh will be converted to name.xdmf, i.e., the name of the
input file stays the same
quiet: A boolean flag which silences the output.
mode: The mode which is used to define the subdomains and boundaries. Should be
one of 'physical' (the default), 'geometrical', or 'none'.
comm: The MPI.Comm that is used for parallelization.
"""
mesh_converter = MeshConverter(comm)
mesh_converter.convert(input_file, outputfile=output_file, mode=mode, quiet=quiet)
[docs]
def create_point_representation(
dim: int, points: np.ndarray, idcs: np.ndarray, subwrite_counter: int
) -> str:
"""Creates the representation of the mesh coordinates for gmsh .msh file.
Args:
dim: Dimension of the mesh.
points: The array of the mesh coordinates.
idcs: The list of indices of the points for the current element.
subwrite_counter: A counter for looping over the indices.
Returns:
A string representation of the mesh coordinates.
"""
mod_line = ""
if dim == 2:
mod_line = (
f"{points[idcs[subwrite_counter]][0]:.16e} "
f"{points[idcs[subwrite_counter]][1]:.16e} 0\n"
)
elif dim == 3:
mod_line = (
f"{points[idcs[subwrite_counter]][0]:.16e} "
f"{points[idcs[subwrite_counter]][1]:.16e} "
f"{points[idcs[subwrite_counter]][2]:.16e}\n"
)
return mod_line
[docs]
def gather_coordinates(mesh: fenics.Mesh) -> np.ndarray:
"""Gathers the mesh coordinates on process 0 to write out the mesh to a Gmsh file.
Args:
mesh: The corresponding mesh.
Returns:
A numpy array which contains the vertex coordinates of the mesh
"""
comm = mesh.mpi_comm()
top = mesh.topology()
global_vertex_indices = top.global_indices(0)
num_global_vertices = mesh.num_entities_global(0)
local_mesh_coordinates = mesh.coordinates().copy()
local_coordinates_list = comm.gather(local_mesh_coordinates, root=0)
vertex_map_list = comm.gather(global_vertex_indices, root=0)
if comm.rank == 0:
coordinates = np.zeros((num_global_vertices, local_mesh_coordinates.shape[1]))
for coords, verts in zip(local_coordinates_list, vertex_map_list, strict=True):
coordinates[verts] = coords
else:
coordinates = np.zeros((1, 1))
comm.barrier()
return coordinates
[docs]
def parse_file(
original_msh_file: str, out_msh_file: str, points: np.ndarray, dim: int
) -> None:
"""Parses the mesh file and writes a new, corresponding one.
Args:
original_msh_file: Path to the original GMSH mesh file of the mesh object, has
to end with .msh.
out_msh_file: Path to the output mesh file, has to end with .msh.
points: The mesh coordinates gathered on process 0
dim: The dimensionality of the mesh
"""
with (
open(original_msh_file, encoding="utf-8") as old_file,
open(out_msh_file, "w", encoding="utf-8") as new_file,
):
node_section = False
info_section = False
subnode_counter = 0
subwrite_counter = 0
idcs = np.zeros(1, dtype=int)
for line in old_file:
if line == "$EndNodes\n":
node_section = False
if not node_section:
new_file.write(line)
else:
split_line = line.split(" ")
if info_section:
new_file.write(line)
info_section = False
else:
if len(split_line) == 4:
num_subnodes = int(split_line[-1][:-1])
subnode_counter = 0
subwrite_counter = 0
idcs = np.zeros(num_subnodes, dtype=int)
new_file.write(line)
elif len(split_line) == 1:
idcs[subnode_counter] = int(split_line[0][:-1]) - 1
subnode_counter += 1
new_file.write(line)
elif len(split_line) == 3:
mod_line = create_point_representation(
dim, points, idcs, subwrite_counter
)
new_file.write(mod_line)
subwrite_counter += 1
if line == "$Nodes\n":
node_section = True
info_section = True
[docs]
def check_mesh_compatibility(mesh: fenics.Mesh, original_mesh_file: str) -> None:
"""Checks, whether the supplied mesh file is compatible with the mesh used.
This function returns `None` if they are compatible and raises an exception
otherwise.
Args:
mesh: The mesh that is currently used.
original_mesh_file: The path to the mesh file.
"""
num_points = 0
comm = mesh.mpi_comm()
if comm.rank == 0:
with open(original_mesh_file, encoding="utf-8") as file:
node_info = False
for line in file:
if node_info:
split_line = line.split(" ")
num_points = int(split_line[1])
break
if line == "$Nodes\n":
node_info = True
comm.barrier()
number_of_points = int(comm.bcast(num_points, root=0))
if mesh.num_entities_global(0) != number_of_points:
raise _exceptions.CashocsException(
"The mesh supplied in the configuration file is not "
"compatible with the mesh used."
)
[docs]
def write_out_mesh(
mesh: fenics.Mesh, original_msh_file: str, out_msh_file: str
) -> None:
"""Writes out mesh as Gmsh .msh file.
This method updates the vertex positions in the ``original_gmsh_file``, the
topology of the mesh and its connections are the same. The original GMSH
file is kept, and a new one is generated under ``out_mesh_file``.
Args:
mesh: The mesh object in fenics that should be saved as Gmsh file.
original_msh_file: Path to the original GMSH mesh file of the mesh object, has
to end with .msh.
out_msh_file: Path to the output mesh file, has to end with .msh.
Notes:
The method only works with GMSH 4.1 file format. Others might also work, but
this is not tested or ensured in any way.
"""
check_mesh_compatibility(mesh, original_msh_file)
dim = mesh.geometric_dimension()
comm = mesh.mpi_comm()
if not pathlib.Path(out_msh_file).parent.is_dir():
pathlib.Path(out_msh_file).parent.mkdir(parents=True, exist_ok=True)
points = gather_coordinates(mesh)
if comm.rank == 0:
parse_file(original_msh_file, out_msh_file, points, dim)
comm.barrier()
[docs]
def read_mesh_from_xdmf(
filename: str, step: int = 0, comm: MPI.Comm | None = None
) -> fenics.Mesh:
"""Reads a mesh from a .xdmf file containing a checkpointed function.
Args:
filename: The filename to the .xdmf file.
step: The checkpoint number. Default is ``0``.
comm: MPI communicator that is to be used for creating the mesh. If this is
`None`, then COMM_WORLD is used.
Returns:
The corresponding mesh for the checkpoint number.
"""
h5_filename = f"{filename[:-5]}.h5"
with h5py.File(h5_filename) as file:
name = list(file.keys())[0]
step_name = f"{name}_{step}"
coordinates = file[name][step_name]["mesh"]["geometry"][()]
cells = file[name][step_name]["mesh"]["topology"][()]
gdim = coordinates.shape[1]
if cells.shape[1] == 2:
tdim = 1
cell_type = "line"
elif cells.shape[1] == 3:
tdim = 2
cell_type = "triangle"
elif cells.shape[1] == 4:
tdim = 3
cell_type = "tetrahedron"
else:
raise _exceptions.CashocsException("The mesh saved in the xdmf file is faulty.")
if tdim > gdim:
raise _exceptions.CashocsException(
"The topological dimension of a mesh must not be larger than its "
"geometrical dimension"
)
if comm is None:
comm = mpi.COMM_WORLD
mesh = fenics.Mesh(comm)
if comm.rank == 0:
mesh_editor = fenics.MeshEditor()
mesh_editor.open(mesh, cell_type, tdim, gdim)
mesh_editor.init_vertices(coordinates.shape[0])
mesh_editor.init_cells(cells.shape[0])
for i, vertex in enumerate(coordinates):
mesh_editor.add_vertex(i, vertex)
for i, cell in enumerate(cells):
mesh_editor.add_cell(i, cell)
mesh_editor.close()
mesh.init()
mesh.order()
fenics.MeshPartitioning.build_distributed_mesh(mesh)
return mesh
[docs]
class MeshConverter:
"""Converter from Gmsh to XDMF."""
def __init__(self, comm: MPI.Comm | None) -> None:
"""Initializes the mesh converter.
Args:
comm (MPI.Comm | None): The MPI.Comm used for parallelization. If this is
None, COMM_WORLD is used.
"""
if comm is None:
self.comm = mpi.COMM_WORLD
else:
self.comm = comm
[docs]
def check_mode(self, mode: str) -> None:
"""Cheks, whether the supplied mode is sensible.
Args:
mode: The mode that should be used for the conversion.
"""
if mode not in ["physical", "geometrical", "none"]:
raise _exceptions.CashocsException(
f"The supplied mode {mode} is invalid. "
f"Only possible options are 'physical', 'geometrical', or 'none'."
)
[docs]
def write_mesh(
self,
topological_dimension: int,
cell_data_dict: dict,
points: np.ndarray,
cells_dict: dict,
ostring: str,
mode: str,
) -> None:
"""Writes out a xdmf file with the mesh and corresponding subdomains.
Args:
topological_dimension: The topological dimension of the mesh.
cell_data_dict: The cell_data_dict of the mesh.
points: The array of points.
cells_dict: The cells_dict of the mesh.
ostring: The output string, containing the name and path to the output file,
without extension.
mode: The mode which is used to define the subdomains and boundaries.
Should be one of 'physical' (the default), 'geometrical', or 'none'.
"""
cell_type = "triangle"
if topological_dimension == 2:
cell_type = "triangle"
elif topological_dimension == 3:
cell_type = "tetra"
if mode == "physical":
dict_key = "gmsh:physical"
elif mode == "geometrical":
dict_key = "gmsh:geometrical"
else:
dict_key = None
try:
cell_data = {"subdomains": [cell_data_dict[dict_key][cell_type]]}
except KeyError:
cell_data = None
xdmf_mesh = meshio.Mesh(
points=points, cells={cell_type: cells_dict[cell_type]}, cell_data=cell_data
)
meshio.write(f"{ostring}.xdmf", xdmf_mesh)
[docs]
def write_boundaries(
self,
topological_dimension: int,
cell_data_dict: dict,
points: np.ndarray,
cells_dict: dict,
ostring: str,
mode: str,
) -> None:
"""Writes out a xdmf file with meshio corresponding to the boundaries.
Args:
topological_dimension: The topological dimension of the mesh.
cell_data_dict: The cell_data_dict of the mesh.
points: The array of points.
cells_dict: The cells_dict of the mesh.
ostring: The output string, containing the name and path to the output file,
without extension.
mode: The mode which is used to define the subdomains and boundaries.
Should be one of 'physical' (the default), 'geometrical', or 'none'.
"""
facet_str = "line"
if topological_dimension == 2:
facet_str = "line"
elif topological_dimension == 3:
facet_str = "triangle"
if mode == "physical":
dict_key = "gmsh:physical"
elif mode == "geometrical":
dict_key = "gmsh:geometrical"
else:
dict_key = None
if mode != "none" and dict_key in cell_data_dict.keys():
if facet_str in cell_data_dict[dict_key].keys():
xdmf_boundaries = meshio.Mesh(
points=points,
cells={facet_str: cells_dict[facet_str]},
cell_data={"boundaries": [cell_data_dict[dict_key][facet_str]]},
)
meshio.write(f"{ostring}_boundaries.xdmf", xdmf_boundaries)
else:
if pathlib.Path(f"{ostring}_boundaries.xdmf").is_file():
subprocess.run( # noqa: S603
["rm", f"{ostring}_boundaries.xdmf"], # noqa: S607
check=True,
)
if pathlib.Path(f"{ostring}_boundaries.h5").is_file():
subprocess.run( # noqa: S603
["rm", f"{ostring}_boundaries.h5"], # noqa: S607
check=True,
)
[docs]
def check_for_physical_names(
self, inputfile: str, topological_dimension: int, ostring: str
) -> None:
"""Checks and extracts physical tags if they are given as strings.
Args:
inputfile: Path to the input file.
topological_dimension: The dimension of the mesh.
ostring: The output string, containing the name and path to the output file,
without extension.
"""
physical_groups: dict[str, dict[str, int]] = {"dx": {}, "ds": {}}
has_physical_groups = False
with open(inputfile, encoding="utf-8") as infile:
for line in infile:
line = line.strip()
if line == "$PhysicalNames":
has_physical_groups = True
info_line = next(infile).strip()
no_physical_groups = int(info_line)
for _ in range(no_physical_groups):
physical_line = next(infile).strip().split()
phys_dim = int(physical_line[0])
phys_tag = int(physical_line[1])
phys_name = physical_line[2]
if "'" in phys_name:
phys_name = phys_name.replace("'", "")
if '"' in phys_name:
phys_name = phys_name.replace('"', "")
if phys_dim == topological_dimension:
physical_groups["dx"][phys_name] = phys_tag
elif phys_dim == topological_dimension - 1:
physical_groups["ds"][phys_name] = phys_tag
break
if has_physical_groups:
with open(
f"{ostring}_physical_groups.json", "w", encoding="utf-8"
) as ofile:
json.dump(physical_groups, ofile, indent=4)
[docs]
def convert(
self,
inputfile: str,
outputfile: str | None = None,
mode: str = "physical",
quiet: bool = False,
) -> None:
"""Converts a Gmsh .msh file to a .xdmf mesh file.
Args:
inputfile (str): Gmsh file to be converted.
outputfile (str | None, optional): Path to the target output. Defaults to
None.
mode (str, optional): The mode used to define the subdomains and
boundaries. This can be either "physical" or "geometrical". Defaults to
"physical".
quiet (bool, optional): A boolean flag which indicates whether the method
should produce output to stdout. Defaults to False.
"""
start_time = time.time()
_utils.check_file_extension(inputfile, "msh")
self.check_mode(mode)
if outputfile is None:
outputfile = f"{inputfile[:-4]}.xdmf"
_utils.check_file_extension(outputfile, "xdmf")
ostring = outputfile.rsplit(".", 1)[0]
if self.comm.rank == 0:
with contextlib.redirect_stdout(None):
mesh_collection = meshio.read(inputfile)
points = mesh_collection.points
cells_dict = mesh_collection.cells_dict
cell_data_dict = mesh_collection.cell_data_dict
# Check, whether we have a 2D or 3D mesh:
keyvals = cells_dict.keys()
topological_dimension = 2
if "tetra" in keyvals:
topological_dimension = 3
elif "triangle" in keyvals:
topological_dimension = 2
# check if geometrical dimension matches topological dimension
z_coords = points[:, 2]
if np.abs(np.max(z_coords) - np.min(z_coords)) <= 1e-15:
points = points[:, :2]
self.write_mesh(
topological_dimension, cell_data_dict, points, cells_dict, ostring, mode
)
self.write_boundaries(
topological_dimension, cell_data_dict, points, cells_dict, ostring, mode
)
self.check_for_physical_names(inputfile, topological_dimension, ostring)
self.comm.barrier()
end_time = time.time()
if not quiet:
log.info(
f"Successfully converted {inputfile} to {outputfile} "
f"in {end_time - start_time:.2f} s"
)