Source code for imexam.ds9_viewer

"""
Licensed under a 3-clause BSD style license - see LICENSE.rst

This class supports communication with DS9 through the XPA


Some code in this class was adapted from pysao, which can be found
at https://github.com/leejjoon/pysao. Specifically this package used the
existing Cython implementation to the XPA  and extended the calls to the other
available XPA executables so that more functionality is added. The API
information is available here:

http://hea-www.harvard.edu/RD/xpa/client.html#xpaopen

Using Cython will allow for broader development of the code and produce faster
runtimes for large datasets with repeated calls to the display manager.

XPA is licensed under MIT, help can be found here:
http://hea-www.cfa.harvard.edu/saord/xpa/help.html

The current XPA can be downloaded from here:
http://hea-www.harvard.edu/saord/xpa/

"""

import os
import shutil
import sys
import numpy as np
from subprocess import Popen
import time
import warnings
import logging
from tempfile import mkdtemp
import matplotlib.image as mpimage
import matplotlib.pyplot as plt
from matplotlib import get_backend
import atexit
from subprocess import call

# The XPA class controls interaction with DS9
from .xpa_wrap import XPA
from .imexamxpa import XpaException

from . import util
from astropy.io import fits


class UnsupportedDatatypeException(Exception):
    pass


class UnsupportedImageShapeException(Exception):
    pass

__all__ = ['ds9']


[docs]class ds9: """Control all interactions between the user and the DS9 window. The ds9() contructor takes a ds9 target as its main argument. If none is given, then a new window and process will be started. DS9's xpa access points are documented in the reference manual, but the can also be returned to the user with a call to xpaset. http://hea-www.harvard.edu/saord/ds9/ref/xpa.html Parameters ---------- target: string, optional the ds9 target name or id. If None or empty string, a new ds9 instance is created. path : string, optional path of the ds9. Used only if a new ds9 is requested. wait_time : float, optional waiting time before error is raised quit_ds9_on_del : boolean, optional If True, try to quit ds9 when this instance is deleted. Attributes ---------- wait_time: float The waiting time before error is raised path: string The path to the DS9 executable _xpa_name: string The value in XPA_METHOD, the name of the DS9 window _quit_ds9_on_del: boolean Determine whether to quit ds9 when object goes out of scope. _ds9_unix_name: string The full path filename to the unix socket, only if unix sockets are being used with local _need_to_purge: boolean whenever there are unix socket directories which need to be purged when the object goes out of scope _tmpd_name: string The full path name to the unix socket file on the local system _filename: string The name of the image that's currently loaded into DS9 _ext: int Extension of the current image that is loaded. If one extension of an MEF file is loaded this will be 1 regardless of the extension that was specified (because DS9 and the XPA now see it as a single image and header) _extname: string If available, the EXTNAME of the MEF extension that is loaded, taken from the current data header _extver: int If available, the EXTVER of the MEF extension that is loaded, taken from the current data header _ds9_process: pointer Points to the ds9 process id on the system, returned by Popen, whenever this module starts DS9 _mef_file: boolean The file is a multi-extension fits file _iscube: bookean The file is a multiextension fits file, and one of the extensions contains at least 1 additional extension (3D or more) _numaxis: int number of image planes, this is NAXIS _naxis: tuple specific image plane displayed, defaulted to 1 image plane, most relevant to cube fits files """ # _ImgCode : copied from fits, used for displaying arrays straight to DS9 _ImgCode = {'float32': -32, 'float64': -64, 'float16': -16, 'int16': 16, 'int32': 32, 'int64': 64, 'uint8': 8, 'uint16': 16} _tmp_dir = "" _process_list = list() def __init__(self, target='', path='', wait_time=5, quit_ds9_on_del=True): """starter. Notes ----- I think this is a quirk in the XPA communication. The xpans, and XPA prefer to have all connections be of the same type. DS9 defaults to creating an INET connection. In some cases, if no IP address can be found for the computer, the startup can hang. In these cases, a local connection is preferred, which uses a unix filename for the socket. The problem arises that if the user already has DS9 windows running, that were started by default, the nameserver is only listening for the default socket type (inet) and not local. There are also cases where the machine running this code does not have xpa installed, so there is no xpans (nameserver) to run and keep track of the open connections. In that case, the user needs to provide this init with the name of the socket in their window (in XPA_METHOD) in order to create the connection. """ # determine whether to quit ds9 also when object deleted. self._quit_ds9_on_del = quit_ds9_on_del self.wait_time = wait_time self._need_to_purge = False self._tmpd_name = None # dictionary where each key is a frame number, and the values are a # dictionary of details about the image loaded in that frame self._viewer = dict() self._current_frame = None self._current_slice = None # default starting socket type to inet # users can change to local using environment variable self._xpa_method = "inet" self._xpa_name = "" # only used for DS9 windows started from this module self._ds9_process = None self._ds9_path = None # see if the package logger was already started self.log = logging.getLogger(__name__) # An existing ds9 was suggested. Confirm existence # and attach. if target: # check to see if the target exists active = util.list_active_ds9(False) if target in list(active): self._xpa_name = target else: # see if they used a title and not the xpa_method # the title is the first item in the tuple for each key for window in active.values(): if target in window: self._xpa_name = target if not self._xpa_name: print(f"\nDS9 target: {target} doesn't exist.\n{list(active)}") raise ValueError( "Please choose an existing target." ) else: if not path: self._ds9_path = util.find_path('ds9') if not self._ds9_path: raise OSError("Could not find ds9 executable on your path or alias") else: self._ds9_path = path # Check to see if the user has chosen a preference first if 'XPA_METHOD' in os.environ: self._xpa_method = os.environ['XPA_METHOD'].lower() if 'inet' in self._xpa_method: self._xpa_name = self.run_inet_ds9() # xpa_name is the title of the window, the xpa can be # referenced with either the socket address or name # of the window elif 'local' in self._xpa_method: self._xpa_name, self._ds9_unix_name = self._run_unixonly_ds9() # tells xpa where to find sockets os.environ['XPA_METHOD'] = self._xpa_method # xpa_name sets the template for the get and set commands self.xpa = XPA(self._xpa_name) if 'local' in self._xpa_method: self._set_frameinfo() # initial load self._define_cmaps() # dictionary of color maps def __str__(self): pass def __del__(self): """Nicely exit the DS9 process.""" if self._quit_ds9_on_del: if 'local' in self._xpa_method: self._purge_local() else: self._stop_process() def _set_frameinfo(self): """Set the name and extension for the data displayed in current frame. Notes ----- The absolute path reference is stored to make XPA happy in all cases, wherever the user started the DS9 process. The only consistant way to return which cube and slice that is displayed is with the call to "file" which has the full plane=x:y information, but only when looking at something other than the first extension for each plane. In this case, you have to look at the header information to see it's a cube image, and assume the first image plane is displayed. If you load a single extension from an MEF into DS9, XPA references the extension as 1 afterwards for access points you need to look in the header of the displayed image to find out what actual extension is loaded This also also gathers needed image header information It's possible the user has an array in the frame, for which there is no header or filename information """ # check the current frame, if none exists, then don't continue frame = self.frame() if frame: if frame not in self._viewer: self._viewer[frame] = dict() try: user_array = self._viewer[frame]['user_array'] except KeyError: user_array = None extver = None # extension number extname = None # name of extension filename = None # filename of image numaxis = 2 # number of image planes, this is NAXIS # tuple of each image plane, defaulted to 1 image plane naxis = (0) # data has more than 2 dimensions and loads in cube/slice frame iscube = False mef_file = False # used to check misleading headers in fits files load_header = False # not used for in memory arrays self._current_frame = frame # see if any file is currently loaded into ds9, # xpa returns '\n' for nothing loaded # get the current frame try: filename_string = self.get('file').strip() if len(filename_string) > 1 and '\n' not in filename_string: filename = str(filename_string.strip().split('[')[0]) self._viewer[frame]['filename'] = os.path.abspath(filename) load_header = True else: filename_string = "" except XpaException: filename_string = "" try: if "plane" in filename_string: iscube = True if ":" in filename_string: naxis = filename_string.strip().split(']')[ 1].split("=")[1].split(":") else: naxis = filename_string.strip().split(']')[ 1].split('=')[1].split() if len(naxis) == 1: naxis.append("0") naxis.reverse() # for astropy.fits row-major ordering naxis = map(int, naxis) naxis = [axis - 1 if axis > 0 else 0 for axis in naxis] naxis = tuple(naxis) except ValueError: raise ValueError("Problem parsing filename") if load_header: # set the extension from the header information returned from # DS9 this is the best way to get the information if the user # changes the loaded file using the gui header_cards = fits.Header.fromstring( self.get_header(), sep='\n') mef_file, nextend, first_image = util.check_valid(filename) if mef_file: try: extver = int(header_cards['EXTVER']) except KeyError: # fits doesn't require extver if there is only 1 # extension extver = first_image try: extname = str(header_cards['EXTNAME']) except KeyError: extname = None try: numaxis = int(header_cards['NAXIS']) except KeyError: raise KeyError("Problem getting NAXIS from header") if not iscube: if numaxis > 2: iscube = True naxis = list() # assume the first axis in each extension is displayed for axis in range(numaxis, 2, -1): naxis.append(0) naxis = tuple(naxis) # update the viewer dictionary, if the user changes what's # displayed in a frame this should update correctly # this dictionary will be referenced in the other parts of # the code. This enables tracking user arrays through # frame changes self._viewer[frame] = {'filename': filename, 'extver': extver, 'extname': extname, 'naxis': naxis, 'numaxis': numaxis, 'iscube': iscube, 'user_array': user_array, 'mef': mef_file} else: warnings.warn("No frame loaded in viewer")
[docs] def valid_data_in_viewer(self): """return bool if valid file or array is loaded into the viewer.""" frame = self.frame() if frame: self._set_frameinfo() if self._viewer[frame]['filename']: return True else: try: if self._viewer[frame]['user_array'].any(): return True except AttributeError as ValueError: print("error in array") return False
[docs] def get_filename(self): """return the filename currently on display. This function will check if there is already a filename saved. It's possible that the user can connect to a ds9 window with no file loaded and then ask for the data file name after loading one through the ds9 menu options. This will poll the private filename and then try and set one if it's empty. """ # see if the user has loaded a file by hand or changed frames in the # gui frame = self.frame() self._set_frameinfo() return self._viewer[frame]['filename']
[docs] def get_frame_info(self): """return more explicit information about the data displayed.""" self._set_frameinfo() return self._viewer[self.frame()]
[docs] def get_viewer_info(self): """Return a dictionary of information. The dictionary contains information about all frames which are loaded with data Notes ----- Consider adding a loop to verify that all the frames still exist and the user has not deleted any through the gui. """ self._set_frameinfo() return self._viewer
@classmethod def _purge_tmp_dirs(cls): """Delete temporary directories made for the unix socket. When used with ipython (pylab mode), it seems that the objects are not properly deleted, i.e., temporary directories are not deleted. This is a work around for that. """ if cls._tmp_dir: shutil.rmtree(cls._tmp_dir) @classmethod def _stop_running_process(cls): """stop self generated DS9 windows when user quits python window.""" while cls._process_list: process = cls._process_list.pop() if process.poll() is None: process.terminate() def _stop_process(self): """stop the ds9 window process nicely. but only if this package started it """ try: if self._ds9_process: # none means not yet terminated if self._ds9_process.poll() is None: self.set("exit") if self._ds9_process in self._process_list: self._process_list.remove(self._ds9_process) except XpaException as e: print(f"XPA Exception: {repr(e)}") def _purge_local(self): """remove temporary directories from the unix socket.""" if not self._need_to_purge: return if not self._quit_ds9_on_del: warnings.warn( "You need to manually delete tmp. dir ({0:s})".format( self._tmpd_name)) return self._stop_process() # add a wait for the process to terminate before trying to delete the # tree time.sleep(0.5) try: shutil.rmtree(self._tmpd_name) except OSError: warnings.warn( "Warning : couldn't delete the temporary \ directory ({0:s})".format(self._tmpd_name,)) self._need_to_purge = False
[docs] def close(self): """close the window and end connection.""" # make sure we clean up the object and quit_ds9 local files if 'local' in self._xpa_method or 'tmp' in self._xpa_method: self._purge_local() else: self._stop_process()
[docs] def run_inet_ds9(self): """start a new ds9 window using an inet socket connection. Notes ----- It is given a unique title so it can be identified later. """ env = os.environ # this is the title of the window, without a nameserver connection # is there a way to get the inet x:x address? # that should be unique enough, something better? xpaname = "imexam" + str(time.time()) try: p = Popen([self._ds9_path, "-xpa", "inet", "-title", xpaname], shell=False, env=env) self._ds9_process = p self._process_list.append(p) self._need_to_purge = False return xpaname except Exception as e: # refine error class warnings.warn("Opening ds9 failed") print(f"Exception: {repr(e)}") from signal import SIGTERM try: pidtokill = p.pid except NameError: # in case p failed at the initialization level pidtokill = None if pidtokill is not None: os.kill(pidtokill, SIGTERM) raise e
def _run_unixonly_ds9(self): """start new ds9 window and connect to object using a unix socket. Notes ----- When the xpa method in libxpa parses a given template as a unix socket, it checks if the template string starts with tmpdir (from env["XPA_TMPDIR"] or default to /tmp/.xpa). This can make having multiple instances of ds9 a bit difficult, but if you give it unique names or use the inet address you should be fine For unix only, we run ds9 with XPA_TMPDIR set to temporary directory whose prefix start with /tmp/xpa (eg, /tmp/xpa_sf23f), them set os.environ["XPA_TMPDIR"] (which affects xpa set and/or get command from python) to /tmp/xpa. """ env = os.environ wait_time = self.wait_time self._tmpd_name = mkdtemp( prefix="xpa_" + env.get( "USER", ""), dir="/tmp") # this is the first directory the servers looks for on the path env["XPA_TMPDIR"] = self._tmpd_name unix_name = "{0:s}/.IMT".format(self._tmpd_name) # that should be unique enough, something better? title = str(time.time()) try: # unix only flag disables the fifos and inet connections p = Popen([self._ds9_path, "-xpa", "local", "-unix_only", "-title", title, "-unix", "{0:s}".format(unix_name)], shell=False, env=env) # wait until ds9 starts and the .IMT socket exists while wait_time > 0: file_list = os.listdir(self._tmpd_name) if ".IMT" in file_list: break time.sleep(0.5) wait_time -= 0.5 if wait_time == 0: from signal import SIGTERM os.kill(p.pid, SIGTERM) print(f"Connection timeout with the ds9. Try to increase the \ *wait_time* parameter (current value \ is {self.wait_time} s)") except (OSError, ValueError, AttributeError) as e: warnings.warn("Starting ds9 failed") shutil.rmtree(self._tmpd_name) else: self._tmp_dir = self._tmpd_name self._ds9_process = p self._process_list.append(p) # this might be sketchy try: file_list.remove(".IMT") # should be in the directory, if not except (ValueError, IOError): warnings.warn("IMT not found in tmp, using first thing in list") if len(file_list) > 0: xpaname = os.path.join(self._tmpd_name, file_list[0]) else: shutil.rmtree(self._tmpd_name) raise ValueError("Problem starting ds9 local socket connection") env["XPA_TMPDIR"] = "/tmp/xpa" # for all local connections self._need_to_purge = True self._xpa_method = 'local' return xpaname, unix_name def _check_ds9_process(self): """Check to see if the ds9 process is still running. Notes ----- If you start a ds9 window from the shell and then connect to imexam, imexam will not have a reference for the process, so this method ignores that state. """ if self._ds9_process: ret = self._ds9_process.poll() if ret: raise RuntimeError("The ds9 process is externally killed.") self._purge_local()
[docs] def set(self, param, buf=None): """XPA set method to ds9 instance. Notes ----- This function is linked with the Cython implementation set(param, buf=None) param : parameter string (eg. "fits" "regions") buf : aux data string (sometime string needed to be ended with CR) """ self._check_ds9_process() self.xpa.set(param, buf)
[docs] def get(self, param): """XPA get method to ds9 instance which returns received string. Parameters ---------- param : parameter string (eg. "fits" "regions") Notes ----- This function is linked with the Cython implementation get(param) """ self._check_ds9_process() return self.xpa.get(param)
[docs] def readcursor(self): """Returns the image coordinate postion and key pressed. Notes ----- XPA returns strings of the form: u a 257.5 239 """ try: xpa_string = self.get("imexam any coordinate image") except XpaException as e: print(f"Xpa problem reading cursor: {repr(e)}") raise KeyError except ValueError: raise ValueError("Outside of data range") k, x, y = xpa_string.split() # ds9 is returning 1 based array, set to 0-based since # imexamine uses numpy for array crunching return float(x)-1, float(y)-1, str(k)
[docs] def alignwcs(self, on=True): """align wcs. Parameters ---------- on: bool Align the images using the WCS in their headers """ self.set("align {0:s}".format(str(on)))
[docs] def clear_contour(self): """clear contours from the screen.""" self.set("contour clear")
def _define_cmaps(self): """setup the default color maps which are available.""" self._cmap_syntax = """ Syntax: cmap [<colormap>] [file] [load <filename>] [save <filename>] [invert true|false] [value <constrast> <bias>] [tag [load|save] <filename>] [tag delete] [match] [lock [true|false]] [open|close] Examples -------- >obj.cmap(map="gist_heat") >obj.cmap(invert=True) """ # is there a list I can pull automatically from ds9? self._cmap_colors = ["grey", "red", "green", "red", "blue", "a", "b", "bb", "he", "i8", "aips0", "sls", "hsv", "heat", "cool", "rainbow", "standard", "staircase", "color"]
[docs] def cmap(self, color=None, load=None, invert=False, save=False, filename='colormap.ds9'): """Set the color map table, using a defined list of options. Parameters ---------- color: string color must be set to one of the available DS9 color map names load: string, optional set to the filename which is a valid colormap lookup table valid contrast values are from 0 to 10, and valid bias values are from 0 to 1 invert: bool, optional invert the colormap save: bool, optional save the current colormap as a file filename: string, optional the name of the file to save the colormap to """ if color: color = color.lower() if color.lower() in self._cmap_colors: cstring = "cmap {0:s}".format(color) self.set(cstring) else: print("Unrecognized color map, choose one of these:") print(self._cmap_colors) if invert: invert = 'yes' cstring = 'cmap invert {0:s}'.format(invert) self.set(cstring) if load: cstring = 'cmap load {0:s}'.format(load) self.set(cstring) if save: cstring = 'cmap save {0:s}'.format(filename) self.set(cstring) if not color and not load: print(self._cmap_colors)
[docs] def colorbar(self, on=True): """turn the colorbar on the bottom of the window on and off. Parameters ---------- on: bool Set to True to turn on the colorbar, False to turn it off """ self.set("colorbar {0:s}".format(str(on)))
[docs] def contour(self, on=True, construct=True): """show contours on the window. Parameters ---------- on: bool Set to true to turn on contours construct: bool, optional Will open the contour dialog box which has more options """ self.set("contour {0:s}".format(str(on))) if construct: self.set("contour levels")
[docs] def contour_load(self, filename): """load a contour file into the window. Parameters ---------- filename: string The name of the file with the contour level defined """ if filename: self.set("contour loadlevels {0:s}".format(str(filename))) else: warnings.warn("No filename provided for contours")
[docs] def crosshair(self, x=None, y=None, coordsys="physical", skyframe="wcs", skyformat="fk5", match=False, lock=False): """Control the position of the crosshair in the current frame. crosshair mode is turned on automatically Parameters ---------- x: string or int The value of x is converted to a string for the call to XPA, use a value here appropriate for the skyformat you choose y: string or int The value of y is converted to a string for the call to XPA, use a value here appropriate for the skyformat you choose coordsys: string, optional The coordinate system your x and y are defined in skyframe: string, optional If skyframe has "wcs" in it then skyformat is also sent to the XPA skyformat: string, optional Used with skyframe, specifies the format of the coordinate which were given in x and y match: bool, optional If set to True, then the wcs is matched for the frames lock: bool, optional If set to True, then the frame is locked in wcs """ if x and y: if "wcs" in skyframe: format = skyformat else: format = "" self.window.set("crosshair {0:s} {1:s} {2:s} {3:s}".format( str(x), str(y), str(coordsys), format)) if match: self.window.set("crosshair match wcs") if lock: self.window.set("crosshair lock wcs")
[docs] def cursor(self, x=None, y=None): """move the cursor in the current frame to the specified image pixel. selected regions will also be moved Parameters ---------- x: int pixel location x coordinate to move to y: int pixel location y coordinate to move to x and y are converted to strings for the call """ if x and y: self.set("cursor {0:s} {1:s}".format(str(x), str(y))) else: warnings.warn( "You need to supply both an x and y location for the cursor")
[docs] def disp_header(self): """Display the header of the current image to a DS9 window.""" cstring = "header " try: self.set(cstring) # display the header of the current frame except XpaException as e: raise XpaException("XPA Exception getting header: {0}".format(e))
[docs] def frame(self, n=None): """convenience function to change or report frames. Parameters ---------- n: int, string, optional The frame number to open or change to. If the number specified doesn't exist, a new frame will be opened If nothing is specified, then the current frame number will be returned. The value of n is converted to a string before passing to the XPA Examples -------- frame(1) sets the current frame to 1 frame("last") set the current frame to the last frame frame() returns the number of the current frame frame("new") opens a new frame frame(3) opens frame 3 if it doesn't exist already, otherwise goes to frame 3 """ frame = self.get("frame").strip() # xpa returns '\n' for no frame if frame: if n: if "delete" in str(n): if frame in self._viewer: del self._viewer[frame] self.set("frame {0:s}".format(str(n))) else: try: if len(frame) < 1: self.set("frame 1") # the user can delete frame 1, but ds9 defaults to 1, # so set it return '1' else: return str(frame) except XpaException as e: raise XpaException( "XPA Exception getting frame: {0}".format(e)) else: if len(frame) < 1: self.set("frame 1") # the user can delete frame 1, but ds9 defaults to 1, so set it return '1'
[docs] def iscube(self): """return whether a cube image is displayed in the current frame.""" frame = self.frame() if frame != self._current_frame: self._set_frameinfo() else: return self._viewer[frame]['iscube']
[docs] def get_slice_info(self): """return the slice tuple that is currently displayed.""" self._set_frameinfo() frame = self.frame() if self._viewer[frame]['iscube']: image_slice = self._viewer[frame]['naxis'] else: image_slice = None return image_slice
[docs] def get_data(self): """return a numpy array of the data displayed in the current frame. Notes ----- This is the data array that the imexam() function from connect() uses for analysis astropy.io.fits stores data in row-major format. So a 4d image would be [NAXIS4, NAXIS3, NAXIS2, NAXIS1] just the one image is retured in the case of multidimensional data, not the cube """ # make sure the filename and extension info are correct for the current # frame in DS9, users can change this in the gui # users can change frame and slice before calling this, best to check self._set_frameinfo() frame = self.frame() if frame: if isinstance(self._viewer[frame]['user_array'], np.ndarray): return self._viewer[frame]['user_array'] else: filename = self._viewer[frame]['filename'] extver = self._viewer[frame]['extver'] extname = self._viewer[frame]['extname'] naxis = self._viewer[frame]['naxis'] if self._viewer[frame]['mef']: with fits.open(filename) as filedata: if self._viewer[frame]['iscube']: data = filedata[extname, extver].section[naxis] else: data = filedata[extver].data return data else: with fits.open(filename) as filedata: data = filedata[0].data return data
[docs] def get_image(self): """return the full image object instead of just the data array.""" print("Returning just the data array, open the image file \ for the full object") self.get_data()
[docs] def get_header(self, fitsobj=False): """Return the current fits header. The return value is the string or None if there's a problem If fits is True then a fits header object is returned instead """ # TODO return the simple header for arrays which are loaded frame = self.frame() if frame != self._current_frame: self._set_frameinfo() if self._viewer[frame]['filename']: if fitsobj: header = fits.getheader(self._viewer[frame]['filename']) else: try: header = self.get("fits header") except XpaException as e: print(f"XPA Exception getting header: {repr(e)}") return None return header else: warnings.warn("No file with header loaded into ds9") return None
[docs] def grid(self, on=True, param=False): """convenience to turn the grid on and off. grid can be flushed with many more options Parameters ---------- on: bool, optional Will turn the grid overlay on in the current frame param: bool, optional Will open the parameter dialog in DS9 """ self.set("grid {0:s}".format(str(on))) if param: self.set("grid open")
[docs] def hideme(self): """lower the ds9 window.""" self.set("lower")
[docs] def embed(self): """Embed the viewer in a notebook.""" print("Not Implemented for DS9")
[docs] def load_fits(self, fname, extver=None, mecube=False): """convenience function to load fits image to current frame. Parameters ---------- fname: string, FITS object The name of the file to be loaded. You can specify the full extension in the name, such as filename_flt.fits or filename_flt.fits[1] You can also pass it an in-memory FITS object extver: int, optional The extension to load (EXTVER in the header) mecube: bool, optional If mecube is True, load the fits file as a cube into ds9 Notes ----- To tell ds9 to open a file whose name or path includes spaces, surround the path with "{...}", e.g. % xpaset -p ds9 file "{foo bar/my image.fits}" This is assuming that the image loads into the current or next new frame, watch the internal file and ext values because the user can switch frames through DS9 app itself XPA needs to have the absolute path to the filename so that if the DS9 window was started in another directory it can still find the file to load. The pathname also needs to be stripped of spaces. """ # for the viewer reference frame = self.frame() if frame is None: frame = 1 # load into first frame if isinstance(fname, fits.hdu.image.PrimaryHDU): shortname = fname extn = None extv = 0 if extver is None: extver = extv if isinstance(fname, fits.hdu.hdulist.HDUList): shortname = fname extn = None extv = extver elif isinstance(fname, str): shortname, extn, extv = util.verify_filename(fname) if extn is not None: raise ValueError("Extension name given, must " "specify the absolute extension you want") # prefer the keyword value over the extension in the name if extver is None: extver = extv else: raise TypeError("Expected FITS data as input") # safety for a valid imexam file if ((extv is None) and (extver is None)): mef_file, nextend, first_image = util.check_valid(shortname) extver = first_image # the extension of the first IMAGE if isinstance(fname, str): if mecube: cstring = "mecube {0:s}".format(shortname) else: cstring = ('fits {0:s}[{1:d}]'.format(shortname, extver)) self.set(cstring) # make sure any previous reference is reset self._set_frameinfo() self._viewer[frame]['user_array'] = None else: self.view(fname[extver].data)
[docs] def load_region(self, filename): """Load regions from a file which uses ds9 standard formatting. Parameters ---------- filename: string The file containing the DS9 style region description """ if os.access(filename, os.F_OK): self.set("regions load {0:s}".format(filename)) else: warnings.warn("No such file:{0:s}".format(filename))
[docs] def load_rgb(self, red, green, blue, scale=False, lockwcs=False): """load 3 images into an RGBimage frame. Parameters ---------- red: string The name of the fits file loaded into the red channel green: string The name of the fits file loaded into the green channel blue: string The name of the fits file loaded into the blue channel scale: bool If True, then each image will be scaled with zscale() after loading lockwcs: bool If True, then the image positions will be locked to each other using the WCS information in their headers """ self.set("rgb new") self.set("rgb channel red") self.load_fits(red) if scale: self.scale() self.set("rgb channel green") self.load_fits(green) if scale: self.scale() self.set("rgb channel blue") self.load_fits(blue) if scale: self.scale() if lockwcs: self.set("rgb wcs yes")
[docs] def load_mef_as_cube(self, filename=None): """Load a Mult-Extension-Fits image into one frame as an image cube.""" if not filename: print("No filename specified") else: self.set(f"file mecube new {filename}")
[docs] def load_mef_as_multi(self, filename=None): """Load a Mult-Extension-Fits image into multiple frames.""" if not filename: print("No filename specified") else: self.set(f"file multiframe {filename}")
[docs] def mark_region_from_array(self, input_points, ptype="image", textoff=10, size=4): """mark ds9 regions regions given an input list of tuples. a convienence function, you can also use set_region Parameters ---------- input_points: iterator, a tuple, or list of tuples or a string: (x,y,comment), ptype: string the reference system for the point locations, image|physical|fk5 size: int the size of the region marker textoff: string the offset for the comment text, if comment is empty it will not show Notes ----- only circular regions are supported currently """ if isinstance(input_points, tuple): input_points = [input_points] elif isinstance(input_points, str): input_points = [tuple(input_points.split())] x = 0 y = 1 comment = 2 rtype = "circle" # only one supported right now for location in input_points: if rtype == "circle": pline = rtype + " " + \ str(location[x]) + " " + str(location[y]) + " " + str(size) self.log.info(pline) self.set_region(pline) try: if(len(str(location[comment])) > 0): pline = "text " + str(float(location[x]) + textoff) +\ " " + str(float(location[y]) + textoff) + " '" +\ str(location[comment]) + "' #font=times" self.log.info(pline) self.set_region(pline) except IndexError: pass
[docs] def make_region(self, infile, labels=False, header=0, textoff=10, size=5): """make an input reg file with [x,y,comment] to a DS9 reg file. the input file should contain lines specifying x,y,comment Parameters ---------- infile: str input filename labels: bool add labels to the regions header: int number of header lines in text file to skip textoff: int offset in pixels for labels size: int size of the region type Notes ----- only circular regions are supported currently """ try: f = open(infile, 'r') lines = f.readlines() f.close() except IOError as e: warnings.warn(f"Unable to open input file") print("{repr(e)}") raise ValueError # assumed defaults for simple regions file point = "circle" # only one supported right now delta = textoff # pixels to offset text lines = lines[header:] text = list() x = list() y = list() for i in range(0, len(lines), 1): words = lines[i].split(',') x.append(words[0].strip()) y.append(words[1].strip()) if(len(words) > 2 and labels): text.append(words[2].strip()) # now write out to a reg file out = infile + ".reg" f = open(out, 'w') for i in range(0, len(lines), 1): pline = "image; " + point + \ "(" + x[i] + "," + y[i] + "," + str(size) + ")\n" f.write(pline) if(len(text) > 0): pline = "image;text(" + str(float(x[i]) + delta) + "," + \ str(float(y[i]) + delta) + \ "{ " + text[i] + " })# font=\"time 12 bold\"\n" f.write(pline) f.close() self.log.info(f"output reg file saved to: {out}")
[docs] def match(self, coordsys="wcs", frame=True, crop=False, fslice=False, scale=False, bin=False, colorbar=False, smooth=False, crosshair=False): """match all other frames to the current frame. Parameters ---------- coordsys: string, optional The coordinate system to use frame: bool, optional Match all other frames to the current frame, using the set coordsys crop: bool, optional Set the current image display area, using the set coordsys fslice: bool, optional Match current slice in all frames scale: bool, optional Match to the current scale for all frames bin: bool, optional Match to the current binning for all frames colorbar: bool, optional Match to the current colorbar for all frames smooth: bool, optional Match to the current smoothing for all frames crosshair: bool, optional Match the crosshair in all frames, using the current coordsys Notes ----- You can only choose one of the options at a time, and the logic will select the first True option so set frame=False and something else in addition to your choice if you don't want the default option. """ cstring = "match " if frame: cstring += "frame {0:s}".format(coordsys) elif crosshair: cstring += "crosshair {0:s}".format(coordsys) elif crop: cstring += "crop {0:s}".format(coordsys) elif fslice: cstring += "slice" elif bin: cstring += "bin" elif scale: cstring += "scale" elif colorbar: cstring += "colorbar" elif smooth: cstring += "smooth" self.set(cstring)
[docs] def nancolor(self, color="red"): """set the not-a-number color, default is red. Parameters ---------- color: string The color to use for NAN pixels """ self.set("nan {0:s}".format(color))
[docs] def panto_image(self, x, y): """convenience function to change to x,y physical image coordinates. Parameters ---------- x: float X location in physical coords to pan to y: float Y location in physical coords to pan to """ self.set("pan to {0:f} {1:f} image".format(x, y))
[docs] def panto_wcs(self, x, y, system='fk5'): """pan to wcs location coordinates in image. Parameters ---------- x: string The x location to move to, specified using the given system y: string The y location to move to system: string The reference system that x and y were specified in, they should be understood by DS9 """ self.set("pan to {0:s} {1:s} wcs {2:s}".format(x, y, system))
[docs] def rotate(self, value=None, to=False): """rotate the current frame (in degrees). the current rotation is printed with no params Parameters ---------- value: float [degrees] Rotate the current frame {value} degrees If value is 0, then the current rotation is printed to: bool Rotate the current frame to the specified value """ if value is None: print("Image rotated at {0:s}".format(self.get("rotate"))) elif to and value >= 0: cstring = "rotate to {0:s}".format(str(value)) self.set(cstring) elif value > 0: cstring = "rotate {0:s}".format(str(value)) self.set(cstring) cstring = "Image rotated at {0:s}".format(self.get("rotate")) self.log.info(cstring)
[docs] def save_regions(self, filename=None): """save the regions in the current window to a DS9 style regions file. Parameters ---------- filename: string The nameof th file to which the regions displayed in the current window are saved. If no filename is provided then it will try and save the regions to the name of the file in the current display with _regions.txt appended If a file of that name already exists on disk it will no attempt to overwrite it """ regions = self.get("regions save") frame = self.frame() if frame and not filename: filename = self._viewer[frame]['filename'] + "_regions.txt" # check if the file already exists if not os.access(filename, os.F_OK): with open(filename, "w") as region_file: region_file.write(regions) else: warnings.warn( "File already exists: {0} try again".format(filename))
[docs] def save_rgb(self, filename=None): """save an rgbimage frame as an MEF fits file. Parameters ---------- filename: string The name of the output fits image """ if not filename: print("No filename specified, try again") else: self.set("save rgbimage {0:s}".format(filename))
[docs] def scale(self, scale='zscale'): """The default zscale is the most widely used option. Parameters ---------- scale: string The scale for ds9 to use, these are set strings of [linear|log|pow|sqrt|squared|asinh|sinh|histequ] Notes ----- The xpa doesn't return an error if you set an unknown scale, it just doesn't do anything, this is true for all the xpa calls """ _help = """Syntax: scales available: [linear|log|pow|sqrt|squared|asinh|sinh|histequ] [log exp <value>] [datasec yes|no] [limits <minvalue> <maxvalue>] [mode minmax|<value>|zscale|zmax] [scope local|global] [match] [lock [yes|no]] [open|close] """ mode_scale = ["zscale", "zmax", "minmax"] cstring = f"scale {scale}" if scale in mode_scale: cstring = f"scale mode {scale}" try: self.set(cstring) except (XpaException, ValueError): print(f"{cstring} not valid") print(_help)
[docs] def set_region(self, region_string=""): """display a region using the specifications in region_string. Parameters ---------- region_string: string Should take the form of a region string that DS9 is expecting Examples -------- set_region("physical ruler 200 300 200 400") set_region("line 0 400 3 400 #color=red") """ command = f"regions command {{ {region_string} }}\n" self.set(command)
[docs] def showme(self): """raise the ds9 window.""" self.set("raise")
[docs] def showpix(self, close=False): """display the pixel value table, close window when done. Parameters ---------- close: bool, optional If set to True, then the pixel table dialog window is closed """ self.get("pixeltable") if close: self.set("pixeltable close")
[docs] def snapsave(self, filename=None, format=None, resolution=100): """Create a snap shot of the current window, save in specified format. This function bypasses the XPA calling routines to avoid a bug with the X11/XPA interface. Instead is uses the os function which takes a snapshot of the specified window. Parameters ---------- filename: str, optional filename of output image, the extension in the filename can also be used to specify the format. If no filename is specified, then the filename will be constructed from the name of the image displayed image with _snap.png appended. format: str, optional available formats are fits, eps, gif, tiff, jpeg, png If no format is specified the filename extension is used resolution: int, optional 1 to 100, for jpeg images """ frame = self.frame() if not filename: filename = self._viewer[frame]['filename'] if not format: filename += "_snap.png" else: filename += "_snap." + format # screencapture works with OSX if sys.platform == "darwin": print("Not implemented for darwin platform") return None else: # import only works with x11 windows cstring = ['import'] cstring.append('-window ') cstring.append(self._xpa_name) if "jpeg" in filename: cstring.append(' -quality') cstring.append(str(resolution)) cstring.append(filename) # self.set(cstring) # save the local directory, erase later? call(cstring) print(f"Image saved to {filename}") self.log.info(f"Image saved to {filename}") return(filename)
[docs] def grab(self): """Make a copy of the image view.""" valid_fname = self.snapsave(format="png") if valid_fname: if "nbagg" in get_backend().lower(): # save inside the notebook data = mpimage.imread(valid_fname) plt.clf() return plt.imshow(data, origin="upper")
[docs] def view(self, img): """Display numpy image array to current frame. Parameters ---------- img: numpy array The array containing data, it will be forced to numpy.array() """ frame = self.frame() if not frame: print("No valid frame") else: img = np.array(img) if img.dtype.type == np.bool8: img = img.astype(np.uint8) # arrays of 1 through 3 dimensions dim = img.ndim if dim == 2: (ydim, xdim) = img.shape dims = "[xdim={0:d},ydim={1:d},".format(xdim, ydim) elif dim == 3: (zdim, ydim, xdim) = img.shape dims = "[xdim={0:d},ydim={1:d},zdim={2:d},".format(xdim, ydim, zdim) else: raise UnsupportedImageShapeException(repr(img.shape)) if img.dtype.byteorder in ["=", "|"]: dt = img.dtype.newbyteorder(">") img = np.array(img, dtype=dt) byteorder = ">" else: byteorder = img.dtype.byteorder endianness = {">": ",arch=bigendian", "<": ",arch=littleendian"}[byteorder] arr_str = img.tostring() try: bitpix = self._ImgCode[img.dtype.name] except KeyError as e: raise UnsupportedDatatypeException(e) option = (dims + "bitpix={0:d}{1:s}]".format(bitpix, endianness)) try: self.set("array " + option, arr_str) self._set_frameinfo() self._viewer[frame]['user_array'] = img except XpaException as e: raise XpaException( "XPA: {0} : Problem loading array \ into frame {1}".format(e, frame))
[docs] def zoomtofit(self): """Zoom to fit the image to the viewer.""" self.zoom("to fit")
[docs] def zoom(self, par="to fit"): """Zoom using the specified command. Parameters ---------- par: string - it can be a number (ranging 0 to 8 effectively), and successive calls continue zooming in the same direction - it can be two numbers '4 2', which specify zoom on different axis - if can be to a specific value 'to 8' or 'to fit' - it can be 'open' to open the dialog box - it can be 'close' to close the dialog box (only valid if the box is already open) Examples -------- zoom('0.1') """ try: self.set("zoom {0:s}".format(str(par))) except XpaException: print("XPA problem with zoom (probably your zoom " "window is already closed)")
[docs] def show_xpa_commands(self): """Print the available XPA commands.""" print(self.get('')) # With empty string, all commands are returned
[docs] def reopen(self): """Reopen a closed window.""" print("Not available for DS9, start a new object instead") raise NotImplementedError
atexit.register(ds9._purge_tmp_dirs) atexit.register(ds9._stop_running_process)