Source code for altaipony.flarelc

import numpy as np
import pandas as pd
import os
import copy
import logging
import progressbar
import datetime

from collections import defaultdict

from scipy.interpolate import UnivariateSpline
from scipy import optimize
from scipy.fftpack import fft

from lightkurve import KeplerLightCurve, KeplerTargetPixelFile, TessLightCurve
from lightkurve.utils import KeplerQualityFlags

from import fits
import astropy.units as u

from .k2scmod import k2sc_lc
from .altai import (find_flares,
from .fakeflares import (merge_fake_and_recovered_events,
from .injrecanalysis import wrap_characterization_of_flares, _heatmap
from .utils import split_gaps

import time
LOG = logging.getLogger(__name__)

FLARE_COLUMNS = ['istart', 'istop', 'cstart', 'cstop', 'tstart',
                 'tstop', 'ed_rec', 'ed_rec_err', 'ampl_rec', 
                  'total_n_valid_data_points', 'dur']

FAKE_FLARE_COLUMNS = ['duration_d', 'amplitude', 'ed_inj', 'peak_time']

[docs]class FlareLightCurve(KeplerLightCurve, TessLightCurve): """ Flare light curve class that unifies properties of ``K2SC``-de-trended and Kepler's ``lightkurve.KeplerLightCurve``. Attributes ------------- time : array-like Time measurements. flux : array-like Flux count for every time point. flux_err : array-like Uncertainty on each flux data point. pixel_flux : multi-dimensional array Flux in the target pixels from the KeplerTargetPixelFile. pixel_flux_err : multi-dimensional array Uncertainty on pixel_flux. pipeline_mask : multi-dimensional boolean array TargetPixelFile mask for aperture photometry. time_format : str String specifying how an instant of time is represented, e.g., 'bkjd' or ‘jd'. time_scale : str String that specifies how the time is measured, e.g., tdb', ‘tt', ‘ut1', or 'utc'. time_unit : astropy.unit Astropy unit object defining unit of time. centroid_col : array-like Centroid column coordinates as a function of time. centroid_row : array-like Centroid row coordinates as a function of time. quality : array-like Kepler quality flags. quality_bitmask : str Can be 'none', 'default', 'hard' or 'hardest'. channel : int Channel number, where aperture is located on the CCD. campaign : int K2 campaign number. quarter : int Kepler Quarter number. sector : int TESS sector number mission : string Mission identifier, e.g., 'TESS', 'K2' or 'Kepler'. cadenceno : array-like Cadence number - unique identifier. object : str target ID. ra : float RA in deg. dec : float Declination in deg. label : string 'EPIC xxxxxxxxx'. meta : dict Free-form metadata associated with the LightCurve. Not populated in general. detrended_flux : array-like K2SC detrend flux, same units as flux. detrended_flux_err : array-like K2SC detrend flux error, same units as flux. flux_trends : array-like Astrophysical variability as derived by K2SC. gaps : list of tuples of ints Each tuple contains the start and end indices of observation gaps. See ``find_gaps``. flares : DataFrame Table of flares, their start and stop time, recovered equivalent duration (ED), and, if applicable, recovery probability, ratio of recovered ED to injected synthetic ED. Also information about quality flags may be stored here. it_med : array-like Iterative median, see the ``find_iterative_median`` method. """ @property def detrended_flux_err(self) -> np.array: try: return self["detrended_flux_err"] except KeyError: self["detrended_flux_err"] = np.full_like(self.time.value, np.nan) return self["detrended_flux_err"] @detrended_flux_err.setter def detrended_flux_err(self, detrended_flux_err): self["detrended_flux_err"] = detrended_flux_err @property def detrended_flux(self) -> np.array: try: return self["detrended_flux"] except KeyError: self["detrended_flux"] = np.full_like(self.time.value, np.nan) return self["detrended_flux"] @detrended_flux.setter def detrended_flux(self, detrended_flux): self["detrended_flux"] = detrended_flux @property def cadenceno(self): try: return self["cadenceno"] except KeyError: self["cadenceno"] = np.full_like(self.time.value, np.nan) return self["cadenceno"] @cadenceno.setter def cadenceno(self, cadenceno): self["cadenceno"] = cadenceno @property def it_med(self): try: return self["it_med"] except KeyError: self["it_med"] = np.full_like(self.time.value, np.nan) return self["it_med"] @it_med.setter def it_med(self, it_med): self["it_med"] = it_med @property def origin(self): try: return self.meta["origin"] except KeyError: self.meta["origin"] = "FLC" return self.meta["origin"] @origin.setter def origin(self, origin): self.meta["origin"] = origin @property def saturation(self): try: return self.meta["saturation"] except KeyError: self.meta["saturation"] = [] return self.meta["saturation"] @saturation.setter def saturation(self, saturation): self.meta["saturation"] = saturation @property def flares(self) -> pd.DataFrame: try: return self.meta["flares"] except KeyError: self.meta["flares"] = pd.DataFrame(columns=FLARE_COLUMNS) return self.meta["flares"] @flares.setter def flares(self, flares): self.meta["flares"] = flares @property def fake_flares(self) -> pd.DataFrame: try: return self.meta["fake_flares"] except KeyError: self.meta["fake_flares"] = pd.DataFrame(columns=FAKE_FLARE_COLUMNS) return self.meta["fake_flares"] @fake_flares.setter def fake_flares(self, fake_flares): self.meta["fake_flares"] = fake_flares @property def gaps(self): try: return self.meta["gaps"] except KeyError: self.meta["gaps"] = None return self.meta["gaps"] @gaps.setter def gaps(self, gaps): self.meta["gaps"] = gaps def _init_flare_table(self, flares=None, fake_flares=None): if flares is None: self.flares = pd.DataFrame(columns=FLARE_COLUMNS) else: self.flares = flares if fake_flares is None: self.fake_flares = pd.DataFrame(columns=FAKE_FLARE_COLUMNS) else: self.fake_flares = fake_flares def _add_tpf_columns(self, pixel_flux=None, pixel_flux_err=None, pipeline_mask=None): self.pixel_flux = pixel_flux self.pixel_flux_err = pixel_flux_err self.pipeline_mask = pipeline_mask def __repr__(self): return('FlareLightCurve(ID: {})'.format(self.targetid))
[docs] def find_gaps(self, maxgap=0.09, minspan=10, splits=[]): ''' Find gaps in light curve and stores them in the gaps attribute. If required, adds additional splits in an arbitrary number of places. Caution: passing splits values means that you override the minspan and maxgap. Parameters ------------ time : numpy array with floats sorted array, in units of days maxgap : 0.09 or float maximum time gap between two datapoints in days, default equals approximately 2h minspan : 10 or int minimum number of datapoints in continuous observation, i.e., w/o gaps as defined by maxgap splits : list of floats or ints additional places in which to slice the time series Returns -------- FlareLightCurve ''' lc = copy.copy(self) dt = np.diff(lc.time.value) gap = np.where(np.append(0, dt) >= maxgap)[0] # add start/end of LC to loop over easily gap_out = np.append(0, np.append(gap, len(lc.time.value))) # left start, right end of data left, right = gap_out[:-1], gap_out[1:] # drop too short observation periods too_short = np.where(np.diff(gap_out) < 10) left, right = np.delete(left,too_short), np.delete(right,(too_short)) # get the gaps gaps = list(zip(left, right)) # split up the time series in additional place if needed lc.gaps = split_gaps(gaps, splits) return lc
[docs] def detrend(self, mode, save=False, path='detrended_lc.fits', de_niter=30, max_sigma=3, func=None, **kwargs): """ De-trends a FlareLightCurve using ``K2SC``. Optionally saves the LightCurve in a fits file that can be read as K2SC file. Parameters: ---------- mode : str "k2sc" or "savgol" or "custom" de_niter : int Differential Evolution global optimizer parameter. K2SC default is 150, here set to 30 as a safety net to avoid unintenional computational effort. max_sigma: int Default is 3, value is passed to iterative sigma clipping in K2SC save : False or bool If True, the light curve is saved as a fits file to a given folder. path : str Path to resulting fits file. As a default, the fits file will be stored in the working directory. func : function custom detrending function kwargs : dict Keyword arguments to pass to k2sc, detrend_savgol, or custom method Returns -------- FlareLightCurve """ if mode == "savgol": new_lc = copy.deepcopy(self) new_lc = detrend_savgol(new_lc, **kwargs) if save == True: new_lc.to_fits(path) return new_lc elif mode == "k2sc": #make sure there is no detrended_flux already if self.origin != 'TPF': err_str = ('Only KeplerTargetPixelFile derived FlareLightCurves can be' ' passed to K2SC de-trending.') LOG.exception(err_str) raise ValueError(err_str) else: new_lc = copy.deepcopy(self) new_lc.meta["keplerid"] = self.targetid #K2SC MAGIC new_lc.__class__ = k2sc_lc try: new_lc.k2sc(de_niter=de_niter, max_sigma=max_sigma, **kwargs) new_lc["detrended_flux"] = (new_lc.corr_flux.value - new_lc.tr_time.value + np.nanmedian(new_lc.tr_time.value)) new_lc["detrended_flux_err"] = copy.copy(new_lc.flux_err) # does k2sc share their uncertainties somewhere? new_lc["flux_trends"] = new_lc.tr_time.value if new_lc.detrended_flux.value.shape != self.flux.value.shape: LOG.error('De-detrending messed up the flux arrays.') else:'De-trending successfully completed.') except np.linalg.linalg.LinAlgError as e: LOG.error(e) LOG.error('Detrending failed because probably Cholesky ' 'decomposition failed. Try again, you shall succeed.') new_lc.__class__ = FlareLightCurve if save == True: new_lc.to_fits(path) return new_lc elif mode=="custom": if func is None: LOG.error('If you wish to use a custom detrending function you' ' must pass a callable to the "func" parameter.') raise ValueError new_lc = copy.deepcopy(self) new_lc = func(new_lc, **kwargs) if (np.isnan(new_lc.detrended_flux).all() | np.isnan(new_lc.detrended_flux_err).all()): LOG.error('The custom de-trending function you passed does not' ' return an detrended_flux or detrended_flux_err attri' 'bute.') raise AttributeError if save == True: new_lc.to_fits(path) return new_lc else: err_str = (f'\nDe-trending mode {mode} does not exist. Pass "k2sc" (K2 LCs)' ' or "savgol" (Kepler, TESS).') LOG.exception(err_str) raise ValueError(err_str)
[docs] def find_flares(self, minsep=3, fake=False, **kwargs): ''' Find flares in a ``FlareLightCurve``. Parameters ------------- minsep : 3 or int Minimum distance between two candidate start times in datapoints. kwargs : dict keyword arguments to pass to :func:`find_flares_in_cont_obs_period` Possible keyword arguments: sigma : numpy array local scatter of the flux. Array should be the same length as the detrended flux array. If sigma=None, error is used instead. N1 : int (default is 3) How many times above sigma is required. N2 : int (Default is 2) How many times above sigma and detrended_flux_err is required N3 : int (Default is 3) The number of consecutive points required to flag as a flare. Returns ---------- FlareLightCurve ''' if ((fake==False) & (self.flares.shape[0]>0)): return self else: lc = copy.deepcopy(self) #re-init flares columns = ['istart', 'istop', 'cstart', 'cstop', 'tstart', 'tstop', 'ed_rec', 'ed_rec_err', 'ampl_rec', 'dur'] lc.flares = pd.DataFrame(columns=columns) #find continuous observing periods lc = lc.find_gaps() #find the true median value iteratively lc = find_iterative_median(lc) #find flares lc = find_flares(lc, minsep=minsep, **kwargs) return lc
[docs] def sample_flare_recovery(self, iterations=2000, inject_before_detrending=False, mode=None, func=None, save_lc_to_file=False, folder="", fakefreq=0.05, save=False, path=None, detrend_kwargs={}, **kwargs): """ Runs a number of injection recovery cycles and characterizes the light curve by recovery probability and equivalent duration underestimation. Inject one flare per light curve. Parameters ----------- iterations : 2000 or int Number of injection/recovery cycles inject_before_detrending : False or bool If True, fake flare are injected directly into raw data. mode : str "savgol" or "k2sc". Required if ``inject_before_detrending`` is True. fakefreq : 0.05 or float number of flares per day, but at least one per continuous observation period will be injected detrend_kwargs : dict Keyword arguments to pass to FlareLightCurve.detrend kwargs : dict Keyword arguments to pass to inject_fake_flares Returns ------- lc : FlareLightCurve Detrended LC with all fake_flares listed in the attribute fake_lc : FlareLightCurve Light curve with the last iteration of synthetic flares injected. """ injrecstr = {True : "before", False : "after"} # define string to identify fake flare analysis by file name lc = copy.deepcopy(self) if inject_before_detrending == True: lc = lc.detrend(mode, func=func, **detrend_kwargs) lc = lc.find_gaps() lc = lc.find_flares() lc = find_iterative_median(lc) lc_ = copy.deepcopy(lc) columns = ['istart', 'istop', 'cstart', 'cstop', 'tstart', 'tstop', 'ed_rec', 'ed_rec_err', 'duration_d', 'amplitude', 'ed_inj', 'peak_time', 'ampl_rec', 'dur'] widgets = [progressbar.Percentage(), progressbar.Bar()] bar = progressbar.ProgressBar(widgets=widgets, max_value=iterations).start() for i in range(iterations): fake_lc = lc.inject_fake_flares(inject_before_detrending=inject_before_detrending, fakefreq=fakefreq, **kwargs) if save_lc_to_file == True: fake_lc.to_fits("{folder}before.fits") print(f"saved {self.targetit} LC before detrending") injs = fake_lc.fake_flares if inject_before_detrending == True:'\nDetrending fake LC:\n') fake_lc = fake_lc.detrend(mode, func=func, **detrend_kwargs) fake_lc = fake_lc.find_flares(fake=True) recs = fake_lc.flares if save_lc_to_file == True: fake_lc.to_fits(f"{folder}after.fits") print(f"saved {self.targetit} LC after detrending") # The following line makes sure that column order is retained injrec_results = pd.DataFrame(columns=columns) # Merge injected and recovered flares injrec_results = injrec_results.append(merge_fake_and_recovered_events(injs, recs), ignore_index=True) bar.update(i + 1) if save == True: #Define default path if needed if path is None: path = (f'{iterations}_{lc.targetid}_inj_' f'{injrecstr[inject_before_detrending]}_' f'{lc.campaign}.csv') # If it already exists append new injrec to the end if os.path.exists(path): with open(path, 'a') as f: injrec_results.to_csv(f, index=False, header=False) # If it doesn't then write it out but keep the header else: injrec_results.to_csv(path, index=False) # Add to previous runs of sample_flare_recovery on the same LC or create new table if lc.fake_flares.shape[0] > 0: lc.fake_flares = lc.fake_flares.append(injrec_results, ignore_index=True) else: lc.fake_flares = injrec_results if save == True: # Finally read in the result lc.fake_flares = pd.read_csv(path) # End monitoring bar.finish() return lc, fake_lc
[docs] def mark_flagged_flares(self, explain=False): """ Mark all flares that coincide with K2 flagged cadences. Explain the flags if needed. Parameters ----------- explain : False or bool If True, an ``explanation`` column will be added to the flares table explaining the flags that were raised during the flare duration. Returns -------- FlareLightCurve with the flares table supplemented with an integer ``quality`` and, if applicable, a string ``explanation`` column. """ lc = copy.copy(self) f = lc.flares if 'quality' not in f.columns: f['quality'] = 0 f.quality = f.apply(lambda x: np.sum(lc.quality[x.istart:x.istop], dtype=int), axis=1) if explain == True: g = lambda x: ', '.join(KeplerQualityFlags.decode(x.quality)) f['explanation'] = f.apply(g, axis=1) lc.flares = f return lc
[docs] def get_saturation(self, factor=10, return_level=False): """ Goes back to the TPF and measures the maximum saturation level during a flare, averaged over the aperture mask. Parameters ----------- factor : 10 or float Saturation level in full well depths. Returns ------- FlareLightCurve with modified 'flares' attribute. """ flc = copy.copy(self) well_depth = 10093 def sat(flares, flc=flc, well_depth=10093, return_level=False): pfl = flc.pixel_flux[flares.istart:flares.istop] flare_aperture_pfl = pfl[:,flc.pipeline_mask] return sat_level(flare_aperture_pfl, well_depth, return_level) def sat_level(flare_aperture_pfl, well_depth, return_level): saturation_level = np.nanmean(flare_aperture_pfl, axis=1) / well_depth if return_level == False: return np.any(saturation_level > factor) else: return np.nanmax(saturation_level) colname = 'saturation_f{}'.format(factor) if np.isnan(flc.saturation).all(): if flc.flares.shape[0] > 0:#do not attempt if no flares are detected flc.flares[colname] = flc.flares.apply(sat, axis=1, return_level=return_level) elif flc.flares.shape[0] == 0: # calculate saturation for all times flare_aperture_pfl = flc.pixel_flux[:,flc.pipeline_mask] saturation_level = np.nanmax(flare_aperture_pfl, axis=tuple(np.arange(len(flare_aperture_pfl.shape)))[1:]) / well_depth if return_level == False: flc.saturation = saturation_level > factor else: flc.saturation = saturation_level else: if flc.flares.shape[0] > 0:#do not attempt if no flares are detected if isinstance(flc.saturation[0], np.bool_) : if return_level == False: flc.flares[colname] = flc.flares.apply(lambda x: (flc.saturation[x.istart: x.istop] == True).any(), axis=1) elif return_level == True:"Warning: Saturation is given as boolean flag. \n Choose return_level=False.") return flc.get_saturation(factor=factor, return_level=False) elif (isinstance(flc.saturation[0], np.float_) | isinstance(flc.saturation[0], np.float32)) : if return_level == False: flc.flares[colname] = flc.flares.apply(lambda x: (flc.saturation[x.istart: x.istop] > factor).any(), axis=1) elif return_level == True: flc.flares[colname] = flc.flares.apply(lambda x: np.nanmax(flc.saturation[x.istart: x.istop]), axis=1) return flc
[docs] def inject_fake_flares(self, gapwindow=0.1, fakefreq=.005, inject_before_detrending=False, d=False, seed=None, **kwargs): ''' Create a number of events, inject them in to data Use grid of amplitudes and durations, keep ampl in relative flux units Keep track of energy in Equiv Dur. Duration defined in minutes Amplitude defined multiples of the median error Parameters: ------------- mode : 'loglog', 'hawley2014' or 'rand' injection mode gapwindow : 0.1 or float fakefreq : .005 or float flares per day, but at least one per continuous observation period will be injected inject_before_detrending : True or bool By default, flares are injected before the light curve is detrended. d : boolean If True, a seed for random numbers will be set seed : int or None If d == True, seed will be set to this number kwargs : dict Keyword arguments to pass to generate_fake_flare_distribution. Returns: ------------ FlareLightCurve with fake flare signatures ''' def _equivalent_duration(time, flux): ''' Compute the Equivalent Duration of a fake flare. This is the area under the flare, in relative flux units. Parameters: ------------- time : numpy array units of DAYS flux : numpy array relative flux units Return: ------------ p : float equivalent duration of a single event in units of seconds ''' x = time * 60.0 * 60.0 * 24.0 integral = np.sum(np.diff(x) * flux[:-1]) return integral fake_lc = copy.deepcopy(self) LOG.debug(str() + '{} FakeFlares started'.format( # Either inject flares into the un-detrended light curve if inject_before_detrending == True: typ, typerr = 'flux', 'flux_err' LOG.debug('Injecting before detrending.') # ... or into the detrended one elif inject_before_detrending == False: typ, typerr = 'detrended_flux', 'detrended_flux_err' LOG.debug('Injecting after detrending.') # How many flares do you want to inject # At least one per gap # or as defined by the frequency nfakesum = max(len(fake_lc.gaps), int(np.rint(fakefreq * (fake_lc.time.value.max() - fake_lc.time.value.min())) ) ) # Use a light curve where you know the median flux fake_lc = find_iterative_median(fake_lc) # Init arrays for the synthetic flare parameters t0_fake = np.zeros(nfakesum, dtype='float') # peak times ed_fake = np.zeros(nfakesum, dtype='float') # ED dur_fake = np.zeros(nfakesum, dtype='float') # duration ampl_fake = np.zeros(nfakesum, dtype='float') # amplitude # Init the synthetic flare counter to allow to point to the right # places in the arrays above (XXX_fake etc.) ckm = 0 # Iterate over continuous observing periods for (le,ri) in fake_lc.gaps: # Pick the observing period gap_fake_lc = fake_lc[le:ri] # Define the number of synthetic flares you want to inject # minimum of 1 nfake = max(1, int(np.rint(fakefreq * (gap_fake_lc.time.value.max() - gap_fake_lc.time.value.min() ) ) ) ) LOG.debug(f'Inject {nfake} fake flares into a {ri-le} datapoint long array.') # Are there real flares to deal with in the gap? real_flares_in_gap = self.flares[(self.flares.istart >= le) & (self.flares.istop <= ri)] # Pick flux, time, and flux error arrays error = gap_fake_lc[typerr].value flux = gap_fake_lc[typ].value time = gap_fake_lc.time.value # generate the time constraints for the flares you want to inject mintime, maxtime = np.min(time), np.max(time) dtime = maxtime - mintime # generate a distribution of durations and amplitudes distribution = generate_fake_flare_distribution(nfake, d=d, seed=seed, **kwargs) # add the distribution for this observing period # to the full list of injected flares dur_fake[ckm:ckm+nfake], ampl_fake[ckm:ckm+nfake] = distribution # loop over the numer of fake flares you want to generate for k in range(ckm, ckm+nfake): # generate random peak time, avoid known flares isok = False # keep picking new random peak times for your synthetic flares # until it does not overlap with a real one while isok is False: # choose a random peak time # if you set a seed you will get the same synthetic flares # all the time if isinstance(seed, int): t0 = (mod_random(1, d=d, seed=seed * k) * dtime + mintime)[0] # if you do note set a seed, the synthetic flares will be # randomly distributed elif seed is None: t0 = (mod_random(1, d=d) * dtime + mintime)[0] # Check if there are there any real flares to deal with # at that peak time. Only relevant if there were any flares # detected at all: if real_flares_in_gap.tstart.shape[0] > 0: # Are there any real flares happening at peak time? # Fake flares should not overlap with real ones. b = (real_flares_in_gap[(t0 >= real_flares_in_gap.tstart) & (t0 <= real_flares_in_gap.tstop)]. shape[0] ) # number of flares that overlap should be 0 if b == 0: isok = True # No real flares, no trouble: else: isok = True # add the peak time to the list t0_fake[k] = t0 # generate the flare flux from the Davenport 2014 model fl_flux = aflare(time, t0, dur_fake[k], ampl_fake[k]) # calculate the injected ED ed_fake[k] = _equivalent_duration(time, fl_flux) # inject flare in to light curve by adding the flare flux fake_lc[typ].value[le:ri] = (fake_lc[typ][le:ri].value + fl_flux * fake_lc.it_med[le:ri].value) # Increment the counter ckm += nfake # error minimum is a safety net for the spline function if mode=3 fake_lc[typerr] = max( 1e-10, np.nanmedian( pd.Series(fake_lc[typ]). rolling(3, center=True). std() ) )*np.ones_like(fake_lc[typ]) # Put the data together injected_events = {'duration_d' : dur_fake, 'amplitude' : ampl_fake, 'ed_inj' : ed_fake, 'peak_time' : t0_fake} # the fake_flares attribute is a pandas DataFrame fake_lc.fake_flares = pd.DataFrame(injected_events) # Free up space del dur_fake del ampl_fake # Return the FLC with the injected flares return fake_lc
[docs] def load_injrec_data(self, path, **kwargs): """Fetch the injection-recovery table from a given path, and append it to any existing table. Parameters: ----------- path : string path to file kwargs : dict keyword arguments to pass to `pandas.read_csv()` """ df = pd.read_csv(path) if self.fake_flares.shape[0]>0: LOG.warning("The file is appended to an existing table.") self.fake_flares = self.fake_flares.append(df) else: self.fake_flares = df
[docs] def plot_recovery_probability_heatmap(self, ampl_bins=None, dur_bins=None, flares_per_bin=20, **kwargs): """Plot injected amplitude and injected FWHM vs. the fraction of recovered flares. Parameters: ----------- ampl_bins : int or array bins for amplitudes dur_bins : int or array bins for FWHM flares_per_bin : int number of flares per bin, default is 20 """ flc = copy.deepcopy(self) return _heatmap(flc, "recovery_probability", ampl_bins, dur_bins, flares_per_bin, **kwargs)
[docs] def plot_ed_ratio_heatmap(self, ampl_bins=None, dur_bins=None, flares_per_bin=20, **kwargs): """Plot recovered amplitude and recovered duration vs. the ratio of recovered ED to injected ED. Parameters: ----------- ampl_bins : int or array bins for recovered amplitudes dur_bins : int or array bins for recovered duration flares_per_bin : int number of flares per bin, default is 20 """ flc = copy.deepcopy(self) return _heatmap(flc, "ed_ratio", ampl_bins, dur_bins, flares_per_bin, **kwargs)
[docs] def characterize_flares(self, flares_per_bin=30, ampl_bins=None, dur_bins=None): """Use results from injection recovery to determine corrected flare characteristics. """ flc = copy.deepcopy(self) flares = wrap_characterization_of_flares(flc.fake_flares, flc.flares, flares_per_bin=flares_per_bin, ampl_bins=ampl_bins, dur_bins=dur_bins) flc.flares = flares return flc
[docs] def to_fits(self, *args, **kwargs): if self.mission in ["Kepler", "K2"]: self.__class__ = KeplerLightCurve self.to_fits(*args, cadenceno=self.cadenceno, **kwargs) elif self.mission in ["TESS"]: self.__class__ = TessLightCurve self.to_fits(*args, cadenceno=self.cadenceno, **kwargs)
[docs]def generate_lightcurve(errorval, a1, a2, period1, period2, quad, cube, mean=3400.): """Generate wild light curves with variability on several timescales. Returns: --------- FlareLightCurve with time, flux, and flux_err attributes """ time = np.arange(10, 10 + 10 * np.pi,.0008) # define the flux flux = (np.random.normal(0,errorval,time.shape[0]) + mean + a1*mean*np.sin(period1*time +1.) + a2*mean*np.sin(period2*time) + quad*(time-25)**2 - cube*(time-25)**3) # add a gap in the data flux[5600:7720] = np.nan # add big and long flare l = 66 flux[5280:5280 + l] = flux[5280:5280 + l] + np.linspace(1000,250,l) # add tiny flare l = 3 flux[15280:15280 + l] = flux[15280:15280 + l] + np.linspace(100,60,l) # add intermediate flare l, s = 15, 25280 flux[s:s + l] = flux[s:s + l] + np.linspace(200,60,l) # typically Kepler and TESS underestimate the real noise err = np.full_like(time,errorval/3*2) # define FLC return FlareLightCurve(time=time, flux=flux, flux_err=err)