Source code for cashocs.io.mesh

# Copyright (C) 2020-2026 Fraunhofer ITWM and Sebastian Blauth
#
# This file is part of cashocs.
#
# cashocs is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cashocs is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cashocs.  If not, see <https://www.gnu.org/licenses/>.

"""Mesh input and output."""

from __future__ import annotations

import configparser
import contextlib
import gc
import json
import pathlib
import subprocess
import tempfile
import time
from typing import TYPE_CHECKING

import fenics
import h5py
import meshio
from mpi4py import MPI
import numpy as np

from cashocs import _exceptions
from cashocs import _utils
from cashocs import log
from cashocs import mpi
from cashocs.geometry.measure import NamedMeasure
from cashocs.geometry.mesh import _get_mesh_stats

if TYPE_CHECKING:
    from cashocs import _typing


[docs] def import_mesh(mesh_file: str, comm: MPI.Comm | None = None) -> _typing.MeshTuple: """Imports a mesh file for use with cashocs / FEniCS. This function imports a mesh file. The mesh file can either be a Gmsh mesh file or an xdmf mesh file that was generated by GMSH and converted to .xdmf with the function :py:func:`cashocs.convert`. If there are Physical quantities specified in the GMSH file, these are imported to the subdomains and boundaries output of this function and can also be directly accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc. Args: mesh_file: The location of the mesh file in .xdmf or .msh file format. comm: MPI communicator that is to be used for creating the mesh. Returns: A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh function for the boundaries, dx is a volume measure, ds is a surface measure, and dS is a measure for the interior facets. Notes: In case the boundaries in the Gmsh .msh file are not only marked with numbers (as physical groups), but also with names (i.e. strings), these strings can be used with the integration measures ``dx`` and ``ds`` returned by this method. E.g., if one specified the following in a 2D Gmsh .geo file :: Physical Surface("domain", 1) = {i,j,k}; where i,j,k are representative for some integers, then this can be used in the measure ``dx`` (as we are 2D) as follows. The command :: dx(1) is completely equivalent to :: dx("domain") and both can be used interchangeably. """ mesh_path = pathlib.Path(mesh_file) mesh_type = mesh_path.suffix if mesh_type == ".msh": mesh_tuple = _import_gmsh_mesh(mesh_file, comm) elif mesh_type == ".xdmf": mesh_tuple = _import_xdmf_mesh(mesh_file, comm) else: raise _exceptions.InputError( "cashocs.io.mesh.import_mesh", "mesh_file", "Only XDMF (.xdmf) or Gmsh (.msh) meshes are supported.", ) gc.collect() return mesh_tuple
def _import_gmsh_mesh( mesh_file: str, comm: MPI.Comm | None = None ) -> _typing.MeshTuple: """Imports a mesh file for use with cashocs / FEniCS. This function imports a mesh file that was generated by GMSH. If there are Physical quantities specified in the GMSH file, these are imported to the subdomains and boundaries output of this function and can also be directly accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc. Args: mesh_file: The location of the mesh file in .xdmf file format. comm: MPI communicator that is to be used for creating the mesh. Returns: A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh function for the boundaries, dx is a volume measure, ds is a surface measure, and dS is a measure for the interior facets. Notes: In case the boundaries in the Gmsh .msh file are not only marked with numbers (as physical groups), but also with names (i.e. strings), these strings can be used with the integration measures ``dx`` and ``ds`` returned by this method. E.g., if one specified the following in a 2D Gmsh .geo file :: Physical Surface("domain", 1) = {i,j,k}; where i,j,k are representative for some integers, then this can be used in the measure ``dx`` (as we are 2D) as follows. The command :: dx(1) is completely equivalent to :: dx("domain") and both can be used interchangeably. """ convert(mesh_file, quiet=True) mesh_filename = f"{mesh_file[:-4]}.xdmf" mesh_tuple = _import_xdmf_mesh(mesh_filename, comm) return mesh_tuple def _get_mesh_paths(mesh_file: str) -> tuple[str, str, str]: mesh_path = pathlib.Path(mesh_file) subdomains_test_path = mesh_path.with_stem(mesh_path.stem + "_subdomains") if subdomains_test_path.is_file(): subdomains_path = subdomains_test_path else: subdomains_path = mesh_path boundaries_path = mesh_path.with_stem(mesh_path.stem + "_boundaries") return str(mesh_path), str(subdomains_path), str(boundaries_path) def _get_tags_from_mvc(mvc: fenics.MeshValueCollection, comm: MPI.Comm) -> dict: tag_dict = {} for val in set(mvc.values().values()): tag_dict[str(val)] = val gathered_dicts = comm.allgather(tag_dict) merged_dict = {} for d in gathered_dicts: merged_dict.update(d) return merged_dict @_get_mesh_stats("import") # pylint:disable=protected-access def _import_xdmf_mesh( mesh_file: str, comm: MPI.Comm | None = None ) -> _typing.MeshTuple: """Imports a mesh file for use with cashocs / FEniCS. This function imports a mesh file that was generated by GMSH and converted to .xdmf with the function :py:func:`cashocs.convert`. If there are Physical quantities specified in the GMSH file, these are imported to the subdomains and boundaries output of this function and can also be directly accessed via the measures, e.g., with ``dx(1)``, ``ds(1)``, etc. Args: mesh_file: The location of the mesh file in .xdmf file format. comm: MPI communicator that is to be used for creating the mesh. If this is `None`, then COMM_WORLD is used. Returns: A tuple (mesh, subdomains, boundaries, dx, ds, dS), where mesh is the imported FEM mesh, subdomains is a mesh function for the subdomains, boundaries is a mesh function for the boundaries, dx is a volume measure, ds is a surface measure, and dS is a measure for the interior facets. Notes: In case the boundaries in the Gmsh .msh file are not only marked with numbers (as physical groups), but also with names (i.e. strings), these strings can be used with the integration measures ``dx`` and ``ds`` returned by this method. E.g., if one specified the following in a 2D Gmsh .geo file :: Physical Surface("domain", 1) = {i,j,k}; where i,j,k are representative for some integers, then this can be used in the measure ``dx`` (as we are 2D) as follows. The command :: dx(1) is completely equivalent to :: dx("domain") and both can be used interchangeably. """ if isinstance(mesh_file, configparser.ConfigParser): raise _exceptions.InputError( "cashocs.import_mesh", "mesh_file", "Calling cashocs.import_mesh with a ConfigParser instance" " is not allowed anymore starting with cashocs v2.", ) # Check for the file format mesh_string, subdomain_string, boundaries_string = _get_mesh_paths(mesh_file) if comm is None: comm = mpi.COMM_WORLD mesh = fenics.Mesh(comm) with fenics.XDMFFile(mesh.mpi_comm(), mesh_string) as xdmf_file: xdmf_file.read(mesh) subdomains_mvc = fenics.MeshValueCollection( "size_t", mesh, mesh.geometric_dimension() ) boundaries_mvc = fenics.MeshValueCollection( "size_t", mesh, mesh.geometric_dimension() - 1 ) subdomains_path = pathlib.Path(subdomain_string) if subdomains_path.is_file(): with fenics.XDMFFile(mesh.mpi_comm(), str(subdomains_path)) as xdmf_subdomains: xdmf_subdomains.read(subdomains_mvc, "subdomains") boundaries_path = pathlib.Path(boundaries_string) if boundaries_path.is_file(): with fenics.XDMFFile(mesh.mpi_comm(), str(boundaries_path)) as xdmf_boundaries: xdmf_boundaries.read(boundaries_mvc, "boundaries") physical_groups_path = pathlib.Path(mesh_string) physical_groups_path = physical_groups_path.with_name( physical_groups_path.stem + "_physical_groups.json" ) if physical_groups_path.is_file(): with physical_groups_path.open("r", encoding="utf-8") as file: physical_groups: dict[str, dict[str, int]] = json.load(file) else: physical_groups = { "dx": _get_tags_from_mvc(subdomains_mvc, comm), "ds": _get_tags_from_mvc(boundaries_mvc, comm), } subdomains = fenics.MeshFunction("size_t", mesh, subdomains_mvc) boundaries = fenics.MeshFunction("size_t", mesh, boundaries_mvc) dx = NamedMeasure( "dx", domain=mesh, subdomain_data=subdomains, physical_groups=physical_groups ) ds = NamedMeasure( "ds", domain=mesh, subdomain_data=boundaries, physical_groups=physical_groups ) dS = NamedMeasure( # pylint: disable=invalid-name "dS", domain=mesh, subdomain_data=boundaries, physical_groups=physical_groups ) mesh.physical_groups = physical_groups return mesh, subdomains, boundaries, dx, ds, dS
[docs] def export_mesh( mesh: fenics.Mesh, mesh_file: str, subdomains: fenics.MeshFunction | None = None, boundaries: fenics.MeshFunction | None = None, comm: MPI.Comm | None = None, ) -> None: """Exports a mesh (together with its subdomains and boundaries). This is useful so that the mesh can be reimported later on again. This is used for checkpointing in cashocs, but also has other use cases, e.g., storing a function on hard disk and later reimporting it to perform a post-processing. Args: mesh: The fenics mesh which shall be exported. mesh_file: Filename / path to the exported mesh. Has to end in .xdmf. Boundaries and subdomains will be named accordingly. subdomains: The subdomains meshfunction corresponding to the mesh. Optional, default is `None`, so that no subdomain information is used. boundaries: The boundaries meshfunction corresponding to the mesh. Optional, default is `None`, so that no boundary information is used. comm: The MPI communicator used. Optional, default is `None`. This argument is deprecated and will be removed in the future. """ comm = mesh.mpi_comm() _utils.check_file_extension(mesh_file, "xdmf") filepath = pathlib.Path(mesh_file) folder_path = filepath.parent filename = filepath.stem if not folder_path.is_dir(): folder_path.mkdir(parents=True, exist_ok=True) with fenics.XDMFFile(comm, mesh_file) as file: file.write(mesh) if subdomains is not None: with fenics.XDMFFile( comm, f"{str(folder_path)}/{filename}_subdomains.xdmf" ) as file: file.write(subdomains) if boundaries is not None: with fenics.XDMFFile( comm, f"{str(folder_path)}/{filename}_boundaries.xdmf" ) as file: file.write(boundaries)
[docs] def convert( input_file: str, output_file: str | None = None, mode: str = "physical", quiet: bool = False, comm: MPI.Comm | None = None, ) -> None: """Converts the input mesh file to a xdmf mesh file for cashocs to work with. Args: input_file: A gmsh .msh file. output_file: The name of the output .xdmf file or ``None``. If this is ``None``, then a file name.msh will be converted to name.xdmf, i.e., the name of the input file stays the same quiet: A boolean flag which silences the output. mode: The mode which is used to define the subdomains and boundaries. Should be one of 'physical' (the default), 'geometrical', or 'none'. comm: The MPI.Comm that is used for parallelization. """ mesh_converter = MeshConverter(comm) mesh_converter.convert(input_file, outputfile=output_file, mode=mode, quiet=quiet)
[docs] def create_point_representation( dim: int, points: np.ndarray, idcs: np.ndarray, subwrite_counter: int ) -> str: """Creates the representation of the mesh coordinates for gmsh .msh file. Args: dim: Dimension of the mesh. points: The array of the mesh coordinates. idcs: The list of indices of the points for the current element. subwrite_counter: A counter for looping over the indices. Returns: A string representation of the mesh coordinates. """ mod_line = "" if dim == 2: mod_line = ( f"{points[idcs[subwrite_counter]][0]:.16e} " f"{points[idcs[subwrite_counter]][1]:.16e} 0\n" ) elif dim == 3: mod_line = ( f"{points[idcs[subwrite_counter]][0]:.16e} " f"{points[idcs[subwrite_counter]][1]:.16e} " f"{points[idcs[subwrite_counter]][2]:.16e}\n" ) return mod_line
[docs] def gather_coordinates(mesh: fenics.Mesh) -> np.ndarray: """Gathers the mesh coordinates on process 0 to write out the mesh to a Gmsh file. Args: mesh: The corresponding mesh. Returns: A numpy array which contains the vertex coordinates of the mesh """ comm = mesh.mpi_comm() top = mesh.topology() global_vertex_indices = top.global_indices(0) num_global_vertices = mesh.num_entities_global(0) local_mesh_coordinates = mesh.coordinates().copy() local_coordinates_list = comm.gather(local_mesh_coordinates, root=0) vertex_map_list = comm.gather(global_vertex_indices, root=0) if comm.rank == 0: coordinates = np.zeros((num_global_vertices, local_mesh_coordinates.shape[1])) for coords, verts in zip(local_coordinates_list, vertex_map_list, strict=True): coordinates[verts] = coords else: coordinates = np.zeros((1, 1)) comm.barrier() return coordinates
[docs] def parse_file( original_msh_file: str, out_msh_file: str, points: np.ndarray, dim: int ) -> None: """Parses the mesh file and writes a new, corresponding one. Args: original_msh_file: Path to the original GMSH mesh file of the mesh object, has to end with .msh. out_msh_file: Path to the output mesh file, has to end with .msh. points: The mesh coordinates gathered on process 0 dim: The dimensionality of the mesh """ with ( open(original_msh_file, encoding="utf-8") as old_file, open(out_msh_file, "w", encoding="utf-8") as new_file, ): node_section = False info_section = False subnode_counter = 0 subwrite_counter = 0 idcs = np.zeros(1, dtype=int) for line in old_file: if line == "$EndNodes\n": node_section = False if not node_section: new_file.write(line) else: split_line = line.split(" ") if info_section: new_file.write(line) info_section = False else: if len(split_line) == 4: num_subnodes = int(split_line[-1][:-1]) subnode_counter = 0 subwrite_counter = 0 idcs = np.zeros(num_subnodes, dtype=int) new_file.write(line) elif len(split_line) == 1: idcs[subnode_counter] = int(split_line[0][:-1]) - 1 subnode_counter += 1 new_file.write(line) elif len(split_line) == 3: mod_line = create_point_representation( dim, points, idcs, subwrite_counter ) new_file.write(mod_line) subwrite_counter += 1 if line == "$Nodes\n": node_section = True info_section = True
[docs] def check_mesh_compatibility(mesh: fenics.Mesh, original_mesh_file: str) -> None: """Checks, whether the supplied mesh file is compatible with the mesh used. This function returns `None` if they are compatible and raises an exception otherwise. Args: mesh: The mesh that is currently used. original_mesh_file: The path to the mesh file. """ num_points = 0 comm = mesh.mpi_comm() if comm.rank == 0: with open(original_mesh_file, encoding="utf-8") as file: node_info = False for line in file: if node_info: split_line = line.split(" ") num_points = int(split_line[1]) break if line == "$Nodes\n": node_info = True comm.barrier() number_of_points = int(comm.bcast(num_points, root=0)) if mesh.num_entities_global(0) != number_of_points: raise _exceptions.CashocsException( "The mesh supplied in the configuration file is not " "compatible with the mesh used." )
[docs] def write_out_mesh( mesh: fenics.Mesh, original_msh_file: str, out_msh_file: str ) -> None: """Writes out mesh as Gmsh .msh file. This method updates the vertex positions in the ``original_gmsh_file``, the topology of the mesh and its connections are the same. The original GMSH file is kept, and a new one is generated under ``out_mesh_file``. Args: mesh: The mesh object in fenics that should be saved as Gmsh file. original_msh_file: Path to the original GMSH mesh file of the mesh object, has to end with .msh. out_msh_file: Path to the output mesh file, has to end with .msh. Notes: The method only works with GMSH 4.1 file format. Others might also work, but this is not tested or ensured in any way. """ check_mesh_compatibility(mesh, original_msh_file) dim = mesh.geometric_dimension() comm = mesh.mpi_comm() if not pathlib.Path(out_msh_file).parent.is_dir(): pathlib.Path(out_msh_file).parent.mkdir(parents=True, exist_ok=True) points = gather_coordinates(mesh) if comm.rank == 0: parse_file(original_msh_file, out_msh_file, points, dim) comm.barrier()
[docs] def read_mesh_from_xdmf( filename: str, step: int = 0, comm: MPI.Comm | None = None ) -> fenics.Mesh: """Reads a mesh from a .xdmf file containing a checkpointed function. Args: filename: The filename to the .xdmf file. step: The checkpoint number. Default is ``0``. comm: MPI communicator that is to be used for creating the mesh. If this is `None`, then COMM_WORLD is used. Returns: The corresponding mesh for the checkpoint number. """ h5_filename = f"{filename[:-5]}.h5" with h5py.File(h5_filename) as file: name = list(file.keys())[0] step_name = f"{name}_{step}" coordinates = file[name][step_name]["mesh"]["geometry"][()] cells = file[name][step_name]["mesh"]["topology"][()] gdim = coordinates.shape[1] if cells.shape[1] == 2: tdim = 1 cell_type = "line" elif cells.shape[1] == 3: tdim = 2 cell_type = "triangle" elif cells.shape[1] == 4: tdim = 3 cell_type = "tetrahedron" else: raise _exceptions.CashocsException("The mesh saved in the xdmf file is faulty.") if tdim > gdim: raise _exceptions.CashocsException( "The topological dimension of a mesh must not be larger than its " "geometrical dimension" ) if comm is None: comm = mpi.COMM_WORLD mesh = fenics.Mesh(comm) if comm.rank == 0: mesh_editor = fenics.MeshEditor() mesh_editor.open(mesh, cell_type, tdim, gdim) mesh_editor.init_vertices(coordinates.shape[0]) mesh_editor.init_cells(cells.shape[0]) for i, vertex in enumerate(coordinates): mesh_editor.add_vertex(i, vertex) for i, cell in enumerate(cells): mesh_editor.add_cell(i, cell) mesh_editor.close() mesh.init() mesh.order() fenics.MeshPartitioning.build_distributed_mesh(mesh) return mesh
[docs] def extract_mesh_from_xdmf( xdmffile: str, iteration: int = 0, outputfile: str | None = None, original_gmsh_file: str | None = None, quiet: bool = False, # pylint: disable= unused-argument ) -> None: """Extracts a Gmsh mesh file from an XDMF state file. Args: xdmffile: The path to the XDMF state file. iteration: The iteration of interest (for a time series saved in the XDMF file), default is 0. outputfile: The path to the output Gmsh file. The default is `None` in which case the output is saved in the same directory as the XDMF state file. original_gmsh_file: The original gmsh .msh file used to create the mesh. This can be used to generate output files which preserve, e.g., physical tags and only updates the nodal positions. This will not work if the geometry has been remeshed. The default is `None`, which uses a more robust (but less detailed) approach. quiet: When this is set to `True`, verbose output is disabled. Default is `False`. """ log.begin("Extracting the mesh from the XDMF file", level=log.DEBUG) _utils.check_file_extension(xdmffile, "xdmf") if outputfile is None: outputfile = f"{xdmffile[:-5]}.msh" _utils.check_file_extension(outputfile, "msh") mesh = read_mesh_from_xdmf(xdmffile, step=iteration) comm = mesh.mpi_comm() if comm.rank == 0: tmp = tempfile.mkdtemp(prefix="cashocs_tmp_") else: tmp = "" comm.barrier() tempdir = comm.bcast(tmp, root=0) mesh_location_xdmf = f"{tempdir}/mesh.xdmf" comm.barrier() if original_gmsh_file is None: try: with fenics.XDMFFile(comm, mesh_location_xdmf) as file: file.write(mesh, fenics.XDMFFile.Encoding.HDF5) if comm.rank == 0: subprocess.run( # noqa: S603 [ # noqa: S607 "meshio", "convert", "--output-format", "gmsh", "--ascii", mesh_location_xdmf, outputfile, ], check=True, ) comm.barrier() finally: if comm.rank == 0: subprocess.run(["rm", "-r", tempdir], check=True) # noqa: S603, S607 comm.barrier() else: write_out_mesh(mesh, original_gmsh_file, outputfile) log.end()
[docs] class MeshConverter: """Converter from Gmsh to XDMF.""" def __init__(self, comm: MPI.Comm | None) -> None: """Initializes the mesh converter. Args: comm (MPI.Comm | None): The MPI.Comm used for parallelization. If this is None, COMM_WORLD is used. """ if comm is None: self.comm = mpi.COMM_WORLD else: self.comm = comm
[docs] def check_mode(self, mode: str) -> None: """Cheks, whether the supplied mode is sensible. Args: mode: The mode that should be used for the conversion. """ if mode not in ["physical", "geometrical", "none"]: raise _exceptions.CashocsException( f"The supplied mode {mode} is invalid. " f"Only possible options are 'physical', 'geometrical', or 'none'." )
[docs] def write_mesh( self, topological_dimension: int, cell_data_dict: dict, points: np.ndarray, cells_dict: dict, ostring: str, mode: str, ) -> None: """Writes out a xdmf file with the mesh and corresponding subdomains. Args: topological_dimension: The topological dimension of the mesh. cell_data_dict: The cell_data_dict of the mesh. points: The array of points. cells_dict: The cells_dict of the mesh. ostring: The output string, containing the name and path to the output file, without extension. mode: The mode which is used to define the subdomains and boundaries. Should be one of 'physical' (the default), 'geometrical', or 'none'. """ cell_type = "triangle" if topological_dimension == 2: cell_type = "triangle" elif topological_dimension == 3: cell_type = "tetra" if mode == "physical": dict_key = "gmsh:physical" elif mode == "geometrical": dict_key = "gmsh:geometrical" else: dict_key = None try: cell_data = {"subdomains": [cell_data_dict[dict_key][cell_type]]} except KeyError: cell_data = None xdmf_mesh = meshio.Mesh( points=points, cells={cell_type: cells_dict[cell_type]}, cell_data=cell_data ) meshio.write(f"{ostring}.xdmf", xdmf_mesh)
[docs] def write_boundaries( self, topological_dimension: int, cell_data_dict: dict, points: np.ndarray, cells_dict: dict, ostring: str, mode: str, ) -> None: """Writes out a xdmf file with meshio corresponding to the boundaries. Args: topological_dimension: The topological dimension of the mesh. cell_data_dict: The cell_data_dict of the mesh. points: The array of points. cells_dict: The cells_dict of the mesh. ostring: The output string, containing the name and path to the output file, without extension. mode: The mode which is used to define the subdomains and boundaries. Should be one of 'physical' (the default), 'geometrical', or 'none'. """ facet_str = "line" if topological_dimension == 2: facet_str = "line" elif topological_dimension == 3: facet_str = "triangle" if mode == "physical": dict_key = "gmsh:physical" elif mode == "geometrical": dict_key = "gmsh:geometrical" else: dict_key = None if mode != "none" and dict_key in cell_data_dict.keys(): if facet_str in cell_data_dict[dict_key].keys(): xdmf_boundaries = meshio.Mesh( points=points, cells={facet_str: cells_dict[facet_str]}, cell_data={"boundaries": [cell_data_dict[dict_key][facet_str]]}, ) meshio.write(f"{ostring}_boundaries.xdmf", xdmf_boundaries) else: if pathlib.Path(f"{ostring}_boundaries.xdmf").is_file(): subprocess.run( # noqa: S603 ["rm", f"{ostring}_boundaries.xdmf"], # noqa: S607 check=True, ) if pathlib.Path(f"{ostring}_boundaries.h5").is_file(): subprocess.run( # noqa: S603 ["rm", f"{ostring}_boundaries.h5"], # noqa: S607 check=True, )
[docs] def check_for_physical_names( self, inputfile: str, topological_dimension: int, ostring: str ) -> None: """Checks and extracts physical tags if they are given as strings. Args: inputfile: Path to the input file. topological_dimension: The dimension of the mesh. ostring: The output string, containing the name and path to the output file, without extension. """ physical_groups: dict[str, dict[str, int]] = {"dx": {}, "ds": {}} has_physical_groups = False with open(inputfile, encoding="utf-8") as infile: for line in infile: line = line.strip() if line == "$PhysicalNames": has_physical_groups = True info_line = next(infile).strip() no_physical_groups = int(info_line) for _ in range(no_physical_groups): physical_line = next(infile).strip().split() phys_dim = int(physical_line[0]) phys_tag = int(physical_line[1]) phys_name = physical_line[2] if "'" in phys_name: phys_name = phys_name.replace("'", "") if '"' in phys_name: phys_name = phys_name.replace('"', "") if phys_dim == topological_dimension: physical_groups["dx"][phys_name] = phys_tag elif phys_dim == topological_dimension - 1: physical_groups["ds"][phys_name] = phys_tag break if has_physical_groups: with open( f"{ostring}_physical_groups.json", "w", encoding="utf-8" ) as ofile: json.dump(physical_groups, ofile, indent=4)
[docs] def convert( self, inputfile: str, outputfile: str | None = None, mode: str = "physical", quiet: bool = False, ) -> None: """Converts a Gmsh .msh file to a .xdmf mesh file. Args: inputfile (str): Gmsh file to be converted. outputfile (str | None, optional): Path to the target output. Defaults to None. mode (str, optional): The mode used to define the subdomains and boundaries. This can be either "physical" or "geometrical". Defaults to "physical". quiet (bool, optional): A boolean flag which indicates whether the method should produce output to stdout. Defaults to False. """ start_time = time.time() _utils.check_file_extension(inputfile, "msh") self.check_mode(mode) if outputfile is None: outputfile = f"{inputfile[:-4]}.xdmf" _utils.check_file_extension(outputfile, "xdmf") ostring = outputfile.rsplit(".", 1)[0] if self.comm.rank == 0: with contextlib.redirect_stdout(None): mesh_collection = meshio.read(inputfile) points = mesh_collection.points cells_dict = mesh_collection.cells_dict cell_data_dict = mesh_collection.cell_data_dict # Check, whether we have a 2D or 3D mesh: keyvals = cells_dict.keys() topological_dimension = 2 if "tetra" in keyvals: topological_dimension = 3 elif "triangle" in keyvals: topological_dimension = 2 # check if geometrical dimension matches topological dimension z_coords = points[:, 2] if np.abs(np.max(z_coords) - np.min(z_coords)) <= 1e-15: points = points[:, :2] self.write_mesh( topological_dimension, cell_data_dict, points, cells_dict, ostring, mode ) self.write_boundaries( topological_dimension, cell_data_dict, points, cells_dict, ostring, mode ) self.check_for_physical_names(inputfile, topological_dimension, ostring) self.comm.barrier() end_time = time.time() if not quiet: log.info( f"Successfully converted {inputfile} to {outputfile} " f"in {end_time - start_time:.2f} s" )