Skip to content

Experiments

Here's an example of an experiment that gathered simple wavelength sweep data on a set of circuits filtered using CircuitMap. Initially, it connects to all the hardware and makes sure it's configured properly (warmed up, etc.). Then, it configures the hardware for alignment optimization and searches for peak power throughput. Once aligned, it reconfigures for a wavelength sweep and extended data collection period. It sweeps the wavelength and records the data, saving it to a text file. It then moves to the next circuit and repeats.

from datetime import datetime
from getpass import getuser
from pathlib import Path
import time
import logging
from typing import List

import numpy as np

from autogator.analysis import WavelengthAnalyzer
from autogator.experiments import Experiment
from autogator.api import load_default_configuration
from autogator.routines import auto_scan


log = logging.getLogger(__name__)


class WavelengthSweepExperiment(Experiment):
    """
    A wavelength sweep experiment keeps laser power constant, but changes the
    wavelength of the laser. As the laser sweeps, it outputs a trigger signal
    and logs wavelength points. The trigger signal is collected on the
    oscilloscope, which collects data for the entire duration of the sweep,
    and the time-based data is then correlated to the wavelengths as denoted
    by the trigger signal. The data is saved to a text file.

    Attributes
    ----------
    wl_start : float
        The starting wavelength of the sweep.
    wl_stop : float
        The ending wavelength of the sweep.
    duration : float
        The duration of the sweep, in seconds. This will define the sweep rate.
    sample_rate : float
        The sample rate of the oscilloscope.
    trigger_channel : int
        The channel on the oscilloscope that will be used to trigger the data
        collection. This is the laser's output trigger that correlates time
        to logged wavelengths.
    power_dBm : float
        The power of the laser, in dBm.
    buffer : float
        The number of seconds to buffer after collecting data before raising
        a timeout error from the scope.
    active_channels : List[int]
        The oscilloscope channels to collect data on (1-indexed).
    trigger_channel : int
        The channel the laser trigger is conneceted to.
    trigger_level : float
        The trigger voltage to begin data collection at.
    output_dir : str
        The directory to save the data to.
    chip_name : str
        The name of the chip.
    """
    # General configuration
    MANUAL = False
    chip_name: str = "fabrun5"
    output_dir = Path("C:/Users/sequo/Documents/GitHub/autogator/examples/fake")

    # Scope configuration
    duration: float = 5.0
    buffer: float = 5.0
    sample_rate: int = int(np.floor(10e6 / (duration + buffer)))
    sample_rate = 1e5
    active_channels: List[int] = [1, 2]
    data_channels: List[int] = [2]
    trigger_channel: int = 1
    trigger_level: int = 1
    channel_settings = {
        1: {"range": 5, "position": -2.5},
        2: {"range": 0.6, "position": -4},
        # 3: {"range": 2.5, "position": -1},
        # 4: {"range": 2.5, "position": -1},
    }

    # Laser configuration
    wl_start: float = 1500
    wl_stop: float = 1600
    trigger_step: float = 0.01
    power_dBm: float = -4.0

    def setup(self):
        """
        Start up the laser and scope, enter details about who is running the
        experiment. Do some basic sanity checking on some values.
        """
        self.operator = input(f"Operator ({getuser()}) [ENTER]: ") 
        if not self.operator: 
            self.operator = getuser()

        self.laser = self.stage.laser.driver
        self.scope = self.stage.scope.driver

        self.laser._pyroClaimOwnership()
        self.laser.on()
        self.laser.open_shutter()

        sweep_rate = (self.wl_stop - self.wl_start) / self.duration
        assert sweep_rate > 1.0
        assert sweep_rate < 100.0
        assert self.wl_start >= 1500  # self.laser.MINIMUM_WAVELENGTH
        assert self.wl_stop <= 1630  # self.laser.MAXIMUM_WAVELENGTH

    def configure_scope_sweep(self):
        """
        The scope needs to be alternately configured to record a long sweep and
        a single-shot measurement. This function reconfigures for a long sweep.
        """
        acquire_time = self.duration + self.buffer
        numSamples = int((acquire_time) * self.sample_rate)
        print(
            "Set for {:.2E} Samples @ {:.2E} Sa/s.".format(numSamples, self.sample_rate)
        )

        self.scope.acquisition_settings(sample_rate=self.sample_rate, duration=acquire_time)
        for channel in self.active_channels:
            self.scope.set_channel(channel, **self.channel_settings[channel])
            time.sleep(0.1)

        self.scope.edge_trigger(self.trigger_channel, self.trigger_level)

    def configure_scope_measure(self):
        """
        The scope needs to be alternately configured to record a long sweep and
        a single-shot measurement. This function reconfigures for a single
        measurement.
        """
        MEAS_CHANNEL = 2
        RANGE = 2.0
        POSITION = -3.0

        self.scope.set_channel(1, range=RANGE, position=POSITION)
        self.scope.set_channel(2, range=RANGE, position=POSITION)
        self.scope.set_auto_measurement(source=F"C{MEAS_CHANNEL}W1")
        self.scope.wait_for_device()

        self.scope.edge_trigger(1, 0.0)
        self.scope.set_timescale(10e-10)
        self.scope.acquire(run="continuous")
        time.sleep(5)

    def configure_laser_sweep(self):
        """
        The laser needs to be alternately configured to sweep in wavelength and
        to return to the peak-power wavelength for alignment purposes. This
        function reconfigures for a wavelength sweep.
        """
        self.laser.power_dBm(self.power_dBm)
        self.laser.sweep_set_mode(
            continuous=True, twoway=True, trigger=False, const_freq_step=False
        )

        self.laser.trigger_enable_output()
        self.laser.trigger_set_mode("Step")
        self.laser.trigger_step(self.trigger_step)

    def configure_laser_measure(self):
        """
        The laser needs to be alternately configured to sweep in wavelength and
        to return to the peak-power wavelength for alignment purposes. This
        function reconfigures for a single measurement.
        """
        self.laser.wavelength(1550.0)

    def run(self):
        """
        The test procedure for every device under test.
        """
        print(self.circuit)

        self.configure_scope_measure()
        self.configure_laser_measure()

        # Maximize signal
        auto_scan(stage=self.stage, daq=self.stage.scope, settle=0.0, plot=self.MANUAL)

        self.configure_scope_sweep()
        self.configure_laser_sweep()

        log.debug("Starting Acquisition")
        self.scope.acquire(timeout=self.duration * 2)
        log.debug("Sweeping laser")
        self.laser.sweep_wavelength(self.wl_start, self.wl_stop, self.duration)
        log.debug("Waiting for acquisition to complete...")
        self.scope.wait_for_device()

        log.debug("Downloading raw data...")
        raw = {}
        for channel in self.active_channels:
            raw[channel] = self.scope.get_data(channel)
        wavelengthLog = self.laser.wavelength_logging()

        print("Processing Data")
        analysis = WavelengthAnalyzer(
            sample_rate=self.sample_rate,
            wavelength_log=wavelengthLog,
            trigger_data=raw[self.trigger_channel],
        )

        sorted_data = {
            channel: analysis.process_data(raw[channel])
            for channel in self.data_channels
        }

        today = datetime.now()
        date_prefix = f"{today.year}_{today.month}_{today.day}_{today.hour}_{today.minute}_"
        filename = self.output_dir / f"{date_prefix}_{self.chip_name}_locx_{self.circuit.loc.x}_locy_{self.circuit.loc.y}".replace(".", "p")
        filename = filename.with_suffix(".wlsweep")

        FILE_HEADER = f"""Test performed at {today.strftime("%Y-%m-%d %H:%M:%S")}
Operator: {self.operator}
Chip: {self.chip_name}
Circuit: {self.circuit.loc.x}, {self.circuit.loc.y}
Laser power: {self.power_dBm} dBm
Wavelength start: {self.wl_start} nm
Wavelength stop: {self.wl_stop} nm

Wavelength\tCh1"""

        print("Saving raw data.")
        data_lists = []
        for channel in self.data_channels:
            if not data_lists:
                data_lists = [sorted_data[channel].wl]
            data_lists.append(sorted_data[channel].data)
        np.savetxt(filename, np.column_stack(data_lists), delimiter="\t", header=FILE_HEADER)

    def teardown(self):
        pass


if __name__ == "__main__":
    from autogator.circuits import CircuitMap
    from autogator.experiments import ExperimentRunner

    cmap = CircuitMap.loadtxt("data/circuitmap.txt")

    mzis = cmap.filterby(name="MZI4", grouping="1")
    stage = load_default_configuration().get_stage()

    try:
        runner = ExperimentRunner(mzis, WavelengthSweepExperiment, stage=stage)
        runner.run()
    except Exception as e:
        print(stage.scope.measure())
        raise e
Back to top