py4sci

Table Of Contents

Previous topic

N-Dimensional Scan

Next topic

Miscellaneous

This Page

Site Specific Setup

For a site specific setup, refer to the following examples.

Configuration

Have all your scan scripts import a site specific top level python module that does the following:

  1. Import the scan modules that you intent to use
  2. Import the SettingsBasedLoop, ..Set, ..Wait commands such that they replace the basic ones.
  3. Install an instance of ScanSettings that is aware of your local devices.
  4. Define meta-commands for accessing complex devices by combining basic commands into Sequence commands.
  5. Provide a customized ScanClient that is aware of your scan server host.
  6. Offers shortcuts to for example the table scan with pre/post commands required at your site.
"""Example for beamline specific setup

Uses the scan server example database
plus some local PVs to fake devices.
"""

from scan import *

# Note that the basic Loop/Set/Wait commands are replaced by
# those that utilize custom scan settings
from scan.util import SettingsBasedLoop as Loop
from scan.util import SettingsBasedSet as Set
from scan.util import SettingsBasedWait as Wait

# Custom scan settings
class BeamlineScanSettings(ScanSettings):
    def __init__(self):
        super(BeamlineScanSettings, self).__init__()
        # Define several PVs to use completion etc.
        self.defineDeviceClass("shutter", readback=True)
        self.defineDeviceClass("chopper:.*", completion=True)
        self.defineDeviceClass(".*daq.*", completion=True)
        self.defineDeviceClass("motor_.", completion=True, readback=True)
        self.defineDeviceClass("setpoint", completion=True, readback="readback", tolerance=0.1)
        self.defineDeviceClass("pcharge", comparison="increase by")
        self.defineDeviceClass("neutrons", comparison="increase by")
        
        # Used by UI to suggest devices on which one can 'wait'
        self.settable = [ "motor_x", "motor_y" ]
        self.waitable = [ "seconds", "pcharge" ]
        self.loggable = [ "signal" ]

    def getReadbackName(self, device_name):
        # Prime example would be a motor:
        #if "motor" in device_name:
        #    return device_name + ".RBV"
        return device_name

scan_settings = BeamlineScanSettings()
# Install beam line specific scan settings
setScanSettings(scan_settings)


# 'Meta Commands'
def Start():
    """Start data acquisition"""
    return Sequence( Set('loc://daq_reset(0)', 1),
                     Set('loc://daq(0)', 1)
                   )

def Stop():
    """Stop data acquisition"""
    return Set('loc://daq(0)', 0)

def TakeData(counter, limit):
    return  Sequence(Start(), Wait(counter, limit), Stop())

def SetChopper(wavelength, phase):
    return  Sequence(scan_settings.Set('loc://chopper:run(0)', 0),
                     scan_settings.Set('loc://chopper:wlen(0)', wavelength),
                     scan_settings.Set('loc://chopper:phs(0)', phase),
                     scan_settings.Set('loc://chopper:run(0)', 1)
                    )


def table_scan(headers, rows):
    """Create table scan with pre/post/start/stop for this beam line"""
    table = TableScan(headers, rows,
                      start=Start(),
                      stop=Stop())
    return table.createScan()

# Shortcut
ndim = createNDimScan

# Create a scan client, using the host name that executes the scan server
scan_client = ScanClient('localhost')





import unittest

class TestScanClient(unittest.TestCase):
    def testSet(self):
        self.assertEqual(str(Set('setpoint', 1)), "Set('setpoint', 1, completion=True, readback='readback')")
        self.assertEqual(str(Set('setpoint', 1, completion=False)), "Set('setpoint', 1, readback='readback')")
        self.assertEqual(str(Set('setpoint', 1, readback=False)), "Set('setpoint', 1, completion=True)")
        self.assertEqual(str(Set('setpoint', 1, timeout=10)), "Set('setpoint', 1, completion=True, readback='readback', timeout=10)")

    def testLoop(self):
        self.assertEqual(str(Loop('x', 1, 5, 0.5)), "Loop('x', 1, 5, 0.5)")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5)), "Loop('motor_x', 1, 5, 0.5, completion=True, readback='motor_x')")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5, completion=False)), "Loop('motor_x', 1, 5, 0.5, readback='motor_x')")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5, completion=False, readback=False)), "Loop('motor_x', 1, 5, 0.5)")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5, Comment(''), completion=False, readback=False)),
                         "Loop('motor_x', 1, 5, 0.5, [ Comment('') ])")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5, Comment('a'), Comment('b'), completion=False, readback=False)),
                         "Loop('motor_x', 1, 5, 0.5, [ Comment('a'), Comment('b') ])")
        self.assertEqual(str(Loop('motor_x', 1, 5, 0.5, [ Comment('a'), Comment('b') ], completion=False, readback=False)),
                         "Loop('motor_x', 1, 5, 0.5, [ Comment('a'), Comment('b') ])")
        
    def testWait(self):
        self.assertEqual(str(Wait('whatever', 1)), "Wait('whatever', 1, comparison='>=')")
        self.assertEqual(str(Wait('pcharge', 1)), "Wait('pcharge', 1, comparison='increase by')")
        self.assertEqual(str(Wait('pcharge', 1, timeout=5)), "Wait('pcharge', 1, comparison='increase by', timeout=5)")
        self.assertEqual(str(Wait('pcharge', 1, timeout=5, comparison='>=')), "Wait('pcharge', 1, comparison='>=', timeout=5)")


if __name__ == '__main__':
    unittest.main()

Usage Examples

from beamline_setup import *

# N-Dim scan
cmds = ndim(('xpos', 1, 10), Delay(0.2))
print cmds
id = scan_client.submit(cmds)
print scan_client.waitUntilDone(id)

# Simulated pcharge rises at about 1e9 per second
cmds = ndim(('xpos', 1, 3), ('ypos', 1, 3), TakeData('pcharge', 3*1e9), 'neutrons')
print cmds
id = scan_client.submit(cmds)
print scan_client.scanInfo(id)
# scan_client.waitUntilDone(id)
from beamline_setup import *

# Table
cmds = table_scan(
  [ 'xpos',    'ypos', 'Wait For', 'Value'],
[
  [     1, [ 2, 4, 6],  'pcharge',  3*1e9 ],
  [     3,          8,  'pcharge',  3*1e9 ],
])
print cmds
id = scan_client.submit(cmds)
scan_client.waitUntilDone(id)

# Dump motor positions
data = scan_client.getData(id)
table = createTable(data, 'xpos', 'ypos')
for (x, y) in zip(*table):
    print "%d, %d" % (x, y)
from beamline_setup import *

# Custom scan
cmds = CommandSequence()
cmds.append(Set('xpos', 1))
cmds.append(Set('ypos', 1, completion=False))
cmds.append(Set('setpoint', 8))
cmds.append(SetChopper(1.7, 45.0))
cmds.append(TakeData('pcharge', 5e9))
cmds.append(Set('setpoint', 1))
print cmds

result = scan_client.simulate(cmds)
print result['simulation']

id = scan_client.submit(cmds)
print "Submitted scan #%d" % id