# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
"""Server helper classes for writing Tango device servers."""
import sys
import copy
import inspect
import logging
import functools
import traceback
import warnings
from typing import Union, ClassVar
from inspect import getfullargspec
try:
from warnings import deprecated
except ImportError:
from typing_extensions import deprecated
from tango._tango import AttrDataFormat, AttrWriteType, PipeWriteType, DevState
from tango._tango import DevFailed, GreenMode, SerialModel
from tango.attr_data import AttrData
from tango.pipe_data import PipeData
from tango.device_class import DeviceClass
from tango.device_server import (
LatestDeviceImpl,
get_worker,
set_worker,
run_in_executor,
)
from tango.utils import (
is_seq,
is_non_str_seq,
is_pure_str,
_is_coroutine_function,
get_tango_type_format,
get_attribute_type_format,
set_complex_value,
parse_type_hint,
_create_device_telemetry_tracer,
get_telemetry_tracer_provider_factory,
_force_tracing,
_forcefully_traced_method,
)
from tango.utils import scalar_to_array_type
from tango.green import get_green_mode, get_executor
from tango.pyutil import Util
from tango.asyncio_executor import AsyncioExecutor
from tango.constants import StatusNotSet
__all__ = (
"DeviceMeta",
"Device",
"LatestDeviceImpl",
"attribute",
"command",
"pipe",
"device_property",
"class_property",
"run",
"server_run",
"Server",
)
API_VERSION = 2
# Helpers
def from_typeformat_to_type(dtype, dformat):
if dformat == AttrDataFormat.SCALAR:
return dtype
elif dformat == AttrDataFormat.IMAGE:
raise TypeError("Cannot translate IMAGE to tango type")
return scalar_to_array_type(dtype)
def __get_wrapped_read_method(attribute, read_method):
"""
Make sure attr is updated on read, and wrap it with executor, if needed.
:param attribute: the attribute data information
:type attribute: AttrData
:param read_method: read method
:type read_method: callable
"""
already_wrapped = hasattr(read_method, "__access_wrapped__")
if already_wrapped:
return read_method
if attribute.read_green_mode:
@functools.wraps(read_method)
def read_attr(self, attr):
worker = get_worker()
ret = worker.execute(read_method, self)
if not attr.get_value_flag() and ret is not None:
set_complex_value(attr, ret)
return ret
else:
@functools.wraps(read_method)
def read_attr(self, attr):
ret = read_method(self)
if not attr.get_value_flag() and ret is not None:
set_complex_value(attr, ret)
return ret
if _force_tracing:
read_attr = _forcefully_traced_method(read_attr)
read_attr.__access_wrapped__ = True
return read_attr
def __patch_read_method(tango_device_klass, attribute):
"""
Finds read method for attribute, wraps it with executor and adds
wrapped method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param attribute: the attribute data information
:type attribute: AttrData
"""
read_method = getattr(attribute, "fget", None)
if not read_method:
method_name = attribute.read_method_name
read_method = getattr(tango_device_klass, method_name)
read_attr = __get_wrapped_read_method(attribute, read_method)
method_name = f"__read_{attribute.attr_name}_wrapper__"
attribute.read_method_name = method_name
setattr(tango_device_klass, method_name, read_attr)
def __get_wrapped_write_method(attribute, write_method):
"""
Wraps write method with executor, if needed.
"""
already_wrapped = hasattr(write_method, "__access_wrapped__")
if already_wrapped:
return write_method
if attribute.write_green_mode:
@functools.wraps(write_method)
def write_attr(self, attr):
value = attr.get_write_value()
return get_worker().execute(write_method, self, value)
else:
@functools.wraps(write_method)
def write_attr(self, attr):
value = attr.get_write_value()
return write_method(self, value)
if _force_tracing:
write_attr = _forcefully_traced_method(write_attr)
write_attr.__access_wrapped__ = True
return write_attr
def __patch_write_method(tango_device_klass, attribute):
"""
Finds write method for attribute, wraps it with executor and adds
wrapped method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param attribute: the attribute data information
:type attribute: AttrData
"""
write_method = getattr(attribute, "fset", None)
if not write_method:
method_name = attribute.write_method_name
write_method = getattr(tango_device_klass, method_name)
write_attr = __get_wrapped_write_method(attribute, write_method)
method_name = f"__write_{attribute.attr_name}_wrapper__"
attribute.write_method_name = method_name
setattr(tango_device_klass, method_name, write_attr)
def __get_wrapped_isallowed_method(attribute, isallowed_method):
"""
Wraps is allowed method with executor, if needed.
:param attribute: the attribute data information
:type attribute: AttrData
:param isallowed_method: is allowed method
:type isallowed_method: callable
"""
already_wrapped = hasattr(isallowed_method, "__access_wrapped__")
if already_wrapped:
return isallowed_method
if attribute.isallowed_green_mode:
@functools.wraps(isallowed_method)
def isallowed_attr(self, request_type):
worker = get_worker()
return worker.execute(isallowed_method, self, request_type)
else:
isallowed_attr = isallowed_method
if _force_tracing:
isallowed_attr = _forcefully_traced_method(isallowed_attr)
if isallowed_attr is not isallowed_method:
isallowed_attr.__access_wrapped__ = True
return isallowed_attr
def __patch_isallowed_method(tango_device_klass, attribute):
"""
Finds isallowed method for attribute, wraps it with executor and adds
wrapped method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param attribute: the attribute data information
:type attribute: AttrData
"""
isallowed_method = getattr(attribute, "fisallowed", None)
if not isallowed_method:
method_name = attribute.is_allowed_name
isallowed_method = getattr(tango_device_klass, method_name, None)
if isallowed_method:
isallowed_attr = __get_wrapped_isallowed_method(attribute, isallowed_method)
method_name = f"__is_{attribute.attr_name}_allowed_wrapper__"
attribute.is_allowed_name = method_name
setattr(tango_device_klass, method_name, isallowed_attr)
def __patch_attr_methods(tango_device_klass, attribute):
"""
Finds read, write and isallowed methods for attribute, and
wraps into another method to make them work.
Also patch methods with green executor, if requested.
Finally, adds pathed methods to the device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param attribute: the attribute data information
:type attribute: AttrData
"""
if attribute.attr_write in (AttrWriteType.READ, AttrWriteType.READ_WRITE):
__patch_read_method(tango_device_klass, attribute)
if attribute.attr_write in (AttrWriteType.WRITE, AttrWriteType.READ_WRITE):
__patch_write_method(tango_device_klass, attribute)
__patch_isallowed_method(tango_device_klass, attribute)
def __get_attribute_type_from_hint(attribute, type_hint=None, device_klass=None):
if not attribute.has_dtype_kword:
if not type_hint:
if attribute.attr_write in (AttrWriteType.READ, AttrWriteType.READ_WRITE):
read_method = getattr(device_klass, attribute.read_method_name)
type_hint = dict(read_method.__annotations__).get("return", None)
if not type_hint and attribute.attr_write in (
AttrWriteType.WRITE,
AttrWriteType.READ_WRITE,
):
write_method = getattr(device_klass, attribute.write_method_name)
type_hints = dict(write_method.__annotations__)
type_hints.pop("return", None)
if type_hints:
type_hint = list(type_hints.values())[-1]
if type_hint:
dtype, dformat, max_x, max_y = parse_type_hint(
type_hint, caller="attribute"
)
if dformat is None:
if attribute.attr_format not in [
AttrDataFormat.IMAGE,
AttrDataFormat.SPECTRUM,
]:
raise RuntimeError(
"For numpy.ndarrays AttrDataFormat has to be specified"
)
dformat = attribute.attr_format
dtype, dformat, enum_labels = get_attribute_type_format(
dtype, dformat, None
)
attribute.attr_type = dtype
attribute.attr_format = dformat
if enum_labels:
attribute.set_enum_labels_to_attr_prop(enum_labels)
if not attribute.has_size_kword:
if max_x:
attribute.dim_x = max_x
if max_y:
attribute.dim_y = max_y
def __get_wrapped_pipe_read_method(pipe, read_method):
already_wrapped = hasattr(read_method, "__access_wrapped__")
if already_wrapped:
return read_method
if pipe.read_green_mode:
@functools.wraps(read_method)
def read_pipe(self, pipe):
worker = get_worker()
ret = worker.execute(read_method, self)
if ret is not None:
pipe.set_value(ret)
return ret
else:
@functools.wraps(read_method)
def read_pipe(self, pipe):
ret = read_method(self)
if ret is not None:
pipe.set_value(ret)
return ret
if _force_tracing:
read_pipe = _forcefully_traced_method(read_pipe)
read_pipe.__access_wrapped__ = True
return read_pipe
def __patch_pipe_read_method(tango_device_klass, pipe):
"""
Finds read method for pipe, wraps it with executor and adds wrapped
method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param pipe: the pipe data information
:type pipe: PipeData
"""
read_method = getattr(pipe, "fget", None)
if not read_method:
method_name = pipe.read_method_name
read_method = getattr(tango_device_klass, method_name)
read_pipe = __get_wrapped_pipe_read_method(pipe, read_method)
method_name = f"__read_{pipe.pipe_name}_wrapper__"
pipe.read_method_name = method_name
setattr(tango_device_klass, method_name, read_pipe)
def __get_wrapped_pipe_write_method(pipe, write_method):
already_wrapped = hasattr(write_method, "__access_wrapped__")
if already_wrapped:
return write_method
if pipe.write_green_mode:
@functools.wraps(write_method)
def write_pipe(self, pipe):
value = pipe.get_value()
return get_worker().execute(write_method, self, value)
else:
@functools.wraps(write_method)
def write_pipe(self, pipe):
value = pipe.get_value()
return write_method(self, value)
if _force_tracing:
write_pipe = _forcefully_traced_method(write_pipe)
write_pipe.__access_wrapped__ = True
return write_pipe
def __patch_pipe_write_method(tango_device_klass, pipe):
"""
Finds write method for pipe, wraps it with executor and adds wrapped
method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param pipe: the pipe data information
:type pipe: PipeData
"""
write_method = getattr(pipe, "fset", None)
if not write_method:
method_name = pipe.write_method_name
write_method = getattr(tango_device_klass, method_name)
write_pipe = __get_wrapped_pipe_write_method(pipe, write_method)
method_name = f"__write_{pipe.pipe_name}_wrapper__"
pipe.write_method_name = method_name
setattr(tango_device_klass, method_name, write_pipe)
def __get_wrapped_pipe_isallowed_method(pipe, isallowed_method):
"""
Wraps is allowed method with executor, if needed.
:param pipe: the pipe data information
:type pipe: PipeData
:param isallowed_method: is allowed method
:type isallowed_method: callable
"""
already_wrapped = hasattr(isallowed_method, "__access_wrapped__")
if already_wrapped:
return isallowed_method
if pipe.isallowed_green_mode:
@functools.wraps(isallowed_method)
def isallowed_pipe(self, request_type):
worker = get_worker()
return worker.execute(isallowed_method, self, request_type)
else:
isallowed_pipe = isallowed_method
if _force_tracing:
isallowed_pipe = _forcefully_traced_method(isallowed_pipe)
if isallowed_pipe is not isallowed_method:
isallowed_pipe.__access_wrapped__ = True
return isallowed_pipe
def __patch_pipe_isallowed_method(tango_device_klass, pipe):
"""
Finds isallowed method for pipe, wraps it with executor and adds
wrapped method to device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param pipe: the pipe data information
:type pipe: PipeData
"""
isallowed_method = getattr(pipe, "fisallowed", None)
if not isallowed_method:
method_name = pipe.is_allowed_name
isallowed_method = getattr(tango_device_klass, method_name, None)
if isallowed_method:
isallowed_attr = __get_wrapped_pipe_isallowed_method(pipe, isallowed_method)
method_name = f"__is_{pipe.pipe_name}_allowed_wrapper__"
pipe.is_allowed_name = method_name
setattr(tango_device_klass, method_name, isallowed_attr)
def __patch_pipe_methods(tango_device_klass, pipe):
"""
Finds read, write and isallowed methods for pipe, and
wraps into another method to make them work.
Also patch methods with green executor, if requested.
Finally, adds pathed methods to the device dict.
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param pipe: the pipe data information
:type pipe: PipeData
"""
__patch_pipe_read_method(tango_device_klass, pipe)
if pipe.pipe_write == PipeWriteType.PIPE_READ_WRITE:
__patch_pipe_write_method(tango_device_klass, pipe)
__patch_pipe_isallowed_method(tango_device_klass, pipe)
def __get_property_type_from_hint(property, type_hint):
if property.dtype is None:
dtype, _, _, _ = parse_type_hint(type_hint, caller="property")
property.dtype = from_typeformat_to_type(*get_tango_type_format(dtype))
def __patch_is_command_allowed_method(
tango_device_klass, is_allowed_method, cmd_name, green_mode
):
"""
:param tango_device_klass: a DeviceImpl class
:type tango_device_klass: class
:param is_allowed_method: a callable to check if command is allowed
:type is_allowed_method: callable
:param cmd_name: command name
:type cmd_name: str
:param green_mode: indicates whether method should be wrapped with executor or not
:type green_mode: bool
"""
already_wrapped = hasattr(is_allowed_method, "__access_wrapped__")
if already_wrapped:
return is_allowed_method.__wrapped_method_name__
method_name = getattr(is_allowed_method, "__name__", f"is_{cmd_name}_allowed")
method_name = f"__wrapped_{method_name}__"
if green_mode:
wrapped_method = run_in_executor(is_allowed_method)
else:
wrapped_method = is_allowed_method
if _force_tracing:
wrapped_method = _forcefully_traced_method(wrapped_method)
if wrapped_method is not is_allowed_method:
wrapped_method.__access_wrapped__ = True
wrapped_method.__wrapped_method_name__ = method_name
setattr(tango_device_klass, method_name, wrapped_method)
return method_name
def __unwrap_method(method):
while hasattr(method, "__wrapped__"):
method = method.__wrapped__
return method
def __warn_if_standard_device_methods_should_be_coroutine(klass):
worker = get_worker()
# get base device class:
if isinstance(worker, AsyncioExecutor):
for parent in klass.__bases__:
if type(parent) is DeviceMeta:
for method in (
"init_device",
"delete_device",
"dev_state",
"dev_status",
"read_attr_hardware",
"always_executed_hook",
):
user_method = __unwrap_method(getattr(klass, method))
base_method = __unwrap_method(getattr(parent, method))
if user_method != base_method and not _is_coroutine_function(
user_method
):
warnings.warn(
f"{method} is sync: support of "
f"sync functions in Asyncio Servers is "
f"deprecated. Use 'async def' instead of 'def'.",
DeprecationWarning,
)
async def __async_method_helper(method, *args, **kwargs):
return await get_worker().delegate(method, *args, **kwargs)
def __patch_standard_device_methods(klass):
# TODO allow to force non green mode
# init_device
init_device_orig = klass.init_device
already_wrapped = hasattr(init_device_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_init_device = init_device_orig == BaseDevice.init_device
@functools.wraps(init_device_orig)
def init_device(self):
__warn_if_standard_device_methods_should_be_coroutine(klass)
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_init_device:
# if coroutine will never be awaited, we want that RuntimeWarning will print us
# 'init_device' instead of "__async_method_helper", so we create another intermediate
# layer with proper __qualname__
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.init_device"
return worker.execute(method_helper, init_device_orig, self)
else:
return worker.execute(init_device_orig, self)
if _force_tracing:
init_device = _forcefully_traced_method(
init_device, is_kernel_method=is_base_klass_init_device
)
init_device.__access_wrapped__ = True
setattr(klass, "init_device", init_device)
# delete_device
delete_device_orig = klass.delete_device
already_wrapped = hasattr(delete_device_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_delete_device = delete_device_orig == BaseDevice.delete_device
@functools.wraps(delete_device_orig)
def delete_device(self):
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_delete_device:
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.delete_device"
return worker.execute(method_helper, delete_device_orig, self)
else:
return worker.execute(delete_device_orig, self)
if _force_tracing:
delete_device = _forcefully_traced_method(
delete_device, is_kernel_method=is_base_klass_delete_device
)
delete_device.__access_wrapped__ = True
setattr(klass, "delete_device", delete_device)
dev_state_orig = klass.dev_state
already_wrapped = hasattr(dev_state_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_dev_state = dev_state_orig == BaseDevice.dev_state
@functools.wraps(dev_state_orig)
def dev_state(self):
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_dev_state:
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.dev_state"
return worker.execute(method_helper, dev_state_orig, self)
else:
return worker.execute(dev_state_orig, self)
if _force_tracing:
dev_state = _forcefully_traced_method(
dev_state, is_kernel_method=is_base_klass_dev_state
)
dev_state.__access_wrapped__ = True
setattr(klass, "dev_state", dev_state)
# device_status
dev_status_orig = klass.dev_status
already_wrapped = hasattr(dev_status_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_dev_status = dev_status_orig == BaseDevice.dev_status
@functools.wraps(dev_status_orig)
def dev_status(self):
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_dev_status:
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.dev_status"
return worker.execute(method_helper, dev_status_orig, self)
else:
return worker.execute(dev_status_orig, self)
if _force_tracing:
dev_status = _forcefully_traced_method(
dev_status, is_kernel_method=is_base_klass_dev_status
)
dev_status.__access_wrapped__ = True
setattr(klass, "dev_status", dev_status)
# read_attr_hardware
read_attr_hardware_orig = klass.read_attr_hardware
already_wrapped = hasattr(read_attr_hardware_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_read_attr_hardware = (
read_attr_hardware_orig == BaseDevice.read_attr_hardware
)
@functools.wraps(read_attr_hardware_orig)
def read_attr_hardware(self, attr_list):
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_read_attr_hardware:
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.read_attr_hardware"
return worker.execute(
method_helper, read_attr_hardware_orig, self, attr_list
)
else:
return worker.execute(read_attr_hardware_orig, self, attr_list)
if _force_tracing:
read_attr_hardware = _forcefully_traced_method(
read_attr_hardware, is_kernel_method=is_base_klass_read_attr_hardware
)
read_attr_hardware.__access_wrapped__ = True
setattr(klass, "read_attr_hardware", read_attr_hardware)
# always_executed_hook
always_executed_hook_orig = klass.always_executed_hook
already_wrapped = hasattr(always_executed_hook_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_always_executed_hook = (
always_executed_hook_orig == BaseDevice.always_executed_hook
)
@functools.wraps(always_executed_hook_orig)
def always_executed_hook(self):
worker = get_worker()
if (
isinstance(worker, AsyncioExecutor)
and is_base_klass_always_executed_hook
):
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.always_executed_hook"
return worker.execute(method_helper, always_executed_hook_orig, self)
else:
return worker.execute(always_executed_hook_orig, self)
if _force_tracing:
always_executed_hook = _forcefully_traced_method(
always_executed_hook,
is_kernel_method=is_base_klass_always_executed_hook,
)
always_executed_hook.__access_wrapped__ = True
setattr(klass, "always_executed_hook", always_executed_hook)
# server_init_hook
server_init_hook_orig = klass.server_init_hook
already_wrapped = hasattr(server_init_hook_orig, "__access_wrapped__")
if not already_wrapped:
is_base_klass_server_init_hook = (
server_init_hook_orig == BaseDevice.server_init_hook
)
@functools.wraps(server_init_hook_orig)
def server_init_hook(self):
worker = get_worker()
if isinstance(worker, AsyncioExecutor) and is_base_klass_server_init_hook:
method_helper = __async_method_helper
method_helper.__qualname__ = "BaseDevice.server_init_hook"
return worker.execute(method_helper, server_init_hook_orig, self)
else:
return worker.execute(server_init_hook_orig, self)
if _force_tracing:
server_init_hook = _forcefully_traced_method(
server_init_hook, is_kernel_method=is_base_klass_server_init_hook
)
server_init_hook.__access_wrapped__ = True
setattr(klass, "server_init_hook", server_init_hook)
class _DeviceClass(DeviceClass):
def __init__(self, name):
DeviceClass.__init__(self, name)
self.set_type(name)
if _force_tracing:
orig_dyn_attr = getattr(self, "dyn_attr")
setattr(
self,
"dyn_attr",
_forcefully_traced_method(orig_dyn_attr, is_kernel_method=True),
)
def dyn_attr(self, dev_list):
"""Invoked to create dynamic attributes for the given devices.
Default implementation calls
:meth:`TT.initialize_dynamic_attributes` for each device
:param dev_list: list of devices
:type dev_list: :class:`tango.DeviceImpl`"""
for dev in dev_list:
init_dyn_attrs = getattr(dev, "initialize_dynamic_attributes", None)
if init_dyn_attrs and callable(init_dyn_attrs):
try:
init_dyn_attrs()
except Exception as ex:
dev.warn_stream("Failed to initialize dynamic attributes")
dev.debug_stream("Details: " + traceback.format_exc())
raise Exception(repr(ex))
def __create_tango_deviceclass_klass(tango_device_klass, attrs=None):
klass_name = tango_device_klass.__name__
if not issubclass(tango_device_klass, (BaseDevice)):
msg = f"{klass_name} device must inherit from tango.server.Device"
raise Exception(msg)
if attrs is None:
attrs = tango_device_klass.__dict__
klass_annotations = {}
if hasattr(tango_device_klass, "__annotations__"):
klass_annotations = dict(tango_device_klass.__annotations__)
attr_list = {}
pipe_list = {}
class_property_list = {}
device_property_list = {}
cmd_list = {}
for attr_name, attr_obj in attrs.items():
if isinstance(attr_obj, attribute):
klass_attribute_name = attr_name
if attr_obj.attr_name is None:
attr_obj._set_name(attr_name)
else:
attr_name = attr_obj.attr_name
attr_list[attr_name] = attr_obj
if not attr_obj.forward:
__patch_attr_methods(tango_device_klass, attr_obj)
if klass_attribute_name in klass_annotations:
__get_attribute_type_from_hint(
attr_obj, type_hint=klass_annotations[klass_attribute_name]
)
else:
__get_attribute_type_from_hint(
attr_obj, device_klass=tango_device_klass
)
elif isinstance(attr_obj, pipe):
if attr_obj.pipe_name is None:
attr_obj._set_name(attr_name)
else:
attr_name = attr_obj.pipe_name
pipe_list[attr_name] = attr_obj
__patch_pipe_methods(tango_device_klass, attr_obj)
elif isinstance(attr_obj, device_property):
if attr_name in klass_annotations:
__get_property_type_from_hint(attr_obj, klass_annotations[attr_name])
attr_obj.name = attr_name
# if you modify the attr_obj order then you should
# take care of the code in get_device_properties()
device_property_list[attr_name] = [
attr_obj.dtype,
attr_obj.doc,
attr_obj.default_value,
attr_obj.mandatory,
]
elif isinstance(attr_obj, class_property):
if attr_name in klass_annotations:
__get_property_type_from_hint(attr_obj, klass_annotations[attr_name])
attr_obj.name = attr_name
class_property_list[attr_name] = [
attr_obj.dtype,
attr_obj.doc,
attr_obj.default_value,
]
elif inspect.isroutine(attr_obj):
if hasattr(attr_obj, "__tango_command__"):
cmd_name, cmd_info = attr_obj.__tango_command__
cmd_list[cmd_name] = cmd_info
if "Is allowed" in cmd_info[2]:
is_allowed_method = cmd_info[2]["Is allowed"]
else:
is_allowed_method = f"is_{cmd_name}_allowed"
is_allowed_method_green_mode = cmd_info[2]["Is allowed green_mode"]
if is_pure_str(is_allowed_method):
is_allowed_method = getattr(
tango_device_klass, is_allowed_method, None
)
if is_allowed_method is not None:
cmd_info[2]["Is allowed"] = __patch_is_command_allowed_method(
tango_device_klass,
is_allowed_method,
cmd_name,
is_allowed_method_green_mode,
)
__patch_standard_device_methods(tango_device_klass)
devclass_name = klass_name + "Class"
devclass_attrs = dict(
class_property_list=class_property_list,
device_property_list=device_property_list,
cmd_list=cmd_list,
attr_list=attr_list,
pipe_list=pipe_list,
)
return type(_DeviceClass)(devclass_name, (_DeviceClass,), devclass_attrs)
def _init_tango_device_klass(tango_device_klass, attrs=None, tango_class_name=None):
klass_name = tango_device_klass.__name__
tango_deviceclass_klass = __create_tango_deviceclass_klass(
tango_device_klass, attrs=attrs
)
if tango_class_name is None:
if hasattr(tango_device_klass, "TangoClassName"):
tango_class_name = tango_device_klass.TangoClassName
else:
tango_class_name = klass_name
tango_device_klass.TangoClassClass = tango_deviceclass_klass
tango_device_klass.TangoClassName = tango_class_name
tango_device_klass._api = API_VERSION
return tango_device_klass
def is_tango_object(arg):
"""Return tango data if the argument is a tango object,
False otherwise.
"""
classes = attribute, device_property, class_property, pipe
if isinstance(arg, classes):
return arg
try:
return arg.__tango_command__
except AttributeError:
return False
def inheritance_patch(attrs):
"""Patch tango objects before they are processed by the metaclass."""
for key, obj in attrs.items():
if isinstance(obj, attribute):
if getattr(obj, "attr_write", None) == AttrWriteType.READ_WRITE:
if not getattr(obj, "fset", None):
method_name = obj.write_method_name or "write_" + key
obj.fset = attrs.get(method_name)
class DeviceMeta(type(LatestDeviceImpl)):
"""
The :py:data:`metaclass` callable for :class:`Device`.
This implementation of DeviceMeta makes device inheritance possible.
"""
def __new__(metacls, name, bases, attrs):
# Attribute dictionary
dct = {}
# Filter object from bases
bases = tuple(base for base in bases if base is not object)
# Set tango objects as attributes
for base in reversed(bases):
for key, value in base.__dict__.items():
if is_tango_object(value):
dct[key] = value
# Inheritance patch
inheritance_patch(attrs)
# Update attribute dictionary
dct.update(attrs)
# Create device class
cls = type(LatestDeviceImpl).__new__(metacls, name, bases, dct)
# Initialize device class
_init_tango_device_klass(cls, dct)
cls.TangoClassName = name
# Return device class
return cls
class BaseDevice(LatestDeviceImpl):
"""
Base device class for the High level API.
It should not be used directly, since this class is not an
instance of MetaDevice. Use tango.server.Device instead.
"""
DEVICE_CLASS_DESCRIPTION: ClassVar[Union[str, None]] = None
"""Description of the device class (optional).
If not specified, the class docstring is used.
Available to clients via :meth:`tango.DeviceProxy.description`.
Use as a class variable.
:meta hide-value:
"""
DEVICE_CLASS_INITIAL_STATUS: ClassVar[str] = StatusNotSet
"""Initial status string for all instances of the device (optional).
Use as a class variable.
:meta hide-value:
"""
DEVICE_CLASS_INITIAL_STATE: ClassVar[DevState] = DevState.UNKNOWN
"""Initial state value for all instances of the device (optional).
Use as a class variable.
:meta hide-value:
"""
def __init__(self, cl, name):
self._tango_properties = {}
if self.DEVICE_CLASS_DESCRIPTION is not None:
dev_desc = self.DEVICE_CLASS_DESCRIPTION
elif self.__doc__:
dev_desc = self.__doc__
else:
dev_desc = "A TANGO device"
dev_state = self.DEVICE_CLASS_INITIAL_STATE
dev_status = self.DEVICE_CLASS_INITIAL_STATUS
LatestDeviceImpl.__init__(self, cl, name, dev_desc, dev_state, dev_status)
self._configure_device_telemetry(cl.get_name(), name)
self.init_device()
def init_device(self):
"""
Code to handle device initialisation.
This method is called automatically when the device starts,
but before it is available to clients (i.e., before it is "exported").
It also gets called if the device is re-initialised by a call to the
``Init`` command (after :meth:`~tango.server.Device.delete_device`).
Default implementation calls :meth:`get_device_properties`
If overwriting this method, it is important to call the super class
method first:
- For synchronous devices: ``super().init_device()``
- For asyncio green mode devices: ``await super().init_device()``
"""
self.get_device_properties()
def delete_device(self):
"""
Code to handle device clean-up.
This method is called automatically when the device is shut down gracefully.
It also gets called if the device is re-initialised by a call to the ``Init``
command (before a new call to :meth:`~tango.server.Device.init_device`).
If overwriting this method, it is important to call the super class
method last:
- For synchronous devices: ``super().delete_device()``
- For asyncio green mode devices: ``await super().delete_device()``
"""
pass
def read_attr_hardware(self, attr_list):
return LatestDeviceImpl.read_attr_hardware(self, attr_list)
def dev_state(self):
return LatestDeviceImpl.dev_state(self)
def dev_status(self):
return LatestDeviceImpl.dev_status(self)
def get_device_properties(self, ds_class=None):
if ds_class is None:
try:
# Call this method in a try/except in case this is called
# during the DS shutdown sequence
ds_class = self.get_device_class()
except Exception:
return
try:
pu = self.prop_util = ds_class.prop_util
self.device_property_list = copy.deepcopy(ds_class.device_property_list)
class_prop = ds_class.class_property_list
pu.get_device_properties(self, class_prop, self.device_property_list)
for prop_name in class_prop:
value = pu.get_property_values(prop_name, class_prop)
self._tango_properties[prop_name] = value
for prop_name in self.device_property_list:
value = self.prop_util.get_property_values(
prop_name, self.device_property_list
)
self._tango_properties[prop_name] = value
properties = self.device_property_list[prop_name]
mandatory = properties[3]
if mandatory and value is None:
msg = f"Device property {prop_name} is mandatory "
raise Exception(msg)
except DevFailed as df:
print(80 * "-")
print(df)
raise df
def always_executed_hook(self):
"""
Tango always_executed_hook. Default implementation does
nothing
"""
pass
def server_init_hook(self):
"""
Tango server_init_hook. Called once the device server admin device
(DServer) is exported.
Default implementation does nothing.
"""
pass
def initialize_dynamic_attributes(self):
"""
Method executed at initializion phase to create dynamic
attributes. Default implementation does nothing. Overwrite
when necessary.
.. note::
This method is only called once when the device server starts,
after init_device(), but before the device is marked as exported.
If the Init command is executed on the device, the
initialize_dynamic_attributes() method will not be called again.
"""
pass
@classmethod
def run_server(cls, args=None, **kwargs):
"""Run the class as a device server.
It is based on the tango.server.run method.
The difference is that the device class
and server name are automatically given.
Args:
args (iterable): args as given in the tango.server.run method
without the server name. If None, the sys.argv
list is used
kwargs: the other keywords argument are as given
in the tango.server.run method.
"""
if args is None:
args = sys.argv[1:]
args = [cls.__name__] + list(args)
green_mode = getattr(cls, "green_mode", None)
kwargs.setdefault("green_mode", green_mode)
return run((cls,), args, **kwargs)
def _configure_device_telemetry(self, class_name, device_name):
device_tracer_provider = self.create_telemetry_tracer_provider(
class_name, device_name
)
device_tracer = self.create_telemetry_tracer(device_tracer_provider)
self._tango_telemetry_tracer = device_tracer
def create_telemetry_tracer_provider(
self, class_name, device_name
) -> "opentelemetry.trace.TracerProvider": # noqa: F821
"""Factory method returning a TracerProvider for telemetry.
The default implementation can be overridden.
.. versionadded:: 10.0.0
"""
tracer_provider_factory = get_telemetry_tracer_provider_factory()
return tracer_provider_factory(class_name, device_name)
def create_telemetry_tracer(
self, device_tracer_provider
) -> "opentelemetry.trace.Tracer": # noqa: F821
"""Factory method returning a Tracer for telemetry.
The default implementation can be overridden.
.. versionadded:: 10.0.0
"""
return _create_device_telemetry_tracer(device_tracer_provider)
def get_telemetry_tracer(self) -> "opentelemetry.trace.Tracer": # noqa: F821
"""Returns device telemetry tracer, or None if telemetry disabled.
.. versionadded:: 10.0.0
"""
return self._tango_telemetry_tracer
[docs]
class attribute(AttrData):
'''
Declares a new tango attribute in a :class:`Device`. To be used
like the python native :obj:`property` function. For example, to
declare a scalar, `tango.DevDouble`, read-only attribute called
*voltage* in a *PowerSupply* :class:`Device` do::
class PowerSupply(Device):
voltage = attribute()
def read_voltage(self):
return 999.999
The same can be achieved with::
class PowerSupply(Device):
@attribute
def voltage(self):
return 999.999
It receives multiple keyword arguments.
===================== ================================ ======================================= =======================================================================================
parameter type default value description
===================== ================================ ======================================= =======================================================================================
name :obj:`str` class member name alternative attribute name
dtype :obj:`object` :obj:`~tango.CmdArgType.DevDouble` data type (see :ref:`Data type equivalence <pytango-hlapi-datatypes>`)
dformat :obj:`~tango.AttrDataFormat` :obj:`~tango.AttrDataFormat.SCALAR` data format
max_dim_x :obj:`int` 1 maximum size for x dimension (ignored for SCALAR format)
max_dim_y :obj:`int` 0 maximum size for y dimension (ignored for SCALAR and SPECTRUM formats)
display_level :obj:`~tango.DispLevel` :obj:`~tango.DisLevel.OPERATOR` display level
polling_period :obj:`int` -1 polling period
memorized :obj:`bool` False attribute must be memorized (only applicable to scalar, writeable attributes). If True, the latest write value, i.e., set point, is stored in the Tango database - see also ``hw_memorized`` parameter. Corresponds to low-level :meth:`tango.Attr.set_memorized`.
hw_memorized :obj:`bool` False memorized value will be restored by automatically calling attribute write method at startup, and after each ``Init`` command call (only applies if ``memorized`` is set to True). Corresponds to low-level :meth:`tango.Attr.set_memorized_init`
access :obj:`~tango.AttrWriteType` :obj:`~tango.AttrWriteType.READ` read only/ read write / write only access
fget (or fread) :obj:`str` or :obj:`callable` ``read_<attr_name>`` read method name or method object
fset (or fwrite) :obj:`str` or :obj:`callable` ``write_<attr_name>`` write method name or method object
fisallowed :obj:`str` or :obj:`callable` ``is_<attr_name>_allowed`` is allowed method name or method object
label :obj:`str` ``<attr_name>`` attribute label
enum_labels sequence None the list of enumeration labels (enum data type)
doc (or description) :obj:`str` ``""`` attribute description
unit :obj:`str` ``""`` physical units the attribute value is in
standard_unit :obj:`str` ``""`` physical standard unit
display_unit :obj:`str` ``""`` physical display unit (hint for clients)
format :obj:`str` ``"6.2f"`` attribute representation format
min_value :obj:`str` None minimum allowed value
max_value :obj:`str` None maximum allowed value
min_alarm :obj:`str` None minimum value to trigger attribute alarm
max_alarm :obj:`str` None maximum value to trigger attribute alarm
min_warning :obj:`str` None minimum value to trigger attribute warning
max_warning :obj:`str` None maximum value to trigger attribute warning
delta_val :obj:`str` None
delta_t :obj:`str` None
abs_change :obj:`str` None minimum value change between events that causes event filter to send the event
rel_change :obj:`str` None minimum relative change between events that causes event filter to send the event (%)
period :obj:`str` None
archive_abs_change :obj:`str` None
archive_rel_change :obj:`str` None
archive_period :obj:`str` None
green_mode :obj:`bool` True Default green mode for read/write/isallowed functions. If True: run with green mode executor, if False: run directly
read_green_mode :obj:`bool` ``green_mode`` value green mode for read function. If True: run with green mode executor, if False: run directly
write_green_mode :obj:`bool` ``green_mode`` value green mode for write function. If True: run with green mode executor, if False: run directly
isallowed_green_mode :obj:`bool` ``green_mode`` value green mode for is allowed function. If True: run with green mode executor, if False: run directly
forwarded :obj:`bool` False the attribute should be forwarded if True
===================== ================================ ======================================= =======================================================================================
.. note::
avoid using *dformat* parameter. If you need a SPECTRUM
attribute of say, boolean type, use instead ``dtype=(bool,)``.
Example of a integer writable attribute with a customized label,
unit and description::
class PowerSupply(Device):
current = attribute(label="Current", unit="mA", dtype=int,
access=AttrWriteType.READ_WRITE,
doc="the power supply current")
def init_device(self):
Device.init_device(self)
self._current = -1
def read_current(self):
return self._current
def write_current(self, current):
self._current = current
The same, but using attribute as a decorator::
class PowerSupply(Device):
def init_device(self):
Device.init_device(self)
self._current = -1
@attribute(label="Current", unit="mA", dtype=int)
def current(self):
"""the power supply current"""
return 999.999
@current.write
def current(self, current):
self._current = current
In this second format, defining the `write` implicitly sets the attribute
access to READ_WRITE.
.. versionadded:: 8.1.7
added green_mode, read_green_mode and write_green_mode options
'''
def __init__(self, fget=None, **kwargs):
self._kwargs = dict(kwargs)
self.name = kwargs.pop("name", None)
class_name = kwargs.pop("class_name", None)
forward = kwargs.get("forwarded", False)
if forward:
kwarg_copy = dict(kwargs)
for k in ["name", "label", "forwarded"]:
if k in kwarg_copy:
del kwarg_copy[k]
if len(kwarg_copy):
raise TypeError(
"Forwarded attributes only support 'label' and 'name' arguments"
)
else:
green_mode = kwargs.pop("green_mode", True)
self.read_green_mode = kwargs.pop("read_green_mode", green_mode)
self.write_green_mode = kwargs.pop("write_green_mode", green_mode)
self.isallowed_green_mode = kwargs.pop("isallowed_green_mode", green_mode)
if not fget:
fget = kwargs.pop("fread", None)
if fget:
if inspect.isroutine(fget):
self.fget = fget
if "doc" not in kwargs and "description" not in kwargs:
if fget.__doc__ is not None:
kwargs["doc"] = fget.__doc__
kwargs["fget"] = fget
fset = kwargs.pop("fwrite", kwargs.pop("fset", None))
if fset:
if inspect.isroutine(fset):
self.fset = fset
kwargs["fset"] = fset
fisallowed = kwargs.pop("fisallowed", None)
if fisallowed:
if inspect.isroutine(fisallowed):
self.fisallowed = fisallowed
kwargs["fisallowed"] = fisallowed
super().__init__(self.name, class_name)
self.__doc__ = kwargs.get("doc", kwargs.get("description", "TANGO attribute"))
if "dtype" in kwargs:
dtype = kwargs["dtype"]
dformat = kwargs.get("dformat")
dtype, dformat, enum_labels = get_attribute_type_format(
dtype, dformat, kwargs.get("enum_labels")
)
kwargs["dtype"], kwargs["dformat"] = dtype, dformat
if enum_labels:
kwargs["enum_labels"] = enum_labels
self.build_from_dict(kwargs)
def get_attribute(self, obj):
return obj.get_device_attr().get_attr_by_name(self.attr_name)
# --------------------
# descriptor interface
# --------------------
def __get__(self, obj, objtype):
if obj is None:
return self
return self.get_attribute(obj)
def __set__(self, obj, value):
attr = self.get_attribute(obj)
set_complex_value(attr, value)
def __delete__(self, obj):
obj.remove_attribute(self.attr_name)
def setter(self, fset):
"""
To be used as a decorator, ``@attribute.setter``. Defines the decorated method
as the write attribute method to be called when a client writes
the attribute. Equivalent to ``@attribute.write``.
"""
self.fset = fset
if self.attr_write == AttrWriteType.READ:
if getattr(self, "fget", None):
self.attr_write = AttrWriteType.READ_WRITE
else:
self.attr_write = AttrWriteType.WRITE
return self
def write(self, fset):
"""
To be used as a decorator, ``@attribute.write``. Defines the decorated method
as the write attribute method to be called when a client writes
the attribute. Equivalent to ``@attribute.setter``.
"""
return self.setter(fset)
def getter(self, fget):
"""
To be used as a decorator, ``@attribute.getter``. Defines the decorated method
as the read attribute method to be called when a client reads
the attribute. Equivalent to ``@attribute.read``.
"""
self.fget = fget
if self.attr_write == AttrWriteType.WRITE:
if getattr(self, "fset", None):
self.attr_write = AttrWriteType.READ_WRITE
else:
self.attr_write = AttrWriteType.READ
return self
def read(self, fget):
"""
To be used as a decorator, ``@attribute.read``. Defines the decorated method
as the read attribute method to be called when a client reads
the attribute. Equivalent to ``@attribute.getter``.
"""
return self.getter(fget)
def is_allowed(self, fisallowed):
"""
To be used as a decorator, ``@attribute.is_allowed``. Defines the decorated
method as the is allowed attribute method
"""
self.fisallowed = fisallowed
return self
def __call__(self, fget):
return type(self)(fget=fget, **self._kwargs)
[docs]
@deprecated("pipe - scheduled for removal in PyTango 10.1.0")
class pipe(PipeData):
'''
Declares a new tango pipe in a :class:`Device`. To be used
like the python native :obj:`property` function.
Checkout the :ref:`pipe data types <pytango-pipe-data-types>`
to see what you should return on a pipe read request and what
to expect as argument on a pipe write request.
For example, to declare a read-only pipe called *ROI*
(for Region Of Interest), in a *Detector* :class:`Device` do::
class Detector(Device):
ROI = pipe()
def read_ROI(self):
return ('ROI', ({'name': 'x', 'value': 0},
{'name': 'y', 'value': 10},
{'name': 'width', 'value': 100},
{'name': 'height', 'value': 200}))
The same can be achieved with (also showing that a dict can be used
to pass blob data)::
class Detector(Device):
@pipe
def ROI(self):
return 'ROI', dict(x=0, y=10, width=100, height=200)
It receives multiple keyword arguments.
===================== ================================ ======================================= =======================================================================================
parameter type default value description
===================== ================================ ======================================= =======================================================================================
name :obj:`str` class member name alternative pipe name
display_level :obj:`~tango.DispLevel` :obj:`~tango.DisLevel.OPERATOR` display level
access :obj:`~tango.PipeWriteType` :obj:`~tango.PipeWriteType.READ` read only/ read write access
fget (or fread) :obj:`str` or :obj:`callable` ``read_<pipe_name>`` read method name or method object
fset (or fwrite) :obj:`str` or :obj:`callable` ``write_<pipe_name>`` write method name or method object
fisallowed :obj:`str` or :obj:`callable` ``is_<pipe_name>_allowed`` is allowed method name or method object
label :obj:`str` ``<pipe_name>`` pipe label
doc (or description) :obj:`str` ``""`` pipe description
green_mode :obj:`bool` True Default green mode for read/write/isallowed functions. If True: run with green mode executor, if False: run directly
read_green_mode :obj:`bool` ``green_mode`` value green mode for read function. If True: run with green mode executor, if False: run directly
write_green_mode :obj:`bool` ``green_mode`` value green mode for write function. If True: run with green mode executor, if False: run directly
isallowed_green_mode :obj:`bool` ``green_mode`` value green mode for is allowed function. If True: run with green mode executor, if False: run directly
===================== ================================ ======================================= =======================================================================================
The same example with a read-write ROI, a customized label and description::
class Detector(Device):
ROI = pipe(label='Region Of Interest', doc='The active region of interest',
access=PipeWriteType.PIPE_READ_WRITE)
def init_device(self):
Device.init_device(self)
self.__roi = 'ROI', dict(x=0, y=10, width=100, height=200)
def read_ROI(self):
return self.__roi
def write_ROI(self, roi):
self.__roi = roi
The same, but using pipe as a decorator::
class Detector(Device):
def init_device(self):
Device.init_device(self)
self.__roi = 'ROI', dict(x=0, y=10, width=100, height=200)
@pipe(label="Region Of Interest")
def ROI(self):
"""The active region of interest"""
return self.__roi
@ROI.write
def ROI(self, roi):
self.__roi = roi
In this second format, defining the `write` / `setter` implicitly sets
the pipe access to READ_WRITE.
.. versionadded:: 9.2.0
.. versionadded:: 9.4.0
added isallowed_green_mode option
.. deprecated:: 10.0.1
Pipes scheduled for removal from PyTango in version 10.1.0
'''
def __init__(self, fget=None, **kwargs):
self._kwargs = dict(kwargs)
name = kwargs.pop("name", None)
class_name = kwargs.pop("class_name", None)
green_mode = kwargs.pop("green_mode", True)
self.read_green_mode = kwargs.pop("read_green_mode", green_mode)
self.write_green_mode = kwargs.pop("write_green_mode", green_mode)
self.isallowed_green_mode = kwargs.pop("isallowed_green_mode", green_mode)
if not fget:
fget = kwargs.pop("fread", None)
if fget:
if inspect.isroutine(fget):
self.fget = fget
if "doc" not in kwargs and "description" not in kwargs:
if fget.__doc__ is not None:
kwargs["doc"] = fget.__doc__
kwargs["fget"] = fget
fset = kwargs.pop("fwrite", kwargs.pop("fset", None))
if fset:
if inspect.isroutine(fset):
self.fset = fset
kwargs["fset"] = fset
fisallowed = kwargs.pop("fisallowed", None)
if fisallowed:
if inspect.isroutine(fisallowed):
self.fisallowed = fisallowed
kwargs["fisallowed"] = fisallowed
super().__init__(name, class_name)
self.__doc__ = kwargs.get("doc", kwargs.get("description", "TANGO pipe"))
self.build_from_dict(kwargs)
def get_pipe(self, obj):
dclass = obj.get_device_class()
return dclass.get_pipe_by_name(self.pipe_name)
# --------------------
# descriptor interface
# --------------------
def __get__(self, obj, objtype):
if obj is None:
return self
return self.get_attribute(obj)
def __set__(self, obj, value):
attr = self.get_attribute(obj)
set_complex_value(attr, value)
def setter(self, fset):
"""
To be used as a decorator. Will define the decorated method
as a write pipe method to be called when client writes to the pipe
"""
self.fset = fset
self.pipe_write = PipeWriteType.PIPE_READ_WRITE
return self
def write(self, fset):
"""
To be used as a decorator. Will define the decorated method
as a write pipe method to be called when client writes to the pipe
"""
return self.setter(fset)
def __call__(self, fget):
return type(self)(fget=fget, **self._kwargs)
def __build_command_doc(f, name, dtype_in, doc_in, dtype_out, doc_out):
doc = f"'{name}' TANGO command"
if dtype_in is not None:
arg_spec = getfullargspec(f)
if len(arg_spec.args) > 1:
# arg[0] should be self and arg[1] the command argument
param_name = arg_spec.args[1]
else:
param_name = "arg"
dtype_in_str = str(dtype_in)
if not isinstance(dtype_in, str):
try:
dtype_in_str = dtype_in.__name__
except Exception:
pass
msg = doc_in or "(not documented)"
doc += f"\n\n:param {param_name}: {msg}\n:type {param_name}: {dtype_in_str}"
if dtype_out is not None:
dtype_out_str = str(dtype_out)
if not isinstance(dtype_out, str):
try:
dtype_out_str = dtype_out.__name__
except Exception:
pass
msg = doc_out or "(not documented)"
doc += f"\n\n:return: {msg}\n:rtype: {dtype_out_str}"
return doc
[docs]
def command(
f=None,
dtype_in=None,
dformat_in=None,
doc_in="",
dtype_out=None,
dformat_out=None,
doc_out="",
display_level=None,
polling_period=None,
green_mode=True,
fisallowed=None,
cmd_green_mode=None,
isallowed_green_mode=None,
):
"""
Declares a new tango command in a :class:`Device`.
To be used like a decorator in the methods you want to declare as
tango commands. The following example declares commands:
* `void TurnOn(void)`
* `void Ramp(DevDouble current)`
* `DevBool Pressurize(DevDouble pressure)`
::
class PowerSupply(Device):
@command
def TurnOn(self):
self.info_stream('Turning on the power supply')
@command(dtype_in=float)
def Ramp(self, current):
self.info_stream('Ramping on %f...' % current)
@command(dtype_in=float, doc_in='the pressure to be set',
dtype_out=bool, doc_out='True if it worked, False otherwise')
def Pressurize(self, pressure):
self.info_stream('Pressurizing to %f...' % pressure)
return True
.. note::
avoid using *dformat* parameter. If you need a SPECTRUM
attribute of say, boolean type, use instead ``dtype=(bool,)``.
:param dtype_in:
a :ref:`data type <pytango-hlapi-datatypes>` describing the
type of parameter. Default is None meaning no parameter.
:param dformat_in: parameter data format. Default is None.
:type dformat_in: AttrDataFormat
:param doc_in: parameter documentation
:type doc_in: str
:param dtype_out:
a :ref:`data type <pytango-hlapi-datatypes>` describing the
type of return value. Default is None meaning no return value.
:param dformat_out: return value data format. Default is None.
:type dformat_out: AttrDataFormat
:param doc_out: return value documentation
:type doc_out: str
:param display_level: display level for the command (optional)
:type display_level: DispLevel
:param polling_period: polling period in milliseconds (optional)
:type polling_period: int
:param green_mode: DEPRECATED: green mode for command method. If True: run with green mode executor, if False: run directly.
See the green_mode parameter deprecation note below for more details.
:type green_mode: bool
:param fisallowed: is allowed method for command
:type fisallowed: str or callable
:param cmd_green_mode: green mode for command method. If True: run with green mode executor, if False: run directly
See the green_mode parameter deprecation note below for more details.
:type cmd_green_mode: bool
:param isallowed_green_mode: green mode for isallowed method. If True: run with green mode executor, if False: run directly
See the green_mode parameter deprecation note below for more details.
:type isallowed_green_mode: bool
.. versionadded:: 8.1.7
added green_mode option
.. versionadded:: 9.2.0
added display_level and polling_period optional argument
.. versionadded:: 9.4.0
added fisallowed option
.. versionadded:: 10.0.0
added cmd_green_mode and isallowed_green_mode options
.. versionchanged:: 10.0.0
the way that the green_mode parameter is interpreted was changed to be consistent with the same parameter for attributes.
Now it expects bool, which indicates, if methods should be run with executor (green_mode=True, default) or bypass it (green_mode=False)
Before it was either green_mode=None - use executor or green_mode=GreenMode.Synchronous - bypass it.
However, due python by default casts GreenMode.Synchronous (which is int value 0) to False bool,
old code is automatically backward compatible.
.. deprecated:: 10.0.0
green_mode parameter is deprecated and may be removed in future. Use cmd_green_mode and isallowed_green_mode parameters instead.
The new parameters match how attributes and pipes are defined, offer more flexibility, and are clearer.
If you use both old green_mode, and new isallowed_green_mode and cmd_green_mode - the new ones take priority.
"""
if f is None:
return functools.partial(
command,
dtype_in=dtype_in,
dformat_in=dformat_in,
doc_in=doc_in,
dtype_out=dtype_out,
dformat_out=dformat_out,
doc_out=doc_out,
display_level=display_level,
polling_period=polling_period,
green_mode=green_mode,
fisallowed=fisallowed,
cmd_green_mode=cmd_green_mode,
isallowed_green_mode=isallowed_green_mode,
)
name = f.__name__
annotations = getfullargspec(f).annotations
args = getfullargspec(f).args
if dtype_out is None and "return" in annotations:
dtype_out, dformat_out, _, _ = parse_type_hint(
annotations["return"], caller="command"
)
annotations.pop("return", None)
if dtype_in is None and len(args) > 1 and len(annotations):
dtype_in, dformat_in, _, _ = parse_type_hint(
list(annotations.values())[-1], caller="command"
)
dtype_in, format_in, _ = get_attribute_type_format(dtype_in, dformat_in, None)
dtype_out, format_out, _ = get_attribute_type_format(dtype_out, dformat_out, None)
din = [from_typeformat_to_type(dtype_in, format_in), doc_in]
dout = [from_typeformat_to_type(dtype_out, format_out), doc_out]
config_dict = {}
if display_level is not None:
config_dict["Display level"] = display_level
if polling_period is not None:
config_dict["Polling period"] = polling_period
if fisallowed is not None:
config_dict["Is allowed"] = fisallowed
config_dict["Is allowed green_mode"] = (
isallowed_green_mode if isallowed_green_mode is not None else green_mode
)
cmd_green_mode = cmd_green_mode if cmd_green_mode is not None else green_mode
command_method = __get_wrapped_command_method(f, cmd_green_mode)
command_method.__tango_command__ = name, [din, dout, config_dict]
# try to create a minimalistic __doc__
if command_method.__doc__ is None:
try:
command_method.__doc__ = __build_command_doc(
f, name, dtype_in, doc_in, dtype_out, doc_out
)
except Exception:
command_method.__doc__ = "TANGO command"
return command_method
def __get_wrapped_command_method(cmd_method, cmd_green_mode):
already_wrapped = hasattr(cmd_method, "__access_wrapped__")
if already_wrapped:
return cmd_method
if cmd_green_mode:
@functools.wraps(cmd_method)
def wrapped_command_method(self, *args, **kwargs):
return get_worker().execute(cmd_method, self, *args, **kwargs)
else:
wrapped_command_method = cmd_method
if _force_tracing:
wrapped_command_method = _forcefully_traced_method(wrapped_command_method)
if wrapped_command_method is not cmd_method:
wrapped_command_method.__access_wrapped__ = True
return wrapped_command_method
class _BaseProperty:
def __init__(self, dtype=None, doc="", default_value=None, update_db=False):
self.name = None
if dtype:
dtype = from_typeformat_to_type(*get_tango_type_format(dtype))
self.dtype = dtype
self.doc = doc
self.default_value = default_value
self.update_db = update_db
self.__doc__ = doc or "TANGO property"
def __get__(self, obj, objtype):
if obj is None:
return self
return obj._tango_properties.get(self.name)
def __set__(self, obj, value):
obj._tango_properties[self.name] = value
if self.update_db:
import tango
db = tango.Util.instance().get_database()
db.put_device_property(obj.get_name(), {self.name: value})
def __delete__(self, obj):
del obj._tango_properties[self.name]
[docs]
class device_property(_BaseProperty):
"""
Declares a new tango device property in a :class:`Device`. To be
used like the python native :obj:`property` function. For example,
to declare a scalar, `tango.DevString`, device property called
*host* in a *PowerSupply* :class:`Device` do::
from tango.server import Device, DeviceMeta
from tango.server import device_property
class PowerSupply(Device):
host = device_property(dtype=str)
port = device_property(dtype=int, mandatory=True)
:param dtype: Data type (see :ref:`pytango-data-types`)
:param doc: property documentation (optional)
:param mandatory (optional: default is False)
:param default_value: default value for the property (optional)
:param update_db: tells if set value should write the value to database.
[default: False]
:type update_db: bool
.. versionadded:: 8.1.7
added update_db option
"""
def __init__(
self, dtype=None, doc="", mandatory=False, default_value=None, update_db=False
):
super().__init__(dtype, doc, default_value, update_db)
self.mandatory = mandatory
if mandatory and default_value is not None:
msg = (
"Invalid arguments: 'mandatory' is True, so 'default_value' must be None. "
"A mandatory device property value must be defined in the Tango Database "
"so it cannot have a default."
)
raise ValueError(msg)
[docs]
class class_property(_BaseProperty):
"""
Declares a new tango class property in a :class:`Device`. To be
used like the python native :obj:`property` function. For example,
to declare a scalar, `tango.DevString`, class property called
*port* in a *PowerSupply* :class:`Device` do::
from tango.server import Device, DeviceMeta
from tango.server import class_property
class PowerSupply(Device):
port = class_property(dtype=int, default_value=9788)
:param dtype: Data type (see :ref:`pytango-data-types`)
:param doc: property documentation (optional)
:param default_value: default value for the property (optional)
:param update_db: tells if set value should write the value to database.
[default: False]
:type update_db: bool
.. versionadded:: 8.1.7
added update_db option
"""
pass
def __to_callback(callback, cb_type, green_mode):
if callback is None:
return lambda: None
err_msg = (
f"{cb_type} must be a callable or " "sequence <callable [, args, [, kwargs]]>"
)
if callable(callback):
f = callback
elif is_non_str_seq(callback):
length = len(callback)
if length < 1 or length > 3:
raise TypeError(err_msg)
callback = callback[0]
if not callable(callback):
raise TypeError(err_msg)
args, kwargs = [], {}
if length > 1:
args = callback[1]
if length > 2:
kwargs = callback[2]
f = functools.partial(callback, *args, **kwargs)
else:
raise TypeError(err_msg)
if green_mode == GreenMode.Asyncio and not _is_coroutine_function(f):
@functools.wraps(f)
async def async_callback():
return f()
return async_callback
else:
return f
def _to_classes(classes):
uclasses = []
if is_seq(classes):
for klass_info in classes:
if is_seq(klass_info):
if len(klass_info) == 2:
klass_klass, klass = klass_info
klass_name = klass.__name__
else:
klass_klass, klass, klass_name = klass_info
else:
if not hasattr(klass_info, "_api") or klass_info._api < 2:
raise Exception(
"When giving a single class, it must "
"implement HLAPI (see tango.server)"
)
klass_klass = klass_info.TangoClassClass
klass_name = klass_info.TangoClassName
klass = klass_info
uclasses.append((klass_klass, klass, klass_name))
else:
for klass_name, klass_info in classes.items():
if is_seq(klass_info):
if len(klass_info) == 2:
klass_klass, klass = klass_info
else:
klass_klass, klass, klass_name = klass_info
else:
if not hasattr(klass_info, "_api") or klass_info._api < 2:
raise Exception(
"When giving a single class, it must "
"implement HLAPI (see tango.server)"
)
klass_klass = klass_info.TangoClassClass
klass_name = klass_info.TangoClassName
klass = klass_info
uclasses.append((klass_klass, klass, klass_name))
return uclasses
def _add_classes(util, classes):
for class_info in _to_classes(classes):
util.add_class(*class_info)
def _get_class_green_mode(classes, green_mode):
if green_mode is not None:
default_green_mode = green_mode
else:
default_green_mode = get_green_mode()
green_modes = set()
for _, klass, _ in _to_classes(classes):
device_green_mode = getattr(klass, "green_mode", None)
if device_green_mode is None:
device_green_mode = default_green_mode
green_modes.add(device_green_mode)
if len(green_modes) > 1:
raise ValueError(
f"Devices with mixed green modes cannot be run in the same device "
f"server process. Modes: {green_modes}. Classes: {classes}."
)
elif len(green_modes) == 0:
raise ValueError(
"No device classes specified - cannot run device server "
"process with no classes."
)
unanimous_green_mode = green_modes.pop()
return unanimous_green_mode
def __server_run(
classes,
args=None,
msg_stream=sys.stdout,
util=None,
event_loop=None,
pre_init_callback=None,
post_init_callback=None,
green_mode=None,
):
green_mode = _get_class_green_mode(classes, green_mode)
write = msg_stream.write if msg_stream else lambda msg: None
if args is None:
args = sys.argv
pre_init_callback = __to_callback(
pre_init_callback, "pre_init_callback", green_mode
)
post_init_callback = __to_callback(
post_init_callback, "post_init_callback", green_mode
)
if util is None:
util = Util.init(args)
if green_mode in (GreenMode.Gevent, GreenMode.Asyncio):
util.set_serial_model(SerialModel.NO_SYNC)
worker = get_executor(green_mode)
set_worker(worker)
if event_loop is not None:
event_loop = functools.partial(worker.execute, event_loop)
util.server_set_event_loop(event_loop)
log = logging.getLogger("tango")
def tango_loop():
log.debug("server loop started")
worker.execute(pre_init_callback)
_add_classes(util, classes)
util.server_init()
worker.execute(post_init_callback)
write("Ready to accept request\n")
util.server_run()
log.debug("server loop exit")
worker.run(tango_loop, wait=True)
return util
[docs]
def run(
classes,
args=None,
msg_stream=sys.stdout,
verbose=False,
util=None,
event_loop=None,
pre_init_callback=None,
post_init_callback=None,
green_mode=None,
raises=False,
err_stream=sys.stderr,
):
"""
Provides a simple way to run a tango server. It handles exceptions
by writting a message to the msg_stream.
:Examples:
Example 1: registering and running a PowerSupply inheriting from
:class:`~tango.server.Device`::
from tango.server import Device, run
class PowerSupply(Device):
pass
run((PowerSupply,))
Example 2: registering and running a MyServer defined by tango
classes `MyServerClass` and `MyServer`::
from tango import Device_4Impl, DeviceClass
from tango.server import run
class MyServer(Device_4Impl):
pass
class MyServerClass(DeviceClass):
pass
run({'MyServer': (MyServerClass, MyServer)})
Example 3: registering and running a MyServer defined by tango
classes `MyServerClass` and `MyServer`::
from tango import Device_4Impl, DeviceClass
from tango.server import Device, run
class PowerSupply(Device):
pass
class MyServer(Device_4Impl):
pass
class MyServerClass(DeviceClass):
pass
run([PowerSupply, [MyServerClass, MyServer]])
# or: run({'MyServer': (MyServerClass, MyServer)})
.. note::
the order of registration of tango classes defines the order
tango uses to initialize the corresponding devices.
if using a dictionary as argument for classes be aware that the
order of registration becomes arbitrary. If you need a
predefined order use a sequence or an OrderedDict.
:param classes:
Defines for which Tango Device Classes the server will run.
If :class:`~dict` is provided, it's key is the tango class name
and value is either:
| :class:`~tango.server.Device`
| two element sequence: :class:`~tango.DeviceClass`, :class:`~tango.DeviceImpl`
| three element sequence: :class:`~tango.DeviceClass`, :class:`~tango.DeviceImpl`, tango class name :class:`~str`
:type classes: Sequence[tango.server.Device] | dict
:param args:
list of command line arguments [default: None, meaning use
sys.argv]
:type args: list
:param msg_stream:
stream where to put messages [default: sys.stdout]
:param util:
PyTango Util object [default: None meaning create a Util
instance]
:type util: :class:`~tango.Util`
:param event_loop: event_loop callable
:type event_loop: callable
:param pre_init_callback:
an optional callback that is executed between the calls
Util.init and Util.server_init
The optional `pre_init_callback` can be a callable (without
arguments) or a tuple where the first element is the callable,
the second is a list of arguments (optional) and the third is a
dictionary of keyword arguments (also optional).
:type pre_init_callback:
callable or tuple
:param post_init_callback:
an optional callback that is executed between the calls
Util.server_init and Util.server_run
The optional `post_init_callback` can be a callable (without
arguments) or a tuple where the first element is the callable,
the second is a list of arguments (optional) and the third is a
dictionary of keyword arguments (also optional).
:type post_init_callback:
callable or tuple
:param raises:
Disable error handling and propagate exceptions from the server
:type raises: bool
:param err_stream:
stream where to put catched exceptions [default: sys.stderr]
:return: The Util singleton object
:rtype: :class:`~tango.Util`
.. versionadded:: 8.1.2
.. versionchanged:: 8.1.4
when classes argument is a sequence, the items can also be
a sequence <TangoClass, TangoClassClass>[, tango class name]
.. versionchanged:: 9.2.2
`raises` argument has been added
.. versionchanged:: 9.5.0
`pre_init_callback` argument has been added
.. versionchanged:: 10.0.0
`err_stream` argument has been added
"""
server_run = functools.partial(
__server_run,
classes,
args=args,
msg_stream=msg_stream,
util=util,
event_loop=event_loop,
pre_init_callback=pre_init_callback,
post_init_callback=post_init_callback,
green_mode=green_mode,
)
# Run the server without error handling
if raises:
return server_run()
# Run the server with error handling
write = err_stream.write if err_stream else lambda msg: None
try:
return server_run()
except KeyboardInterrupt:
write("Exiting: Keyboard interrupt\n")
except DevFailed as df:
write("Exiting: Server exited with tango.DevFailed:\n" + str(df) + "\n")
if verbose:
write(traceback.format_exc())
except Exception as e:
write("Exiting: Server exited with unforseen exception:\n" + str(e) + "\n")
if verbose:
write(traceback.format_exc())
write("\nExited\n")
[docs]
def server_run(
classes,
args=None,
msg_stream=sys.stdout,
verbose=False,
util=None,
event_loop=None,
pre_init_callback=None,
post_init_callback=None,
green_mode=None,
err_stream=sys.stderr,
):
"""
Since PyTango 8.1.2 it is just an alias to
:func:`~tango.server.run`. Use :func:`~tango.server.run`
instead.
.. versionadded:: 8.0.0
.. versionchanged:: 8.0.3
Added `util` keyword parameter.
Returns util object
.. versionchanged:: 8.1.1
Changed default msg_stream from *stderr* to *stdout*
Added `event_loop` keyword parameter.
Returns util object
.. versionchanged:: 8.1.2
Added `post_init_callback` keyword parameter
.. deprecated:: 8.1.2
Use :func:`~tango.server.run` instead.
.. versionchanged:: 9.5.0
`pre_init_callback` argument has been added
.. versionchanged:: 10.0.0
`err_stream` argument has been added
"""
return run(
classes,
args=args,
msg_stream=msg_stream,
verbose=verbose,
util=util,
event_loop=event_loop,
pre_init_callback=pre_init_callback,
post_init_callback=post_init_callback,
green_mode=green_mode,
err_stream=sys.stderr,
)
[docs]
class Device(BaseDevice, metaclass=DeviceMeta):
"""
Device class for the high-level API.
All device-specific classes should inherit from this class.
"""
# Avoid circular imports
from tango.tango_object import Server # noqa: E402