"""
This script contains a function to run the main convolution
based absorber algorithm on a single spectrum.
"""
from functools import reduce
from operator import add
import time
import logging
import numpy as np
from astropy.table import Table
from .utils import convolution_fun, vel_dispersion, elapsed
from .absorberutils import (
estimate_local_sigma_conv_array,
median_selection_after_combining,
remove_Mg_falsely_come_from_Fe_absorber,
z_abs_from_same_metal_absorber,
contiguous_pixel_remover,
estimate_snr_for_lines,
absorber_search_window,
find_valid_indices,
calculate_doublet_ratio,
group_and_select_weighted_redshift,
check_absorber_selection
)
from .ew import (
measure_absorber_properties_double_gaussian
)
from .datamodel import QSOSpecRead
# Constants — imported via the module object so startup-time patches propagate here.
from .constants import lines, oscillator_parameters, speed_of_light, doublet_keys
from . import constants as _constants
logger = logging.getLogger(__name__)
[docs]def read_single_spectrum_and_find_absorber(fits_file, spec_index, absorber, **kwargs):
"""
This function retrieves a single QSO spectrum from a FITS file, processes the data to remove NaNs,
and prepares the spectrum for absorber search within specified wavelength regions
and runs the convolution based adaptive S/N method to detect absorbers in the spectrum.
Args:
fits_file (str): Path to the FITS file containing normalized QSO spectra.
The file must include extensions for FLUX, ERROR, WAVELENGTH
and METADATA which must contain keyword Z_QSO.
spec_index (int): Index of the quasar spectrum to retrieve from the FITS file.
absorber (str): Name of the absorber to search for (e.g., MgII, CIV, OVI, NV, SiIV, AlIII, FeII).
kwargs (dict): search parameters as taken in convolution_method..()
Returns:
dict: Contains lists of various parameters related to detected absorbers.
- index_spec (list): QSO spec index searched
- z_abs (list of floats): redshifts of absorbers detected
- gauss_fit (list of arrays): gaussian fit parameters for each absorber
- gauss_fit_std (list of arrays): errors on gaussian fit parameters for each absorber
- ew_1_mean (list of floats): Equivalent width of line 1 for each absorber
- ew_2_mean (list of floats): Equivalent width of line 2 for each absorber
- ew_total_mean (list of floats): Total Equivalent width of line 1 and line 2 for each absorber
- ew_1_error (list of floats): errors on Equivalent width of line 1 for each absorber
- ew_2_error (list of floats): errors on Equivalent width of line 2 for each absorber
- ew_total_error (list of floats): errors on Total Equivalent width of line 1 and line 2 for each absorber
- z_abs_err (list): errors on redshifts of absorbers detected
- sn_1 (list): SNR of line 1 for each absorber
- sn_2 (list): SNR of line 2 for each absorber
- vel_disp1 (list): rest-frame velocity dispersion of line 1 for each absorber (in km/s)
- vel_disp2 (list): rest-frame velocity dispersion of line 2 for each absorber (in km/s)
- delta_chi2 (list): delta_chi2 between fitted model and flat continuum (null hypothesis)
Raises:
AssertionError: If the sizes of `lam_search`, `unmsk_residual`, and `unmsk_error` do not match.
Note:
- This function assumes that the input spectra are already normalized (i.e., flux divided by continuum).
- The wavelength search region is determined dynamically based on the observed wavelength range.
"""
start_time = time.time()
verbose = kwargs.get("verbose", False)
# Read the specified QSO spectrum from the FITS file
if verbose:
logger.info("Starting search for QSO INDEX = %s", spec_index)
spectra = QSOSpecRead(fits_file, index=spec_index, autoload=True, verbose=verbose) # verbose=True, shows time
spectra.metadata = Table(spectra.metadata) # in case spectra.metadata is a Row
if 'Z' in spectra.metadata.colnames:
spectra.metadata.rename_column('Z', 'Z_QSO')
z_qso = spectra.metadata['Z_QSO']
lam_obs = spectra.wavelength
# Define the wavelength range for searching the absorber
min_wave, max_wave = lam_obs.min(), lam_obs.max()
# Retrieve flux and error data, ensuring consistent dtype for Numba compatibility
residual, error = spectra.flux.astype('float64'), spectra.error.astype('float64')
lam_obs = lam_obs.astype('float64')
# Remove NaN values from the arrays
non_nan_indices = ~np.isnan(residual)
lam_obs, residual, error = lam_obs[non_nan_indices], residual[non_nan_indices], error[non_nan_indices]
# Identify the wavelength region for searching the specified absorber
lam_search, unmsk_residual, unmsk_error = absorber_search_window(
lam_obs, residual, error, z_qso, absorber, min_wave, max_wave, start_rest_wave=kwargs["start_rest_wave"], end_rest_wave=kwargs["end_rest_wave"],
dv=kwargs["dv"], lam_edge_sep= kwargs["lam_edge_sep"], verbose=verbose)
# Verify that the arrays are of equal size
assert lam_search.size == unmsk_residual.size == unmsk_error.size, "Mismatch in array sizes of lam_search, unmsk_residual, and unmsk_error"
not_allowed_args = ["lam_edge_sep", "start_rest_wave", "end_rest_wave",
"dv", "continuum_error_frac", "lam_red", "lam_blue"]
conv_kwargs = {}
for key in kwargs.keys():
if key not in not_allowed_args:
conv_kwargs[key] = kwargs[key]
if verbose:
logger.info("search absorber = %s", absorber)
logger.info("Z_QSO = %s", z_qso[0])
result = convolution_method_absorber_finder_in_QSO_spectra(
spec_index,
absorber,
lam_obs,
residual,
error,
lam_search,
unmsk_residual,
**conv_kwargs,
)
if verbose:
elapsed(start_time, f"INFO: Time taken to finish {absorber} detection for index = {spec_index} Quasar is:")
return result
[docs]def convolution_method_absorber_finder_in_QSO_spectra(spec_index, absorber='MgII', lam_obs=None, residual=None, error=None, lam_search=None, unmsk_residual=None, ker_width_pixels=5, coeff_sigma=2.5, mult_resi=1, d_pix=0.6, pm_pixel=200, sn_line1=3, sn_line2=2, use_covariance=False, logwave=True, verbose=True, nboot=None, conf_level=0.95):
"""
Detect absorbers with doublet properties in SDSS quasar spectra using a
convolution method. This function identifies potential absorbers based on
user-defined threshold criteria, applies Gaussian fitting to reject false
positives, and computes the equivalent widths (EWs) of the lines, returning
the redshifts, EWs, and fitting parameters.
Args:
spec_index (int): Index of quasar in the spectra 2D array.
absorber (str): Absorber name for searching doublets (MgII, CIV, OVI, NV, SiIV, AlIII, FeII, NaI). Default is 'MgII'.
lam_obs (numpy.array): observed wavelength array.
residual (numpy.array): residual (i.e. flux/continuum) array
error (numpy.array): error on residuals
lam_search (numpy.array): search observed wavelength array (i.e. region where absorber will be looked for).
unmsk_residual (numpy.array): search residual array (residuals at search wavelength pixels)
ker_width_pixels (int or list): Kernel width(s) in pixels. Default is 5.
coeff_sigma (float): Coefficient for sigma to apply threshold in the convolved array. Default is 2.5.
mult_resi (float): Factor to shift the residual up or down. Default is 1.
d_pix (float): Pixel distance for line separation during Gaussian fitting. Default is 0.6.
pm_pixel (int): Pixel parameter for local noise estimation. Default is 200.
sn_line1 (float): Signal-to-noise ratio threshold for line 1. Default is 3.
sn_line2 (float): Signal-to-noise ratio threshold for line 2. Default is 2.
use_covariance (bool): If True, use full covariance of scipy curve_fit for EW error calculation. Default is False.
logwave (bool): If True, wavelength is on log scale (e.g. SDSS). Default is True.
verbose (bool): If True, print detailed outputs for debugging. Default is True.
nboot (int, optional): Number of bootstrap iterations for fitting. Default is None (disabled).
conf_level (float): Minimum confidence level for chi2-based absorber selection. Default is 0.95.
Returns:
dict: Contains lists of various parameters related to detected absorbers.
- index_spec (list): QSO spec index searched
- z_abs (list): redshifts of absorbers detected
- gauss_fit (list of arrays): gaussian fit parameters for each absorber
- gauss_fit_std (list of arrays): errors on gaussian fit parameters for each absorber
- ew_1_mean (list): Equivalent width of line 1 for each absorber
- ew_2_mean (list): Equivalent width of line 2 for each absorber
- ew_total_mean (list): Total Equivalent width of line 1 and line 2 for each absorber
- ew_1_error (list): errors on Equivalent width of line 1 for each absorber
- ew_2_error (list): errors on Equivalent width of line 2 for each absorber
- ew_total_error (list): errors on Total Equivalent width of line 1 and line 2 for each absorber
- z_abs_err (list): errors on redshifts of absorbers detected
- sn_1 (list): SNR of line 1 for each absorber
- sn_2 (list): SNR of line 2 for each absorber
- vel_disp1 (list): rest-frame velocity dispersion of line 1 for each absorber (in km/s)
- vel_disp2 (list): rest-frame velocity dispersion of line 2 for each absorber (in km/s)
- delta_chi2 (list): delta_chi2 between fitted model and flat continuum (null hypothesis)
"""
def _build_result(index_spec, z_abs, gauss_fit, gauss_fit_std, ew_1_mean, ew_2_mean,
ew_total_mean, ew_1_error, ew_2_error, ew_total_error,
z_abs_err, sn_1, sn_2, vel_disp1, vel_disp2, delta_chi2):
return {
'index_spec': index_spec,
'z_abs': z_abs,
'gauss_fit': gauss_fit,
'gauss_fit_std': gauss_fit_std,
'ew_1_mean': ew_1_mean,
'ew_2_mean': ew_2_mean,
'ew_total_mean': ew_total_mean,
'ew_1_error': ew_1_error,
'ew_2_error': ew_2_error,
'ew_total_error': ew_total_error,
'z_abs_err': z_abs_err,
'sn_1': sn_1,
'sn_2': sn_2,
'vel_disp1': vel_disp1,
'vel_disp2': vel_disp2,
'delta_chi2': delta_chi2,
}
# return if there are less than MIN_NPIXEL wavelength pixels available to search for
if lam_search.size <= _constants.MIN_NPIXEL or lam_obs.size <= _constants.MIN_NPIXEL:
if verbose:
logger.info("No wavelength pixels available in search region, spec index = %s", spec_index)
return _build_result(
[spec_index], [0], [[0, 0, 0, 0, 0, 0]], [[0, 0, 0, 0, 0, 0]], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0]
)
else:
# Constants
if absorber not in doublet_keys:
raise ValueError(f"No support for {absorber}, only supports {doublet_keys.keys()}")
else:
line1, line2 = lines[doublet_keys[absorber][0]], lines[doublet_keys[absorber][1]]
f1, f2 = oscillator_parameters[f'{absorber}_f1'], oscillator_parameters[f'{absorber}_f2']
prod1 = f1 * line1
prod2 = f2 * line2
line_ratio = max(prod1, prod2) / min(prod1, prod2)
line_sep = line2 - line1
del_z = line_sep / line1
if verbose:
logger.debug("For %s, theoretical oscillator strength ratio: %s", absorber, line_ratio)
logger.debug("instrumental resolution will be calculated from wavelength array; wavelength pixels are assumed to be less than FWHM")
if not logwave:
# per pixel resolution in case wavelength is on linear scale
wave_res = np.nanmedian(np.diff(lam_search)) # robust to outliers
resolution = wave_res/lam_obs * speed_of_light # an array, it is assumed that it's true one and not FWHM
mean_resolution = np.nanmean(resolution)
#this is just to define the lower boundary for gaussian sigma
del_sigma = mean_resolution * line1 / speed_of_light
else:
log_obs_wave = np.log10(lam_search)
wave_res = np.nanmedian(np.diff(log_obs_wave))
resolution = (10**wave_res - 1) * speed_of_light
del_sigma = line1 * resolution / speed_of_light # in Ang
del_sigma /=2.355 ## FWHM sqrt(8ln2) #this is just to define the lower boundary for gaussian sigma
mean_resolution = resolution
if verbose:
logger.debug("mean wave_resolution = %.5f, mean resolution per pixel = %.3f [km/s], del_sigma: %s", wave_res, mean_resolution, del_sigma)
bd_ct, x_sep = 1.0, 30 # multiple for bound definition (for line centres and widths of line, max can be 30 times of min)
# bounds for gaussian fitting, to avoid very bad candidates
edge = 0.1
bound = ((np.array([2e-2, line1 - bd_ct * d_pix, max(0.1,del_sigma - edge), 2e-2, line2 - bd_ct * d_pix, max(0.1, del_sigma - edge)])),
(np.array([1.11, line1 + bd_ct * d_pix, x_sep * del_sigma + edge, 1.11, line2 + bd_ct * d_pix, x_sep * del_sigma + edge])))
# line separation tolerance (fitted line centers should not be outside, centre +/- d_pix)
lower_del_lam = line_sep - d_pix
upper_del_lam = line_sep + d_pix
if isinstance(ker_width_pixels, int):
ker_width_pixels = [ker_width_pixels]
# Kernel width computation
width_kernel = np.array([ker * mean_resolution * ((f1 * line1 + f2 * line2) / (f1 + f2)) / (speed_of_light * 2.355) for ker in ker_width_pixels])
combined_final_our_z = []
for sig_ker in width_kernel:
if verbose:
logger.debug("convolving for kernel width: %s Angstrom.", sig_ker)
line_centre = (line1 + line2) / 2
conv_arr = convolution_fun(absorber, mult_resi * unmsk_residual, sig_ker, log=logwave, wave_res=wave_res, index=spec_index, f1=f1, f2=f2)
sigma_cr = estimate_local_sigma_conv_array(conv_arr, pm_pixel=pm_pixel)
thr = np.nanmedian(conv_arr) - coeff_sigma * sigma_cr
conv_arr[np.isnan(conv_arr)] = 1e5
our_z_ind = conv_arr < thr
conv_arr[conv_arr == 1e5] = np.nan
our_z = lam_search[our_z_ind] / line_centre - 1
residual_our_z = unmsk_residual[our_z_ind]
if verbose:
logger.debug("sigma cut on convolved flux for potential candidates")
new_our_z, new_res_arr = find_valid_indices(our_z, residual_our_z, lam_search, conv_arr, sigma_cr, coeff_sigma, line_ratio, line1, line2, logwave)
final_our_z = group_and_select_weighted_redshift(new_our_z, new_res_arr, residual, lam_obs, line1, line2, del_z)
combined_final_our_z.append(final_our_z)
if verbose:
logger.debug("combining redshifts")
combined_final_our_z = reduce(add, combined_final_our_z)
combined_final_our_z = list(set(combined_final_our_z))
if verbose:
logger.debug("potential candidates before combining: %s", combined_final_our_z)
combined_final_our_z = median_selection_after_combining(combined_final_our_z, lam_obs, residual, d_pix=d_pix, use_kernel=absorber, delta_z=del_z)
combined_final_our_z = [x for x in combined_final_our_z if not np.isnan(x)]
if verbose:
logger.debug("potential candidates after combining: %s", combined_final_our_z)
if len(combined_final_our_z)>0:
z_abs, _, fit_param, _, _, _, _, _, _, _,_ = measure_absorber_properties_double_gaussian(
index=spec_index, wavelength=lam_obs, flux=residual, error=error, absorber_redshift=combined_final_our_z, bound=bound, use_kernel=absorber, d_pix=d_pix, use_covariance=use_covariance, nboot=nboot)
pure_z_abs = np.zeros(len(z_abs))
pure_gauss_fit = np.zeros((len(z_abs), 6))
pure_gauss_fit_std = np.zeros((len(z_abs), 6))
pure_ew_first_line_mean = np.zeros(len(z_abs))
pure_ew_second_line_mean = np.zeros(len(z_abs))
pure_ew_total_mean = np.zeros(len(z_abs))
pure_ew_first_line_error = np.zeros(len(z_abs))
pure_ew_second_line_error = np.zeros(len(z_abs))
pure_ew_total_error = np.zeros(len(z_abs))
redshift_err = np.zeros(len(z_abs))
sn1_all = np.zeros(len(z_abs))
sn2_all = np.zeros(len(z_abs))
vel_disp1 = np.zeros(len(z_abs))
vel_disp2 = np.zeros(len(z_abs))
delta_chi2_array = np.zeros(len(z_abs))
z_inds = [i for i, x in enumerate(z_abs) if not np.isnan(x) and x > 0]
if verbose:
logger.debug("performing final selection based on physical properties")
logger.debug("only absorbers with conf_level > %s will be selected", conf_level)
for m in z_inds:
if len(fit_param[m]) > 0 and not np.all(np.isnan(fit_param[m])):
z_new, z_new_error, fit_param_temp, fit_param_std_temp, EW_first_temp_mean, EW_second_temp_mean, EW_total_temp_mean, EW_first_error_temp, EW_second_error_temp, EW_total_error_temp, delta_chi2 = measure_absorber_properties_double_gaussian(
index=spec_index, wavelength=lam_obs, flux=residual, error=error, absorber_redshift=[z_abs[m]], bound=bound, use_kernel=absorber, d_pix=d_pix, use_covariance=use_covariance, nboot=nboot)
z_new = float(z_new[0])
z_new_error = float(z_new_error[0])
delta_chi2 = delta_chi2[0]
if len(fit_param_temp[0]) > 0 and not np.all(np.isnan(fit_param_temp[0])):
gaussian_parameters = np.array(fit_param_temp[0])
lam_rest = lam_obs / (1 + z_new)
c0 = gaussian_parameters[1]
c1 = gaussian_parameters[4]
sig1, sig2 = gaussian_parameters[2], gaussian_parameters[5]
#S/N estimation
sn1, sn2 = estimate_snr_for_lines(c0, c1, sig1, sig2, lam_rest, residual, error, logwave)
# resolution corrected velocity dispersion (should be greater than 0)
vel1, vel2 = vel_dispersion(c0, c1, gaussian_parameters[2], gaussian_parameters[5], resolution, z_new, lam_obs)
# calculate best -fit doublet ratio and errors and check if they are within the range.
# usually 1 < DR < line_ratio (doublet ratio =2, for MgII, CIV), also applying SNR for EW >1, these are strict cuts
if EW_first_temp_mean[0] > 0 and EW_second_temp_mean[0] > 0:
dr, dr_error = calculate_doublet_ratio(EW_first_temp_mean[0], EW_second_temp_mean[0], EW_first_error_temp[0], EW_second_error_temp[0], f1, f2)
min_dr, max_dr = 1 - dr_error, line_ratio + dr_error
ew1_snr, ew2_snr = EW_first_temp_mean[0] / EW_first_error_temp[0], EW_second_temp_mean[0] / EW_second_error_temp[0]
else:
dr, min_dr, max_dr = 0, 0, -1 #failure case
ew1_snr, ew2_snr = 0, 0 # failure case
good = check_absorber_selection(spec_index, z_new, gaussian_parameters, bound,
lower_del_lam, c0, c1, upper_del_lam,
sn1, sn_line1, sn2, sn_line2,
vel1, vel2, min_dr, dr, max_dr,
ew1_snr, ew2_snr, delta_chi2, conf_level, verbose=verbose)
if good:
pure_z_abs[m] = z_new
pure_gauss_fit[m] = fit_param_temp[0]
pure_gauss_fit_std[m] = fit_param_std_temp[0]
pure_ew_first_line_mean[m] = EW_first_temp_mean[0]
pure_ew_second_line_mean[m] = EW_second_temp_mean[0]
pure_ew_total_mean[m] = EW_total_temp_mean[0]
pure_ew_first_line_error[m] = EW_first_error_temp[0]
pure_ew_second_line_error[m] = EW_second_error_temp[0]
pure_ew_total_error[m] = EW_total_error_temp[0]
redshift_err[m] = z_new_error
sn1_all[m] = sn1
sn2_all[m] = sn2
vel_disp1[m] = vel1
vel_disp2[m] = vel2
delta_chi2_array[m] = delta_chi2
valid_indices = pure_z_abs != 0
pure_z_abs = pure_z_abs[valid_indices]
pure_gauss_fit = pure_gauss_fit[valid_indices]
pure_gauss_fit_std = pure_gauss_fit_std[valid_indices]
pure_ew_first_line_mean = pure_ew_first_line_mean[valid_indices]
pure_ew_second_line_mean = pure_ew_second_line_mean[valid_indices]
pure_ew_total_mean = pure_ew_total_mean[valid_indices]
pure_ew_first_line_error = pure_ew_first_line_error[valid_indices]
pure_ew_second_line_error = pure_ew_second_line_error[valid_indices]
pure_ew_total_error = pure_ew_total_error[valid_indices]
redshift_err = redshift_err[valid_indices]
sn1_all = sn1_all[valid_indices]
sn2_all = sn2_all[valid_indices]
vel_disp1 = vel_disp1[valid_indices]
vel_disp2 = vel_disp2[valid_indices]
delta_chi2_array = delta_chi2_array[valid_indices]
if verbose:
logger.debug("final candidates: %s", pure_z_abs)
if len(pure_z_abs) > 0:
if absorber=='MgII':
match_abs1 = remove_Mg_falsely_come_from_Fe_absorber(pure_z_abs, lam_obs, residual, error, d_pix, logwave)
else:
match_abs1 = -1*np.ones(len(pure_z_abs))
match_abs2 = z_abs_from_same_metal_absorber(pure_z_abs, lam_obs, residual, error, d_pix, absorber, logwave)
ind_z = contiguous_pixel_remover(pure_z_abs, sn1_all, sn2_all, absorber, pure_gauss_fit)
sel_indices = (match_abs1 == -1) & (match_abs2 == -1) & (ind_z == -1) # pure final absorber candidates
# Select final quantities based on sel_indices
pure_z_abs = pure_z_abs[sel_indices]
pure_gauss_fit = pure_gauss_fit[sel_indices]
pure_gauss_fit_std = pure_gauss_fit_std[sel_indices]
pure_ew_first_line_mean = pure_ew_first_line_mean[sel_indices]
pure_ew_second_line_mean = pure_ew_second_line_mean[sel_indices]
pure_ew_total_mean = pure_ew_total_mean[sel_indices]
pure_ew_first_line_error = pure_ew_first_line_error[sel_indices]
pure_ew_second_line_error = pure_ew_second_line_error[sel_indices]
pure_ew_total_error = pure_ew_total_error[sel_indices]
redshift_err = redshift_err[sel_indices]
sn1_all = sn1_all[sel_indices]
sn2_all = sn2_all[sel_indices]
vel_disp1 = vel_disp1[sel_indices]
vel_disp2 = vel_disp2[sel_indices]
delta_chi2_array = delta_chi2_array[sel_indices]
else:
redshift_err = np.array([0])
pure_z_abs = np.array([0])
pure_gauss_fit = pure_gauss_fit_std = np.array([[0, 0, 0, 0, 0, 0]])
pure_ew_first_line_mean = pure_ew_second_line_mean = pure_ew_total_mean = np.array([0])
pure_ew_first_line_error = pure_ew_second_line_error = pure_ew_total_error = np.array([0])
sn1_all = sn2_all = np.array([0])
vel_disp1 = vel_disp2= np.array([0])
delta_chi2_array = np.array([0])
not_found = max(1, len(pure_z_abs))
index_spec = [spec_index for _ in range(not_found)]
return _build_result(
index_spec,
pure_z_abs.tolist(),
pure_gauss_fit.tolist(),
pure_gauss_fit_std.tolist(),
pure_ew_first_line_mean.tolist(),
pure_ew_second_line_mean.tolist(),
pure_ew_total_mean.tolist(),
pure_ew_first_line_error.tolist(),
pure_ew_second_line_error.tolist(),
pure_ew_total_error.tolist(),
redshift_err.tolist(),
sn1_all.tolist(),
sn2_all.tolist(),
vel_disp1.tolist(),
vel_disp2.tolist(),
delta_chi2_array.tolist(),
)
else:
return _build_result(
[spec_index], [0], [[0, 0, 0, 0, 0, 0]], [[0, 0, 0, 0, 0, 0]], [0], [0], [0],
[0], [0], [0], [0], [0], [0], [0], [0], [0]
)