# -*- coding: utf-8 -*-
#+---------------------------------------------------------------------------+
#| 01001110 01100101 01110100 01111010 01101111 01100010 |
#| |
#| Netzob : Inferring communication protocols |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011 Georges Bossert and Frédéric Guihéry |
#| This program is free software: you can redistribute it and/or modify |
#| it under the terms of the GNU General Public License as published by |
#| the Free Software Foundation, either version 3 of the License, or |
#| (at your option) any later version. |
#| |
#| This program is distributed in the hope that it will be useful, |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
#| GNU General Public License for more details. |
#| |
#| You should have received a copy of the GNU General Public License |
#| along with this program. If not, see <http://www.gnu.org/licenses/>. |
#+---------------------------------------------------------------------------+
#| @url : http://www.netzob.org |
#| @contact : contact@netzob.org |
#| @sponsors : Amossys, http://www.amossys.fr |
#| Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/ |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| Standard library imports
#+---------------------------------------------------------------------------+
from gettext import gettext as _
import logging
import os
import datetime
import re
import uuid
from lxml.etree import ElementTree, DocumentInvalid
from lxml import etree
import types
import shutil
#+---------------------------------------------------------------------------+
#| Local Imports
#+---------------------------------------------------------------------------+
from netzob.Common.ResourcesConfiguration import ResourcesConfiguration
from netzob.Common.ProjectConfiguration import ProjectConfiguration
from netzob.Common.Vocabulary import Vocabulary
from netzob.Common.Grammar import Grammar
from netzob.Common.Type.TypeConvertor import TypeConvertor
from netzob.Common.Type.Format import Format
from netzob.Common.Type.UnitSize import UnitSize
from netzob.Common.Type.Sign import Sign
from netzob.Common.Type.Endianess import Endianess
from netzob.Common.XSDResolver import XSDResolver
from netzob.Common.Property import Property
from netzob.Common.PropertyList import PropertyList
from netzob.Common.Simulator import Simulator
PROJECT_NAMESPACE = "http://www.netzob.org/project"
COMMON_NAMESPACE = "http://www.netzob.org/common"
def loadProject_0_1(projectFile):
# Parse the XML Document as 0.1 version
tree = ElementTree()
tree.parse(projectFile)
xmlProject = tree.getroot()
# Register the namespace
etree.register_namespace('netzob', PROJECT_NAMESPACE)
etree.register_namespace('netzob-common', COMMON_NAMESPACE)
projectID = str(xmlProject.get('id'))
projectName = xmlProject.get('name', 'none')
projectCreationDate = TypeConvertor.xsdDatetime2PythonDatetime(xmlProject.get('creation_date'))
projectPath = xmlProject.get('path')
project = Project(projectID, projectName, projectCreationDate, projectPath)
description = xmlProject.get('description')
project.setDescription(description)
# Parse the configuration
if xmlProject.find("{" + PROJECT_NAMESPACE + "}configuration") is not None:
projectConfiguration = ProjectConfiguration.loadProjectConfiguration(xmlProject.find("{" + PROJECT_NAMESPACE + "}configuration"), PROJECT_NAMESPACE, "0.1")
project.setConfiguration(projectConfiguration)
# Parse the vocabulary
if xmlProject.find("{" + PROJECT_NAMESPACE + "}vocabulary") is not None:
projectVocabulary = Vocabulary.loadVocabulary(xmlProject.find("{" + PROJECT_NAMESPACE + "}vocabulary"), PROJECT_NAMESPACE, COMMON_NAMESPACE, "0.1", project)
project.setVocabulary(projectVocabulary)
# Parse the grammar
if xmlProject.find("{" + PROJECT_NAMESPACE + "}grammar") is not None:
projectGrammar = Grammar.loadGrammar(xmlProject.find("{" + PROJECT_NAMESPACE + "}grammar"), projectVocabulary, PROJECT_NAMESPACE, "0.1")
if projectGrammar is not None:
project.setGrammar(projectGrammar)
# Parse the simulator
if xmlProject.find("{" + PROJECT_NAMESPACE + "}simulator") is not None:
projectSimulator = Simulator.loadSimulator(xmlProject.find("{" + PROJECT_NAMESPACE + "}simulator"), PROJECT_NAMESPACE, "0.1", project.getGrammar().getAutomata(), project.getVocabulary())
if projectSimulator is not None:
project.setSimulator(projectSimulator)
return project
class ProjectException(Exception):
pass
[docs]class Project(object):
"""Class definition of a Project"""
# The name of the configuration file
CONFIGURATION_FILENAME = "config.xml"
# /!\ WARNING:
# The dict{} which defines the parsing function associated with each schema
# is added to the end of the document
#+-----------------------------------------------------------------------+
#| Constructor
#| @param name : name of the project
#| @param creationDate : date of creation
#+-----------------------------------------------------------------------+
def __init__(self, id, name, creationDate, path):
self.id = id
self.name = name
self.creationDate = creationDate
self.path = path
self.vocabulary = Vocabulary()
self.grammar = Grammar()
self.simulator = Simulator()
self.configuration = ProjectConfiguration.loadDefaultProjectConfiguration()
self.description = None
def generateXMLConfigFile(self):
# Register the namespace
etree.register_namespace('netzob', PROJECT_NAMESPACE)
etree.register_namespace('netzob-common', COMMON_NAMESPACE)
# Dump the file
root = etree.Element("{" + PROJECT_NAMESPACE + "}project")
root.set("id", str(self.getID()))
root.set("path", str(self.getPath()))
# Warning, changed because of project = Project.createProject(self.netzob.getCurrentWorkspace(), projectName)
if isinstance(self.getCreationDate(), types.TupleType):
root.set("creation_date", TypeConvertor.pythonDatetime2XSDDatetime(self.getCreationDate()[0]))
else:
root.set("creation_date", TypeConvertor.pythonDatetime2XSDDatetime(self.getCreationDate()))
root.set("name", str(self.getName()))
if self.description:
root.set("description", str(self.description))
# Save the configuration in it
self.getConfiguration().save(root, PROJECT_NAMESPACE)
# Save the vocabulary in it
self.getVocabulary().save(root, PROJECT_NAMESPACE, COMMON_NAMESPACE)
# Save the grammar in it
if self.getGrammar() is not None:
self.getGrammar().save(root, PROJECT_NAMESPACE)
# Save the simulator in it
self.getSimulator().save(root, PROJECT_NAMESPACE)
return root
def saveConfigFile(self, workspace):
projectPath = os.path.join(workspace.getPath(), self.getPath())
projectFile = os.path.join(projectPath, Project.CONFIGURATION_FILENAME)
logging.info("Save the config file of project {0} in {1}".format(self.getName(), projectFile))
# First we verify and create if necessary the directory of the project
if not os.path.exists(projectPath):
logging.info("Creation of the directory: {0}".format(projectPath))
os.mkdir(projectPath)
# We generate the XML Config file
root = self.generateXMLConfigFile()
tree = ElementTree(root)
tree.write(projectFile, pretty_print=True)
def cloneProjectTo(self, workspace, cloneName):
try:
# Original project files
origProjectPath = os.path.join(workspace.getPath(), self.path)
# Clone project
idProject = str(uuid.uuid4())
clonePath = os.path.join(workspace.getPath(), "projects", idProject)
cloneFile = os.path.join(clonePath, Project.CONFIGURATION_FILENAME)
logging.info("Clone project from '{0}' to '{0}'".format(origProjectPath,
clonePath))
# Clone project files
shutil.copytree(origProjectPath, clonePath)
# Create the new project
project = Project.loadProject(workspace, clonePath)
project.setID(idProject)
project.setName(cloneName)
project.setPath(clonePath)
project.saveConfigFile(workspace)
# Update workspace
workspace.referenceProject(project.getPath())
workspace.saveConfigFile()
except IOError, e:
raise ProjectException(str(e))
def hasPendingModifications(self, workspace):
result = True
# TODO : Some errors may occur here...
try:
tree = ElementTree(self.generateXMLConfigFile())
currentXml = etree.tostring(tree)
tree.parse(os.path.join(os.path.join(os.path.join(workspace.getPath(), "projects"), self.getPath()), Project.CONFIGURATION_FILENAME))
xmlProject = tree.getroot()
oldXml = etree.tostring(xmlProject)
if currentXml == oldXml:
result = False
except:
pass
return result
def getEnvironmentDependencies(self):
"""Computes and returns the list of environment dependencies
and associates.
@return: a list of Properties"""
envDeps = []
excludedProperties = ["Data", "ID"]
symbols = []
if self.getVocabulary() is not None:
symbols.extend(self.getVocabulary().getSymbols())
# Retrieve the list of properties for each Symbol
for symbol in symbols:
for message in symbol.getMessages():
properties = message.getProperties()
for property in properties:
if not property.getName() in excludedProperties:
found = False
for prop in envDeps:
if prop.getCurrentValue() == property.getCurrentValue():
found = True
break
if not found:
envDeps.append(property)
# Retrieve the list of properties for the project
propertiesProject = self.getProperties()
for prop in propertiesProject:
found = False
for property in envDeps:
if prop.getCurrentValue() == property.getCurrentValue():
found = True
break
if not found:
envDeps.append(prop)
return envDeps
def deleteProject(self, workspace):
try:
# Delete project files
projectFullPath = os.path.join(workspace.getPath(), self.path)
logging.debug("Deleting project files: {0}".format(projectFullPath))
shutil.rmtree(projectFullPath)
# Dereference project from Workspace
workspace.dereferenceProject(self.path)
except IOError, e:
raise ProjectException(_("Unable to delete project: {0}").format(e))
@staticmethod
def createProject(workspace, name):
idProject = str(uuid.uuid4())
path = os.path.join("projects", idProject)
creationDate = datetime.datetime.now()
project = Project(idProject, name, creationDate, path)
# Creation of the config file
project.saveConfigFile(workspace)
# Register the project in the workspace
workspace.referenceProject(project.getPath())
workspace.saveConfigFile()
return project
@staticmethod
def getNameOfProject(workspace, projectDirectory):
projectFile = os.path.join(os.path.join(workspace.getPath(), projectDirectory), Project.CONFIGURATION_FILENAME)
# verify we can open and read the file
if projectFile is None:
return None
# is the projectFile is a file
if not os.path.isfile(projectFile):
logging.warn("The specified project's configuration file ({0} is not valid: its not a file.".format(str(projectFile)))
return None
# is it readable
if not os.access(projectFile, os.R_OK):
logging.warn("The specified project's configuration file ({0}) is not readable.".format(str(projectFile)))
return None
# We validate the file given the schemas
for xmlSchemaFile in Project.PROJECT_SCHEMAS.keys():
xmlSchemaPath = os.path.join(ResourcesConfiguration.getStaticResources(), xmlSchemaFile)
# If we find a version which validates the XML, we parse with the associated function
if Project.isSchemaValidateXML(xmlSchemaPath, projectFile):
tree = ElementTree()
tree.parse(projectFile)
xmlProject = tree.getroot()
# Register the namespace
etree.register_namespace('netzob', PROJECT_NAMESPACE)
etree.register_namespace('netzob-common', COMMON_NAMESPACE)
projectName = xmlProject.get('name', 'none')
if projectName is not None and projectName != 'none':
return projectName
else:
logging.warn("The project declared in file ({0}) is not valid".format(projectFile))
return None
@staticmethod
def loadProjectFromFile(projectFile):
# verify we can open and read the file
if projectFile is None:
return None
# is the projectFile is a file
if not os.path.isfile(projectFile):
logging.warn("The specified project's configuration file ({0}) is not valid: its not a file.".format(projectFile))
return None
# is it readable
if not os.access(projectFile, os.R_OK):
logging.warn("The specified project's configuration file ({0}) is not readable.".format(projectFile))
return None
# We validate the file given the schemas
for xmlSchemaFile in Project.PROJECT_SCHEMAS.keys():
xmlSchemaPath = os.path.join(ResourcesConfiguration.getStaticResources(), xmlSchemaFile)
# If we find a version which validates the XML, we parse with the associated function
if Project.isSchemaValidateXML(xmlSchemaPath, projectFile):
parsingFunc = Project.PROJECT_SCHEMAS[xmlSchemaFile]
project = parsingFunc(projectFile)
if project is not None:
logging.info("Loading project '{0}' from workspace.".format(project.getName()))
return project
else:
logging.warn("The project declared in file ({0}) is not valid".format(projectFile))
return None
@staticmethod
def importNewXMLProject(workspace, xmlProjectFile):
# Generate the Unique ID of the imported project
idProject = str(uuid.uuid4())
# First we verify and create if necessary the directory of the project
projectPath = "projects/{0}/".format(idProject)
destPath = os.path.join(workspace.getPath(), projectPath)
try:
if not os.path.exists(destPath):
logging.info("Creation of the directory {0}".format(destPath))
os.mkdir(destPath)
# Retrieving and storing of the config file
destFile = os.path.join(destPath, Project.CONFIGURATION_FILENAME)
shutil.copy(xmlProjectFile, destFile)
project = Project.loadProject(workspace, destPath)
project.setID(idProject)
project.setName(_("Copy of {0}").format(project.getName()))
project.setPath(projectPath)
project.saveConfigFile(workspace)
workspace.referenceProject(project.getPath())
workspace.saveConfigFile()
return project
except IOError, e:
logging.warn("Error when importing project: {0}".format(e))
raise ProjectException(_("Unable to import the project: {0}.").format(e))
@staticmethod
def loadProject(workspace, projectDirectory):
projectFile = os.path.join(os.path.join(workspace.getPath(), projectDirectory), Project.CONFIGURATION_FILENAME)
return Project.loadProjectFromFile(projectFile)
@staticmethod
def isSchemaValidateXML(schemaFile, xmlFile):
# is the schema is a file
if not os.path.isfile(schemaFile):
logging.warn("The specified schema file ({0}) is not valid: its not a file.".format(str(schemaFile)))
return False
# is it readable
if not os.access(schemaFile, os.R_OK):
logging.warn("The specified schema file ({0}) is not readable.".format(str(schemaFile)))
return False
schemaF = open(schemaFile, "r")
schemaContent = schemaF.read()
schemaF.close()
if schemaContent is None or len(schemaContent) == 0:
logging.warn("Impossible to read the schema file (no content found in it)")
return False
# Extended version of an XSD validator
# Create an xmlParser for the schema
schemaParser = etree.XMLParser()
# Register a resolver (to locate the other XSDs according to the path of static resources)
xsdResolver = XSDResolver()
xsdResolver.addMapping("common.xsd", os.path.join(os.path.dirname(schemaFile), "common.xsd"))
schemaParser.resolvers.add(xsdResolver)
schemaParsed = etree.parse(schemaContent, parser=schemaParser)
schema = etree.XMLSchema(schemaParsed)
# We parse the given XML file
try:
xmlRoot = etree.parse(xmlFile)
try:
schema.assertValid(xmlRoot)
return True
except DocumentInvalid, err:
log = schema.error_log
error = log.last_error
logging.error(error)
logging.error("XML Document in invalid: {0}".format(err))
return False
except etree.XMLSyntaxError, e:
log = e.error_log.filter_from_level(etree.ErrorLevels.FATAL)
logging.error(log)
return False
# Dictionary of projects versions, must be sorted by version DESC
PROJECT_SCHEMAS = {"xsds/0.1/Project.xsd": loadProject_0_1}
def getProperties(self):
properties = PropertyList()
configuration = self.getConfiguration()
properties.append(Property('workspace', Format.STRING, self.getPath()))
prop = Property('name', Format.STRING, self.getName())
prop.setIsEditable(True)
properties.append(prop)
prop = Property('description', Format.STRING, self.getDescription())
prop.setIsEditable(True)
properties.append(prop)
properties.append(Property('date', Format.STRING, self.getCreationDate()))
properties.append(Property('symbols', Format.DECIMAL, len(self.getVocabulary().getSymbols())))
properties.append(Property('messages', Format.DECIMAL, len(self.getVocabulary().getMessages())))
fields = 0
for sym in self.getVocabulary().getSymbols():
fields = fields + len(sym.getField().getExtendedFields())
properties.append(Property('fields', Format.DECIMAL, fields))
prop = Property(configuration.VOCABULARY_GLOBAL_FORMAT, Format.STRING, configuration.getVocabularyInferenceParameter(configuration.VOCABULARY_GLOBAL_FORMAT))
prop.setIsEditable(True)
prop.setPossibleValues(Format.getSupportedFormats())
properties.append(prop)
prop = Property(configuration.VOCABULARY_GLOBAL_UNITSIZE, Format.STRING, configuration.getVocabularyInferenceParameter(configuration.VOCABULARY_GLOBAL_UNITSIZE))
prop.setIsEditable(True)
prop.setPossibleValues([UnitSize.NONE, UnitSize.BITS4, UnitSize.BITS8, UnitSize.BITS16, UnitSize.BITS32, UnitSize.BITS64])
properties.append(prop)
prop = Property(configuration.VOCABULARY_GLOBAL_SIGN, Format.STRING, configuration.getVocabularyInferenceParameter(configuration.VOCABULARY_GLOBAL_SIGN))
prop.setIsEditable(True)
prop.setPossibleValues([Sign.SIGNED, Sign.UNSIGNED])
properties.append(prop)
prop = Property(configuration.VOCABULARY_GLOBAL_ENDIANESS, Format.STRING, configuration.getVocabularyInferenceParameter(configuration.VOCABULARY_GLOBAL_ENDIANESS))
prop.setIsEditable(True)
prop.setPossibleValues([Endianess.BIG, Endianess.LITTLE])
properties.append(prop)
return properties
def getID(self):
return self.id
def getName(self):
return self.name
def getDescription(self):
return self.description
def getCreationDate(self):
return self.creationDate
def getPath(self):
return self.path
def getVocabulary(self):
return self.vocabulary
def getGrammar(self):
return self.grammar
def getConfiguration(self):
return self.configuration
def getSimulator(self):
return self.simulator
def setID(self, idproject):
self.id = idproject
def setName(self, name):
self.name = name
def setDescription(self, description):
self.description = description
def setPath(self, path):
self.path = path
def setCreationDate(self, creationDate):
self.creationDate = creationDate
def setConfiguration(self, conf):
self.configuration = conf
def setVocabulary(self, voc):
self.vocabulary = voc
def setGrammar(self, grammar):
self.grammar = grammar
def setSimulator(self, simulator):
self.simulator = simulator