# -*- coding: utf-8 -*-
#+---------------------------------------------------------------------------+
#| 01001110 01100101 01110100 01111010 01101111 01100010 |
#| |
#| Netzob : Inferring communication protocols |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry |
#| This program is free software: you can redistribute it and/or modify |
#| it under the terms of the GNU General Public License as published by |
#| the Free Software Foundation, either version 3 of the License, or |
#| (at your option) any later version. |
#| |
#| This program is distributed in the hope that it will be useful, |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
#| GNU General Public License for more details. |
#| |
#| You should have received a copy of the GNU General Public License |
#| along with this program. If not, see <http://www.gnu.org/licenses/>. |
#+---------------------------------------------------------------------------+
#| @url : http://www.netzob.org |
#| @contact : contact@netzob.org |
#| @sponsors : Amossys, http://www.amossys.fr |
#| Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/ |
#+---------------------------------------------------------------------------+
#+----------------------------------------------
#| Standard library imports
#+----------------------------------------------
import logging
#+----------------------------------------------
#| Related third party imports
#+----------------------------------------------
#+----------------------------------------------
#| Local application imports
#+----------------------------------------------
from netzob.Inference.Grammar.Angluin import MealyLSTAR
from netzob.Inference.Grammar.MQCache import MQCache
# Replace by previous import statement : from Angluin import Angluin
import threading
import time
#+----------------------------------------------
#| GrammarInferer:
#| Given Angluin's L*a algorithm, it learns
#| the grammar of a protocol
#+----------------------------------------------
[docs]class GrammarInferer(threading.Thread):
def __init__(self, vocabulary, inputDictionary, oracle, equivalenceOracle,
resetScript, cb_submitedQuery, cb_hypotheticalAutomaton):
threading.Thread.__init__(self)
# create logger with the given configuration
self.log = logging.getLogger(
'netzob.Inference.Grammar.GrammarInferer.py')
self.vocabulary = vocabulary
self.inputDictionary = inputDictionary
self.oracle = oracle
self.equivalenceOracle = equivalenceOracle
self.resetScript = resetScript
self.cb_submitedQuery = cb_submitedQuery
self.cb_hypotheticalAutomaton = cb_hypotheticalAutomaton
self.active = False
self.inferedAutomaton = None
self.hypotheticalAutomaton = None
self.learner = None
[docs] def run(self):
self.log.info("Starting the Grammar inferring process")
self.active = True
self.infer()
self.active = False
self.log.info("Ending the Grammar inferring process")
[docs] def hasFinish(self):
return not self.active
[docs] def getInferedAutomaton(self):
return self.inferedAutomaton
[docs] def getHypotheticalAutomaton(self):
return self.hypotheticalAutomaton
[docs] def getSubmitedQueries(self):
if self.learner is not None:
return self.learner.getSubmitedQueries()
return []
[docs] def stop(self):
self.active = False
[docs] def infer(self):
self.active = True
equivalent = False
startTime = time.time()
# Create a MQ cache
cache = MQCache()
# we first initialize the angluin's algo
self.learner = Angluin(self.vocabulary, self.inputDictionary,
self.oracle, self.resetScript,
self.cb_submitedQuery,
self.cb_hypotheticalAutomaton, cache)
while not equivalent and self.active:
self.log.info(
"============================================================================="
)
self.log.info("Execute one new round of the inferring process")
self.log.info(
"============================================================================="
)
self.learner.learn()
if not self.active:
break
self.hypotheticalAutomaton = self.learner.getInferedAutomata()
self.log.info("An hypothetical automaton has been computed")
# Execute the call back function for the hypothetial automaton
# GObject.idle_add(self.cb_hypotheticalAutomaton, self.hypotheticalAutomaton)
counterExample = self.equivalenceOracle.findCounterExample(
self.hypotheticalAutomaton, self.inputDictionary, cache)
if not self.active:
break
if counterExample is None:
self.log.info("No counter-example were found !")
equivalent = True
else:
self.log.info("A counter-example has been found")
for s in counterExample.getSymbols():
self.log.info("symbol : " + str(s) + " => " + str(
s.getID()))
self.learner.addCounterExamples([counterExample])
automaton = self.learner.getInferedAutomata()
self.log.info("The following automaton has been computed : " + str(
automaton.getDotCode()))
# Now we apply indeterminism
i_session = 0
self.vocabulary.getSessions()
for session in self.sessions:
self.log.info("Re-inject session (" + str(i_session) +
") in the automata")
self.applyMessagesOnAutomata(automaton, messages)
i_session = i_session + 1
endTime = time.time()
self.log.info("The inferring process is finished !")
print("Elapsed time: {} msecs".format((endTime - startTime) * 1000))
self.inferedAutomaton = automaton
[docs] def applyMessagesOnAutomata(self, automaton, messages):
self.log.info("Apply the messages ({0}) on automata".format(messages))
automaton.getInitialState()
return automaton