Source code for pytemscript.modules.acquisition

from typing import Optional, Dict
import time
import logging
from datetime import datetime
from functools import lru_cache

from ..utils.misc import RequestBody, convert_image
from ..utils.enums import AcqImageSize, AcqShutterMode, PlateLabelDateFormat, ScreenPosition
from .extras import Image, SpecialObj


class AcquisitionObj(SpecialObj):
    """ Wrapper around cameras COM object with specific acquisition methods. """
    def __init__(self, com_object):
        super().__init__(com_object)
        self.current_camera = None

    def show_film_settings(self) -> Dict:
        """ Returns a dict with film settings. """
        film = self.com_object
        return {
            "stock": film.Stock,  # Int
            "exposure_time": film.ManualExposureTime,
            "film_text": film.FilmText,
            "exposure_number": film.ExposureNumber,
            "user_code": film.Usercode,  # 3 digits
            "screen_current": film.ScreenCurrent * 1e9
        }

    def acquire_film(self,
                     film_text: str,
                     exp_time: float) -> None:
        """ Expose a film. """
        film = self.com_object
        film.PlateLabelDataType = PlateLabelDateFormat.DDMMYY
        exp_num = film.ExposureNumber
        film.ExposureNumber = exp_num + 1
        film.MainScreen = ScreenPosition.UP
        film.ScreenDim = True
        film.FilmText = film_text.strip()[:96]
        film.ManualExposureTime = exp_time
        film.TakeExposure()

    def show_stem_detectors(self) -> Dict:
        """ Returns a dict with STEM detectors parameters. """
        stem_detectors = dict()
        for d in self.com_object:
            info = d.Info
            name = info.Name
            stem_detectors[name] = {"binnings": [int(b) for b in info.Binnings]}
        return stem_detectors

    def show_cameras(self) -> Dict:
        """ Returns a dict with parameters for all TEM cameras. """
        tem_cameras = dict()
        for cam in self.com_object:
            info = cam.Info
            param = cam.AcqParams
            name = info.Name
            tem_cameras[name] = {
                "supports_csa": False,
                "supports_cca": False,
                "height": info.Height,
                "width": info.Width,
                "pixel_size(um)": (info.PixelSize.X / 1e-6, info.PixelSize.Y / 1e-6),
                "binnings": [int(b) for b in info.Binnings],
                "shutter_modes": [AcqShutterMode(x).name for x in info.ShutterModes],
                "pre_exposure_limits(s)": (param.MinPreExposureTime, param.MaxPreExposureTime),
                "pre_exposure_pause_limits(s)": (param.MinPreExposurePauseTime,
                                                 param.MaxPreExposurePauseTime)
            }

        return tem_cameras

    def show_cameras_csa(self) -> Dict:
        """ Returns a dict with parameters for all TEM cameras that support CSA. """
        csa_cameras = dict()
        for cam in self.com_object.SupportedCameras:
            self.com_object.Camera = cam
            param = self.com_object.CameraSettings.Capabilities
            csa_cameras[cam.Name] = {
                "supports_csa": True,
                "supports_cca": False,
                "height": cam.Height,
                "width": cam.Width,
                "pixel_size(um)": (cam.PixelSize.Width / 1e-6, cam.PixelSize.Height / 1e-6),
                "binnings": [int(b.Width) for b in param.SupportedBinnings],
                "exposure_time_range(s)": (param.ExposureTimeRange.Begin,
                                           param.ExposureTimeRange.End),
                "supports_dose_fractions": param.SupportsDoseFractions,
                "max_number_of_fractions": param.MaximumNumberOfDoseFractions,
                "supports_drift_correction": param.SupportsDriftCorrection,
                "supports_electron_counting": param.SupportsElectronCounting,
                "supports_eer": getattr(param, 'SupportsEER', False)
            }

        return csa_cameras

    def show_cameras_cca(self, tem_cameras: Dict) -> Dict:
        """ Update input dict with parameters for all TEM cameras that support CCA. """
        for cam in self.com_object.SupportedCameras:
            if cam.Name in tem_cameras:
                self.com_object.Camera = cam
                param = self.com_object.CameraSettings.Capabilities
                tem_cameras[cam.Name].update({
                    "supports_cca": True,
                    "supports_recording": getattr(param, 'SupportsRecording', False)
                })

        return tem_cameras

    def acquire(self, cameraName: str, **kwargs) -> Image:
        """ Perform actual acquisition. Camera settings should be set beforehand.

        :param str cameraName: Camera name
        :returns: Image object
        """
        acq = self.com_object
        acq.RemoveAllAcqDevices()
        acq.AddAcqDeviceByName(cameraName)
        t0 = time.time()
        imgs = acq.AcquireImages()
        t1 = time.time()
        image = convert_image(imgs[0], name=cameraName, **kwargs)
        t2 = time.time()
        logging.debug("\tAcquisition took %f s" % (t1 - t0))
        logging.debug("\tConverting image took %f s" %(t2 - t1))

        return image

    def acquire_advanced(self,
                         cameraName: str,
                         recording: bool = False,
                         **kwargs) -> Optional[Image]:
        """ Perform actual acquisition with advanced scripting. """
        if recording:
            self.com_object.CameraContinuousAcquisition.Start()
            #self.com_object.CameraContinuousAcquisition.Wait()
            return None
        else:
            t0 = time.time()
            img = self.com_object.CameraSingleAcquisition.Acquire()
            t1 = time.time()
            #self.com_object.CameraSingleAcquisition.Wait()
            image = convert_image(img, name=cameraName, advanced=True, **kwargs)
            t2 = time.time()
            logging.debug("\tAcquisition took %f s" % (t1 - t0))
            logging.debug("\tConverting image took %f s" % (t2 - t1))
            return image

    def restore_shutter(self,
                        cameraName: str,
                        prev_shutter_mode: int) -> None:
        """ Restore global shutter mode after exposure. """
        camera = None
        for cam in self.com_object:
            if cam.Info.Name == cameraName:
                camera = cam
                break
        if camera is None:
            raise KeyError("No camera with name %s. If using standard scripting the "
                           "camera must be selected in the microscope user interface" % cameraName)

        camera.Info.ShutterMode = prev_shutter_mode

    def set_tem_presets(self,
                        cameraName: str,
                        size: AcqImageSize = AcqImageSize.FULL,
                        exp_time: float = 1.0,
                        binning: int = 1,
                        **kwargs) -> Optional[int]:

        for cam in self.com_object:
            if cam.Info.Name == cameraName:
                self.current_camera = cam
                break

        if self.current_camera is None:
            raise KeyError("No camera with name %s. If using standard scripting the "
                           "camera must be selected in the microscope user interface" % cameraName)

        info = self.current_camera.Info
        settings = self.current_camera.AcqParams
        settings.ImageSize = size

        settings.Binning = binning
        prev_shutter_mode = None

        if 'correction' in kwargs:
            settings.ImageCorrection = kwargs['correction']
        if 'exposure_mode' in kwargs:
            settings.ExposureMode = kwargs['exposure_mode']
        if 'shutter_mode' in kwargs:
            # Save previous global shutter mode
            prev_shutter_mode = info.ShutterMode
            info.ShutterMode = kwargs['shutter_mode']
        if 'pre_exp_time' in kwargs:
            if kwargs['shutter_mode'] != AcqShutterMode.BOTH:
                raise RuntimeError("Pre-exposures can only be be done "
                                   "when the shutter mode is set to BOTH")
            settings.PreExposureTime = kwargs['pre_exp_time']
        if 'pre_exp_pause_time' in kwargs:
            if kwargs['shutter_mode'] != AcqShutterMode.BOTH:
                raise RuntimeError("Pre-exposures can only be be done when "
                                   "the shutter mode is set to BOTH")
            settings.PreExposurePauseTime = kwargs['pre_exp_pause_time']

        # Set exposure after binning, since it adjusted automatically when binning is set
        settings.ExposureTime = exp_time

        return prev_shutter_mode

    def set_tem_presets_advanced(self,
                                 cameraName: str,
                                 size: AcqImageSize = AcqImageSize.FULL,
                                 exp_time: float = 1.0,
                                 binning: int = 1,
                                 use_cca: bool = False,
                                 **kwargs) -> None:
        eer = kwargs.get("eer")
        if use_cca:
            for cam in self.com_object.CameraContinuousAcquisition.SupportedCameras:
                if cam.Name == cameraName:
                    self.current_camera = cam
                    break
        else: # CSA
            for cam in self.com_object.CameraSingleAcquisition.SupportedCameras:
                if cam.Name == cameraName:
                    self.current_camera = cam
                    break

        if self.current_camera is None:
            raise KeyError("No camera with name %s. If using standard scripting the "
                           "camera must be selected in the microscope user interface" % cameraName)

        if not self.current_camera.IsInserted:
            self.current_camera.Insert()

        if 'recording' in kwargs:
            self.com_object.CameraContinuousAcquisition.Camera = self.current_camera
            settings = self.com_object.CameraContinuousAcquisition.CameraSettings
            capabilities = settings.Capabilities
            if hasattr(capabilities, 'SupportsRecording') and capabilities.SupportsRecording:
                settings.RecordingDuration = kwargs['recording']
            else:
                raise NotImplementedError("This camera does not support continuous acquisition")

        else:
            self.com_object.CameraSingleAcquisition.Camera = self.current_camera
            settings = self.com_object.CameraSingleAcquisition.CameraSettings
            capabilities = settings.Capabilities

        # Unfortunately, settings.Binning is an interface, not a simple int
        for b in capabilities.SupportedBinnings:
            if int(b.Width) == int(binning):
                settings.Binning = b

        settings.ReadoutArea = size
        # Set exposure after binning, since it adjusted automatically when binning is set
        settings.ExposureTime = exp_time

        if 'align_image' in kwargs:
            if capabilities.SupportsDriftCorrection:
                settings.AlignImage = kwargs['align_image']
            else:
                raise NotImplementedError("This camera does not support drift correction")

        if 'electron_counting' in kwargs:
            if capabilities.SupportsElectronCounting:
                settings.ElectronCounting = kwargs['electron_counting']
            else:
                raise NotImplementedError("This camera does not support electron counting")

        # EER saving is supported in TEM server 7.6 (Titan 3.6 / Talos 2.6)
        if hasattr(capabilities, 'SupportsEER'):
            eer_is_supported = capabilities.SupportsEER
            if eer and eer_is_supported:
                settings.EER = eer
            elif eer and not eer_is_supported:
                raise NotImplementedError("This camera does not support EER")
            elif not eer and eer_is_supported:
                # EER param is persistent throughout camera COM object lifetime,
                # if not using EER we need to set it to False
                settings.EER = False

            if eer and not settings.ElectronCounting:
                raise RuntimeError("Electron counting should be enabled when using EER")
            if eer and 'group_frames' in kwargs:
                raise RuntimeError("No frame grouping allowed when using EER")

        if capabilities.SupportsDoseFractions:
            dfd = settings.DoseFractionsDefinition
            dfd.Clear()

        if kwargs.get('save_frames'):
            if not capabilities.SupportsDoseFractions:
                raise NotImplementedError("This camera does not support dose fractions")

            total = settings.CalculateNumberOfFrames()
            now = datetime.now()
            settings.SubPathPattern = cameraName + "_" + now.strftime("%d%m%Y_%H%M%S")
            output = settings.PathToImageStorage + settings.SubPathPattern

            if eer in [False, None]:
                group = kwargs.get('group_frames', 1)
                if group < 1:
                    raise ValueError("Frame group size must be at least 1")
                if group > total:
                    raise ValueError("Frame group size cannot exceed maximum possible "
                                     "number of frames: %d. Change exposure time." % total)

                frame_ranges = [(i, min(i + group, total)) for i in range(0, total-1, group)]
                logging.debug("Using frame ranges: %s", frame_ranges[:-1])
                for i in frame_ranges[:-1]:
                    dfd.AddRange(i[0], i[1])

                logging.info("Movie of %d fractions (%d frames, group=%d) "
                             "will be saved to: %s.mrc",
                             len(frame_ranges)-1, total, group, output)
                logging.info("MRC format can only contain images of up to "
                             "16-bits per pixel, to get true CameraCounts "
                             "multiply pixels by PixelToValueCameraCounts "
                             "factor found in the metadata")
            else:
                logging.info("Movie of %d frames will be saved to: %s.eer",
                             total, output)

    def set_stem_presets(self,
                         cameraName: str,
                         size: AcqImageSize = AcqImageSize.FULL,
                         dwell_time: float = 1e-5,
                         binning: int = 1,
                         **kwargs) -> None:

        for stem in self.com_object:
            if stem.Info.Name == cameraName:
                self.current_camera = stem
                break
        if self.current_camera is None:
            raise KeyError("No STEM detector with name %s" % cameraName)

        if 'brightness' in kwargs:
            self.current_camera.Info.Brightness = kwargs['brightness']
        if 'contrast' in kwargs:
            self.current_camera.Info.Contrast = kwargs['contrast']

        settings = self.com_object.AcqParams  # StemAcqParams
        settings.ImageSize = size
        settings.Binning = binning
        settings.DwellTime = dwell_time


[docs] class Acquisition: """ Image acquisition functions. """ __slots__ = ("__client", "__id_adv") def __init__(self, client): self.__client = client self.__id_adv = "tem_adv.Acquisitions" @property @lru_cache(maxsize=1) def __has_cca(self) -> bool: """ CCA is supported by Ceta 2. """ cca = RequestBody(attr=self.__id_adv + ".CameraContinuousAcquisition", validator=bool) return self.__client.has_advanced_iface and self.__client.call(method="has", body=cca) @property @lru_cache(maxsize=1) def __has_csa(self) -> bool: """ CSA is supported by Ceta 1, Ceta 2, Falcon 3, Falcon 4. """ csa = RequestBody(attr=self.__id_adv + ".CameraSingleAcquisition", validator=bool) return self.__client.has_advanced_iface and self.__client.call(method="has", body=csa) @property @lru_cache(maxsize=1) def __has_film(self) -> bool: body = RequestBody(attr="tem.Camera.Stock", validator=int) return self.__client.call(method="has", body=body) @staticmethod def __find_camera(cameraName: str, cameras_dict: Dict, binning: int) -> Dict: """ Check camera name and supported binning. """ camera_dict = cameras_dict.get(cameraName) if camera_dict is None: raise KeyError("No camera with name %s. If using standard scripting the " "camera must be selected in the microscope user interface" % cameraName) if binning not in camera_dict["binnings"]: raise ValueError("Unsupported binning value: %d" % binning) return camera_dict def __check_prerequisites(self) -> None: """ Check if buffer cycle or LN filling is running before acquisition call. """ counter = 0 while counter < 10: body = RequestBody(attr="tem.Vacuum.PVPRunning", validator=bool) if self.__client.call(method="get", body=body): logging.info("Buffer cycle in progress, waiting...\r") time.sleep(2) counter += 1 else: logging.info("Checking buffer levels...") break body = RequestBody(attr="tem.TemperatureControl.TemperatureControlAvailable", validator=bool) if self.__client.call(method="has", body=body): counter = 0 while counter < 40: body = RequestBody("tem.TemperatureControl.DewarsAreBusyFilling", validator=bool) if self.__client.call(method="get", body=body): logging.info("Dewars are filling, waiting...\r") time.sleep(30) counter += 1 else: logging.info("Checking dewars levels...") break def __acquire_with_tecnaiccd(self, cameraName: str, size: AcqImageSize, exp_time: float, binning: int, camerasize: int, **kwargs) -> Image: if not self.__client.has_ccd_iface: raise RuntimeError("Tecnai CCD plugin not found, did you " "pass useTecnaiCCD=True to the Microscope() ?") else: logging.info("Using TecnaiCCD plugin for Gatan camera") from ..plugins.tecnai_ccd import TecnaiCCDPlugin body = RequestBody(attr=None, obj_cls=TecnaiCCDPlugin, obj_method="acquire_image", cameraName=cameraName, size=size, exp_time=exp_time, binning=binning, camerasize=camerasize, **kwargs) image = self.__client.call(method="exec_special", body=body) logging.info("TEM image acquired on %s", cameraName) return image
[docs] def acquire_tem_image(self, cameraName: str, size: AcqImageSize = AcqImageSize.FULL, exp_time: float = 1.0, binning: int = 1, **kwargs) -> Optional[Image]: """ Acquire a TEM image. :param str cameraName: Camera name :param AcqImageSize size: Image size :param float exp_time: Exposure time in seconds :param int binning: Binning factor :keyword AcqImageCorrection correction: Image correction :keyword AcqExposureMode exposure_mode: CCD exposure mode :keyword AcqShutterMode shutter_mode: CCD shutter mode :keyword float pre_exp_time: The pre-exposure time in seconds. :keyword float pre_exp_pause_time: The time delay after pre-exposure and before the actual CCD exposure in seconds. :keyword bool align_image: Whether frame alignment (i.e. drift correction) is to be applied to the final image as well as the intermediate images. Advanced cameras only. :keyword bool electron_counting: Use counting mode. Advanced cameras only. :keyword bool eer: Use EER mode. Advanced cameras only. :keyword bool save_frames: Use to save movies. Advanced cameras only. :keyword bool group_frames: Group frames into fractions of this size. Advanced cameras only. :keyword float recording: minimum amount of time the acquisition will take, as it will take as much complete frames with the set exposure time as is needed to get to the set RecordingDuration. E.g. if the exposure time is 0.5 and the RecordingDuration is 2.3, there will be an acquisition of 2.5 (5 frames). Advanced cameras only. :keyword bool use_tecnaiccd: Use Tecnai CCD plugin to acquire image via Digital Micrograph, only for Gatan cameras. Requires Microscope() initialized with useTecnaiCCD=True :returns: Image object :rtype: Image Extra notes: - Keyword arguments correction, exposure_mode, shutter_mode, pre_exp_time, pre_exp_pause_time are only available for CCD cameras that use standard scripting. - Advanced cameras are Ceta 1, Ceta 2, Falcon 3, Falcon 4(i). - Counting mode and frame saving requires a separate license enabled in TEM software. - Continuous acquisition with recording is supported only by Ceta 2. - TecnaiCCD plugin is only available for Gatan CCD cameras. Usage: >>> microscope = Microscope() >>> acq = microscope.acquisition >>> img = acq.acquire_tem_image("BM-Falcon", AcqImageSize.FULL, exp_time=5.0, binning=1, electron_counting=True, align_image=True) >>> img.save("aligned_sum.mrc") >>> print(img.width) 4096 """ camera_dict = self.__find_camera(cameraName, self.cameras, binning) if kwargs.get("use_tecnaiccd", False): return self.__acquire_with_tecnaiccd(cameraName, size, exp_time, binning, camera_dict["width"], **kwargs) if kwargs.get("recording", False) and not camera_dict.get("supports_recording", False): raise NotImplementedError("Recording / continuous acquisition is not available") csa, cca = camera_dict["supports_csa"], camera_dict["supports_cca"] if not csa: # Use standard scripting body = RequestBody(attr="tem.Acquisition.Cameras", obj_cls=AcquisitionObj, obj_method="set_tem_presets", cameraName=cameraName, size=size, exp_time=exp_time, binning=binning, **kwargs) prev_shutter_mode = self.__client.call(method="exec_special", body=body) self.__check_prerequisites() body = RequestBody(attr="tem.Acquisition", obj_cls=AcquisitionObj, obj_method="acquire", cameraName=cameraName, **kwargs) image = self.__client.call(method="exec_special", body=body) logging.info("TEM image acquired on %s", cameraName) if prev_shutter_mode is not None: body = RequestBody(attr="tem.Acquisition.Cameras", obj_cls=AcquisitionObj, obj_method="restore_shutter", cameraName=cameraName, prev_shutter_mode=prev_shutter_mode) self.__client.call(method="exec_special", body=body) return image else: # CCA or CSA camera type, use advanced scripting body = RequestBody(attr=self.__id_adv, obj_cls=AcquisitionObj, obj_method="set_tem_presets_advanced", cameraName=cameraName, size=size, exp_time=exp_time, binning=binning, use_cca=cca, **kwargs) self.__client.call(method="exec_special", body=body) if "recording" in kwargs: body = RequestBody(attr=self.__id_adv, obj_cls=AcquisitionObj, obj_method="acquire_advanced", cameraName=cameraName, recording=kwargs["recording"], **kwargs) self.__client.call(method="exec_special", body=body) logging.info("TEM image acquired on %s", cameraName) return None else: body = RequestBody(attr=self.__id_adv, validator=Image, obj_cls=AcquisitionObj, obj_method="acquire_advanced", cameraName=cameraName, **kwargs) image = self.__client.call(method="exec_special", body=body) return image
[docs] def acquire_stem_image(self, cameraName: str, size: AcqImageSize = AcqImageSize.FULL, dwell_time: float = 1e-5, binning: int = 1, **kwargs) -> Image: """ Acquire a STEM image. :param str cameraName: Camera name :param AcqImageSize size: Image size :param float dwell_time: Dwell time in seconds. The frame time equals the dwell time times the number of pixels plus some overhead (typically 20%, used for the line flyback) :param int binning: Binning factor. Technically speaking these are "pixel skipping" values, since in STEM we do not combine pixels as a CCD does. :keyword float brightness: Brightness setting (0.0-1.0) :keyword float contrast: Contrast setting (0.0-1.0) :returns: Image object :rtype: Image """ _ = self.__find_camera(cameraName, self.stem_detectors, binning) body = RequestBody(attr="tem.Acquisition.Detectors", obj_cls=AcquisitionObj, obj_method="set_stem_presets", cameraName=cameraName, size=size, dwell_time=dwell_time, binning=binning, **kwargs) self.__client.call(method="exec_special", body=body) self.__check_prerequisites() body = RequestBody(attr="tem.Acquisition", validator=Image, obj_cls=AcquisitionObj, obj_method="acquire", cameraName=cameraName, **kwargs) image = self.__client.call(method="exec_special", body=body) logging.info("STEM image acquired on %s", cameraName) return image
[docs] def acquire_film(self, film_text: str, exp_time: float) -> None: """ Expose a film. :param str film_text: Film text, 96 symbols :param float exp_time: Exposure time in seconds """ stock = RequestBody(attr="tem.Camera.Stock", validator=int) if self.__has_film and self.__client.call(method="get", body=stock) > 0: body = RequestBody(attr="tem.Camera", obj_cls=AcquisitionObj, obj_method="acquire_film", film_text=film_text, exp_time=exp_time) self.__client.call(method="exec_special", body=body) logging.info("Film exposure completed") else: raise RuntimeError("Plate is not available or stock is empty!")
@property def film_settings(self) -> Dict: """ Returns a dict with film settings. Note: The plate camera has become obsolete with Windows 7 so most of the existing functions are no longer supported. """ if self.__has_film: body = RequestBody(attr="tem.Camera", validator=dict, obj_cls=AcquisitionObj, obj_method="show_film_settings") return self.__client.call(method="exec_special", body=body) else: logging.error("No film/plate device detected.") return {} @property def screen_position(self) -> str: """ Fluorescent screen position, ScreenPosition enum. (read/write) """ body = RequestBody(attr="tem.Camera.MainScreen", validator=int) result = self.__client.call(method="get", body=body) return ScreenPosition(result).name @screen_position.setter def screen_position(self, value: ScreenPosition) -> None: body = RequestBody(attr="tem.Camera.MainScreen", value=value) self.__client.call(method="set", body=body) @property @lru_cache(maxsize=1) def stem_detectors(self) -> Dict: """ Returns a dict with STEM detectors parameters. """ body = RequestBody(attr="tem.Acquisition.Detectors", validator=dict, obj_cls=AcquisitionObj, obj_method="show_stem_detectors") return self.__client.call(method="exec_special", body=body) @property @lru_cache(maxsize=1) def cameras(self) -> Dict: """ Returns a dict with parameters for all TEM cameras. supports_csa means single acquisition (Ceta 1, Ceta 2, Falcon 3, Falcon 4(i)); supports_cca means continuous acquisition (Ceta 2 only) """ body = RequestBody(attr="tem.Acquisition.Cameras", validator=dict, obj_cls=AcquisitionObj, obj_method="show_cameras") tem_cameras = self.__client.call(method="exec_special", body=body) if not self.__client.has_advanced_iface: return tem_cameras # CSA is supported by Ceta 1, Ceta 2, Falcon 3, Falcon 4(i) body = RequestBody(attr=self.__id_adv + ".CameraSingleAcquisition", validator=dict, obj_cls=AcquisitionObj, obj_method="show_cameras_csa") csa_cameras = self.__client.call(method="exec_special", body=body) tem_cameras.update(csa_cameras) # CCA is supported by Ceta 2 if self.__has_cca: body = RequestBody(attr=self.__id_adv + ".CameraContinuousAcquisition", validator=dict, obj_cls=AcquisitionObj, obj_method="show_cameras_cca", tem_cameras=tem_cameras) tem_cameras = self.__client.call(method="exec_special", body=body) return tem_cameras