Source code for qsoabsfind.ew

"""
This script contains a function to fit a given absorption profile
with a double gaussian and measure equivalent widths.
"""

import numpy as np
from numba import njit
from scipy.optimize import curve_fit
from .utils import double_gaussian
from .absorberutils import redshift_estimate
from .absorberutils import find_z_from_minimum

# Constants
from .constants import lines, oscillator_parameters, doublet_keys

[docs]def return_line_centers(use_kernel): """ Return line centers for a given absorber Args: use_kerne (str): absorber (e.g. MgII, CIV, OVI, NV, AlIII, SiIV, FeII) Returns: line centers (floats) """ if use_kernel not in doublet_keys: raise ValueError(f"Unsupported kernel type. Use {doublet_keys.keys()}") else: line_centre1, line_centre2 = lines[doublet_keys[use_kernel][0]], lines[doublet_keys[use_kernel][1]] return line_centre1, line_centre2
# Example usage within double_curve_fit
[docs]def double_curve_fit(index, fun_to_run, lam_fit_range, nmf_resi_fit, error_fit, bounds, init_cond, maxefv): """ Fits a double Gaussian function to the provided data. Args: index (int): Index of the spectrum being fitted. fun_to_run (callable): The fitting function. lam_fit_range (numpy.ndarray): Wavelength range for the fitting. nmf_resi_fit (numpy.ndarray): Residual array for the fitting. error_fit (numpy.ndarray): Error array for the fitting. bounds (tuple): Bounds for the fitting parameters. init_cond (list or numpy.ndarray): Initial conditions for the fitting parameters. maxefv (int): Maximum number of iterations for the fitting algorithm. Returns: tuple: Contains the following elements: - save_param_array (numpy.ndarray): Fitted parameters. - save_param_error (numpy.ndarray): Errors of the fitted parameters. - EW_first (float): Equivalent width of the first Gaussian. - EW_second (float): Equivalent width of the second Gaussian. - EW_total (float): Total equivalent width of both Gaussians. """ nparm = len(init_cond) save_param_array = np.zeros(nparm) save_param_error = np.zeros(nparm) save_param_cov = np.zeros((nparm, nparm)) EW_first = np.nan EW_second = np.nan EW_total = np.nan if bounds is None: bounds = (-np.inf, np.inf) try: popt, pcov = curve_fit( fun_to_run, lam_fit_range, nmf_resi_fit, bounds=bounds, sigma=error_fit, p0=init_cond, maxfev=maxefv, absolute_sigma=True, ftol=1e-4, xtol=1e-4 ) EW_first = popt[0] * np.sqrt(np.pi * 2 * popt[2] ** 2) EW_second = popt[3] * np.sqrt(np.pi * 2 * popt[5] ** 2) EW_total = EW_first + EW_second save_param_array = popt save_param_error = np.sqrt(np.diag(pcov)) save_param_cov = pcov except (RuntimeError, ValueError, TypeError) as e: if isinstance(e, TypeError): print(f'NMF resi fit size = {nmf_resi_fit.size}') print(f'\nIn Main Gaussian script: In double Gaussian fitting: Spec Index = {index} has issues, Check this please\n') save_param_array[:] = np.nan save_param_error[:] = np.nan save_param_cov[:] = np.nan return save_param_array, save_param_error, EW_first, EW_second, EW_total, save_param_cov
[docs]@njit def calculate_ew_errors(popt, perr): """ Calculate the errors in the equivalent widths (EW) using the errors in the optimized parameters. Args: popt (numpy.ndarray): Optimized parameters from the curve fitting. perr (numpy.ndarray): Errors of the optimized parameters from the curve fitting. Returns: tuple: Contains the following elements: - EW1_error (float): Error in the equivalent width of the first Gaussian. - EW2_error (float): Error in the equivalent width of the second Gaussian. - EW_total_error (float): Total error in the equivalent width of both Gaussians. """ amp1, mean1, sigma1, amp2, mean2, sigma2 = popt amp1_err, mean1_err, sigma1_err, amp2_err, mean2_err, sigma2_err = perr EW1 = amp1 * np.sqrt(np.pi * 2 * sigma1 ** 2) EW2 = amp2 * np.sqrt(np.pi * 2 * sigma2 ** 2) # using correlation between parameters EW1_error = EW1 * np.sqrt((amp1_err / amp1) ** 2 + (sigma1_err / sigma1) ** 2 - 2 * amp1_err * sigma1_err / (amp1 * sigma1)) EW2_error = EW2 * np.sqrt((amp2_err / amp2) ** 2 + (sigma2_err / sigma2) ** 2 - 2 * amp2_err * sigma2_err / (amp2 * sigma2)) EW_total_error = np.sqrt(EW1_error ** 2 + EW2_error ** 2) return EW1_error, EW2_error, EW_total_error
[docs]def full_covariance_ew_errors(popt, pcov): """ With full covariance matrix, calculate the errors in the equivalent widths (EW) using the errors in the optimized parameters. Args: popt (numpy.ndarray): Optimized parameters from the curve fitting. pcov (numpy.ndarray): Covariance matrix from the curve fitting. Returns: tuple: Contains the following elements: - EW1_error (float): Error in the equivalent width of the first Gaussian. - EW2_error (float): Error in the equivalent width of the second Gaussian. - EW_total_error (float): Total error in the equivalent width of both Gaussians. """ # Extract optimized parameters amp1, mean1, sigma1, amp2, mean2, sigma2 = popt # Calculate the partial derivatives of EW1 and EW2 with respect to the parameters dEW1_damp1 = np.sqrt(2 * np.pi) * sigma1 dEW1_dsigma1 = amp1 * np.sqrt(2 * np.pi) dEW2_damp2 = np.sqrt(2 * np.pi) * sigma2 dEW2_dsigma2 = amp2 * np.sqrt(2 * np.pi) # Derivatives arrays for covariance calculation jacobian_EW1 = np.array([dEW1_damp1, 0, dEW1_dsigma1, 0, 0, 0]) jacobian_EW2 = np.array([0, 0, 0, dEW2_damp2, 0, dEW2_dsigma2]) # Calculate the variance (square of the error) using the full covariance matrix EW1_var = np.dot(jacobian_EW1, np.dot(pcov, jacobian_EW1.T)) EW2_var = np.dot(jacobian_EW2, np.dot(pcov, jacobian_EW2.T)) # The square root of the variance gives the error EW1_error = np.sqrt(EW1_var) EW2_error = np.sqrt(EW2_var) # Total EW error, including cross-terms cross_term = 2 * np.dot(jacobian_EW1, np.dot(pcov, jacobian_EW2.T)) EW_total_error = np.sqrt(EW1_var + EW2_var + cross_term) return EW1_error, EW2_error, EW_total_error
[docs]def bootstrap_fitting_and_ew(index, nboot, z, wavelength, flux, error, ix0, ix1, bound, amp_ratio, line1, line2, num_iter): """Perform bootstrap resampling to estimate uncertainties in fitting parameters and equivalent widths. Conducts bootstrap analysis on spectral data to derive robust estimates of double Gaussian fitting parameters and equivalent widths with associated uncertainties for a doublet system. Args: index (int): Index or identifier for the current fitting process. nboot (int): Number of bootstrap iterations to perform. z (float): Redshift of the absorption system. wavelength (array-like): Observed Wavelength array of the spectrum. flux (array-like): Flux array of the spectrum. error (array-like): Flux uncertainty array. ix0 (int): Starting index of the spectral region to fit. ix1 (int): Ending index of the spectral region to fit. bound (tuple or list): Bounds for the fitting parameters. amp_ratio (float): Expected amplitude ratio between the two lines in the doublet. line1 (float): Rest wavelength of the first line in the doublet. line2 (float): Rest wavelength of the second line in the doublet. num_iter (int): Maximum number of iterations for each fitting attempt. Returns: tuple: A tuple containing: - fit_params_mean (array): Mean values of fitted parameters across bootstrap samples. - fit_param_std (array): Standard deviations of fitted parameters. - ew1_mean (float): Mean equivalent width of line 1. - ew2_mean (float): Mean equivalent width of line 2. - ew_total_mean (float): Mean total equivalent width of the doublet. - ew1_std (float): Standard deviation of line 1 equivalent width. - ew2_std (float): Standard deviation of line 2 equivalent width. - ew_total_std (float): Standard deviation of total equivalent width. """ fit_params = np.zeros((nboot, 6)) ew1_array = np.zeros(nboot) ew2_array = np.zeros(nboot) ew_total_array = np.zeros(nboot) absorber_rest_lam = wavelength / (1 + z) # rest-frame conversion of wavelength lam_ind = np.where((absorber_rest_lam >= ix0) & (absorber_rest_lam <= ix1))[0] lam_fit = absorber_rest_lam[lam_ind] nmf_resi = flux[lam_ind] error_flux = error[lam_ind] amp_first_nmf = max(0.05, 1 - np.nanmin(nmf_resi)) amp_second_nmf = min(0.95, amp_ratio * amp_first_nmf) for i in range(nboot): # #best-fit corresponding to this best redshift if bound is not None: sigma1 = np.random.uniform(bound[0][2], bound[1][2]) sigma2 = np.random.uniform(bound[0][5], bound[1][5]) else: sigma1 = sigma2 = np.random.uniform(0.2, 5) init_cond = [amp_first_nmf, line1, sigma1, amp_second_nmf, line2, sigma2] fit_params[i], _, ew1_array[i], ew2_array[i], ew_total_array[i], _ = double_curve_fit( index, double_gaussian, lam_fit, nmf_resi, error_fit=error_flux, bounds=bound, init_cond=init_cond, maxefv= num_iter) fit_params_mean = np.nanmean(fit_params, axis=0) fit_param_std = np.nanstd(fit_params, axis=0) ew1_mean = np.nanmean(ew1_array) ew2_mean = np.nanmean(ew2_array) ew_total_mean = np.nanmean(ew_total_array) ew1_std = np.nanstd(ew1_array) ew2_std = np.nanstd(ew2_array) ew_total_std = np.nanstd(ew_total_array) return fit_params_mean, fit_param_std, ew1_mean, ew2_mean, ew_total_mean, ew1_std, ew2_std, ew_total_std
[docs]@njit def quick_significance_test(flux_norm, fitted_model, error, fitted_params=None, wavelength_rest=None, n_pixels=5): """ Significance test with simple absorption check at line centers. Args: flux_norm: Normalized flux array fitted_model: Fitted double Gaussian model error: Error array fitted_params: Fitted parameters [amp1, center1, sigma1, amp2, center2, sigma2] wavelength_rest: Wavelength array (same frame as fitted_params) n_pixels: Number of pixels around each line center to check Returns: delta_chi2: Delta chi-square value, or 0 if absorption criteria not met """ # Check for empty arrays if flux_norm.size == 0 or error.size == 0: return 0.0 # Check absorption at line centers if fitted_params is not None and wavelength_rest is not None and wavelength_rest.size > 0: # Get fitted line centers line_center1 = fitted_params[1] line_center2 = fitted_params[4] # Check both line centers for line_center in [line_center1, line_center2]: # Find nearest wavelength pixel idx = np.argmin(np.abs(wavelength_rest - line_center)) # Get flux values around this pixel start = max(0, idx - n_pixels) end = min(len(flux_norm), idx + n_pixels + 1) pixels_around_line = flux_norm[start:end] # Check if all pixels < 1 if not np.all(pixels_around_line < 1.0): return 0.0 # Calculate delta chi2 continuum_level = np.ones_like(flux_norm) chi2_flat = np.sum(((flux_norm - continuum_level) / error) ** 2) chi2_with_lines = np.sum(((flux_norm - fitted_model) / error) ** 2) delta_chi2 = chi2_flat - chi2_with_lines return delta_chi2
[docs]def measure_absorber_properties_double_gaussian( index, wavelength, flux, error, absorber_redshift, bound, use_kernel, d_pix, num_iter=500, window=5, use_covariance=False, nboot=None): """ Measures the properties of each potential absorber by fitting a double Gaussian to the absorption feature and measuring the equivalent width (EW) and errors of absorption lines. Args: index (int): Index of the spectrum being fitted. wavelength (numpy.ndarray): Array containing common rest frame quasar wavelength. flux (numpy.ndarray): Matrix containing the residual flux. error (numpy.ndarray): Error array corresponding to the flux. absorber_redshift (list): List of potential absorbers identified previously. bound (tuple): Bounds for the fitting parameters. use_kernel (str): Kernel type (MgII, FeII, CIV, NV, OVI, SiIV, AlIII). d_pix (float): wavelength pixel for tolerance window (int): window size for redshift estimate (default 9) use_covariance (bool): if want to use full covariance of scipy curvey_fit for EW error calculation (default is False) nboot (int): if provided, will perform bootstrapping fitting (default None) Returns: tuple: Contains the following elements: - z_abs_array (numpy.ndarray): Array of absorber redshifts. - z_abs_err (numpy.ndarray): Array of redshift errors. - fitting_param_for_spectrum (numpy.ndarray): Array of fitting parameters for double Gaussian. - fitting_param_std_for_spectrum (numpy.ndarray): Array of errors for fitting parameters. - EW_first_line (numpy.ndarray): Mean equivalent width of the first line. - EW_second_line (numpy.ndarray): Mean equivalent width of the second line. - EW_total (numpy.ndarray): Mean total equivalent width of both lines. - EW_first_line_error (numpy.ndarray): Error in the equivalent width of the first line. - EW_second_line_error (numpy.ndarray): Error in the equivalent width of the second line. - EW_total_error (numpy.ndarray): Total error in the equivalent width of both lines. - delta_chi2 (numpy.ndarray): Delta chi-squared values for significance testing. """ # Initialize arrays z_abs_array = np.array(absorber_redshift) size_array = z_abs_array.size # Initialize all output arrays nparm = 6 # 6 parameter double Gaussian fitting_param_for_spectrum = np.zeros((size_array, nparm)) fitting_param_std_for_spectrum = np.zeros((size_array, nparm)) fitting_param_pcov_for_spectrum = np.zeros((size_array, nparm, nparm)) EW_first_line = np.zeros(size_array, dtype='float32') EW_second_line = np.zeros(size_array, dtype='float32') EW_first_line_error = np.zeros(size_array, dtype='float32') EW_second_line_error = np.zeros(size_array, dtype='float32') EW_total = np.zeros(size_array, dtype='float32') EW_total_error = np.zeros(size_array, dtype='float32') z_abs_err = np.zeros(size_array, dtype='float32') delta_chi2 = np.zeros(size_array, dtype='float32') # Get line properties for this kernel line_centre1, line_centre2 = return_line_centers(use_kernel) amp_ratio = oscillator_parameters[f'{use_kernel}_f2'] / oscillator_parameters[f'{use_kernel}_f1'] # Define wavelength range for Gaussian fitting # Assuming maximum line width of d_pix * 15 sigma = d_pix * 15 ix0 = line_centre1 - sigma ix1 = line_centre2 + sigma # Return empty arrays if no absorbers if size_array == 0: return ( z_abs_array, z_abs_err, fitting_param_for_spectrum, fitting_param_std_for_spectrum, EW_first_line, EW_second_line, EW_total, EW_first_line_error, EW_second_line_error, EW_total_error, delta_chi2 ) # Process each absorber for k in range(size_array): # Set random seed for reproducibility np.random.seed(int(absorber_redshift[k] * 1e6) % 2**32) # ========== FIRST REDSHIFT REFINEMENT ========== z1 = find_z_from_minimum(wavelength, flux, line_centre1, absorber_redshift[k], window=window) z2 = find_z_from_minimum(wavelength, flux, line_centre2, absorber_redshift[k], window=window) absorber_redshift[k] = (line_centre1 * z1 + line_centre2 * z2) / (line_centre1 + line_centre2) # Extract spectrum in rest frame absorber_rest_lam = wavelength / (1 + absorber_redshift[k]) lam_ind = np.where((absorber_rest_lam >= ix0) & (absorber_rest_lam <= ix1))[0] lam_fit = absorber_rest_lam[lam_ind] nmf_resi = flux[lam_ind] error_flux = error[lam_ind] uniform = np.random.uniform # Check if we have valid data if nmf_resi.size > 0 and not np.all(np.isnan(nmf_resi)): # ========== INITIAL FIT IN REST FRAME ========== # Set initial conditions amp_first_nmf = max(0.05, 1 - np.nanmin(nmf_resi)) amp_second_nmf = min(0.95, amp_ratio * amp_first_nmf) line_first = line_centre1 if bound is not None: sigma1 = uniform(bound[0][2], bound[1][2]) sigma2 = uniform(bound[0][5], bound[1][5]) else: sigma1 = sigma2 = uniform(0.2, 5) line_second = line_centre2 init_cond = [amp_first_nmf, line_first, sigma1, amp_second_nmf, line_second, sigma2] # First fit in rest frame fitting_param_for_spectrum[k], fitting_param_std_for_spectrum[k], \ EW_first_line[k], EW_second_line[k], EW_total[k], _ = double_curve_fit( index, double_gaussian, lam_fit, nmf_resi, error_fit=error_flux, bounds=bound, init_cond=init_cond, maxefv=num_iter ) # ========== FIT IN OBSERVED FRAME FOR REDSHIFT ========== # Convert to observed frame fitted_l1 = fitting_param_for_spectrum[k][1] * (1 + absorber_redshift[k]) fitted_l2 = fitting_param_for_spectrum[k][4] * (1 + absorber_redshift[k]) std_fitted_l1 = fitting_param_std_for_spectrum[k][1] * (1 + absorber_redshift[k]) std_fitted_l2 = fitting_param_std_for_spectrum[k][4] * (1 + absorber_redshift[k]) obs_sig1 = fitting_param_for_spectrum[k][2] * (1 + absorber_redshift[k]) obs_sig2 = fitting_param_for_spectrum[k][5] * (1 + absorber_redshift[k]) obs_init_cond = [amp_first_nmf, fitted_l1, obs_sig1, amp_second_nmf, fitted_l2, obs_sig2] # Fit in observed frame obs_fitting_param_for_spectrum, obs_fitting_param_std_for_spectrum, _, _, _, _ = double_curve_fit( index, double_gaussian, lam_fit * (1 + absorber_redshift[k]), nmf_resi, error_fit=error_flux, bounds=None, init_cond=obs_init_cond, maxefv=num_iter ) fitted_l1 = obs_fitting_param_for_spectrum[1] fitted_l2 = obs_fitting_param_for_spectrum[4] std_fitted_l1 = obs_fitting_param_std_for_spectrum[1] std_fitted_l2 = obs_fitting_param_std_for_spectrum[4] # Update redshift estimate z_abs_array[k], z_abs_err[k] = redshift_estimate( fitted_l1, fitted_l2, std_fitted_l1, std_fitted_l2, line_centre1, line_centre2 ) # ========== SECOND REDSHIFT REFINEMENT ========== z1 = find_z_from_minimum(wavelength, flux, line_centre1, z_abs_array[k], window=window) z2 = find_z_from_minimum(wavelength, flux, line_centre2, z_abs_array[k], window=window) z_abs_array[k] = 0.5 * (z1 + z2) # ========== SECOND FIT WITH REFINED REDSHIFT ========== # Re-extract spectrum with new redshift absorber_rest_lam = wavelength / (1 + z_abs_array[k]) lam_ind = np.where((absorber_rest_lam >= ix0) & (absorber_rest_lam <= ix1))[0] lam_fit = absorber_rest_lam[lam_ind] nmf_resi = flux[lam_ind] error_flux = error[lam_ind] fitting_param_for_spectrum[k], fitting_param_std_for_spectrum[k], \ EW_first_line[k], EW_second_line[k], EW_total[k], \ fitting_param_pcov_for_spectrum[k] = double_curve_fit( index, double_gaussian, lam_fit, nmf_resi, error_fit=error_flux, bounds=bound, init_cond=init_cond, maxefv=num_iter ) # Update redshift estimate again fitted_l1 = fitting_param_for_spectrum[k][1] * (1 + z_abs_array[k]) fitted_l2 = fitting_param_for_spectrum[k][4] * (1 + z_abs_array[k]) std_fitted_l1 = fitting_param_std_for_spectrum[k][1] * (1 + z_abs_array[k]) std_fitted_l2 = fitting_param_std_for_spectrum[k][4] * (1 + z_abs_array[k]) z_abs_array[k], z_abs_err[k] = redshift_estimate( fitted_l1, fitted_l2, std_fitted_l1, std_fitted_l2, line_centre1, line_centre2 ) # ========== FINAL FIT WITH BEST REDSHIFT ========== # Final extraction with best redshift absorber_rest_lam = wavelength / (1 + z_abs_array[k]) lam_ind = np.where((absorber_rest_lam >= ix0) & (absorber_rest_lam <= ix1))[0] lam_fit = absorber_rest_lam[lam_ind] nmf_resi = flux[lam_ind] error_flux = error[lam_ind] fitting_param_for_spectrum[k], fitting_param_std_for_spectrum[k], \ EW_first_line[k], EW_second_line[k], EW_total[k], \ fitting_param_pcov_for_spectrum[k] = double_curve_fit( index, double_gaussian, lam_fit, nmf_resi, error_fit=error_flux, bounds=bound, init_cond=init_cond, maxefv=2 * num_iter ) # ========== CALCULATE SIGNIFICANCE ========== fitted_model = double_gaussian(lam_fit, *fitting_param_for_spectrum[k]) delta_chi2[k] = quick_significance_test(nmf_resi, fitted_model, error_flux, fitted_params=fitting_param_for_spectrum[k], wavelength_rest=lam_fit, n_pixels=2) # ========== BOOTSTRAPPING OR ERROR CALCULATION ========== if nboot is not None and nboot > 0 and nmf_resi.size > 0: print(f'INFO: bootstrapping...') fitting_param_for_spectrum[k], fitting_param_std_for_spectrum[k], \ EW_first_line[k], EW_second_line[k], EW_total[k], \ EW_first_line_error[k], EW_second_line_error[k], EW_total_error[k] = bootstrap_fitting_and_ew( index, nboot, z_abs_array[k], wavelength, flux, error, ix0, ix1, bound, amp_ratio, line_centre1, line_centre2, num_iter ) fitted_model = double_gaussian(lam_fit, *fitting_param_for_spectrum[k]) delta_chi2[k] = quick_significance_test(nmf_resi, fitted_model, error_flux, fitted_params=fitting_param_for_spectrum[k], wavelength_rest=lam_fit, n_pixels=2) else: # Calculate EW errors if not use_covariance: EW_first_line_error[k], EW_second_line_error[k], EW_total_error[k] = \ calculate_ew_errors(fitting_param_for_spectrum[k], fitting_param_std_for_spectrum[k]) else: EW_first_line_error[k], EW_second_line_error[k], EW_total_error[k] = \ full_covariance_ew_errors(fitting_param_for_spectrum[k], fitting_param_pcov_for_spectrum[k]) # ========== CHECK FOR NaN VALUES ========== if np.all(np.isnan(EW_first_line[k])) or \ np.all(np.isnan(EW_second_line[k])) or \ np.all(np.isnan(EW_total[k])): # Reset all values if NaN fitting_param_for_spectrum[k] = np.zeros(nparm) fitting_param_std_for_spectrum[k] = np.zeros(nparm) EW_first_line[k] = 0 EW_second_line[k] = 0 EW_total[k] = 0 EW_first_line_error[k] = 0 EW_second_line_error[k] = 0 EW_total_error[k] = 0 z_abs_err[k] = 0 delta_chi2[k] = 0 else: # No valid data - set everything to zero EW_first_line[k] = 0 EW_second_line[k] = 0 EW_total[k] = 0 EW_first_line_error[k] = 0 EW_second_line_error[k] = 0 EW_total_error[k] = 0 fitting_param_for_spectrum[k] = np.zeros(nparm) fitting_param_std_for_spectrum[k] = np.zeros(nparm) z_abs_err[k] = 0 delta_chi2[k] = 0 return ( z_abs_array, z_abs_err, fitting_param_for_spectrum, fitting_param_std_for_spectrum, EW_first_line, EW_second_line, EW_total, EW_first_line_error, EW_second_line_error, EW_total_error, delta_chi2 )