#-*- coding: utf-8 -*-
#+---------------------------------------------------------------------------+
#| 01001110 01100101 01110100 01111010 01101111 01100010 |
#| |
#| Netzob : Inferring communication protocols |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry |
#| This program is free software: you can redistribute it and/or modify |
#| it under the terms of the GNU General Public License as published by |
#| the Free Software Foundation, either version 3 of the License, or |
#| (at your option) any later version. |
#| |
#| This program is distributed in the hope that it will be useful, |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
#| GNU General Public License for more details. |
#| |
#| You should have received a copy of the GNU General Public License |
#| along with this program. If not, see <http://www.gnu.org/licenses/>. |
#+---------------------------------------------------------------------------+
#| @url : http://www.netzob.org |
#| @contact : contact@netzob.org |
#| @sponsors : Amossys, http://www.amossys.fr |
#| Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/ |
#| ANSSI, https://www.ssi.gouv.fr |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| File contributors : |
#| - Georges Bossert <gbossert (a) miskin.fr> |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| Standard library imports |
#+---------------------------------------------------------------------------+
import socket
import ssl
#+---------------------------------------------------------------------------+
#| Related third party imports |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| Local application imports |
#+---------------------------------------------------------------------------+
from netzob.Common.Utils.Decorators import typeCheck, NetzobLogger
from netzob.Simulator.Channels.AbstractChannel import AbstractChannel, ChannelDownException
@NetzobLogger
[docs]class SSLClient(AbstractChannel):
"""An SSLClient is a communication channel that relies on SSL. It allows to create client connecting
to a specific IP:Port server over a TCP/SSL socket.
When the actor execute an OpenChannelTransition, it calls the open
method on the ssl client which connects to the server.
"""
def __init__(self,
remoteIP,
remotePort,
localIP=None,
localPort=None,
timeout=2,
server_cert_file=None,
alpn_protocols=None):
super(SSLClient, self).__init__(isServer=False)
self.remoteIP = remoteIP
self.remotePort = remotePort
self.localIP = localIP
self.localPort = localPort
self.timeout = timeout
self.type = AbstractChannel.TYPE_SSLCLIENT
self.__socket = None
self.__ssl_socket = None
self.server_cert_file = server_cert_file
self.alpn_protocols = alpn_protocols
[docs] def open(self, timeout=None):
"""Open the communication channel. If the channel is a client, it starts to connect
to the specified server.
"""
if self.isOpen:
raise RuntimeError(
"The channel is already open, cannot open it again")
self.__socket = socket.socket()
# Reuse the connection
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__socket.settimeout(self.timeout)
if self.localIP is not None and self.localPort is not None:
self.__socket.bind((self.localIP, self.localPort))
# lets create the ssl context
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.check_hostname = False
context.load_default_certs()
if self.server_cert_file is not None:
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(self.server_cert_file)
else:
context.verify_mode = ssl.CERT_NONE
if self.alpn_protocols is not None:
context.set_alpn_protocols(self.alpn_protocols)
# lets wrap the socket to create the ssl tunnel
self.__ssl_socket = context.wrap_socket(self.__socket)
self.__ssl_socket.settimeout(self.timeout)
self._logger.debug("Connect to the SSL server to {0}:{1}".format(
self.remoteIP, self.remotePort))
self.__ssl_socket.connect((self.remoteIP, self.remotePort))
self.isOpen = True
[docs] def close(self):
"""Close the communication channel."""
if self.__ssl_socket is not None:
self.__ssl_socket.close()
if self.__socket is not None:
self.__socket.close()
self.isOpen = False
[docs] def read(self, timeout=None):
"""Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached
@type timeout: :class:`int`
"""
reading_seg_size = 1024
if self.__ssl_socket is not None:
data = b""
finish = False
while not finish:
try:
recv = self.__ssl_socket.recv(reading_seg_size)
except ssl.SSLError:
# says we received nothing (timeout issue)
recv = b""
if recv is None or len(recv) == 0:
finish = True
else:
data += recv
return data
else:
raise Exception("socket is not available")
[docs] def writePacket(self, data):
"""Write on the communication channel the specified data
:parameter data: the data to write on the channel
:type data: binary object
"""
if self.__socket is not None:
try:
self.__ssl_socket.sendall(data)
return len(data)
except ssl.SSLError:
raise ChannelDownException()
else:
raise Exception("socket is not available")
@typeCheck(bytes)
[docs] def sendReceive(self, data, timeout=None):
"""Write on the communication channel the specified data and returns
the corresponding response.
"""
raise NotImplementedError("Not yet implemented")
# Management methods
# Properties
@property
def remoteIP(self):
"""IP on which the server will listen.
:type: :class:`str`
"""
return self.__remoteIP
@remoteIP.setter
@typeCheck(str)
def remoteIP(self, remoteIP):
if remoteIP is None:
raise TypeError("RemoteIP cannot be None")
self.__remoteIP = remoteIP
@property
def remotePort(self):
"""TCP Port on which the server will listen.
Its value must be above 0 and under 65535.
:type: :class:`int`
"""
return self.__remotePort
@remotePort.setter
@typeCheck(int)
def remotePort(self, remotePort):
if remotePort is None:
raise TypeError("RemotePort cannot be None")
if remotePort <= 0 or remotePort > 65535:
raise ValueError("RemotePort must be > 0 and <= 65535")
self.__remotePort = remotePort
@property
def localIP(self):
"""IP on which the server will listen.
:type: :class:`str`
"""
return self.__localIP
@localIP.setter
@typeCheck(str)
def localIP(self, localIP):
self.__localIP = localIP
@property
def localPort(self):
"""TCP Port on which the server will listen.
Its value must be above 0 and under 65535.
:type: :class:`int`
"""
return self.__localPort
@localPort.setter
@typeCheck(int)
def localPort(self, localPort):
self.__localPort = localPort
@property
def timeout(self):
return self.__timeout
@timeout.setter
def timeout(self, timeout):
self.__timeout = timeout