import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from pathlib import Path from .units import apply_units import logging logger = logging.getLogger(__name__) def density_distribution(r_bins: np.ndarray, particles: np.ndarray, ret_error: bool = False): """ Computes the radial density distribution of a set of particles. Assumes that the particles array has the following columns: x, y, z, m. If ret_error is True, it will return the absolute error of the density. """ if particles.shape[1] != 4: raise ValueError("Particles array must have 4 columns: x, y, z, m") m = particles[:, 3] r = np.linalg.norm(particles[:, :3], axis=1) m_shells = np.zeros_like(r_bins) v_shells = np.zeros_like(r_bins) error_relative = np.zeros_like(r_bins) r_bins = np.insert(r_bins, 0, 0) for i in range(len(r_bins) - 1): mask = (r >= r_bins[i]) & (r < r_bins[i + 1]) m_shells[i] = np.sum(m[mask]) v_shells[i] = 4/3 * np.pi * (r_bins[i + 1]**3 - r_bins[i]**3) if ret_error: count = np.count_nonzero(mask) if count > 0: # the absolute error is the square root of the number of particles error_relative[i] = 1 / np.sqrt(count) else: error_relative[i] = 0 density = m_shells / v_shells if ret_error: return density, density * error_relative else: return density def particle_count(r_bins, particles): r = np.linalg.norm(particles[:, :3], axis=1) # r_bins = np.insert(r_bins, 0, 0) count = np.zeros_like(r_bins) for i in range(len(r_bins) - 1): mask = (r >= r_bins[i]) & (r < r_bins[i+1]) count[i] = np.count_nonzero(mask) return count def r_distribution(particles: np.ndarray): """ Computes the distribution of distances (to the origin) of a set of particles. Assumes that the particles array has the following columns: x, y, z ... """ if particles.shape[1] < 3: raise ValueError("Particles array must have at least 3 columns: x, y, z") r = np.linalg.norm(particles[:, :3], axis=1) return r def remove_outliers(particles: np.ndarray, std_threshold: float = 3): """ Removes outliers from a set of particles. Assumes that the particles array has the following columns: x, y, z ... """ if particles.shape[1] < 3: raise ValueError("Particles array must have at least 3 columns: x, y, z") r = np.linalg.norm(particles[:, :3], axis=1) r_std = np.std(r) r_mean = np.mean(r) mask = np.abs(r - r_mean) < std_threshold * r_std return particles[mask] def mean_interparticle_distance(particles: np.ndarray): """ Computes the mean interparticle distance of a set of particles. Assumes that the particles array has the following columns: x, y, z ... """ if particles.shape[1] < 3: raise ValueError("Particles array must have at least 3 columns: x, y, z") r_half_mass = half_mass_radius(particles) r = np.linalg.norm(particles[:, :3], axis=1) n_half_mass = np.sum(r < r_half_mass) logger.debug(f"Number of particles within half mass radius: {n_half_mass} of {particles.shape[0]}") rho = n_half_mass / (4/3 * np.pi * r_half_mass**3) # the mean distance between particles is the inverse of the density epsilon = (1 / rho)**(1/3) logger.info(f"Found mean interparticle distance: {epsilon}") return epsilon def half_mass_radius(particles: np.ndarray): """ Computes the half mass radius of a set of particles. Assumes that the particles array has the following columns: x, y, z, m, ... """ if particles.shape[1] < 4: raise ValueError("Particles array must have at least 4 columns: x, y, z, m") # even though in the simple example, all the masses are the same, we will consider the general case total_mass = np.sum(particles[:, 3]) half_mass = total_mass / 2 # sort the particles by distance r = np.linalg.norm(particles[:, :3], axis=1) indices = np.argsort(r) r = r[indices] masses = particles[indices, 3] masses_cumsum = np.cumsum(masses) i = np.argmin(np.abs(masses_cumsum - half_mass)) logger.debug(f"Half mass radius: {r[i]} for {i}th particle of {particles.shape[0]}") r_hm = r[i] return r_hm def particles_plot_3d(positions: np.ndarray, masses: np.ndarray, title: str = "Particle distribution (3D)"): """ Plots a 3D scatter plot of a set of particles. Assumes that the particles array has the shape: - either 4 columns: x, y, z, m - or 7 columns: x, y, z, vx, vy, vz, m Colormap is the mass of the particles. """ x, y, z = positions[:, 0], positions[:, 1], positions[:, 2] fig = plt.figure() fig.suptitle(title) ax = fig.add_subplot(111, projection='3d') if np.all(masses == masses[0]): sc = ax.scatter(x, y, z, c='blue') cbar = plt.colorbar(sc, ax=ax, pad=0.1) else: sc = ax.scatter(x, y, z, cmap='coolwarm', c=masses) cbar = plt.colorbar(sc, ax=ax, pad=0.1) try: cbar.set_label(f'Mass [{masses.unit:latex}]') ax.set_xlabel(f'x [{x.unit:latex}]') ax.set_ylabel(f'y [{x.unit:latex}]') ax.set_zlabel(f'z [{x.unit:latex}]') except AttributeError: cbar.set_label('Mass') ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') plt.show() logger.debug("3D scatter plot with mass colormap") def particles_plot_2d(particles: np.ndarray, title: str = "Flattened distribution (along z)", ax = None): """ Plots a 2 colormap of a set of particles, flattened in the z direction. Assumes that the particles array has the shape: - either 3 columns: x, y, z - or 4 columns: x, y, z, m - or 7 columns: x, y, z, vx, vy, vz, m """ if particles.shape[1] == 3: x, y, z = particles[:, 0], particles[:, 1], particles[:, 2] elif particles.shape[1] == 4: x, y, z, m = particles[:, 0], particles[:, 1], particles[:, 2], particles[:, 3] elif particles.shape[1] == 7: x, y, z, m = particles[:, 0], particles[:, 1], particles[:, 2], particles[:, 6] else: raise ValueError("Particles array must have 3, 4 or 7 columns") if ax is None: plt.figure() plt.title(title) plt.hist2d(x, y, bins=100, cmap='coolwarm') cbar = plt.colorbar() cbar.set_label(f'Particle count') plt.show() else: ax.hist2d(x, y, bins=100, cmap='coolwarm') def particles_plot_2d_multiframe(particles_3d: np.ndarray, t_range: np.ndarray, title: str): # reduce the font size fig, axs = plt.subplots(4, 6, figsize=(20, 12)) fig.suptitle(title) # make sure we have enough time steps to show diff = axs.size - particles_3d.shape[0] if diff > 0: logger.debug(f"Adding dummy time steps: {diff=} -> {axs.size=}") plot_t_range = np.concatenate([t_range, np.zeros(diff)]) plot_particles_in_time = particles_3d elif diff < 0: logger.debug(f"Too many steps to plot - reducing: {particles_3d.shape[0]} -> {axs.size}") # skip some of the time steps plot_t_range = [] plot_particles_in_time = [] for i in range(axs.size): idx = int(i / axs.size * particles_3d.shape[0]) # make sure we have the first and last time step are included if i == 0: idx = 0 elif i == axs.size - 1: idx = particles_3d.shape[0] - 1 plot_t_range.append(t_range[idx]) plot_particles_in_time.append(particles_3d[idx]) else: plot_t_range = t_range plot_particles_in_time = particles_3d for p, t, a in zip(plot_particles_in_time, plot_t_range, axs.flat): a.set_title(f"t={t:.2g}") particles_plot_2d(p, ax=a) plt.show() def particles_plot_2d_animated(particles_3d: np.ndarray, t_range: np.ndarray, output: Path): # Also: show the 2D evolution as a GIF plt.ioff() fig, ax = plt.subplots() fig.suptitle("Particle evolution (top view)") ax.set_aspect('equal') xmax = np.max(particles_3d[:, :, :3]) ax.set_xlim(-xmax, xmax) ax.set_ylim(-xmax, xmax) ax.set_xlabel('x') ax.set_ylabel('y') def update(i): ax.set_title(f"t={t_range[i]:.2g}") particles_plot_2d(particles_3d[i], ax=ax) ax.set_xlim(-xmax, xmax) # Ensure x limits remain fixed ax.set_ylim(-xmax, xmax) # Ensure y limits remain fixed ani = FuncAnimation(fig, update, frames=range(len(particles_3d)), repeat=False) ani.save(output, writer='ffmpeg', fps=5) plt.close(fig) plt.ion() def particles_plot_radial_evolution(particles_3d: np.ndarray, t_range: np.ndarray): # radial extrema of the particles - disk surface n_steps = particles_3d.shape[0] r_mins = np.zeros(n_steps) r_maxs = np.zeros(n_steps) r_hms = np.zeros(n_steps) for i in range(n_steps): p = particles_3d[i, ...] # exclude the black hole r = np.linalg.norm(p[1:,:3], axis=1) # plt.plot(r[1::100], alpha=0.5) r_mins[i] = np.min(r) r_maxs[i] = np.max(r) r_hms[i] = half_mass_radius(p) r_mins = apply_units(r_mins, "position") r_maxs = apply_units(r_maxs, "position") plt.figure() plt.plot(t_range, r_mins, label='$r_{min}$', color=plt.cm.Blues(0.5)) plt.plot(t_range, r_maxs, label='$r_{max}$', color=plt.cm.Blues(0.8)) plt.fill_between(t_range, r_mins, r_maxs, color=plt.cm.Blues(0.2)) plt.plot(t_range, r_hms, label='$r_{hm}$', color=plt.cm.Greens(0.5)) # show the initial conditions plt.hlines(r_mins[0], t_range[0], t_range[-1], color='black', linestyle='--') plt.hlines(r_maxs[0], t_range[0], t_range[-1], color='black', linestyle='--') plt.title(f'Radial extrema over {n_steps} timesteps') plt.xlabel('Integration time') plt.ylabel(f'{r_mins.unit:latex}') plt.legend() plt.show() def particles_plot_orbits(particles_3d: np.ndarray, t_range: np.ndarray): # particle orbits fig, axs = plt.subplots(2, 1) axs[0].set_position([0, 0.3, 1, 0.6]) axs[0].set_xlabel('x') axs[0].set_ylabel('y') axs[1].set_position([0, 0, 1, 0.2]) axs[1].set_xlabel("t") axs[1].set_ylabel('z') fig.suptitle('Particle orbits') print(particles_3d.shape) mid = particles_3d.shape[1] // 2 particle_idx = [1, 2, 3, 4, 5, mid-2, mid-1, mid, mid+1, mid+2, -5, -4, -3, -2, -1] colors = plt.cm.Blues(np.linspace(0.2, 0.8, len(particle_idx))) for i, idx in enumerate(particle_idx): x = particles_3d[:, idx, 0] y = particles_3d[:, idx, 1] z = particles_3d[:, idx, 2] axs[0].plot(x, y, label=f'p{idx}', color=colors[i]) axs[1].plot(z, label=f'p{idx}', color=colors[i]) # plt.legend() plt.show()