Source code for netzob.Common.Workspace

# -*- coding: utf-8 -*-

#|          01001110 01100101 01110100 01111010 01101111 01100010            |
#|                                                                           |
#|               Netzob : Inferring communication protocols                  |
#| Copyright (C) 2011 Georges Bossert and Frédéric Guihéry                   |
#| This program is free software: you can redistribute it and/or modify      |
#| it under the terms of the GNU General Public License as published by      |
#| the Free Software Foundation, either version 3 of the License, or         |
#| (at your option) any later version.                                       |
#|                                                                           |
#| This program is distributed in the hope that it will be useful,           |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of            |
#| GNU General Public License for more details.                              |
#|                                                                           |
#| You should have received a copy of the GNU General Public License         |
#| along with this program. If not, see <>.      |
#| @url      :                                         |
#| @contact  :                                            |
#| @sponsors : Amossys,                                |
#|             Supélec,           |

#| Standard library imports
from gettext import gettext as _
import logging
import os
import datetime
import re
from lxml.etree import ElementTree
from lxml import etree
import shutil
import uuid

#| Local Imports
from netzob.Common.Type.TypeConvertor import TypeConvertor
from netzob.Common.XSDResolver import XSDResolver
from netzob.Common.ImportedTrace import ImportedTrace
from netzob.Common.Functions.Transformation.Base64Function import Base64Function
from netzob.Common.Functions.Transformation.GZipFunction import GZipFunction
from netzob.Common.Functions.Transformation.BZ2Function import BZ2Function
from netzob.Common.Functions.RenderingFunction import RenderingFunction


def loadWorkspace_0_1(workspacePath, workspaceFile):

    # Parse the XML Document as 0.1 version
    tree = ElementTree()

    xmlWorkspace = tree.getroot()
    wsName = xmlWorkspace.get('name', 'none')
    wsCreationDate = TypeConvertor.xsdDatetime2PythonDatetime(xmlWorkspace.get('creation_date'))

    # Parse the configuration to retrieve the main paths
    xmlWorkspaceConfig = xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}configuration")
    pathOfTraces = xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}traces").text

    pathOfLogging = None
    if xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}logging") is not None and xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}logging").text is not None and len(xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}logging").text) > 0:
        pathOfLogging = xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}logging").text

    pathOfPrototypes = None
    if xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}prototypes") is not None and xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}prototypes").text is not None and len(xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}prototypes").text) > 0:
        pathOfPrototypes = xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}prototypes").text

    lastProject = None
    if xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}projects") is not None:
        xmlProjects = xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}projects")
        if xmlProjects.get("last", "none") != "none":
            lastProject = xmlProjects.get("last", "none")

    # Instantiation of the workspace
    workspace = Workspace(wsName, wsCreationDate, workspacePath, pathOfTraces, pathOfLogging, pathOfPrototypes)

    # Load the already imported traces
    if xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}traces") is not None:
        xmlTraces = xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}traces")
        for xmlTrace in xmlTraces.findall("{" + WORKSPACE_NAMESPACE + "}trace"):
            trace = ImportedTrace.loadTrace(xmlTrace, WORKSPACE_NAMESPACE, COMMON_NAMESPACE, "0.1", workspace.getPathOfTraces())
            if trace is not None:

    # Reference the projects
    if xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}projects") is not None:
        for xmlProject in xmlWorkspace.findall("{" + WORKSPACE_NAMESPACE + "}projects/{" + WORKSPACE_NAMESPACE + "}project"):
            project_path = xmlProject.get("path")
            if project_path == lastProject and lastProject is not None:

    # Reference the functions
    if xmlWorkspace.find("{" + WORKSPACE_NAMESPACE + "}functions") is not None:
        for xmlFunction in xmlWorkspace.findall("{" + WORKSPACE_NAMESPACE + "}functions/{" + WORKSPACE_NAMESPACE + "}function"):
            function = RenderingFunction.loadFromXML(xmlFunction, WORKSPACE_NAMESPACE, "0.1")
            if function is not None:

    enableBugReporting = False
    if xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}enable_bug_reporting") is not None and xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}enable_bug_reporting").text is not None and len(xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}enable_bug_reporting").text) > 0:
        val = xmlWorkspaceConfig.find("{" + WORKSPACE_NAMESPACE + "}enable_bug_reporting").text
        if val == "true":
            enableBugReporting = True

    return workspace

class WorkspaceException(Exception):

[docs]class Workspace(object): """Class definition of a Workspace""" # The name of the configuration file CONFIGURATION_FILENAME = "workspace.xml" # /!\ WARNING: # The dict{} which defines the parsing function associated with each schema # is added to the end of the document #+-----------------------------------------------------------------------+ #| Constructor #| @param path : path of the workspace #+-----------------------------------------------------------------------+ def __init__(self, name, creationDate, path, pathOfTraces, pathOfLogging, pathOfPrototypes, lastProjectPath=None, importedTraces={}): = name self.path = os.path.abspath(path) self.creationDate = creationDate self.projects_path = [] self.pathOfTraces = os.path.join(self.path, pathOfTraces) self.pathOfLogging = os.path.join(self.path, pathOfLogging) self.pathOfPrototypes = os.path.join(self.path, pathOfPrototypes) self.lastProjectPath = lastProjectPath self.importedTraces = importedTraces self.customTransformationFunctions = [] self.enableBugReporting = False def getNameOfProjects(self): nameOfProjects = [] for project_path in self.getProjectsPath(): from netzob.Common.Project import Project projectName = Project.getNameOfProject(self, project_path) if projectName is not None: nameOfProjects.append((projectName, project_path)) return nameOfProjects def getProjects(self): projects = [] for project_path in self.getProjectsPath(): from netzob.Common.Project import Project project = Project.loadProject(self, project_path) if project is not None: projects.append(project) return projects def getLastProject(self): if self.lastProjectPath is None: return None from netzob.Common.Project import Project project = Project.loadProject(self, self.lastProjectPath) return project def setEnableBugReporting(self, enable): self.enableBugReporting = enable def referenceLastProject(self, lastProject): self.lastProjectPath = lastProject def getImportedTraces(self): return self.importedTraces.values() def getImportedTrace(self, traceId): """Retrieve a specific trace, which identifier is traceId.""" try: return self.importedTraces[traceId] except KeyError: raise WorkspaceException("Unable to find the requested trace ({0})".format(traceId)) def addImportedTrace(self, importedTrace): self.importedTraces.update({ importedTrace}) def removeImportedTrace(self, importedTrace): ImportedTrace.deleteTrace(importedTrace, self.pathOfTraces) self.importedTraces.pop( def newEmptyImportedTrace(self, name, description=""): """This function is in charge of creating an empty `ImportedTrace`. :param name: 'name' of the new trace. :param description (optional): description of the new trace.""" newTrace = ImportedTrace(str(uuid.uuid4()),, "UNKNOWN", description, name) self.addImportedTrace(newTrace) return newTrace def mergeImportedTraces(self, traceIds, name, keep=True): """This methods allows to merge multiple traces. The merged traces gets a new unique id. If the 'keep' parameter is True, a copy of the initial traces is kept. We retrieve the 'ImportedTrace' object from its ID. Then we merge all traces' messages and sessions into a new 'ImportedTrace'.""" messages = {} sessions = {} names = [] types = [] for traceId in traceIds: trace = self.getImportedTrace(traceId) messages.update(trace.messages) sessions.update(trace.sessions) names.append("'{0}'".format( # If the type is already a multiple type merge, types are # comma separated values. types.extend(trace.type.split(";")) if keep is False: self.removeImportedTrace(trace) # The type of the new trace is a list of the multiple types of # the traces to be merged. newTraceType = "{0}".format(";".join(set(types))) description = "Merge of traces {0} and {1}".format(', '.join(names[:-1]), names[-1]) newTrace = ImportedTrace(str(uuid.uuid4()),, newTraceType, description, name) newTrace.messages = messages newTrace.sessions = sessions self.addImportedTrace(newTrace) return newTrace def getTransformationFunctions(self): """Computes and returns the list of available functions""" functions = [] functions.append(Base64Function(_("Base64 Function"))) functions.append(GZipFunction(_("GZip Function"))) functions.append(BZ2Function(_("BZ2 Function"))) functions.extend(self.customTransformationFunctions) return functions def getCustomFunctions(self): return self.customTransformationFunctions def addCustomTransformationFunction(self, function): found = False for f in self.customTransformationFunctions: if f.getName() == function.getName(): found = True break if not found: self.customTransformationFunctions.append(function) #+-----------------------------------------------------------------------+ #| referenceProject: #| reference a project in the workspace #+-----------------------------------------------------------------------+ def referenceProject(self, project_path): path = project_path if not path in self.projects_path: self.projects_path.append(path) else: logging.warn("The project declared in {0} is already referenced in the workspace.". format(path)) def dereferenceProject(self, project_path): if not project_path in self.projects_path: raise WorkspaceException("The project '{0}' is not declared in the workspace".format(project_path)) self.projects_path.remove(project_path) def saveConfigFile(self, overrideTraces=[]): """This functions allows to save the current (and only) instance of the Workspace. You can supply a list of traces that should be written on-disk through the `overrideTraces` variable. This allows to override specific traces that where modified. :param overrideTraces: a list of trace identifiers that should be written on-disk, even if they already exists. """ workspaceFile = os.path.join(self.path, Workspace.CONFIGURATION_FILENAME)"Save the config file of the workspace {0} in {1}".format(self.getName(), workspaceFile)) # Register the namespace etree.register_namespace('netzob', WORKSPACE_NAMESPACE) etree.register_namespace('netzob-common', COMMON_NAMESPACE) # Dump the file root = etree.Element("{" + WORKSPACE_NAMESPACE + "}workspace") root.set("creation_date", TypeConvertor.pythonDatetime2XSDDatetime(self.getCreationDate())) root.set("name", str(self.getName())) xmlWorkspaceConfig = etree.SubElement(root, "{" + WORKSPACE_NAMESPACE + "}configuration") relTracePath = os.path.relpath(self.getPathOfTraces(), self.path) xmlTraces = etree.SubElement(xmlWorkspaceConfig, "{" + WORKSPACE_NAMESPACE + "}traces") xmlTraces.text = str(self.getPathOfTraces()) xmlLogging = etree.SubElement(xmlWorkspaceConfig, "{" + WORKSPACE_NAMESPACE + "}logging") xmlLogging.text = str(self.getPathOfLogging()) xmlPrototypes = etree.SubElement(xmlWorkspaceConfig, "{" + WORKSPACE_NAMESPACE + "}prototypes") xmlPrototypes.text = str(self.getPathOfPrototypes()) xmlPrototypes = etree.SubElement(xmlWorkspaceConfig, "{" + WORKSPACE_NAMESPACE + "}enable_bug_reporting") xmlPrototypes.text = str(self.enableBugReporting).lower() xmlWorkspaceProjects = etree.SubElement(root, "{" + WORKSPACE_NAMESPACE + "}projects") for projectPath in self.getProjectsPath(): xmlProject = etree.SubElement(xmlWorkspaceProjects, "{" + WORKSPACE_NAMESPACE + "}project") xmlProject.set("path", projectPath) xmlWorkspaceImported = etree.SubElement(root, "{" + WORKSPACE_NAMESPACE + "}traces") for importedTrace in self.getImportedTraces(): # overrideTraces variable contains the list of # ImportedTraces that should be overriden. This is useful # in case of message removal for example. forceOverride = ( in overrideTraces), WORKSPACE_NAMESPACE, COMMON_NAMESPACE, os.path.join(self.path, self.getPathOfTraces()), forceOverride) xmlWorkspaceFunctions = etree.SubElement(root, "{" + WORKSPACE_NAMESPACE + "}functions") for function in self.getCustomFunctions():, WORKSPACE_NAMESPACE) tree = ElementTree(root) tree.write(workspaceFile, pretty_print=True) @staticmethod def createWorkspace(name, path): tracesPath = "traces" projectsPath = "projects" prototypesPath = "prototypes" loggingPath = "logging" pathOfLogging = "logging/logging.conf" # we create a "traces" directory if it doesn't yet exist if not os.path.isdir(os.path.join(path, tracesPath)): os.mkdir(os.path.join(path, tracesPath)) # we create a "projects" directory if it doesn't yet exist if not os.path.isdir(os.path.join(path, projectsPath)): os.mkdir(os.path.join(path, projectsPath)) # we create the "prototypes" directory if it doesn't yet exist if not os.path.isdir(os.path.join(path, prototypesPath)): os.mkdir(os.path.join(path, prototypesPath)) # we upload in it the default repository file from netzob.Common.ResourcesConfiguration import ResourcesConfiguration staticRepositoryPath = os.path.join(os.path.join(ResourcesConfiguration.getStaticResources(), "defaults"), "repository.xml.default") shutil.copy(staticRepositoryPath, os.path.join(os.path.join(path, prototypesPath), "repository.xml")) # we create the "logging" directory if it doesn't yet exist if not os.path.isdir(os.path.join(path, loggingPath)): os.mkdir(os.path.join(path, loggingPath)) # we upload in it the default repository file from netzob.Common.ResourcesConfiguration import ResourcesConfiguration staticLoggingPath = os.path.join(os.path.join(ResourcesConfiguration.getStaticResources(), "defaults"), "logging.conf.default") shutil.copy(staticLoggingPath, os.path.join(os.path.join(path, loggingPath), "logging.conf")) workspace = Workspace(name,, path, tracesPath, pathOfLogging, prototypesPath) workspace.saveConfigFile() return workspace @staticmethod def isFolderAValidWorkspace(workspacePath): """Computes if the provided folder represents a valid (and loadable) workspace @return: None if the workspace is loadable or the error message if not valid """ if workspacePath is None: return _("The workspace's path ({0}) is incorrect.".format(workspacePath)) workspaceFile = os.path.join(workspacePath, Workspace.CONFIGURATION_FILENAME) # verify we can open and read the file if workspaceFile is None: return _("The workspace's configuration file can't be find (No workspace path given).") # is the workspaceFile is a file if not os.path.isfile(workspaceFile): return _("The specified workspace's configuration file ({0}) is not valid: its not a file.".format(workspaceFile)) # is it readable if not os.access(workspaceFile, os.R_OK): return _("The specified workspace's configuration file ({0}) is not readable.".format(workspaceFile)) for xmlSchemaFile in Workspace.WORKSPACE_SCHEMAS.keys(): from netzob.Common.ResourcesConfiguration import ResourcesConfiguration xmlSchemaPath = os.path.join(ResourcesConfiguration.getStaticResources(), xmlSchemaFile) # If we find a version which validates the XML, we parse with the associated function if Workspace.isSchemaValidateXML(xmlSchemaPath, workspaceFile): return None return _("The specified workspace is not valid according to the XSD definitions.") @staticmethod def loadWorkspace(workspacePath): """Load the workspace declared in the provided directory @type workspacePath: str @var workspacePath: folder to load as a workspace @return a tupple with the workspace or None if not valid and the error message""" errorMessage = Workspace.isFolderAValidWorkspace(workspacePath) if errorMessage is not None: logging.warn(errorMessage) return (None, errorMessage) workspaceFile = os.path.join(workspacePath, Workspace.CONFIGURATION_FILENAME) logging.debug(" Workspace configuration file found: " + str(workspaceFile)) # We validate the file given the schemas for xmlSchemaFile in Workspace.WORKSPACE_SCHEMAS.keys(): from netzob.Common.ResourcesConfiguration import ResourcesConfiguration xmlSchemaPath = os.path.join(ResourcesConfiguration.getStaticResources(), xmlSchemaFile) # If we find a version which validates the XML, we parse with the associated function if Workspace.isSchemaValidateXML(xmlSchemaPath, workspaceFile): logging.debug(" Workspace configuration file " + str(workspaceFile) + " is valid against XSD scheme " + str(xmlSchemaPath)) parsingFunc = Workspace.WORKSPACE_SCHEMAS[xmlSchemaFile] workspace = parsingFunc(workspacePath, workspaceFile) if workspace is not None: return (workspace, None) else: logging.fatal("The specified Workspace file is not valid according to the XSD found in %s." % (xmlSchemaPath)) return (None, _("An unknown error prevented to open the workspace.")) @staticmethod def isSchemaValidateXML(schemaFile, xmlFile): # is the schema is a file if not os.path.isfile(schemaFile): logging.warn("The specified schema file (" + str(schemaFile) + ") is not valid : its not a file.") return False # is it readable if not os.access(schemaFile, os.R_OK): logging.warn("The specified schema file (" + str(schemaFile) + ") is not readable.") return False schemaF = open(schemaFile, "r") schemaContent = schemaF.close() if schemaContent is None or len(schemaContent) == 0: logging.warn("Impossible to read the schema file (no content found in it)") return False # Extended version of an XSD validator # Create an xmlParser for the schema schemaParser = etree.XMLParser() # Register a resolver (to locate the other XSDs according to the path of static resources) xsdResolver = XSDResolver() xsdResolver.addMapping("common.xsd", os.path.join(os.path.dirname(schemaFile), "common.xsd")) schemaParser.resolvers.add(xsdResolver) schemaParsed = etree.parse(schemaContent, parser=schemaParser) schema = etree.XMLSchema(schemaParsed) try: xmlRoot = etree.parse(xmlFile) schema.assertValid(xmlRoot) return True except Exception as e: logging.warn(e) log = schema.error_log error = log.last_error logging.warn(error) return False return False # Dictionary of workspace versions, must be sorted by version DESC WORKSPACE_SCHEMAS = {"xsds/0.1/Workspace.xsd": loadWorkspace_0_1} def getName(self): return def getCreationDate(self): return self.creationDate def getPath(self): return self.path def getProjectsPath(self): return self.projects_path def getPathOfTraces(self): return self.pathOfTraces def getPathOfLogging(self): return self.pathOfLogging def getPathOfPrototypes(self): return self.pathOfPrototypes