"""Specialization for CanOpen functionality."""
# pylint: disable=invalid-name,unused-argument,too-many-public-methods
# pylint: disable=attribute-defined-outside-init
# mypy: disable_error_code="attr-defined"
import datetime
import logging
import os
import time
import canopen
from .dish_modes import FuncState, Mode, StowPinState
from .sources import Sun
class InitialPosition:
"""Initial position for timestamp, azimuth, and elevation."""
def __init__(self, timestamp, azimuth, elevation):
"""Init method for initial position."""
self.azimuth = azimuth
self.elevation = elevation
self.timestamp = timestamp
[docs]class ASTTComponentManager:
def __init__(self):
"""Init method for the CM."""
self.antenna_node = None
self.trackstop = True
self.antenna_app_state = None
self.antenna_func_state = FuncState.UNKNOWN
self.antenna_mode = Mode.UNKNOWN
self.stow_sensor_state = StowPinState.UNKNOWN
self.network0 = canopen.Network()
self.transmission_triggered = False
self.current_position = InitialPosition(timestamp=0.0, azimuth=0.0, elevation=0.0)
self.logger = logging.getLogger("ASTT-COMP-MANAGER")
logging.basicConfig(
filename="app_dev.log",
format="%(asctime)s|%(levelname)s|%(name)s|%(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
# =====================
# Connection functions
# =====================
[docs] def connect_to_network(self):
"""Connect to the CAN0."""
self.logger.debug("Starting VCAN Network")
(self.network0).connect(channel="can0", bustype="socketcan")
[docs] def connect_to_plc_node(self):
"""Connect to the C++ antenna simulator."""
self.logger.debug("Connecting to PLC node")
curr_dir = os.getcwd()
eds_rel_path = "src/component_managers/cpp-slave.eds"
eds_full_path = os.path.join(curr_dir, eds_rel_path)
node2 = canopen.RemoteNode(
2,
eds_full_path,
)
(self.network0).add_node(node2)
self.antenna_node = node2
self.logger.info("Connected to PLC node")
# ========================
# Callbacks
# ========================
[docs] def position_change_callback(self, incoming_object):
"""Transmit PDO callback ."""
dt = datetime.datetime.now(datetime.timezone.utc)
current_timestamp = datetime.datetime.timestamp(dt)
print(f"[{current_timestamp}] Message {incoming_object.name} received:") # noqa: E501
for node_record in incoming_object:
if node_record.name == "Position Feedback.Timestamp(R64) of position":
timestamp = node_record.raw
self.current_position.timestamp = timestamp
if node_record.name == "Position Feedback.Azimuth(R64) of position":
azimuth = node_record.raw
self.current_position.azimuth = azimuth
if node_record.name == "Position Feedback.Elevation(R64) of position":
elevation = node_record.raw
self.current_position.elevation = elevation
print(
f" Timestamp: {self.current_position.timestamp}, Azimuth {self.current_position.azimuth}, Elevation {self.current_position.elevation}" # noqa: E501
)
[docs] def store_initial_position(self):
"""Return starting position's of timestamp, azi, and ele."""
start_position = (
self.current_position.timestamp,
self.current_position.azimuth,
self.current_position.elevation,
)
return start_position
# This is helper function to translate int values
# To enum values
[docs] def gen_mode_state_enums(self, name_of_enum, value):
"""Generate enum values for transmitted values."""
generated_enum = None
state_mode_calls = {
"Mode": Mode,
"FuncState": FuncState,
"StowPinState": StowPinState,
}
if name_of_enum in state_mode_calls:
try:
generated_enum = state_mode_calls[name_of_enum](value)
except Exception as err:
self.logger.exception(f"could not generate mode or state, {err}")
return generated_enum
def stow_pin_callback(self, incoming_object):
for node_record in incoming_object:
st_pin_state = self.gen_mode_state_enums("StowPinState", node_record.raw & 0b111)
# Update if the value from simulator has changed
if st_pin_state != self.stow_sensor_state:
self.stow_sensor_state = st_pin_state
print(f"stow pin state : {st_pin_state.name} ")
def state_mode_callback(self, incoming_object):
for node_record in incoming_object:
if "Mode and State Feedback.Mode" == node_record.name:
ant_mode = self.gen_mode_state_enums("Mode", node_record.raw)
# Update if the value from simulator has changed
if ant_mode != self.antenna_mode:
self.antenna_mode = ant_mode
print(f"antenna mode : {ant_mode.name} ")
elif "Mode and State Feedback.Functional State" == node_record.name:
func_state = self.gen_mode_state_enums("FuncState", node_record.raw)
# Update if the value from simulator has changed
if func_state != self.antenna_func_state:
self.antenna_func_state = func_state
print(f"func state : {func_state.name} ")
# ========================
# Subscription functions
# ========================
[docs] def subscribe_to_az_change(self):
"""Canopen Subscription to the Azimuth."""
self.logger.info("Subscribing to azimuth ")
(self.antenna_node).tpdo.read()
# Mapping the Azimuth to tpdo
(self.antenna_node).tpdo[1].clear()
# (self.antenna_node).tpdo[1].add_variable(0x2001,2)
(self.antenna_node).tpdo[1].add_variable("Position Feedback", "Azimuth(R64) of position")
(self.antenna_node).tpdo[1].trans_type = 1
(self.antenna_node).tpdo[1].event_timer = 0
(self.antenna_node).tpdo[1].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).tpdo[1].save()
(self.antenna_node).tpdo[1].add_callback(self.position_change_callback)
self.logger.info("Subscribed to azimuth")
[docs] def subscribe_to_el_change(self):
"""Canopen Subscription to the Elevation."""
self.logger.info("Subscribing to elevation ")
(self.antenna_node).tpdo[2].read()
# Mapping the Elevation to tpdo
(self.antenna_node).tpdo[2].clear()
(self.antenna_node).tpdo[2].add_variable(0x2001, 3)
(self.antenna_node).tpdo[2].trans_type = 1
(self.antenna_node).tpdo[2].event_timer = 0
(self.antenna_node).tpdo[2].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).tpdo[2].save()
(self.antenna_node).tpdo[2].add_callback(self.position_change_callback)
self.logger.info("Subscribed to elevation")
[docs] def subscribe_to_timestamp(self):
"""Canopen Subscription to the Timestamp."""
self.logger.info("Subscribing to timestamp ")
(self.antenna_node).tpdo[6].read()
# Mapping the Timestamp to tpdo
(self.antenna_node).tpdo[6].clear()
(self.antenna_node).tpdo[6].add_variable(0x2001, 1)
(self.antenna_node).tpdo[6].trans_type = 1
(self.antenna_node).tpdo[6].event_timer = 0
(self.antenna_node).tpdo[6].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).tpdo[6].save()
(self.antenna_node).tpdo[6].add_callback(self.position_change_callback)
self.logger.info("Subscribed to timestamp")
[docs] def subscribe_to_func_state_and_mode(self):
"""Canopen Subscription to the functional state and mode."""
self.logger.info("Subscribing to functional state and mode")
(self.antenna_node).tpdo[3].read()
# Mapping the functional state to tpdo
(self.antenna_node).tpdo[3].clear()
(self.antenna_node).tpdo[3].add_variable("Mode and State Feedback", "Functional State")
(self.antenna_node).tpdo[3].add_variable("Mode and State Feedback", "Mode")
(self.antenna_node).tpdo[3].trans_type = 254
(self.antenna_node).tpdo[3].event_timer = 5
(self.antenna_node).tpdo[3].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).tpdo[3].save()
(self.antenna_node).tpdo[3].add_callback(self.state_mode_callback)
self.logger.info("Subscribed to functional state and mode")
[docs] def subscribe_to_stow_sensor(self):
"""Canopen Subscription to stow sensors."""
self.logger.info("Subscribing to stow sensor ")
(self.antenna_node).tpdo[4].read()
# Mapping the stow sensors to tpdo
(self.antenna_node).tpdo[4].clear()
(self.antenna_node).tpdo[4].add_variable("Sensor Feedback", "Stow sensors")
(self.antenna_node).tpdo[4].trans_type = 254
(self.antenna_node).tpdo[4].event_timer = 5
(self.antenna_node).tpdo[4].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).tpdo[4].save()
(self.antenna_node).tpdo[4].add_callback(self.stow_pin_callback)
self.logger.info("Subscribed to stow sensor")
[docs] def subscribe_to_mode_command_obj(self):
"""Canopen Subscription to the mode command obj ."""
self.logger.info("Subscribing to mode command obj")
(self.antenna_node).rpdo.read()
# Mapping the mode command obj to rpdo
(self.antenna_node).rpdo[1].clear()
(self.antenna_node).rpdo[1].add_variable("Mode command", "Mode")
(self.antenna_node).rpdo[1].enabled = True
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
print((self.antenna_node).nmt.state)
(self.antenna_node).rpdo[1].save()
(self.antenna_node).rpdo[1].start(0.1)
self.logger.info("Subscribed to mode command obj")
# ========================
# Commands
# ========================
[docs] def set_plc_node_to_operational(self):
"""Change all the nodes state to OPERATIONAL."""
(self.antenna_node).nmt.state = "OPERATIONAL"
[docs] def set_plc_node_to_preoperational(self):
"""Change all the nodes state to PRE-OPERATIONAL."""
(self.antenna_node).nmt.state = "PRE-OPERATIONAL"
[docs] def get_plc_state(self):
"""Return node state."""
self.logger.info("Reading plc state")
return (self.antenna_node).nmt.state
[docs] def is_az_allowed(self, az):
"""Allow Azimuth of [-127,127]."""
return True if (az >= -127.0 and az <= 127.0) else False
[docs] def is_el_allowed(self, el):
"""Allow elevation of [-15,92]."""
return True if (el >= -15.0 and el <= 92.0) else False
[docs] def point_to_coordinates(self, timestamp, az, el):
"""Commands the simulator to point az/el."""
self.logger.info(f"Point called with AZ {az} and EL {el} ")
if self.is_az_allowed(az) and self.is_el_allowed(el):
(self.antenna_node).sdo[0x2000][1].raw = timestamp
(self.antenna_node).sdo[0x2000][2].raw = az
(self.antenna_node).sdo[0x2000][3].raw = el
else:
self.logger.exception(f"az: {az} or el: {el} is out of range")
raise ValueError
[docs] def set_point_mode(self):
"""Commands the ASTT Antenna to point mode."""
self.logger.info("Set point mode called!!")
(self.antenna_node).rpdo[1]["Mode command.Mode"].raw = 1
[docs] def set_idle_mode(self):
"""Commands the ASTT Antenna to idle mode."""
self.logger.info("Set idle mode called!!")
(self.antenna_node).rpdo[1]["Mode command.Mode"].raw = 0
[docs] def set_stow_mode(self):
"""Commands the ASTT Antenna to stow mode."""
self.logger.info("Set stow mode called!!")
(self.antenna_node).rpdo[1]["Mode command.Mode"].raw = 2
[docs] def trigger_transmission(self):
"""Triggers the transmission of Az/El."""
self.logger.info("Transmission is triggered")
self.transmission_triggered = True
(self.antenna_node).nmt.state = "OPERATIONAL"
(self.network0).sync.start(0.5)
def stop_az_el_transmission(self):
for tpdo_id in range(1, 3):
(self.antenna_mode).tpdo[tpdo_id].stop()
[docs] def get_antenna_mode(self):
"""Return the antenna mode."""
return self.antenna_mode
[docs] def get_antenna_func_state(self):
"""Return the antenna functional state."""
return self.antenna_func_state
[docs] def get_antenna_app_state(self):
"""Return the antenna application."""
return self.antenna_app_state
[docs] def get_antenna_stow_sensor_state(self):
"""Return the antenna stow sensor state."""
return self.stow_sensor_state
[docs] def track_sun(self, duration_time, az_speed=None, el_speed=None):
"""Calculate Sun's Az and El."""
# Converting the duration time to seconds
start_position = self.store_initial_position()
track_start_position = InitialPosition(
start_position[0], start_position[1], start_position[2]
)
self.logger.info("Starting to track the sun")
time_conversion = duration_time * 3600
count = 1
sun = Sun(-33.9326033333, 18.47222, 3.6)
# Store all the values from the callback on beginning position
begin_time = track_start_position.timestamp
begin_az = track_start_position.azimuth
begin_el = track_start_position.elevation
while count < time_conversion and not self.trackstop:
track_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=10
)
future_timestamp = datetime.datetime.timestamp(track_time)
dt = future_timestamp - begin_time
if az_speed is not None and el_speed is not None:
az_desired = begin_az + (az_speed * dt)
el_desired = begin_el + (el_speed * dt)
else:
az_desired, el_desired = sun.get_sun_az_el(track_time)
self.point_to_coordinates(future_timestamp, az=az_desired, el=el_desired)
if not self.transmission_triggered:
self.trigger_transmission()
else:
pass
time.sleep(5)
self.logger.info("---------------------------------------")
self.logger.info(f"Sun_Az : {az_desired}, Sun_El : {el_desired}")
self.logger.info("---------------------------------------")
count += 1
def track_sun_update(self):
pass
[docs] def clear_all_logs(self):
"""Clear all the logs in app_dev.log."""
try:
with open("app_dev.log", "w") as file:
file.truncate(0)
except FileNotFoundError:
self.logger.error("File app_dev.log not found. No logs cleared.")
except Exception as err:
self.logger.error(f"Failed to clear logs in , error: {err}")
file.close()
if __name__ == "__main__":
"""Run the CM."""
cm = ASTTComponentManager()
cm.connect_to_network()
cm.connect_to_plc_node()
cm.subscribe_to_func_state_and_mode()
cm.subscribe_to_timestamp()
cm.subscribe_to_az_change()
cm.subscribe_to_el_change()
cm.trigger_transmission()
cm.track_sun(1)