Source code for xenamanager.xena_port

"""
Classes and utilities that represents Xena XenaManager-2G port.

:author: yoram@ignissoft.com
"""

import os
from collections import OrderedDict
from enum import Enum

from trafficgenerator.tgn_utils import TgnError

from xenamanager.api.XenaSocket import XenaCommandException
from xenamanager.xena_object import XenaObject, XenaObject21
from xenamanager.xena_stream import XenaStream, XenaStreamState


[docs]class XenaCaptureBufferType(Enum): raw = 0 text = 1 pcap = 2
[docs]class XenaPort(XenaObject): """ Represents Xena port. """ info_config_commands = ['p_info', 'p_config', 'p_receivesync', 'ps_indices', 'pr_tplds'] stats_captions = {'pr_pfcstats': ['total', 'CoS 0', 'CoS 1', 'CoS 2', 'CoS 3', 'CoS 4', 'CoS 5', 'CoS 6', 'CoS 7'], 'pr_total': ['bps', 'pps', 'bytes', 'packets'], 'pr_notpld': ['bps', 'pps', 'bytes', 'packets'], 'pr_extra': ['fcserrors', 'pauseframes', 'arprequests', 'arpreplies', 'pingrequests', 'pingreplies', 'gapcount', 'gapduration'], 'pt_total': ['bps', 'pps', 'bytes', 'packets'], 'pt_extra': ['arprequests', 'arpreplies', 'pingrequests', 'pingreplies', 'injectedfcs', 'injectedseq', 'injectedmis', 'injectedint', 'injectedtid', 'training'], 'pt_notpld': ['bps', 'pps', 'bytes', 'packets']} def __init__(self, parent, index): """ :param parent: parent module or chassis. :param index: port index in format module/port (both 0 based) """ objRef = '{}/{}'.format('/'.join(parent.ref.split('/')[:2]), index) super(self.__class__, self).__init__(objType='port', index=index, parent=parent, objRef=objRef) self._data['name'] = '{}/{}'.format(parent.name, index) self.p_info = None
[docs] def inventory(self): self.p_info = self.get_attributes()
[docs] def reserve(self, force=False): """ Reserve port. XenaManager-2G -> Reserve/Relinquish Port. :param force: True - take forcefully, False - fail if port is reserved by other user """ p_reservation = self.get_attribute('p_reservation') if p_reservation == 'RESERVED_BY_YOU': return elif p_reservation == 'RESERVED_BY_OTHER' and not force: raise TgnError('Port {} reserved by {}'.format(self, self.get_attribute('p_reservedby'))) self.relinquish() self.send_command('p_reservation', 'reserve')
[docs] def relinquish(self): if self.get_attribute('p_reservation') != 'RELEASED': self.send_command('p_reservation relinquish')
[docs] def release(self): if self.get_attribute('p_reservation') == 'RESERVED_BY_YOU': self.send_command('p_reservation release')
[docs] def reset(self): """ Reset port-level parameters to standard values, and delete all streams, filters, capture, and dataset definitions. """ self.objects = OrderedDict() return self.send_command('p_reset')
[docs] def wait_for_up(self, timeout=40): self.wait_for_states('p_receivesync', timeout, 'IN_SYNC')
# # Configurations. #
[docs] def load_config(self, config_file_name): """ Load configuration file from xpc file. :param config_file_name: full path to the configuration file. """ with open(config_file_name) as f: commands = f.read().splitlines() for command in commands: if not command.startswith(';'): try: self.send_command(command) except XenaCommandException as e: self.logger.warning(str(e))
[docs] def save_config(self, config_file_name): """ Save configuration file to xpc file. :param config_file_name: full path to the configuration file. """ with open(config_file_name, 'w+') as f: f.write('P_RESET\n') for line in self.send_command_return_multilines('p_fullconfig', '?'): f.write(line.split(' ', 1)[1].lstrip())
[docs] def add_stream(self, name=None, tpld_id=None, state=XenaStreamState.enabled): """ Add stream. :param name: stream description. :param tpld_id: TPLD ID. If None the a unique value will be set. :param state: new stream state. :type state: xenamanager.xena_stream.XenaStreamState :return: newly created stream. :rtype: xenamanager.xena_stream.XenaStream """ stream = XenaStream(parent=self, index='{}/{}'.format(self.index, len(self.streams)), name=name) stream._create() tpld_id = tpld_id if tpld_id else XenaStream.next_tpld_id stream.set_attributes(ps_comment='"{}"'.format(stream.name), ps_tpldid=tpld_id) XenaStream.next_tpld_id = max(XenaStream.next_tpld_id + 1, tpld_id + 1) stream.set_state(state) return stream
[docs] def remove_stream(self, index): """ Remove stream. :param index: index of stream to remove. """ self.streams[index].del_object_from_parent()
# # Operations. #
[docs] def start_traffic(self, blocking=False): """ Start port traffic. Port -> Start Traffic :param blocking: True - start traffic and wait until traffic ends, False - start traffic and return. """ self.session.start_traffic(blocking, self)
[docs] def stop_traffic(self): """ Stop port traffic. Port -> Stop Traffic """ self.session.stop_traffic(self)
[docs] def start_capture(self): """ Start capture on port. Capture -> Start Capture """ self.del_objects_by_type('capture') self.send_command('p_capture', 'on')
[docs] def stop_capture(self): """ Stop capture on port. Capture -> Stop Capture """ self.send_command('p_capture', 'off')
# # Statistics. #
[docs] def clear_stats(self): """ Clear att TX and RX statistics counter. Port Statistics -> Clear TX Counters, Clear RX Counters """ self.send_command('pt_clear') self.send_command('pr_clear')
[docs] def read_port_stats(self): """ :return: dictionary {group name {stat name: value}}. Sea XenaPort.stats_captions. """ stats_with_captions = OrderedDict() for stat_name in self.stats_captions.keys(): stats_with_captions[stat_name] = self.read_stat(self.stats_captions[stat_name], stat_name) return stats_with_captions
[docs] def read_stream_stats(self): """ :return: dictionary {stream index {stat name: value}}. Sea XenaStream.stats_captions. """ stream_stats = OrderedDict() for stream in self.streams.values(): stream_stats[stream] = stream.read_stats() return stream_stats
[docs] def read_tpld_stats(self): """ :return: dictionary {tpld index {group name {stat name: value}}}. Sea XenaTpld.stats_captions. """ payloads_stats = OrderedDict() for tpld in self.tplds.values(): payloads_stats[tpld] = tpld.read_stats() return payloads_stats
# # Properties. # @property def streams(self): """ :return: dictionary {id: object} of all streams. :rtype: dict of (int, xenamanager.xena_stream.XenaStream) """ if not self.get_objects_by_type('stream'): tpld_ids = [] for index in self.get_attribute('ps_indices').split(): stream = XenaStream(parent=self, index='{}/{}'.format(self.index, index)) tpld_ids.append(stream.get_attribute('ps_tpldid')) if tpld_ids: XenaStream.next_tpld_id = max([XenaStream.next_tpld_id] + [int(t) for t in tpld_ids]) + 1 return {s.id: s for s in self.get_objects_by_type('stream')} @property def tplds(self): """ :return: dictionary {id: object} of all current tplds. :rtype: dict of (int, xenamanager.xena_port.XenaTpld) """ # As TPLDs are dynamic we must re-read them each time from the port. self.parent.del_objects_by_type('tpld') for tpld in self.get_attribute('pr_tplds').split(): XenaTpld(parent=self, index='{}/{}'.format(self.index, tpld)) return {t.id: t for t in self.get_objects_by_type('tpld')} @property def capture(self): """ :return: capture object. :rtype: XenaCapture """ if not self.get_object_by_type('capture'): XenaCapture(parent=self) return self.get_object_by_type('capture')
[docs]class XenaTpld(XenaObject21): stats_captions = {'pr_tpldtraffic': ['bps', 'pps', 'byt', 'pac'], 'pr_tplderrors': ['dummy', 'seq', 'mis', 'pld'], 'pr_tpldlatency': ['min', 'avg', 'max', 'avg1sec', 'min1sec', 'max1sec'], 'pr_tpldjitter': ['min', 'avg', 'max', 'avg1sec', 'min1sec', 'max1sec']} def __init__(self, parent, index): """ :param parent: parent port object. :param index: TPLD index in format module/port/tpld. """ super(self.__class__, self).__init__(objType='tpld', index=index, parent=parent)
[docs] def read_stats(self): """ :return: dictionary {group name {stat name: value}}. Sea XenaTpld.stats_captions. """ stats_with_captions = OrderedDict() for stat_name in self.stats_captions.keys(): stats_with_captions[stat_name] = self.read_stat(self.stats_captions[stat_name], stat_name) return stats_with_captions
[docs]class XenaCapture(XenaObject): """ Represents capture parameters, correspond to the Capture panel of the XenaManager, and deal with configuration of the capture criteria and inspection of the captured data from a port. """ info_config_commands = ['pc_fullconfig'] stats_captions = ['status', 'packets', 'starttime'] def __init__(self, parent): objRef = '{}/capture'.format(parent.ref) super(self.__class__, self).__init__(objType='capture', index=parent.index, parent=parent, objRef=objRef)
[docs] def read_stats(self): """ :return: dictionary {stat name: value}. Sea XenaCapture.stats_captions. """ return self.read_stat(XenaCapture.stats_captions, 'pc_stats')
[docs] def get_packets(self, from_index=0, to_index=None, cap_type=XenaCaptureBufferType.text, file_name=None, tshark=None): """ Get captured packets from chassis. :param from_index: index of first packet to read. :param to_index: index of last packet to read. If None - read all packets. :param cap_type: returned capture format. If pcap then file name and tshark must be provided. :param file_name: if specified, capture will be saved in file. :param tshark: tshark object for pcap type only. :type: xenamanager.xena_tshark.Tshark :return: list of requested packets, None for pcap type. """ to_index = to_index if to_index else len(self.packets) raw_packets = [] for index in range(from_index, to_index): raw_packets.append(self.packets[index].get_attribute('pc_packet').split('0x')[1]) if cap_type == XenaCaptureBufferType.raw: self._save_captue(file_name, raw_packets) return raw_packets text_packets = [] for raw_packet in raw_packets: text_packet = '' for c, b in zip(range(len(raw_packet)), raw_packet): if c % 32 == 0: text_packet += '\n{:06x} '.format(int(c / 2)) elif c % 2 == 0: text_packet += ' ' text_packet += b text_packets.append(text_packet) if cap_type == XenaCaptureBufferType.text: self._save_captue(file_name, text_packets) return text_packets temp_file_name = file_name + '_' self._save_captue(temp_file_name, text_packets) tshark.text_to_pcap(temp_file_name, file_name) os.remove(temp_file_name)
# # Properties. # @property def packets(self): """ :return: dictionary {id: object} of all packets. :rtype: dict of (int, xenamanager.xena_port.XenaCapturePacket) """ if not self.get_object_by_type('cappacket'): for index in range(0, self.read_stats()['packets']): XenaCapturePacket(parent=self, index='{}/{}'.format(self.index, index)) return {p.id: p for p in self.get_objects_by_type('cappacket')} # # Private methods. # def _save_captue(self, file_name, packets): if file_name: with open(file_name, 'w+') as f: for packet in packets: f.write(packet)
[docs]class XenaCapturePacket(XenaObject21): info_config_commands = ['pc_info'] def __init__(self, parent, index): objRef = '{}/{}'.format(parent.ref, index.split('/')[-1]) super(self.__class__, self).__init__(objType='cappacket', parent=parent, index=index, objRef=objRef)