Source code for netzob.Inference.Grammar.Angluin

# -*- coding: utf-8 -*-

#+---------------------------------------------------------------------------+
#|          01001110 01100101 01110100 01111010 01101111 01100010            |
#|                                                                           |
#|               Netzob : Inferring communication protocols                  |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry              |
#| This program is free software: you can redistribute it and/or modify      |
#| it under the terms of the GNU General Public License as published by      |
#| the Free Software Foundation, either version 3 of the License, or         |
#| (at your option) any later version.                                       |
#|                                                                           |
#| This program is distributed in the hope that it will be useful,           |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of            |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the              |
#| GNU General Public License for more details.                              |
#|                                                                           |
#| You should have received a copy of the GNU General Public License         |
#| along with this program. If not, see <http://www.gnu.org/licenses/>.      |
#+---------------------------------------------------------------------------+
#| @url      : http://www.netzob.org                                         |
#| @contact  : contact@netzob.org                                            |
#| @sponsors : Amossys, http://www.amossys.fr                                |
#|             Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/           |
#+---------------------------------------------------------------------------+

#+----------------------------------------------
#| Standard library imports
#+----------------------------------------------

#+----------------------------------------------
#| Related third party imports
#+----------------------------------------------

#+----------------------------------------------
#| Local application imports
#+----------------------------------------------
from netzob.Common.Utils.Decorators import NetzobLogger
from netzob.Common.Utils.Decorators import typeCheck
from netzob.Inference.Grammar.lstar.ObservationTable import ObservationTable


@NetzobLogger
[docs]class MealyLSTAR(object): """This class is an implementation of the Angluin L* Algorithm as detailled in "Learning regular sets from queries and counterexamples" [Ang87]. This active grammatical inference algorithm infers state machine. It communicates with a target by sending membership queries which requires to have access to an implementation of the protocol. To illustrate its usage, we will infer the grammar of a fake simple protocol. >>> from netzob.all import * >>> import time We first create a fake server which requires a vocabulary of input (I) and output (O) symbols: >>> i0 = Symbol(name="a", fields=[Field("a\\n")]) >>> i1 = Symbol(name="b", fields=[Field("b\\n")]) >>> i2 = Symbol(name="c", fields=[Field("c\\n")]) >>> i3 = Symbol(name="d", fields=[Field("d\\n")]) >>> # List of Client > Server messages >>> I = [i0, i1, i2, i3] >>> o0 = Symbol(name="0", fields=[Field("0")]) >>> o1 = Symbol(name="1", fields=[Field("1")]) >>> o2 = Symbol(name="2", fields=[Field("2")]) >>> o3 = Symbol(name="3", fields=[Field("3")]) >>> # List of Server > Client messages >>> O = [o0, o1, o2, o3] >>> symbolList = I + O Now we can create the grammar which includes 5 states >>> s0 = State(name="S0") >>> s1 = State(name="S1") >>> s2 = State(name="S2") >>> s3 = State(name="S3") >>> s4 = State(name="S4") and their transitions >>> t0 = Transition(s0, s1, i0, [o0]) >>> t1 = Transition(s1, s1, i1, [o1]) >>> t2 = Transition(s1, s2, i2, [o2]) >>> t3 = Transition(s2, s1, i1, [o1]) >>> t4 = Transition(s2, s3, i0, [o0]) >>> t5 = Transition(s3, s1, i1, [o1]) >>> t6 = Transition(s3, s4, i2, [o2]) >>> t7 = Transition(s1, s4, i0, [o1]) we add an initial state and an ending state with open and close channel transitions >>> initialState = State(name="Initial") >>> endingState = State(name="End") >>> openTransition = OpenChannelTransition(initialState, s0) >>> closeTransition = CloseChannelTransition(s4, endingState) >>> automata = Automata(initialState, symbolList) >>> # Create an actor: Alice (a server) >>> channel = UDPServer(localIP="127.0.0.1", localPort=8887) >>> abstractionLayer = AbstractionLayer(channel, symbolList) >>> alice = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer) >>> alice.start() We finaly create an angluin-based grammar learner >>> # Creates an inference channel >>> angluinChannel = UDPClient(remoteIP="127.0.0.1", remotePort=8887) # >>> angluin = MealyLSTAR(inputSymbols = I, outputSymbols = O, channel=angluinChannel) # >>> angluin.start() # We wait for the results >>> time.sleep(10) # >>> while (angluin.alive): time.sleep(5) >>> print("Inference finish") Inference finish >>> alice.stop() >>> print(angluin.initialStateOfInferedGrammar) State [Ang87] @article{Ang87, author = {Angluin, Dana}, title = {Learning regular sets from queries and counterexamples}, journal = {Inf. Comput.}, year = {1987}, volume = {75}, pages = {87--106}, month = {November} } """ def __init__(self, inputVocabulary, membershipOracle): self.inputVocabulary = inputVocabulary self.membershipOracle = membershipOracle self.__observationTable = ObservationTable(self.inputVocabulary) @property def hypothesisModel(self): self._logger.debug("Starting the computation of an hypothesis model")
[docs] def refineHypothesis(self, counterExample): self._logger.debug( "Refining the current hypothesis according to counter example : {}". format(counterExample))
[docs] def startLearning(self): self._logger.debug("Starting the MealyLSTAR inference process.")
@property def inputVocabulary(self): return self.__inputVocabulary @inputVocabulary.setter def inputVocabulary(self, inputVocabulary): self.__inputVocabulary = inputVocabulary @property def membershipOracle(self): return self.__membershipOracle @membershipOracle.setter def membershipOracle(self, mOracle): self.__membershipOracle = mOracle
# # create logger with the given configuration # self.log = logging.getLogger('netzob.Inference.Grammar.MealyLSTAR.py') # self.observationTable = dict() # self.initializeObservationTable() # def initializeObservationTable(self): # self.log.info("Initialization of the observation table") # self.suffixes = [] # self.D = [] # # Create the S and SA # self.S = [] # self.SA = [] # self.initialD = [] # # fullfill D with the dictionary # for entry in self.getInputDictionary(): # letter = DictionarySymbol(entry) # mq = MembershipQuery([letter]) # self.addWordInD(mq) # self.initialD.append(mq) # # self.D.append(letter) # # self.suffixes.append(letter) # # Initialize the observation table # emptyMQ = MembershipQuery([EmptySymbol()]) # self.addWordInS(emptyMQ) # def addWordInD(self, words): # if words in self.D: # self.log.info("The words " + str(words) + " already exists in D") # return # self.log.info("Adding word " + str(words) + " in D") # self.D.append(words) # self.observationTable[words] = None # # We compute the value of all existing S and SA # cel = dict() # for wordS in self.S: # mq = wordS.getMQSuffixedWithMQ(words) # cel[wordS] = self.submitQuery(mq) # for wordSA in self.SA: # mq = wordSA.getMQSuffixedWithMQ(words) # cel[wordSA] = self.submitQuery(mq) # self.observationTable[words] = cel # def addWordInS(self, word): # # first we verify the word is not already in S # if word in self.S: # self.log.info("The word " + str(word) + " already exists in S") # return # if word in self.SA: # self.log.info("The word " + str(word) + " already exists in SA") # self.SA.remove(word) # self.log.info("Adding word " + str(word) + " to S") # self.S.append(word) # # We create a MQ which looks like : MQ(word,letter) # for letter in self.D: # mq = word.getMQSuffixedWithMQ(letter) # # we add it in the observation table # if self.observationTable[letter] is not None: # cel = self.observationTable[letter] # else: # cel = dict() # cel[word] = self.submitQuery(mq) # self.observationTable[letter] = cel # # Now we add # for letter in self.D: # self.addWordInSA(word.getMQSuffixedWithMQ(letter)) # self.displayObservationTable() # def addWordInSA(self, word): # # first we verify the word is not already in SA # if word in self.SA: # self.log.info("The word " + str(word) + " already exists in SA") # return # if word in self.S: # self.log.info("The word " + str(word) + " already exists in S (addWordInSA)") # return # self.log.info("Adding word " + str(word) + " to SA") # self.SA.append(word) # for letter in self.D: # mq = word.getMQSuffixedWithMQ(letter) # if self.observationTable[letter] is not None: # cel = self.observationTable[letter] # else: # cel = dict() # cel[word] = self.submitQuery(mq) # self.observationTable[letter] = cel # self.displayObservationTable() # def learn(self): # self.log.info("Learn...") # self.displayObservationTable() # while (not self.isClosed() or not self.isConsistent()): # if not self.isClosed(): # self.log.info("#================================================") # self.log.info("The table is not closed") # self.log.info("#================================================") # self.closeTable() # self.displayObservationTable() # else: # self.log.info("Table is closed !") # if not self.isConsistent(): # self.log.info("#================================================") # self.log.info("The table is not consistent") # self.log.info("#================================================") # self.makesTableConsistent() # self.displayObservationTable() # else: # self.log.info("Table is consistent !") # self.log.info("Another turn") # self.displayObservationTable() # self.log.info("Table is closed and consistent") # # We compute an automata # self.computeAutomata() # def add_column(self): # pass # def move_row(self): # pass # def isEquivalent(self): # return True # def isClosed(self): # self.log.debug("Compute if the table is closed") # rowSA = [] # rowS = [] # for wordSA in self.SA: # rowSA = self.getRowOfObservationTable(wordSA) # found = False # self.log.info("isClosed()? We verify with SA member: {0}".format(str(rowSA))) # for wordS in self.S: # rowS = self.getRowOfObservationTable(wordS) # if self.rowsEquals(rowS, rowSA): # self.log.debug(" isClosed(): YES ({0}) is equal!".format(str(rowS))) # found = True # if not found: # self.log.info("The low-row associated with {0} was not found in S".format(str(wordSA))) # return False # return True # def closeTable(self): # self.log.debug("We close the table") # rowSA = [] # rowS = [] # for wordSA in self.SA: # rowSA = self.getRowOfObservationTable(wordSA) # found = False # for wordS in self.S: # rowS = self.getRowOfObservationTable(wordS) # if self.rowsEquals(rowS, rowSA): # found = True # if not found: # self.log.info("The low-row associated with " + str(wordSA) + " was not found in S") # self.moveWordFromSAtoS(wordSA) # return False # return True # def isConsistent(self): # self.log.info("Is consistent ... ?") # # search for all the rows of S which are equals # rowS = [] # equalsRows = [] # for wordS in self.S: # rowS.append((wordS, self.getRowOfObservationTable(wordS))) # for (word, row) in rowS: # for (word2, row2) in rowS: # if word != word2 and self.rowsEquals(row, row2): # equalsRows.append((word, word2)) # self.log.info("isConsistent ? Equals Rows in S are from words: ") # for (w1, w2) in equalsRows: # self.log.info("w1=" + str(w1) + ";w2=" + str(w2)) # # We verify all the equals rows are still equals one letter more # for a in self.initialD: # w1a = w1.getMQSuffixedWithMQ(a) # w2a = w2.getMQSuffixedWithMQ(a) # self.log.info("Searching for word " + str(w1a)) # self.log.info("Searching for word " + str(w2a)) # row_w1a = self.getRowOfObservationTable(w1a) # row_w2a = self.getRowOfObservationTable(w2a) # if not self.rowsEquals(row_w1a, row_w2a): # self.log.info("The table is not consistent because the rows from w1=" + str(w1a) + ";w2=" + str(w2a) + " are NOT equals") # return False # return True # def makesTableConsistent(self): # # search for all the rows of S which are equals # rowS = [] # equalsRows = [] # for wordS in self.S: # rowS.append((wordS, self.getRowOfObservationTable(wordS))) # for (word, row) in rowS: # for (word2, row2) in rowS: # if word != word2 and self.rowsEquals(row, row2): # equalsRows.append((word, word2)) # self.log.info("Equals Rows in S are from words: ") # for (w1, w2) in equalsRows: # self.log.info("w1=" + str(w1) + ";w2=" + str(w2)) # # We verify all the equals rows are still equals one letter more # for a in self.initialD: # w1a = w1.getMQSuffixedWithMQ(a) # w2a = w2.getMQSuffixedWithMQ(a) # row_w1a = self.getRowOfObservationTable(w1a) # row_w2a = self.getRowOfObservationTable(w2a) # if not self.rowsEquals(row_w1a, row_w2a): # # We find the E (col) which makes the unconsistency # e = None # for i in range(0, len(row_w1a)): # if row_w1a[i].getID() != row_w2a[i].getID(): # e = self.D[i] # self.log.info("E found is " + str(e)) # newCol = a.getMQSuffixedWithMQ(e) # self.log.info("So we add (a.e) to E (=D) a.e=[" + str(newCol) + "]") # self.addWordInD(newCol) # self.log.info("The table is not consistent because the rows from w1=" + str(w1a) + ";w2=" + str(w2a) + " are NOT equals") # return False # return True # def rowsEquals(self, r1, r2): # if (len(r1) != len(r2)): # return False # for i in range(0, len(r1)): # if r1[i].getID() != r2[i].getID(): # return False # self.log.debug(str(r1) + " == " + str(r2)) # return True # def moveWordFromSAtoS(self, wordSA): # if not wordSA in self.SA: # self.log.warn("Impossible to move the word from SA since it doesn't exist") # return # self.SA.remove(wordSA) # self.addWordInS(wordSA) # def getRowOfObservationTable(self, rowName): # cols = [] # for letter in self.D: # val = self.observationTable[letter] # mem = None # for rN in val.keys(): # if rN == rowName: # mem = rN # break # if mem is not None: # cols.append(self.observationTable[letter][mem]) # return cols # def getUniqueRowsInS(self): # # Unique rows in S => new states (name = value of the row) # uniqueRowsInS = [] # for wordS in self.S: # rowS = self.getRowOfObservationTable(wordS) # found = False # for (w, r) in uniqueRowsInS: # if self.rowsEquals(r, rowS): # found = True # if not found: # uniqueRowsInS.append((wordS, rowS)) # return uniqueRowsInS # def getSandSAWords(self): # result = [] # for wordS in self.S: # result.append(wordS) # for wordSA in self.SA: # result.append(wordSA) # return result # def computeAutomata(self): # wordAndStates = [] # startState = None # idState = 0 # idTransition = 0 # states = [] # self.log.info("Compute the automata...") # # Create the states of the automata # uniqueRowsInS = self.getUniqueRowsInS() # for (w, r) in uniqueRowsInS: # self.log.info("The row with word {0} is unique !".format(str(w))) # # We create a State for each unique row # nameState = self.appendValuesInRow(r) # self.log.info("Create state: {0}".format(nameState)) # currentState = NormalState(idState, nameState) # states.append(currentState) # wordAndStates.append((w, currentState)) # # Is it the starting state (wordS = [EmptySymbol]) # if startState is None and w == MembershipQuery([EmptySymbol()]): # startState = currentState # self.log.info("Its the starting state") # idState = idState + 1 # self.log.debug("Create the transition of the automata") # # Create the transitions of the automata # for (word, state) in wordAndStates: # self.log.debug("Working on state: {0}".format(str(state.getName()))) # for symbol in self.initialD: # # retrieve the value: # dicValue = self.observationTable[symbol] # value = dicValue[word] # # search for the output state # mq = word.getMQSuffixedWithMQ(symbol) # self.log.debug("> What happen when we send " + str(symbol) + " after " + str(word)) # self.log.debug(">> " + str(mq)) # for wordSandSA in self.getSandSAWords(): # self.log.info("IS " + str(wordSandSA) + " eq " + str(mq)) # if wordSandSA == mq: # self.log.info("YES its equal") # rowOutputState = self.getRowOfObservationTable(wordSandSA) # outputStateName = self.appendValuesInRow(rowOutputState) # self.log.debug("rowOutputState = " + str(rowOutputState)) # self.log.debug("outputStateName = " + str(outputStateName)) # # search for the state having this name: # outputState = None # self.log.info("Search for the output state: {0}".format(outputStateName)) # for (w2, s2) in wordAndStates: # if s2.getName() == outputStateName: # outputState = s2 # self.log.info(" == " + str(s2.getName())) # else: # self.log.info(" != " + str(s2.getName())) # if outputState is not None: # inputSymbol = symbol.getSymbolsWhichAreNotEmpty()[0] # self.log.info("We create a transition from " + str(state.getName()) + "=>" + str(outputState.getName())) # self.log.info(" input: {0}".format(str(inputSymbol))) # self.log.info(" output: {0}".format(str(value))) # transition = SemiStochasticTransition(idTransition, "Transition " + str(idTransition), state, outputState, inputSymbol) # transition.addOutputSymbol(value, 100, 1000) # state.registerTransition(transition) # idTransition = idTransition + 1 # else: # self.log.error("<!!> Impossible to retrieve the output state named " + str(s2.getName())) # if startState is not None: # self.log.info("An infered automata has been computed.") # self.inferedAutomata = MMSTD(startState, self.dictionary) # for state in states: # self.inferedAutomata.addState(state) # self.log.info("----------------------------------------------") # self.log.info("Constructed Hypothetised Automata:") # self.log.info("----------------------------------------------") # self.log.info(self.inferedAutomata.getDotCode()) # def addCounterExamples(self, counterExamples): # self.log.info("Modify the automata in order to consider the " + str(len(counterExamples)) + " counterexamples") # for counterExample in counterExamples: # # we add all the prefix of the counterexample to S # prefixes = counterExample.getNotEmptyPrefixes() # self.log.info("A number of " + str(len(prefixes)) + " will be added !") # for p in prefixes: # self.log.info("=> " + str(p)) # for prefix in prefixes: # self.displayObservationTable() # self.addWordInS(prefix) # self.displayObservationTable() # def appendValuesInRow(self, row): # result = [] # for i in range(0, len(row)): # result.append(str(row[i])) # return '-'.join(result) # def displayObservationTable(self): # self.log.info(self.observationTable) # horizontal = "---------------------------------------------------------------------------------" # horizontal2 = "=================================================================================" # self.log.info(horizontal) # line = [] # for letter in self.D: # line.append(str(letter)) # self.log.info("\t|\t|" + "\t|".join(line)) # self.log.info(horizontal2) # for mqS in self.S: # line = [] # line.append(str(mqS)) # for letter in self.D: # tmp = self.observationTable[letter] # line.append(str(tmp[mqS])) # self.log.info("\t|".join(line)) # self.log.info(horizontal) # self.log.info(horizontal2) # for mqSA in self.SA: # line = [] # line.append(str(mqSA)) # for letter in self.D: # tmp = self.observationTable[letter] # line.append(str(tmp[mqSA])) # self.log.info("\t|".join(line)) # self.log.info(horizontal) # # self.addWordInS(MembershipQuery([]))