#!/usr/bin/env python """NAT-PMP client library Provides functions to interact with NAT-PMP gateways implementing version 0 of the NAT-PMP draft specification. This version does not completely implement the draft standard. * It does not provide functionality to listen for address change packets. * It does not have a proper request queuing system, meaning that multiple requests may be issued in parallel, against spec recommendations. For more information on NAT-PMP, see the NAT-PMP draft specification: http://files.dns-sd.org/draft-cheshire-nat-pmp.txt Requires Python 2.3 or later. Tested on Python 2.3, 2.4, 2.5 against Apple AirPort Express. 0.0.1.2 - NT autodetection code. Thanks to roee shlomo for the gateway detection regex! 0.0.1.1 - Removed broken mutex code 0.0.1 - Initial release """ __version__ = "0.0.1.2" __license__ = """Copyright (c) 2008, Yiming Liu, All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * The names of the author and contributors may not be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.""" __author__ = "Yiming Liu " import struct, socket, select, time import sys, os, re NATPMP_PORT = 5351 NATPMP_RESERVED_VAL = 0 NATPMP_PROTOCOL_UDP = 1 NATPMP_PROTOCOL_TCP = 2 NATPMP_GATEWAY_NO_VALID_GATEWAY = -10 NATPMP_GATEWAY_NO_SUPPORT = -11 NATPMP_GATEWAY_CANNOT_FIND = -12 NATPMP_RESULT_SUCCESS = 0 # Success NATPMP_RESULT_UNSUPPORTED_VERSION = 1 # Unsupported Version NATPMP_RESULT_NOT_AUTHORIZED = 2 # Not Authorized/Refused/NATPMP turned off NATPMP_RESULT_NETWORK_FAILURE = 3 # Network Failure NATPMP_RESULT_OUT_OF_RESOURCES = 4 # can not create more mappings NATPMP_RESULT_UNSUPPORTED_OPERATION = 5 # not a supported opcode # all remaining results are fatal errors NATPMP_ERROR_DICT = { NATPMP_RESULT_SUCCESS:"No error.", NATPMP_RESULT_UNSUPPORTED_VERSION:"The protocol version specified is unsupported.", NATPMP_RESULT_NOT_AUTHORIZED:"The operation was refused. NAT-PMP may be turned off on gateway.", NATPMP_RESULT_NETWORK_FAILURE:"There was a network failure. The gateway may not have an IP address.",# Network Failure NATPMP_RESULT_OUT_OF_RESOURCES:"The NAT-PMP gateway is out of resources and cannot create more mappings.", # can not create more mappings NATPMP_RESULT_UNSUPPORTED_OPERATION:"The NAT-PMP gateway does not support this operation", # not a supported opcode NATPMP_GATEWAY_NO_SUPPORT:'The gateway does not support NAT-PMP', NATPMP_GATEWAY_NO_VALID_GATEWAY:'No valid gateway address was specified.', NATPMP_GATEWAY_CANNOT_FIND:'Cannot automatically determine gateway address. Must specify manually.' } class NATPMPRequest(object): """Represents a basic NAT-PMP request. This currently consists of the 1-byte fields version and opcode. Other requests are derived from NATPMPRequest. """ retry_increment = 0.250 # seconds def __init__(self, version, opcode): self.version = version self.opcode = opcode def toBytes(self): """Converts the request object to a byte string.""" return struct.pack('!BB', self.version, self.opcode) class PublicAddressRequest(NATPMPRequest): """Represents a NAT-PMP request to the local gateway for a public address. As per the specification, this is a generic request with the opcode = 0. """ def __init__(self, version=0): NATPMPRequest.__init__(self, version, 0) class PortMapRequest(NATPMPRequest): """Represents a NAT-PMP request to the local gateway for a port mapping. As per the specification, this request extends NATPMPRequest with the fields private_port, public_port, and lifetime. The first two are 2-byte unsigned shorts, and the last is a 4-byte unsigned integer. """ def __init__(self, protocol, private_port, public_port, lifetime=3600, version=0): NATPMPRequest.__init__(self, version, protocol) self.private_port = private_port self.public_port = public_port self.lifetime = lifetime def toBytes(self): s= NATPMPRequest.toBytes(self) + struct.pack('!HHHI', NATPMP_RESERVED_VAL, self.private_port, self.public_port, self.lifetime) return s class NATPMPResponse(object): """Represents a generic NAT-PMP response from the local gateway. The generic response has fields for version, opcode, result, and secs since last epoch (last boot of the NAT gateway). As per the specification, the opcode is offset by 128 from the opcode of the original request. """ def __init__(self, version, opcode, result, sec_since_epoch): self.version = version self.opcode = opcode self.result = result self.sec_since_epoch = sec_since_epoch def __str__(self): return "NATPMPResponse(%d, %d, %d, $d)" % (self.version, self.opcode, self.result, self.sec_since_epoch) class PublicAddressResponse(NATPMPResponse): """Represents a NAT-PMP response from the local gateway to a public-address request. It has one additional 4-byte field containing the IP returned. The member variable ip contains the Python-friendly string form, while ip_int contains the same in the original 4-byte unsigned int. """ def __init__(self, bytes): version, opcode, result, sec_since_epoch, self.ip_int = struct.unpack("!BBHII", bytes) NATPMPResponse.__init__(self, version, opcode, result, sec_since_epoch) self.ip = socket.inet_ntoa(bytes[8:8+4]) #self.ip = socket.inet_ntoa(self.ip_bytes) def __str__(self): return "PublicAddressResponse: version %d, opcode %d (%d), result %d, ssec %d, ip %s" % (self.version, self.opcode, self.result, self.sec_since_epoch, self.ip) class PortMapResponse(NATPMPResponse): """Represents a NAT-PMP response from the local gateway to a public-address request. The response contains the private port, public port, and the lifetime of the mapping in addition to typical NAT-PMP headers. Note that the port mapping assigned is NOT NECESSARILY the port requested (see the specification for details). """ def __init__(self, bytes): version, opcode, result, sec_since_epoch, self.private_port, self.public_port, self.lifetime = struct.unpack('!BBHIHHI', bytes) NATPMPResponse.__init__(self, version, opcode, result, sec_since_epoch) def __str__(self): return "PortMapResponse: version %d, opcode %d (%d), result %d, ssec %d, private_port %d, public port %d, lifetime %d" % (self.version, self.opcode, self.opcode, self.result, self.sec_since_epoch, self.private_port, self.public_port, self.lifetime) class NATPMPError(Exception): """Generic exception state. May be used to represent unknown errors.""" pass class NATPMPResultError(NATPMPError): """Used when a NAT gateway responds with an error-state response.""" pass class NATPMPNetworkError(NATPMPError): """Used when a network error occurred while communicating with the NAT gateway.""" pass class NATPMPUnsupportedError(NATPMPError): """Used when a NAT gateway does not support NAT-PMP.""" pass def get_gateway_addr(): """A hack to obtain the current gateway automatically, since Python has no interface to sysctl(). This may or may not be the gateway we should be contacting. It does not guarantee correct results. This function requires the presence of netstat on the path on POSIX and NT. It requires ip on Linux. """ addr = "" shell_command = 'netstat -rn' if os.name == "posix": pattern = re.compile('default\s+([\w.:]+)\s+\w') if "linux" in sys.platform: shell_command = "ip route show" pattern = re.compile('default via\s+([\w.:]+)\s+\w') elif os.name == "nt": pattern = re.compile(".*?Default Gateway:[ ]+(.*?)\n") system_out = os.popen(shell_command, 'r').read() # TODO: this could be a security issue if not system_out: raise NATPMPNetworkError(NATPMP_GATEWAY_CANNOT_FIND, error_str(NATPMP_GATEWAY_CANNOT_FIND)) match = pattern.search(system_out) if not match: raise NATPMPNetworkError(NATPMP_GATEWAY_CANNOT_FIND, error_str(NATPMP_GATEWAY_CANNOT_FIND)) addr = match.groups()[0].strip() return addr # TODO: use real auto-detection def error_str(result_code): """Takes a numerical error code and returns a human-readable error string. """ result = NATPMP_ERROR_DICT.get(result_code) if not result: result = "Unknown fatal error." return result def get_gateway_socket(gateway): """Takes a gateway address string and returns a non-blocking UDP socket to communicate with its NAT-PMP implementation on NATPMP_PORT. e.g. addr = get_gateway_socket('10.0.1.1') """ if not gateway: raise NATPMPNetworkError(NATPMP_GATEWAY_NO_VALID_GATEWAY, error_str(NATPMP_GATEWAY_NO_VALID_GATEWAY)) response_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) response_socket.setblocking(0) response_socket.connect((gateway, NATPMP_PORT)) return response_socket def get_public_address(gateway_ip=get_gateway_addr(), retry=9): """A high-level function that returns the public interface IP of the current host by querying the NAT-PMP gateway. IP is returned as string. Takes two possible keyword arguments: gateway_ip - the IP to the NAT-PMP compatible gateway. Defaults to using auto-detection function get_gateway_addr() retry - the number of times to retry the request if unsuccessful. Defaults to 9 as per specification. """ addr = None addr_request = PublicAddressRequest() addr_response = send_request_with_retry(gateway_ip, addr_request, responseDataClass=PublicAddressResponse, retry=retry) if addr_response.result != 0: #sys.stderr.write("NAT-PMP error %d: %s\n" % (addr_response.result, error_str(addr_response.result))) #sys.stderr.flush() raise NATPMPResultError(addr_response.result, error_str(addr_response.result), addr_response) addr = addr_response.ip return addr def map_tcp_port(public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True): """A high-level wrapper to map_port() that requests a mapping for a public TCP port on the NAT to a private TCP port on this host. Returns the complete response on success. public_port - the public port of the mapping requested private_port - the private port of the mapping requested lifetime - the duration of the mapping in seconds. Defaults to 3600, per specification. gateway_ip - the IP to the NAT-PMP compatible gateway. Defaults to using auto-detection function get_gateway_addr() retry - the number of times to retry the request if unsuccessful. Defaults to 9 as per specification. useException - throw an exception if an error result is received from the gateway. Defaults to True. """ return map_port(NATPMP_PROTOCOL_TCP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, useException=useException) def map_udp_port(public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True): """A high-level wrapper to map_port() that requests a mapping for a public UDP port on the NAT to a private UDP port on this host. Returns the complete response on success. public_port - the public port of the mapping requested private_port - the private port of the mapping requested lifetime - the duration of the mapping in seconds. Defaults to 3600, per specification. gateway_ip - the IP to the NAT-PMP compatible gateway. Defaults to using auto-detection function get_gateway_addr() retry - the number of times to retry the request if unsuccessful. Defaults to 9 as per specification. useException - throw an exception if an error result is received from the gateway. Defaults to True. """ return map_port(NATPMP_PROTOCOL_UDP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, useException=useException) def map_port(protocol, public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True): """A function to map public_port to private_port of protocol. Returns the complete response on success. protocol - NATPMP_PROTOCOL_UDP or NATPMP_PROTOCOL_TCP public_port - the public port of the mapping requested private_port - the private port of the mapping requested lifetime - the duration of the mapping in seconds. Defaults to 3600, per specification. gateway_ip - the IP to the NAT-PMP compatible gateway. Defaults to using auto-detection function get_gateway_addr() retry - the number of times to retry the request if unsuccessful. Defaults to 9 as per specification. useException - throw an exception if an error result is received from the gateway. Defaults to True. """ if protocol not in [NATPMP_PROTOCOL_UDP, NATPMP_PROTOCOL_TCP]: raise ValueError("Must be either NATPMP_PROTOCOL_UDP or NATPMP_PROTOCOL_TCP") response = None port_mapping_request = PortMapRequest(protocol, private_port, public_port, lifetime) port_mapping_response = send_request_with_retry(gateway_ip, port_mapping_request, responseDataClass=PortMapResponse, retry=retry) if port_mapping_response.result != 0 and useException: raise NATPMPResultError(port_mapping_response.result, error_str(port_mapping_response.result), port_mapping_response) return port_mapping_response def send_request(gateway_socket, request): gateway_socket.sendall(request.toBytes()) def read_response(gateway_socket, timeout, responseSize=16): data = "" source_addr = ("", "") rlist, wlist, xlist = select.select([gateway_socket], [], [], timeout) if rlist: resp_socket = rlist[0] data,source_addr = resp_socket.recvfrom(responseSize) return data,source_addr def send_request_with_retry(gateway_ip, request, responseDataClass=None, retry=9): gateway_socket = get_gateway_socket(gateway_ip) n = 1 data = "" while n <= retry and not data: send_request(gateway_socket, request) data,source_addr = read_response(gateway_socket, n * request.retry_increment) if source_addr[0] != gateway_ip or source_addr[1] != NATPMP_PORT: data = "" # discard data if source mismatch, as per specification n += 1 if n >= retry and not data: raise NATPMPUnsupportedError(NATPMP_GATEWAY_NO_SUPPORT, error_str(NATPMP_GATEWAY_NO_SUPPORT)) if data and responseDataClass: data = responseDataClass(data) return data if __name__ == "__main__": addr = get_public_address() map_resp = map_tcp_port(62001, 62001) print addr print map_resp.__dict__