150 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			150 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
## Implementation of a mesh based full solver with boundary conditions etc.
 | 
						|
import numpy as np
 | 
						|
from . import forces_mesh
 | 
						|
import logging
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def mesh_solver(
 | 
						|
        particles: np.ndarray,
 | 
						|
        G: float,
 | 
						|
        mapping: callable,
 | 
						|
        n_grid: int,
 | 
						|
        bounds: tuple = (-1, 1),
 | 
						|
        boundary: str = "vanishing",
 | 
						|
    ) -> np.ndarray:
 | 
						|
    """
 | 
						|
    Computes the gravitational force acting on a set of particles using a mesh-based approach. The mesh is of fixed size: n_grid x n_grid x n_grid spanning the given bounds. Particles reaching the boundary are treated according to the boundary condition.
 | 
						|
    Args:
 | 
						|
    - particles: np.ndarray, shape=(n, 4). Assumes that the particles array has the following columns: x, y, z, m.
 | 
						|
    - G: float, the gravitational constant.
 | 
						|
    - mapping: callable, the mapping function to use.
 | 
						|
    - n_grid: int, the number of grid points in each direction.
 | 
						|
    - bounds: tuple, the bounds of the mesh.
 | 
						|
    - boundary: str, the boundary condition to apply.
 | 
						|
    """
 | 
						|
    if particles.shape[1] != 4:
 | 
						|
        raise ValueError("Particles array must have 4 columns: x, y, z, m")
 | 
						|
 | 
						|
    logger.debug(f"Computing forces for {particles.shape[0]} particles using mesh [mapping={mapping.__name__}, {n_grid=}]")
 | 
						|
 | 
						|
    # the function is fine, let's abuse it somewhat
 | 
						|
    
 | 
						|
    axis = np.linspace(bounds[0], bounds[1], n_grid)
 | 
						|
    mesh = np.zeros((n_grid, n_grid, n_grid))
 | 
						|
    spacing = np.abs(axis[1] - axis[0])
 | 
						|
    logger.debug(f"Using mesh spacing: {spacing}")
 | 
						|
 | 
						|
 | 
						|
    # # Check that the boundary condition is fullfilled
 | 
						|
    # if boundary == "periodic":
 | 
						|
    #     raise NotImplementedError("Periodic boundary conditions are not implemented yet")
 | 
						|
    # elif boundary == "vanishing":
 | 
						|
    #     # remove the particles that are outside the mesh
 | 
						|
    #     outlier_mask = np.logical_or(particles[:, :3] < bounds[0],  particles[:, :3] > bounds[1])
 | 
						|
    #     if np.any(outlier_mask):
 | 
						|
    #         idx = np.any(outlier_mask, axis=1)
 | 
						|
    #         logger.info(f"{idx.shape=}")
 | 
						|
    #         logger.warning(f"Removing {np.sum(idx)} particles that left the mesh")
 | 
						|
    #         # replace the particles by nan values
 | 
						|
    #         particles[idx, :] = np.nan
 | 
						|
    #         print(np.sum(np.isnan(particles)))
 | 
						|
    # else:
 | 
						|
    #     raise ValueError(f"Unknown boundary condition: {boundary}")
 | 
						|
 | 
						|
 | 
						|
    # fill the mesh
 | 
						|
    particles_to_mesh(particles, mesh, axis, mapping)
 | 
						|
    # we want a density mesh:
 | 
						|
    cell_volume = spacing**3
 | 
						|
    rho = mesh / cell_volume
 | 
						|
 | 
						|
    if logger.isEnabledFor(logging.DEBUG):
 | 
						|
        forces_mesh.show_mesh_information(mesh, "Density mesh")
 | 
						|
 | 
						|
    # compute the potential and its gradient
 | 
						|
    phi_grad = forces_mesh.mesh_poisson(rho, G, spacing)
 | 
						|
 | 
						|
    if logger.isEnabledFor(logging.DEBUG):
 | 
						|
        logger.debug(f"Got phi_grad with: {phi_grad.shape}, {np.max(phi_grad)}")
 | 
						|
        forces_mesh.show_mesh_information(phi_grad[0], "Potential gradient (x-direction)")
 | 
						|
 | 
						|
    # compute the particle forces from the mesh potential
 | 
						|
    forces = np.zeros_like(particles[:, :3])
 | 
						|
    for i, p in enumerate(particles):
 | 
						|
        ijk = np.digitize(p, axis) - 1
 | 
						|
        logger.debug(f"Particle {p} maps to cell {ijk}")
 | 
						|
        # this gives 4 entries since p[3] the mass is digitized as well -> this is meaningless and we discard it
 | 
						|
        # logger.debug(f"Particle {p} maps to cell {ijk}")
 | 
						|
        forces[i] = - p[3] * phi_grad[..., ijk[0], ijk[1], ijk[2]]
 | 
						|
 | 
						|
    return forces
 | 
						|
 | 
						|
 | 
						|
def particles_to_mesh(particles: np.ndarray, mesh: np.ndarray, axis: np.ndarray, mapping: callable) -> None:
 | 
						|
    """
 | 
						|
    Maps a list of particles to an existing mesh, filling it inplace
 | 
						|
    """
 | 
						|
    if particles.shape[1] < 4:
 | 
						|
        raise ValueError("Particles array must have at least 4 columns: x, y, z, m")
 | 
						|
 | 
						|
    # axis provide an easy way to map the particles to the mesh
 | 
						|
    for p in particles:
 | 
						|
        m = p[-1]
 | 
						|
        # spread the star onto cells through the shape function, taking into account the mass
 | 
						|
        ijks, weights = mapping(p, axis)
 | 
						|
        for ijk, weight in zip(ijks, weights):
 | 
						|
            mesh[ijk[0], ijk[1], ijk[2]] += weight * m
 | 
						|
 | 
						|
 | 
						|
'''
 | 
						|
#### Actually need to patch this
 | 
						|
def ode_setup(particles: np.ndarray, force_function: callable) -> tuple[np.ndarray, callable]:
 | 
						|
    """
 | 
						|
    Linearizes the ODE system for the particles interacting gravitationally.
 | 
						|
    Returns:
 | 
						|
        - the Y0 array corresponding to the initial conditions (x0 and v0)
 | 
						|
        - the function that computes the right hand side of the ODE with function signature f(t, y)
 | 
						|
    Assumes that the particles array has the following columns: x, y, z, vx, vy, vz, m.
 | 
						|
    """
 | 
						|
    if particles.shape[1] != 7:
 | 
						|
        raise ValueError("Particles array must have 7 columns: x, y, z, vx, vy, vz, m")
 | 
						|
 | 
						|
    # for the integrators we need to flatten array which contains 7 columns for now
 | 
						|
    # we don't really care how we reshape as long as we unflatten consistently
 | 
						|
    particles = particles.flatten()
 | 
						|
    logger.debug(f"Reshaped 7 columns into {particles.shape=}")
 | 
						|
 | 
						|
    def f(y, t):
 | 
						|
        """
 | 
						|
        Computes the right hand side of the ODE system.
 | 
						|
        The ODE system is linearized around the current positions and velocities.
 | 
						|
        """
 | 
						|
        p = to_particles(y)
 | 
						|
        # this is explicitly a copy, which has shape (n, 7)
 | 
						|
        # columns x, y, z, vx, vy, vz, m
 | 
						|
        # (need to keep y intact since integrators make multiple function calls)
 | 
						|
 | 
						|
        forces = force_function(p[:, [0, 1, 2, -1]])
 | 
						|
 | 
						|
        # compute the accelerations
 | 
						|
        masses = p[:, -1]
 | 
						|
        a = forces / masses[:, None]
 | 
						|
        # the [:, None] is to force broadcasting in order to divide each row of forces by the corresponding mass
 | 
						|
 | 
						|
        # the position columns become the velocities
 | 
						|
        # the velocity columns become the accelerations
 | 
						|
        p[:, 0:3] = p[:, 3:6]
 | 
						|
        p[:, 3:6] = a
 | 
						|
        # the masses remain unchanged
 | 
						|
        # p[:, -1] = p[:, -1]
 | 
						|
 | 
						|
        # flatten the array again
 | 
						|
        # logger.debug(f"As particles: {y}")
 | 
						|
        p = p.reshape(-1, copy=False)
 | 
						|
        
 | 
						|
        # logger.debug(f"As column: {y}")
 | 
						|
        return p
 | 
						|
 | 
						|
    return particles, f
 | 
						|
''' |