Source code for netzob.Common.Utils.Decorators

#-*- coding: utf-8 -*-

#+---------------------------------------------------------------------------+
#|          01001110 01100101 01110100 01111010 01101111 01100010            |
#|                                                                           |
#|               Netzob : Inferring communication protocols                  |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry              |
#| This program is free software: you can redistribute it and/or modify      |
#| it under the terms of the GNU General Public License as published by      |
#| the Free Software Foundation, either version 3 of the License, or         |
#| (at your option) any later version.                                       |
#|                                                                           |
#| This program is distributed in the hope that it will be useful,           |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of            |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the              |
#| GNU General Public License for more details.                              |
#|                                                                           |
#| You should have received a copy of the GNU General Public License         |
#| along with this program. If not, see <http://www.gnu.org/licenses/>.      |
#+---------------------------------------------------------------------------+
#| @url      : http://www.netzob.org                                         |
#| @contact  : contact@netzob.org                                            |
#| @sponsors : Amossys, http://www.amossys.fr                                |
#|             Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/           |
#+---------------------------------------------------------------------------+

#+---------------------------------------------------------------------------+
#| File contributors :                                                       |
#|       - Georges Bossert <georges.bossert (a) supelec.fr>                  |
#|       - Frédéric Guihéry <frederic.guihery (a) amossys.fr>                |
#+---------------------------------------------------------------------------+

#+---------------------------------------------------------------------------+
#| Standard library imports                                                  |
#+---------------------------------------------------------------------------+
import logging
import os

#+---------------------------------------------------------------------------+
#| Related third party imports                                               |
#+---------------------------------------------------------------------------+
from functools import wraps

#+---------------------------------------------------------------------------+
#| Local application imports                                                 |
#+---------------------------------------------------------------------------+

# Definition of the ColorStreamHandler class only if dependency colorama is
# available on the current system.
try:
    from colorama import Fore, Back, Style

    class ColourStreamHandler(logging.StreamHandler):
        """ A colorized output SteamHandler """

        # Some basic colour scheme defaults
        colours = {
            'DEBUG': Fore.CYAN,
            'INFO': Fore.GREEN,
            'WARN': Fore.YELLOW,
            'WARNING': Fore.YELLOW,
            'ERROR': Fore.RED,
            'CRIT': Back.RED + Fore.WHITE,
            'CRITICAL': Back.RED + Fore.WHITE
        }

        @property
        def is_tty(self):
            """ Check if we are using a "real" TTY. If we are not using a TTY it means that
            the colour output should be disabled.

            :return: Using a TTY status
            :rtype: bool
            """
            try:
                return getattr(self.stream, 'isatty', None)()
            except:
                return False

        def emit(self, record):
            try:
                message = self.format(record)

                if not self.is_tty:
                    self.stream.write(message)
                else:
                    self.stream.write(self.colours[record.levelname] + message
                                      + Style.RESET_ALL)
                self.stream.write(getattr(self, 'terminator', '\n'))
                self.flush()
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                self.handleError(record)

    has_colour = True
except Exception as e:
    has_colour = False


[docs]def NetzobLogger(klass): """This class decorator adds (if necessary) an instance of the logger (self.__logger) to the attached class and removes from the getState the logger. """ # Verify if a logger already exists found = False for k, v in list(klass.__dict__.items()): if isinstance(v, logging.Logger): found = True break if not found: klass._logger = logging.getLogger(klass.__name__) try: klass._logger.setLevel(int(os.environ['NETZOB_LOG_LEVEL'])) except: pass handler = ColourStreamHandler( ) if has_colour else logging.StreamHandler() fmt = '%(relativeCreated)d: [%(levelname)s] %(module)s:%(funcName)s: %(message)s' handler.setFormatter(logging.Formatter(fmt)) klass._logger.addHandler(handler) klass._logger.propagate = False # Exclude logger from __getstate__ def getState(self, **kwargs): r = dict() for k, v in list(self.__dict__.items()): if not isinstance(v, logging.Logger): r[k] = v return r def setState(self, dict): self.__dict__ = dict self.__logger = logging.getLogger(klass.__name__) klass.__getstate__ = getState klass.__setState__ = setState return klass
[docs]def typeCheck(*types): """Decorator which reduces the amount of code to type-check attributes. Its allows to replace the following code: :: @id.setter def id(self, id): if not isinstance(id, uuid.UUID): raise TypeError("Invalid types for argument id, must be an UUID") self.__id = id with: :: @id.setter @typeCheck(uuid.UUID) def id(self, id): self.__id = id .. note:: set type = "SELF" to check the type of the self parameter .. warning:: if argument is None, the type checking is not executed on it. """ def _typeCheck_(func): def wrapped_f(*args, **kwargs): arguments = args[1:] if len(arguments) == len(types): # Replace "SELF" with args[0] type final_types = [] for type in types: if type == "SELF": final_types.append(args[0].__class__) else: final_types.append(type) for i, argument in enumerate(arguments): if argument is not None and not isinstance(argument, final_types[i]): raise TypeError( "Invalid type for arguments, expecting: {0} and received {1}". format(', '.join([t.__name__ for t in final_types ]), argument.__class__.__name__)) return func(*args, **kwargs) return wraps(func)(wrapped_f) return _typeCheck_