329 lines
11 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from pathlib import Path
from .units import apply_units
import logging
logger = logging.getLogger(__name__)
def density_distribution(r_bins: np.ndarray, particles: np.ndarray, ret_error: bool = False):
"""
Computes the radial density distribution of a set of particles.
Assumes that the particles array has the following columns: x, y, z, m.
If ret_error is True, it will return the absolute error of the density.
"""
if particles.shape[1] != 4:
raise ValueError("Particles array must have 4 columns: x, y, z, m")
m = particles[:, 3]
r = np.linalg.norm(particles[:, :3], axis=1)
m_shells = np.zeros_like(r_bins)
v_shells = np.zeros_like(r_bins)
error_relative = np.zeros_like(r_bins)
r_bins = np.insert(r_bins, 0, 0)
for i in range(len(r_bins) - 1):
mask = (r >= r_bins[i]) & (r < r_bins[i + 1])
m_shells[i] = np.sum(m[mask])
v_shells[i] = 4/3 * np.pi * (r_bins[i + 1]**3 - r_bins[i]**3)
if ret_error:
count = np.count_nonzero(mask)
if count > 0:
# the absolute error is the square root of the number of particles
error_relative[i] = 1 / np.sqrt(count)
else:
error_relative[i] = 0
density = m_shells / v_shells
if ret_error:
return density, density * error_relative
else:
return density
def particle_count(r_bins, particles):
r = np.linalg.norm(particles[:, :3], axis=1)
# r_bins = np.insert(r_bins, 0, 0)
count = np.zeros_like(r_bins)
for i in range(len(r_bins) - 1):
mask = (r >= r_bins[i]) & (r < r_bins[i+1])
count[i] = np.count_nonzero(mask)
return count
def r_distribution(particles: np.ndarray):
"""
Computes the distribution of distances (to the origin) of a set of particles.
Assumes that the particles array has the following columns: x, y, z ...
"""
if particles.shape[1] < 3:
raise ValueError("Particles array must have at least 3 columns: x, y, z")
r = np.linalg.norm(particles[:, :3], axis=1)
return r
def remove_outliers(particles: np.ndarray, std_threshold: float = 3):
"""
Removes outliers from a set of particles.
Assumes that the particles array has the following columns: x, y, z ...
"""
if particles.shape[1] < 3:
raise ValueError("Particles array must have at least 3 columns: x, y, z")
r = np.linalg.norm(particles[:, :3], axis=1)
r_std = np.std(r)
r_mean = np.mean(r)
mask = np.abs(r - r_mean) < std_threshold * r_std
return particles[mask]
def mean_interparticle_distance(particles: np.ndarray):
"""
Computes the mean interparticle distance of a set of particles.
Assumes that the particles array has the following columns: x, y, z ...
"""
if particles.shape[1] < 3:
raise ValueError("Particles array must have at least 3 columns: x, y, z")
r_half_mass = half_mass_radius(particles)
r = np.linalg.norm(particles[:, :3], axis=1)
n_half_mass = np.sum(r < r_half_mass)
logger.debug(f"Number of particles within half mass radius: {n_half_mass} of {particles.shape[0]}")
rho = n_half_mass / (4/3 * np.pi * r_half_mass**3)
# the mean distance between particles is the inverse of the density
epsilon = (1 / rho)**(1/3)
logger.info(f"Found mean interparticle distance: {epsilon}")
return epsilon
def half_mass_radius(particles: np.ndarray):
"""
Computes the half mass radius of a set of particles.
Assumes that the particles array has the following columns: x, y, z, m, ...
"""
if particles.shape[1] < 4:
raise ValueError("Particles array must have at least 4 columns: x, y, z, m")
# even though in the simple example, all the masses are the same, we will consider the general case
total_mass = np.sum(particles[:, 3])
half_mass = total_mass / 2
# sort the particles by distance
r = np.linalg.norm(particles[:, :3], axis=1)
indices = np.argsort(r)
r = r[indices]
masses = particles[indices, 3]
masses_cumsum = np.cumsum(masses)
i = np.argmin(np.abs(masses_cumsum - half_mass))
logger.debug(f"Half mass radius: {r[i]} for {i}th particle of {particles.shape[0]}")
r_hm = r[i]
return r_hm
def particles_plot_3d(positions: np.ndarray, masses: np.ndarray, title: str = "Particle distribution (3D)"):
"""
Plots a 3D scatter plot of a set of particles.
Assumes that the particles array has the shape:
- either 4 columns: x, y, z, m
- or 7 columns: x, y, z, vx, vy, vz, m
Colormap is the mass of the particles.
"""
x, y, z = positions[:, 0], positions[:, 1], positions[:, 2]
fig = plt.figure()
fig.suptitle(title)
ax = fig.add_subplot(111, projection='3d')
if np.all(masses == masses[0]):
sc = ax.scatter(x, y, z, c='blue')
cbar = plt.colorbar(sc, ax=ax, pad=0.1)
else:
sc = ax.scatter(x, y, z, cmap='coolwarm', c=masses)
cbar = plt.colorbar(sc, ax=ax, pad=0.1)
try:
cbar.set_label(f'Mass [{masses.unit:latex}]')
ax.set_xlabel(f'x [{x.unit:latex}]')
ax.set_ylabel(f'y [{x.unit:latex}]')
ax.set_zlabel(f'z [{x.unit:latex}]')
except AttributeError:
cbar.set_label('Mass')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
plt.show()
logger.debug("3D scatter plot with mass colormap")
def particles_plot_2d(particles: np.ndarray, title: str = "Flattened distribution (along z)", ax = None):
"""
Plots a 2 colormap of a set of particles, flattened in the z direction.
Assumes that the particles array has the shape:
- either 3 columns: x, y, z
- or 4 columns: x, y, z, m
- or 7 columns: x, y, z, vx, vy, vz, m
"""
if particles.shape[1] == 3:
x, y, z = particles[:, 0], particles[:, 1], particles[:, 2]
elif particles.shape[1] == 4:
x, y, z, m = particles[:, 0], particles[:, 1], particles[:, 2], particles[:, 3]
elif particles.shape[1] == 7:
x, y, z, m = particles[:, 0], particles[:, 1], particles[:, 2], particles[:, 6]
else:
raise ValueError("Particles array must have 3, 4 or 7 columns")
if ax is None:
plt.figure()
plt.title(title)
plt.hist2d(x, y, bins=100, cmap='coolwarm')
cbar = plt.colorbar()
cbar.set_label(f'Particle count')
plt.show()
else:
ax.hist2d(x, y, bins=100, cmap='coolwarm')
def particles_plot_2d_multiframe(particles_3d: np.ndarray, t_range: np.ndarray, title: str):
# reduce the font size
fig, axs = plt.subplots(4, 6, figsize=(20, 12))
fig.suptitle(title)
# make sure we have enough time steps to show
diff = axs.size - particles_3d.shape[0]
if diff > 0:
logger.debug(f"Adding dummy time steps: {diff=} -> {axs.size=}")
plot_t_range = np.concatenate([t_range, np.zeros(diff)])
plot_particles_in_time = particles_3d
elif diff < 0:
logger.debug(f"Too many steps to plot - reducing: {particles_3d.shape[0]} -> {axs.size}")
# skip some of the time steps
plot_t_range = []
plot_particles_in_time = []
for i in range(axs.size):
idx = int(i / axs.size * particles_3d.shape[0])
# make sure we have the first and last time step are included
if i == 0:
idx = 0
elif i == axs.size - 1:
idx = particles_3d.shape[0] - 1
plot_t_range.append(t_range[idx])
plot_particles_in_time.append(particles_3d[idx])
else:
plot_t_range = t_range
plot_particles_in_time = particles_3d
for p, t, a in zip(plot_particles_in_time, plot_t_range, axs.flat):
a.set_title(f"t={t:.2g}")
particles_plot_2d(p, ax=a)
plt.show()
def particles_plot_2d_animated(particles_3d: np.ndarray, t_range: np.ndarray, output: Path):
# Also: show the 2D evolution as a GIF
plt.ioff()
fig, ax = plt.subplots()
fig.suptitle("Particle evolution (top view)")
ax.set_aspect('equal')
xmax = np.max(particles_3d[:, :, :3])
ax.set_xlim(-xmax, xmax)
ax.set_ylim(-xmax, xmax)
ax.set_xlabel('x')
ax.set_ylabel('y')
def update(i):
ax.set_title(f"t={t_range[i]:.2g}")
particles_plot_2d(particles_3d[i], ax=ax)
ax.set_xlim(-xmax, xmax) # Ensure x limits remain fixed
ax.set_ylim(-xmax, xmax) # Ensure y limits remain fixed
ani = FuncAnimation(fig, update, frames=range(len(particles_3d)), repeat=False)
ani.save(output, writer='ffmpeg', fps=5)
plt.close(fig)
plt.ion()
def particles_plot_radial_evolution(particles_3d: np.ndarray, t_range: np.ndarray):
# radial extrema of the particles - disk surface
n_steps = particles_3d.shape[0]
r_mins = np.zeros(n_steps)
r_maxs = np.zeros(n_steps)
r_hms = np.zeros(n_steps)
for i in range(n_steps):
p = particles_3d[i, ...]
# exclude the black hole
r = np.linalg.norm(p[1:,:3], axis=1)
# plt.plot(r[1::100], alpha=0.5)
r_mins[i] = np.min(r)
r_maxs[i] = np.max(r)
r_hms[i] = half_mass_radius(p)
r_mins = apply_units(r_mins, "position")
r_maxs = apply_units(r_maxs, "position")
plt.figure()
plt.plot(t_range, r_mins, label='$r_{min}$', color=plt.cm.Blues(0.5))
plt.plot(t_range, r_maxs, label='$r_{max}$', color=plt.cm.Blues(0.8))
plt.fill_between(t_range, r_mins, r_maxs, color=plt.cm.Blues(0.2))
plt.plot(t_range, r_hms, label='$r_{hm}$', color=plt.cm.Greens(0.5))
# show the initial conditions
plt.hlines(r_mins[0], t_range[0], t_range[-1], color='black', linestyle='--')
plt.hlines(r_maxs[0], t_range[0], t_range[-1], color='black', linestyle='--')
plt.title(f'Radial extrema over {n_steps} timesteps')
plt.xlabel('Integration time')
plt.ylabel(f'{r_mins.unit:latex}')
plt.legend()
plt.show()
def particles_plot_orbits(particles_3d: np.ndarray, t_range: np.ndarray):
# particle orbits
fig, axs = plt.subplots(2, 1)
axs[0].set_position([0, 0.3, 1, 0.6])
axs[0].set_xlabel('x')
axs[0].set_ylabel('y')
axs[1].set_position([0, 0, 1, 0.2])
axs[1].set_xlabel("t")
axs[1].set_ylabel('z')
fig.suptitle('Particle orbits')
print(particles_3d.shape)
mid = particles_3d.shape[1] // 2
particle_idx = [1, 2, 3, 4, 5, mid-2, mid-1, mid, mid+1, mid+2, -5, -4, -3, -2, -1]
colors = plt.cm.Blues(np.linspace(0.2, 0.8, len(particle_idx)))
for i, idx in enumerate(particle_idx):
x = particles_3d[:, idx, 0]
y = particles_3d[:, idx, 1]
z = particles_3d[:, idx, 2]
axs[0].plot(x, y, label=f'p{idx}', color=colors[i])
axs[1].plot(z, label=f'p{idx}', color=colors[i])
# plt.legend()
plt.show()