"""
This script contains functions to read, append and write fits files.
"""
import os
from astropy.io import fits
import numpy as np
from astropy.table import Table
#Constants
from .constants import doublet_keys
[docs]def read_fits_file(fits_file, index=None):
"""
Reads the FLUX, ERROR, WAVELENGTH, and METADATA extensions from the
FITS file.
Args:
fits_file (str): Path to the FITS file containing QSO spectra.
index (int, list, or np.ndarray, optional): Index or indices of the rows to load. Default is None.
Returns:
tuple: A tuple containing the flux, error, wavelength, and metadata data.
"""
## Read the metadata Table first:
metadata = Table.read(fits_file, hdu="METADATA") ## this preserves the units
with fits.open(fits_file, memmap=True) as hdul:
header = hdul[0].header
wavelength = hdul['WAVELENGTH'].data # Assuming wavelength is common for all spectra
if index is None:
flux = hdul['FLUX'].data
error = hdul['ERROR'].data
else:
metadata = metadata[index] ## get metadata only for the input index (or indices)
if isinstance(index, int):
flux = hdul['FLUX'].data[index].flatten() # flux, error should be 1D for a single spectrum
error = hdul['ERROR'].data[index].flatten()
else:
flux = hdul['FLUX'].data[index]
error = hdul['ERROR'].data[index]
return header, flux, error, wavelength, metadata
[docs]def save_results_to_fits(results, input_file, output_file, headers, absorber):
"""
Save the absorber results to a FITS file along with the metadata of QSOs.
Args:
results (dict): The results dictionary from :func:`parallel_convolution_search`.
input_file (str): The path to the input spectra FITS file.
output_file (str): The path to the output FITS file.
headers (dict): The headers to include in the FITS file.
absorber (str): The absorber type (e.g. MgII, CIV).
Returns:
None: Writes a FITS file with an ``ABSORBER`` BinTableHDU containing
detected absorber properties and a ``METADATA`` BinTableHDU with
corresponding QSO metadata.
"""
if absorber not in doublet_keys:
raise ValueError(f"Unsupported absorber, must be in {doublet_keys.keys()}")
else:
EW_TOTAL = f'{absorber.upper()}_EW_TOTAL'
l1, l2 = doublet_keys[absorber][0].upper(), doublet_keys[absorber][1].upper()
sn_1, sn_2 = f'SN_{l1}', f'SN_{l2}'
EW_1, EW_2 = f'{l1}_EW', f'{l2}_EW'
VDISP1, VDISP2 = f'{l1}_VDISP', f'{l2}_VDISP'
hdu = fits.BinTableHDU.from_columns([
fits.Column(name='INDEX_SPEC', format='K', array=np.array(results['index_spec'])),
fits.Column(name='Z_ABS', format='D', array=np.array(results['z_abs'])),
fits.Column(name='GAUSS_FIT', format='6D', array=np.array(results['gauss_fit'])),
fits.Column(name='GAUSS_FIT_STD', format='6D', array=np.array(results['gauss_fit_std'])),
fits.Column(name=f'{EW_1}', format='D', unit='Angstrom', array=np.array(results['ew_1_mean'])),
fits.Column(name=f'{EW_2}', format='D', unit='Angstrom', array=np.array(results['ew_2_mean'])),
fits.Column(name=f'{EW_TOTAL}', format='D', unit='Angstrom', array=np.array(results['ew_total_mean'])),
fits.Column(name=f'{EW_1}_ERROR', format='D', unit='Angstrom', array=np.array(results['ew_1_error'])),
fits.Column(name=f'{EW_2}_ERROR', format='D', unit='Angstrom', array=np.array(results['ew_2_error'])),
fits.Column(name=f'{EW_TOTAL}_ERROR', format='D', unit='Angstrom', array=np.array(results['ew_total_error'])),
fits.Column(name='Z_ABS_ERR', format='D', array=np.array(results['z_abs_err'])),
fits.Column(name=sn_1, format='D', array=np.array(results['sn_1'])),
fits.Column(name=sn_2, format='D', array=np.array(results['sn_2'])),
fits.Column(name=VDISP1, format='D', unit='km s-1', array=np.array(results['vel_disp1'])),
fits.Column(name=VDISP2, format='D', unit='km s-1', array=np.array(results['vel_disp2'])),
fits.Column(name='DELTA_CHI2', format='D', array=np.array(results['delta_chi2'])),
], name='ABSORBER')
hdr = fits.Header()
for key, header in headers.items():
hdr[key] = (header["value"], header["comment"])
# Primay header
primary_hdu = fits.PrimaryHDU(header=hdr)
primary_hdu.header['EXTNAME'] = 'PRIMARY'
# load the QSO METADATA
_,_, _, _, metadata = read_fits_file(input_file, index=np.array(results['index_spec']))
qso_hdu = fits.BinTableHDU(metadata, name='METADATA')
hdul = fits.HDUList([primary_hdu, hdu, qso_hdu])
hdul.writeto(output_file, overwrite=True)
print(f'INFO: ouptut file {output_file} written.')
[docs]def append_table_to_fits(filename, table, hdu_name):
"""
Append an Astropy Table as a new BinTableHDU to a FITS file.
Args:
filename (str): Path to the FITS file to write to.
table (astropy.table.Table): Table to append.
hdu_name (str): Name of the new HDU (used for identification).
"""
if not isinstance(table, Table):
raise TypeError("Provided 'table' must be an astropy.table.Table object")
# Create the new HDU
new_hdu = fits.BinTableHDU(data=table, name=hdu_name)
if os.path.exists(filename):
# Open existing file and append
with fits.open(filename, mode='update') as hdul:
hdul.append(new_hdu)
hdul.flush()
else:
raise ValueError(f"ERROR: {filename} does not exist")
[docs]def read_any_fits_file(filename, hdu_name):
"""Read any fits file given filename and hdu_name.
Args:
filename (str): fits filepath
hdu_name (str or int): HDU extension name or number
Returns:
astropy data: Table for BinTableHDU, data array for Image/Primary HDU
"""
from astropy.io import fits
from astropy.table import Table
with fits.open(filename) as hdul:
# Get the specific HDU
hdu = hdul[hdu_name]
hdr = hdul[0].header # Primary Headers
# Check HDU type and read accordingly
if isinstance(hdu, fits.BinTableHDU):
return hdr, Table.read(filename, hdu=hdu_name)
elif isinstance(hdu, fits.PrimaryHDU):
return hdr, hdu.data
elif isinstance(hdu, fits.ImageHDU):
return hdr, hdu.data
else:
print(f"Reading {type(hdu).__name__} '{hdu_name}' as data array")
return hdr, hdu.data