Source code for qsoabsfind.absfinder

"""
This script contains a function to run the main convolution
based absorber algorithm on a single spectrum.
"""
from functools import reduce
from operator import add
import time
import logging
import numpy as np
from astropy.table import Table
from .utils import convolution_fun, vel_dispersion, elapsed
from .absorberutils import (
    estimate_local_sigma_conv_array,
    median_selection_after_combining,
    remove_Mg_falsely_come_from_Fe_absorber,
    z_abs_from_same_metal_absorber,
    contiguous_pixel_remover,
    estimate_snr_for_lines,
    absorber_search_window,
    find_valid_indices,
    calculate_doublet_ratio,
    group_and_select_weighted_redshift,
    check_absorber_selection
)
from .ew import (
    measure_absorber_properties_double_gaussian
)
from .datamodel import QSOSpecRead

# Constants — imported via the module object so startup-time patches propagate here.
from .constants import lines, oscillator_parameters, speed_of_light, doublet_keys
from . import constants as _constants

logger = logging.getLogger(__name__)

[docs]def read_single_spectrum_and_find_absorber(fits_file, spec_index, absorber, **kwargs): """ This function retrieves a single QSO spectrum from a FITS file, processes the data to remove NaNs, and prepares the spectrum for absorber search within specified wavelength regions and runs the convolution based adaptive S/N method to detect absorbers in the spectrum. Args: fits_file (str): Path to the FITS file containing normalized QSO spectra. The file must include extensions for FLUX, ERROR, WAVELENGTH and METADATA which must contain keyword Z_QSO. spec_index (int): Index of the quasar spectrum to retrieve from the FITS file. absorber (str): Name of the absorber to search for (e.g., MgII, CIV, OVI, NV, SiIV, AlIII, FeII). kwargs (dict): search parameters as taken in convolution_method..() Returns: dict: Contains lists of various parameters related to detected absorbers. - index_spec (list): QSO spec index searched - z_abs (list of floats): redshifts of absorbers detected - gauss_fit (list of arrays): gaussian fit parameters for each absorber - gauss_fit_std (list of arrays): errors on gaussian fit parameters for each absorber - ew_1_mean (list of floats): Equivalent width of line 1 for each absorber - ew_2_mean (list of floats): Equivalent width of line 2 for each absorber - ew_total_mean (list of floats): Total Equivalent width of line 1 and line 2 for each absorber - ew_1_error (list of floats): errors on Equivalent width of line 1 for each absorber - ew_2_error (list of floats): errors on Equivalent width of line 2 for each absorber - ew_total_error (list of floats): errors on Total Equivalent width of line 1 and line 2 for each absorber - z_abs_err (list): errors on redshifts of absorbers detected - sn_1 (list): SNR of line 1 for each absorber - sn_2 (list): SNR of line 2 for each absorber - vel_disp1 (list): rest-frame velocity dispersion of line 1 for each absorber (in km/s) - vel_disp2 (list): rest-frame velocity dispersion of line 2 for each absorber (in km/s) - delta_chi2 (list): delta_chi2 between fitted model and flat continuum (null hypothesis) Raises: AssertionError: If the sizes of `lam_search`, `unmsk_residual`, and `unmsk_error` do not match. Note: - This function assumes that the input spectra are already normalized (i.e., flux divided by continuum). - The wavelength search region is determined dynamically based on the observed wavelength range. """ start_time = time.time() verbose = kwargs.get("verbose", False) # Read the specified QSO spectrum from the FITS file if verbose: logger.info("Starting search for QSO INDEX = %s", spec_index) spectra = QSOSpecRead(fits_file, index=spec_index, autoload=True, verbose=verbose) # verbose=True, shows time spectra.metadata = Table(spectra.metadata) # in case spectra.metadata is a Row if 'Z' in spectra.metadata.colnames: spectra.metadata.rename_column('Z', 'Z_QSO') z_qso = spectra.metadata['Z_QSO'] lam_obs = spectra.wavelength # Define the wavelength range for searching the absorber min_wave, max_wave = lam_obs.min(), lam_obs.max() # Retrieve flux and error data, ensuring consistent dtype for Numba compatibility residual, error = spectra.flux.astype('float64'), spectra.error.astype('float64') lam_obs = lam_obs.astype('float64') # Remove NaN values from the arrays non_nan_indices = ~np.isnan(residual) lam_obs, residual, error = lam_obs[non_nan_indices], residual[non_nan_indices], error[non_nan_indices] # Identify the wavelength region for searching the specified absorber lam_search, unmsk_residual, unmsk_error = absorber_search_window( lam_obs, residual, error, z_qso, absorber, min_wave, max_wave, start_rest_wave=kwargs["start_rest_wave"], end_rest_wave=kwargs["end_rest_wave"], dv=kwargs["dv"], lam_edge_sep= kwargs["lam_edge_sep"], verbose=verbose) # Verify that the arrays are of equal size assert lam_search.size == unmsk_residual.size == unmsk_error.size, "Mismatch in array sizes of lam_search, unmsk_residual, and unmsk_error" not_allowed_args = ["lam_edge_sep", "start_rest_wave", "end_rest_wave", "dv", "continuum_error_frac", "lam_red", "lam_blue"] conv_kwargs = {} for key in kwargs.keys(): if key not in not_allowed_args: conv_kwargs[key] = kwargs[key] if verbose: logger.info("search absorber = %s", absorber) logger.info("Z_QSO = %s", z_qso[0]) result = convolution_method_absorber_finder_in_QSO_spectra( spec_index, absorber, lam_obs, residual, error, lam_search, unmsk_residual, **conv_kwargs, ) if verbose: elapsed(start_time, f"INFO: Time taken to finish {absorber} detection for index = {spec_index} Quasar is:") return result
[docs]def convolution_method_absorber_finder_in_QSO_spectra(spec_index, absorber='MgII', lam_obs=None, residual=None, error=None, lam_search=None, unmsk_residual=None, ker_width_pixels=5, coeff_sigma=2.5, mult_resi=1, d_pix=0.6, pm_pixel=200, sn_line1=3, sn_line2=2, use_covariance=False, logwave=True, verbose=True, nboot=None, conf_level=0.95): """ Detect absorbers with doublet properties in SDSS quasar spectra using a convolution method. This function identifies potential absorbers based on user-defined threshold criteria, applies Gaussian fitting to reject false positives, and computes the equivalent widths (EWs) of the lines, returning the redshifts, EWs, and fitting parameters. Args: spec_index (int): Index of quasar in the spectra 2D array. absorber (str): Absorber name for searching doublets (MgII, CIV, OVI, NV, SiIV, AlIII, FeII, NaI). Default is 'MgII'. lam_obs (numpy.array): observed wavelength array. residual (numpy.array): residual (i.e. flux/continuum) array error (numpy.array): error on residuals lam_search (numpy.array): search observed wavelength array (i.e. region where absorber will be looked for). unmsk_residual (numpy.array): search residual array (residuals at search wavelength pixels) ker_width_pixels (int or list): Kernel width(s) in pixels. Default is 5. coeff_sigma (float): Coefficient for sigma to apply threshold in the convolved array. Default is 2.5. mult_resi (float): Factor to shift the residual up or down. Default is 1. d_pix (float): Pixel distance for line separation during Gaussian fitting. Default is 0.6. pm_pixel (int): Pixel parameter for local noise estimation. Default is 200. sn_line1 (float): Signal-to-noise ratio threshold for line 1. Default is 3. sn_line2 (float): Signal-to-noise ratio threshold for line 2. Default is 2. use_covariance (bool): If True, use full covariance of scipy curve_fit for EW error calculation. Default is False. logwave (bool): If True, wavelength is on log scale (e.g. SDSS). Default is True. verbose (bool): If True, print detailed outputs for debugging. Default is True. nboot (int, optional): Number of bootstrap iterations for fitting. Default is None (disabled). conf_level (float): Minimum confidence level for chi2-based absorber selection. Default is 0.95. Returns: dict: Contains lists of various parameters related to detected absorbers. - index_spec (list): QSO spec index searched - z_abs (list): redshifts of absorbers detected - gauss_fit (list of arrays): gaussian fit parameters for each absorber - gauss_fit_std (list of arrays): errors on gaussian fit parameters for each absorber - ew_1_mean (list): Equivalent width of line 1 for each absorber - ew_2_mean (list): Equivalent width of line 2 for each absorber - ew_total_mean (list): Total Equivalent width of line 1 and line 2 for each absorber - ew_1_error (list): errors on Equivalent width of line 1 for each absorber - ew_2_error (list): errors on Equivalent width of line 2 for each absorber - ew_total_error (list): errors on Total Equivalent width of line 1 and line 2 for each absorber - z_abs_err (list): errors on redshifts of absorbers detected - sn_1 (list): SNR of line 1 for each absorber - sn_2 (list): SNR of line 2 for each absorber - vel_disp1 (list): rest-frame velocity dispersion of line 1 for each absorber (in km/s) - vel_disp2 (list): rest-frame velocity dispersion of line 2 for each absorber (in km/s) - delta_chi2 (list): delta_chi2 between fitted model and flat continuum (null hypothesis) """ def _build_result(index_spec, z_abs, gauss_fit, gauss_fit_std, ew_1_mean, ew_2_mean, ew_total_mean, ew_1_error, ew_2_error, ew_total_error, z_abs_err, sn_1, sn_2, vel_disp1, vel_disp2, delta_chi2): return { 'index_spec': index_spec, 'z_abs': z_abs, 'gauss_fit': gauss_fit, 'gauss_fit_std': gauss_fit_std, 'ew_1_mean': ew_1_mean, 'ew_2_mean': ew_2_mean, 'ew_total_mean': ew_total_mean, 'ew_1_error': ew_1_error, 'ew_2_error': ew_2_error, 'ew_total_error': ew_total_error, 'z_abs_err': z_abs_err, 'sn_1': sn_1, 'sn_2': sn_2, 'vel_disp1': vel_disp1, 'vel_disp2': vel_disp2, 'delta_chi2': delta_chi2, } # return if there are less than MIN_NPIXEL wavelength pixels available to search for if lam_search.size <= _constants.MIN_NPIXEL or lam_obs.size <= _constants.MIN_NPIXEL: if verbose: logger.info("No wavelength pixels available in search region, spec index = %s", spec_index) return _build_result( [spec_index], [0], [[0, 0, 0, 0, 0, 0]], [[0, 0, 0, 0, 0, 0]], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0] ) else: # Constants if absorber not in doublet_keys: raise ValueError(f"No support for {absorber}, only supports {doublet_keys.keys()}") else: line1, line2 = lines[doublet_keys[absorber][0]], lines[doublet_keys[absorber][1]] f1, f2 = oscillator_parameters[f'{absorber}_f1'], oscillator_parameters[f'{absorber}_f2'] prod1 = f1 * line1 prod2 = f2 * line2 line_ratio = max(prod1, prod2) / min(prod1, prod2) line_sep = line2 - line1 del_z = line_sep / line1 if verbose: logger.debug("For %s, theoretical oscillator strength ratio: %s", absorber, line_ratio) logger.debug("instrumental resolution will be calculated from wavelength array; wavelength pixels are assumed to be less than FWHM") if not logwave: # per pixel resolution in case wavelength is on linear scale wave_res = np.nanmedian(np.diff(lam_search)) # robust to outliers resolution = wave_res/lam_obs * speed_of_light # an array, it is assumed that it's true one and not FWHM mean_resolution = np.nanmean(resolution) #this is just to define the lower boundary for gaussian sigma del_sigma = mean_resolution * line1 / speed_of_light else: log_obs_wave = np.log10(lam_search) wave_res = np.nanmedian(np.diff(log_obs_wave)) resolution = (10**wave_res - 1) * speed_of_light del_sigma = line1 * resolution / speed_of_light # in Ang del_sigma /=2.355 ## FWHM sqrt(8ln2) #this is just to define the lower boundary for gaussian sigma mean_resolution = resolution if verbose: logger.debug("mean wave_resolution = %.5f, mean resolution per pixel = %.3f [km/s], del_sigma: %s", wave_res, mean_resolution, del_sigma) bd_ct, x_sep = 1.0, 30 # multiple for bound definition (for line centres and widths of line, max can be 30 times of min) # bounds for gaussian fitting, to avoid very bad candidates edge = 0.1 bound = ((np.array([2e-2, line1 - bd_ct * d_pix, max(0.1,del_sigma - edge), 2e-2, line2 - bd_ct * d_pix, max(0.1, del_sigma - edge)])), (np.array([1.11, line1 + bd_ct * d_pix, x_sep * del_sigma + edge, 1.11, line2 + bd_ct * d_pix, x_sep * del_sigma + edge]))) # line separation tolerance (fitted line centers should not be outside, centre +/- d_pix) lower_del_lam = line_sep - d_pix upper_del_lam = line_sep + d_pix if isinstance(ker_width_pixels, int): ker_width_pixels = [ker_width_pixels] # Kernel width computation width_kernel = np.array([ker * mean_resolution * ((f1 * line1 + f2 * line2) / (f1 + f2)) / (speed_of_light * 2.355) for ker in ker_width_pixels]) combined_final_our_z = [] for sig_ker in width_kernel: if verbose: logger.debug("convolving for kernel width: %s Angstrom.", sig_ker) line_centre = (line1 + line2) / 2 conv_arr = convolution_fun(absorber, mult_resi * unmsk_residual, sig_ker, log=logwave, wave_res=wave_res, index=spec_index, f1=f1, f2=f2) sigma_cr = estimate_local_sigma_conv_array(conv_arr, pm_pixel=pm_pixel) thr = np.nanmedian(conv_arr) - coeff_sigma * sigma_cr conv_arr[np.isnan(conv_arr)] = 1e5 our_z_ind = conv_arr < thr conv_arr[conv_arr == 1e5] = np.nan our_z = lam_search[our_z_ind] / line_centre - 1 residual_our_z = unmsk_residual[our_z_ind] if verbose: logger.debug("sigma cut on convolved flux for potential candidates") new_our_z, new_res_arr = find_valid_indices(our_z, residual_our_z, lam_search, conv_arr, sigma_cr, coeff_sigma, line_ratio, line1, line2, logwave) final_our_z = group_and_select_weighted_redshift(new_our_z, new_res_arr, residual, lam_obs, line1, line2, del_z) combined_final_our_z.append(final_our_z) if verbose: logger.debug("combining redshifts") combined_final_our_z = reduce(add, combined_final_our_z) combined_final_our_z = list(set(combined_final_our_z)) if verbose: logger.debug("potential candidates before combining: %s", combined_final_our_z) combined_final_our_z = median_selection_after_combining(combined_final_our_z, lam_obs, residual, d_pix=d_pix, use_kernel=absorber, delta_z=del_z) combined_final_our_z = [x for x in combined_final_our_z if not np.isnan(x)] if verbose: logger.debug("potential candidates after combining: %s", combined_final_our_z) if len(combined_final_our_z)>0: z_abs, _, fit_param, _, _, _, _, _, _, _,_ = measure_absorber_properties_double_gaussian( index=spec_index, wavelength=lam_obs, flux=residual, error=error, absorber_redshift=combined_final_our_z, bound=bound, use_kernel=absorber, d_pix=d_pix, use_covariance=use_covariance, nboot=nboot) pure_z_abs = np.zeros(len(z_abs)) pure_gauss_fit = np.zeros((len(z_abs), 6)) pure_gauss_fit_std = np.zeros((len(z_abs), 6)) pure_ew_first_line_mean = np.zeros(len(z_abs)) pure_ew_second_line_mean = np.zeros(len(z_abs)) pure_ew_total_mean = np.zeros(len(z_abs)) pure_ew_first_line_error = np.zeros(len(z_abs)) pure_ew_second_line_error = np.zeros(len(z_abs)) pure_ew_total_error = np.zeros(len(z_abs)) redshift_err = np.zeros(len(z_abs)) sn1_all = np.zeros(len(z_abs)) sn2_all = np.zeros(len(z_abs)) vel_disp1 = np.zeros(len(z_abs)) vel_disp2 = np.zeros(len(z_abs)) delta_chi2_array = np.zeros(len(z_abs)) z_inds = [i for i, x in enumerate(z_abs) if not np.isnan(x) and x > 0] if verbose: logger.debug("performing final selection based on physical properties") logger.debug("only absorbers with conf_level > %s will be selected", conf_level) for m in z_inds: if len(fit_param[m]) > 0 and not np.all(np.isnan(fit_param[m])): z_new, z_new_error, fit_param_temp, fit_param_std_temp, EW_first_temp_mean, EW_second_temp_mean, EW_total_temp_mean, EW_first_error_temp, EW_second_error_temp, EW_total_error_temp, delta_chi2 = measure_absorber_properties_double_gaussian( index=spec_index, wavelength=lam_obs, flux=residual, error=error, absorber_redshift=[z_abs[m]], bound=bound, use_kernel=absorber, d_pix=d_pix, use_covariance=use_covariance, nboot=nboot) z_new = float(z_new[0]) z_new_error = float(z_new_error[0]) delta_chi2 = delta_chi2[0] if len(fit_param_temp[0]) > 0 and not np.all(np.isnan(fit_param_temp[0])): gaussian_parameters = np.array(fit_param_temp[0]) lam_rest = lam_obs / (1 + z_new) c0 = gaussian_parameters[1] c1 = gaussian_parameters[4] sig1, sig2 = gaussian_parameters[2], gaussian_parameters[5] #S/N estimation sn1, sn2 = estimate_snr_for_lines(c0, c1, sig1, sig2, lam_rest, residual, error, logwave) # resolution corrected velocity dispersion (should be greater than 0) vel1, vel2 = vel_dispersion(c0, c1, gaussian_parameters[2], gaussian_parameters[5], resolution, z_new, lam_obs) # calculate best -fit doublet ratio and errors and check if they are within the range. # usually 1 < DR < line_ratio (doublet ratio =2, for MgII, CIV), also applying SNR for EW >1, these are strict cuts if EW_first_temp_mean[0] > 0 and EW_second_temp_mean[0] > 0: dr, dr_error = calculate_doublet_ratio(EW_first_temp_mean[0], EW_second_temp_mean[0], EW_first_error_temp[0], EW_second_error_temp[0], f1, f2) min_dr, max_dr = 1 - dr_error, line_ratio + dr_error ew1_snr, ew2_snr = EW_first_temp_mean[0] / EW_first_error_temp[0], EW_second_temp_mean[0] / EW_second_error_temp[0] else: dr, min_dr, max_dr = 0, 0, -1 #failure case ew1_snr, ew2_snr = 0, 0 # failure case good = check_absorber_selection(spec_index, z_new, gaussian_parameters, bound, lower_del_lam, c0, c1, upper_del_lam, sn1, sn_line1, sn2, sn_line2, vel1, vel2, min_dr, dr, max_dr, ew1_snr, ew2_snr, delta_chi2, conf_level, verbose=verbose) if good: pure_z_abs[m] = z_new pure_gauss_fit[m] = fit_param_temp[0] pure_gauss_fit_std[m] = fit_param_std_temp[0] pure_ew_first_line_mean[m] = EW_first_temp_mean[0] pure_ew_second_line_mean[m] = EW_second_temp_mean[0] pure_ew_total_mean[m] = EW_total_temp_mean[0] pure_ew_first_line_error[m] = EW_first_error_temp[0] pure_ew_second_line_error[m] = EW_second_error_temp[0] pure_ew_total_error[m] = EW_total_error_temp[0] redshift_err[m] = z_new_error sn1_all[m] = sn1 sn2_all[m] = sn2 vel_disp1[m] = vel1 vel_disp2[m] = vel2 delta_chi2_array[m] = delta_chi2 valid_indices = pure_z_abs != 0 pure_z_abs = pure_z_abs[valid_indices] pure_gauss_fit = pure_gauss_fit[valid_indices] pure_gauss_fit_std = pure_gauss_fit_std[valid_indices] pure_ew_first_line_mean = pure_ew_first_line_mean[valid_indices] pure_ew_second_line_mean = pure_ew_second_line_mean[valid_indices] pure_ew_total_mean = pure_ew_total_mean[valid_indices] pure_ew_first_line_error = pure_ew_first_line_error[valid_indices] pure_ew_second_line_error = pure_ew_second_line_error[valid_indices] pure_ew_total_error = pure_ew_total_error[valid_indices] redshift_err = redshift_err[valid_indices] sn1_all = sn1_all[valid_indices] sn2_all = sn2_all[valid_indices] vel_disp1 = vel_disp1[valid_indices] vel_disp2 = vel_disp2[valid_indices] delta_chi2_array = delta_chi2_array[valid_indices] if verbose: logger.debug("final candidates: %s", pure_z_abs) if len(pure_z_abs) > 0: if absorber=='MgII': match_abs1 = remove_Mg_falsely_come_from_Fe_absorber(pure_z_abs, lam_obs, residual, error, d_pix, logwave) else: match_abs1 = -1*np.ones(len(pure_z_abs)) match_abs2 = z_abs_from_same_metal_absorber(pure_z_abs, lam_obs, residual, error, d_pix, absorber, logwave) ind_z = contiguous_pixel_remover(pure_z_abs, sn1_all, sn2_all, absorber, pure_gauss_fit) sel_indices = (match_abs1 == -1) & (match_abs2 == -1) & (ind_z == -1) # pure final absorber candidates # Select final quantities based on sel_indices pure_z_abs = pure_z_abs[sel_indices] pure_gauss_fit = pure_gauss_fit[sel_indices] pure_gauss_fit_std = pure_gauss_fit_std[sel_indices] pure_ew_first_line_mean = pure_ew_first_line_mean[sel_indices] pure_ew_second_line_mean = pure_ew_second_line_mean[sel_indices] pure_ew_total_mean = pure_ew_total_mean[sel_indices] pure_ew_first_line_error = pure_ew_first_line_error[sel_indices] pure_ew_second_line_error = pure_ew_second_line_error[sel_indices] pure_ew_total_error = pure_ew_total_error[sel_indices] redshift_err = redshift_err[sel_indices] sn1_all = sn1_all[sel_indices] sn2_all = sn2_all[sel_indices] vel_disp1 = vel_disp1[sel_indices] vel_disp2 = vel_disp2[sel_indices] delta_chi2_array = delta_chi2_array[sel_indices] else: redshift_err = np.array([0]) pure_z_abs = np.array([0]) pure_gauss_fit = pure_gauss_fit_std = np.array([[0, 0, 0, 0, 0, 0]]) pure_ew_first_line_mean = pure_ew_second_line_mean = pure_ew_total_mean = np.array([0]) pure_ew_first_line_error = pure_ew_second_line_error = pure_ew_total_error = np.array([0]) sn1_all = sn2_all = np.array([0]) vel_disp1 = vel_disp2= np.array([0]) delta_chi2_array = np.array([0]) not_found = max(1, len(pure_z_abs)) index_spec = [spec_index for _ in range(not_found)] return _build_result( index_spec, pure_z_abs.tolist(), pure_gauss_fit.tolist(), pure_gauss_fit_std.tolist(), pure_ew_first_line_mean.tolist(), pure_ew_second_line_mean.tolist(), pure_ew_total_mean.tolist(), pure_ew_first_line_error.tolist(), pure_ew_second_line_error.tolist(), pure_ew_total_error.tolist(), redshift_err.tolist(), sn1_all.tolist(), sn2_all.tolist(), vel_disp1.tolist(), vel_disp2.tolist(), delta_chi2_array.tolist(), ) else: return _build_result( [spec_index], [0], [[0, 0, 0, 0, 0, 0]], [[0, 0, 0, 0, 0, 0]], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0] )