Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

2 changed files with 266 additions and 255 deletions

View file

@ -115,7 +115,7 @@ load_rc_config $name
pidfile="/var/run/${name}.pid" pidfile="/var/run/${name}.pid"
command="/usr/local/bin/checkmyip.py" command="/usr/local/bin/checkmyip.py"
command_interpreter="/usr/local/bin/python3.11" command_interpreter="/usr/local/bin/python3.9"
command_args="&" command_args="&"
start_precmd="checkmyip_precmd" start_precmd="checkmyip_precmd"

View file

@ -1,14 +1,18 @@
#!/usr/local/bin/python3.11 #!/usr/local/bin/python3.9
##### BSD Cafe CheckMyIP Server #####
##### Originally written by John W Kerns #####
##### http://blog.packetsar.com #####
##### Forked to BSD Cafe by Stefano Marinelli #####
##### https://brew.bsd.cafe/BSDCafe/checkmyip #####
##### BSD Cafe CheckMyIP Server #####
##### Originally written by John W Kerns #####
##### Forked to BSD Cafe by Stefano Marinelli #####
##### https://brew.bsd.cafe/BSDCafe/checkmyip #####
##### Inform version here ##### ##### Inform version here #####
version = "v1.3.2" version = "v1.3.0"
##### Import necessary modules #####
##### Import python2 native modules #####
import os import os
import sys import sys
import time import time
@ -17,34 +21,21 @@ import json
import jinja2 import jinja2
import paramiko import paramiko
import threading import threading
from concurrent.futures import ThreadPoolExecutor
from dnslib import DNSRecord, DNSHeader, RR, TXT, QTYPE from dnslib import DNSRecord, DNSHeader, RR, TXT, QTYPE
import logging # Use built-in logging module
##### Configure Logging #####
LOG_DIR = "/var/log/checkmyip/"
os.makedirs(LOG_DIR, exist_ok=True)
# General Logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s: %(message)s",
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, f"{time.strftime('%Y-%m-%d')}_checkmyip.log")),
logging.StreamHandler(sys.stdout)
]
)
# Paramiko Logger
paramiko_log = os.path.join(LOG_DIR, f"{time.strftime('%Y-%m-%d')}_ssh.log")
paramiko.util.log_to_file(paramiko_log)
##### Jinja formatting for logging queries ##### ##### Jinja formatting for logging queries #####
j2log = "Connection from: {{ ip }} ({{ port }}) ({{ proto }})" j2log = "Connection from: {{ ip }} ({{ port }}) ({{ proto }})"
##### Jinja formatting for response queries ##### ##### Jinja formatting for response queries #####
j2send = """{ j2send = """{
"comment": "## Your IP Address is {{ ip }} ({{ port }}) ##", "comment": "## Your IP Address is {{ ip }} ({{ port }}) ##",
"family": "{{ family }}", "family": "{{ family }}",
"ip": "{{ ip }}", "ip": "{{ ip }}",
"port": "{{ port }}", "port": "{{ port }}",
@ -64,13 +55,10 @@ class DNSResponder:
self.sock.bind((self.address, self.port)) self.sock.bind((self.address, self.port))
def run(self): def run(self):
logging.info("DNS Server listening on port %s for both IPv4 and IPv6", self.port) print("DNS Server listening on port 53 for both IPv4 and IPv6")
while True: while True:
try: data, addr = self.sock.recvfrom(1024)
data, addr = self.sock.recvfrom(1024) threading.Thread(target=self.handle_query, args=(data, addr)).start()
threading.Thread(target=self.handle_query, args=(data, addr), daemon=True).start()
except Exception as e:
logging.error("DNS Server error: %s", e)
def handle_query(self, data, addr): def handle_query(self, data, addr):
try: try:
@ -89,260 +77,283 @@ class DNSResponder:
response.add_answer(RR(qname, QTYPE.TXT, ttl=60, rdata=TXT(ip))) response.add_answer(RR(qname, QTYPE.TXT, ttl=60, rdata=TXT(ip)))
self.sock.sendto(response.pack(), addr) self.sock.sendto(response.pack(), addr)
except Exception as e: except Exception as e:
logging.error("Error handling DNS query: %s", e) print(f"Error handling DNS query: {e}")
##### Creates an RSA key for use by paramiko #####
class RSAKey:
data = """-----BEGIN RSA PRIVATE KEY-----
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
-----END RSA PRIVATE KEY-----
"""
def readlines(self): # For use by paramiko.RSAKey.from_private_key
return self.data.split("\n")
def get_key(self): ##### Handles all prnting to console and logging to the logfile #####
return paramiko.RSAKey.from_private_key(self) class log_management:
def __init__(self):
self.logpath = "/var/log/checkmyip/" # Log file directory path
self.logfile = "/var/log/checkmyip/%scheckmyip.log" % \
time.strftime("%Y-%m-%d_") # Log file full path
self.paramikolog = "/var/log/checkmyip/%sssh.log" % \
time.strftime("%Y-%m-%d_") # SSH log file path
self.thread = threading.Thread(target=self._change_logfiles)
self.thread.daemon = True
self.thread.start() # Start talker thread to listen to port
self._publish_methods() # Publish the console and log methods to glob
self.can_log = True # Variable used to check if we can log
try: # Try to configure the SSH log file, create dir if fail
paramiko.util.log_to_file(self.paramikolog)
except IOError:
self._create_log_dir()
def _logger(self, data): # Logging method published to global as 'log'
logdata = time.strftime("%Y-%m-%d %H:%M:%S") + ": " + data + "\n"
if self.can_log:
try: # Try to write to log, create log dir if fail
f = open(self.logfile, 'a')
f.write(logdata)
f.close()
except IOError:
self._console("Unable to log to logfile %s. Creating log directory" % self.logfile)
self.can_log = False
self._create_log_dir()
self._console(logdata)
def _console(self, data, timestamp=False):
if timestamp:
logdata = time.strftime("%Y-%m-%d %H:%M:%S") + ": " + data + "\n"
else:
logdata = data
print(logdata)
def _publish_methods(self):
global log
global console
log = self._logger # Global method used to write to the log file
console = self._console # Global method used to write to the console
def _create_log_dir(self): # Create the directory for logging
os.system('mkdir -p ' + self.logpath)
self._console("Logpath (%s) created" % self.logpath)
self.can_log = True
def _change_logfiles(self, thread=True):
while True:
time.sleep(10)
self.logfile = "/var/log/checkmyip/%scheckmyip.log" % \
time.strftime("%Y-%m-%d_") # Log file full path
self.paramikolog = "/var/log/checkmyip/%sssh.log" % \
time.strftime("%Y-%m-%d_") # SSH log file path
paramiko.util.log_to_file(self.paramikolog)
##### Creates a RSA key for use by paramiko #####
class rsa_key:
data = """-----BEGIN RSA PRIVATE KEY-----
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
-----END RSA PRIVATE KEY-----
"""
def readlines(self): # For use by paramiko.RSAKey.from_private_key mthd
return self.data.split("\n")
def __call__(self): # Recursive method uses own object as arg when called
return paramiko.RSAKey.from_private_key(self)
##### Imports and modifies the ServerInterface module for use by paramiko ##### ##### Imports and modifies the ServerInterface module for use by paramiko #####
class SSHServer(paramiko.ServerInterface): class ssh_server (paramiko.ServerInterface):
def __init__(self): def __init__(self):
self.event = threading.Event() self.event = threading.Event()
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_none(self, username): # Auth none method left wide open
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username): # Give no auth options
return 'none'
def check_channel_shell_request(self, channel):
self.event.set()
return True
def check_channel_pty_request(self, channel, term, width, height,
pixelwidth, pixelheight, modes):
return True
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_none(self, username):
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username):
return 'none'
def check_channel_shell_request(self, channel):
self.event.set()
return True
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
##### Method to merge Jinja templates ##### ##### Method to merge Jinja templates #####
def j2format(j2tmp, valdict): def j2format(j2tmp, valdict):
template = jinja2.Template(j2tmp) template = jinja2.Template(j2tmp)
return template.render(valdict).replace("\n", "\r\n") return template.render(valdict).replace("\n", "\r\n")
##### Cleans IP addresses coming from socket library ##### ##### Cleans IP addresses coming from socket library #####
def cleanip(addr): def cleanip(addr):
ip, port = addr[:2] ip, port = addr[:2]
family = "ipv6" # Default to IPv6 family = "ipv6" # Default to IPv6
if ip.startswith("::ffff:"): # Check if this is a prefixed IPv4 address if ip.startswith("::ffff:"): # Check if this is a prefixed IPv4 address
ip = ip.replace("::ffff:", "") # Clean the IP ip = ip.replace("::ffff:", "") # Clean the IP
family = "ipv4" family = "ipv4"
elif ":" not in ip: # Simple check to recognize IPv4 elif ":" not in ip: # Simple check to recognize IPv4
family = "ipv4" family = "ipv4"
return ip, port, family # Return cleaned IP and family return ip, port, family # Return cleaned IP and family
##### TCP listener methods using ThreadPoolExecutor for better thread management #####
def listener(port, talker, executor): ##### TCP listener methods. Gets used once for each listening port #####
listen_ip = '' def listener(port, talker):
listen_port = port listen_ip = ''
buffer_size = 1024 listen_port = port
while True: buffer_size = 1024
try: while True:
# Use AF_INET6 but also handle IPv4 connections # Use AF_INET6 but also handle IPv4 connections
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Allow IPv4-mapped IPv6 addresses sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Allow IPv4-mapped IPv6 addresses
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((listen_ip, listen_port)) sock.bind((listen_ip, listen_port))
sock.listen(buffer_size) sock.listen(buffer_size)
logging.info("Listening on port %s", listen_port) while True:
while True: client, addr = sock.accept()
client, addr = sock.accept() ip, port, family = cleanip(addr) # Get cleaned IP info including family
ip, port, family = cleanip(addr) # Get cleaned IP info including family valdict = {"ip": ip, "port": port, "family": family, "proto": ""}
valdict = {"ip": ip, "port": port, "family": family, "proto": ""} thread = threading.Thread(target=talker, args=(client, valdict))
executor.submit(talker, client, valdict) # Submit to thread pool thread.start() # Start talker thread to listen to port
except Exception as e:
logging.error("Listener on port %s encountered an error: %s", port, e)
time.sleep(5) # Wait before retrying
##### Telnet responder method. Is run in own thread for each telnet query ##### ##### Telnet responder method. Is run in own thread for each telnet query #####
def telnet_talker(client, valdict, proto="telnet"): def telnet_talker(client, valdict, proto="telnet"):
try: valdict.update({"proto": proto}) # Add the protocol to the value dict
valdict.update({"proto": proto}) # Add the protocol to the value dict log(j2format(j2log, valdict)) # Log the query to the console and logfile
logging.info(j2format(j2log, valdict)) # Log the query # Send the query response
# Send the query response client.send(f'{j2format(j2send, valdict)}\n'.encode())
response = j2format(j2send, valdict) + "\n" client.close() # Close the channel
client.sendall(response.encode())
except Exception as e:
logging.error("Telnet talker error: %s", e)
finally:
client.close() # Ensure the socket is closed
##### SSH responder method. Gets run in own thread for each SSH query ##### ##### SSH responder method. Gets run in own thread for each SSH query #####
def ssh_talker(client, valdict, proto="ssh"): def ssh_talker(client, valdict, proto="ssh"):
try: def makefile(): # A hack to make Cisco SSH sessions work properly
def makefile(chan): chan.makefile('rU').readline().strip('\r\n')
# A hack to make SSH sessions work properly valdict.update({"proto": proto})
chan.makefile('rU').readline().strip('\r\n') log(j2format(j2log, valdict))
t = paramiko.Transport(client, gss_kex=True)
t.set_gss_host(socket.getfqdn(""))
t.load_server_moduli()
t.add_server_key(rsa_key()()) # RSA key object nested call
server = ssh_server()
t.start_server(server=server)
chan = t.accept(20)
if chan:
server.event.wait(10)
chan.send('%s\n' % j2format(j2send, valdict)) # Send the response
thread = threading.Thread(target=makefile)
thread.start() # Start hack in thread since it hangs indefinately
time.sleep(1) # Wait a second
chan.close() # And kill the SSH channel
client.close() # And kill the session
valdict.update({"proto": proto})
logging.info(j2format(j2log, valdict))
t = paramiko.Transport(client)
t.set_gss_host(socket.getfqdn(""))
t.load_server_moduli()
t.add_server_key(RSAKey().get_key()) # RSA key object
server = SSHServer()
t.start_server(server=server)
chan = t.accept(20)
if chan:
if not server.event.wait(10):
logging.warning("No shell request received.")
chan.close()
else:
response = j2format(j2send, valdict) + "\n"
chan.send(response)
threading.Thread(target=makefile, args=(chan,), daemon=True).start()
time.sleep(1)
chan.close()
except Exception as e:
logging.error("SSH talker error: %s", e)
finally:
client.close()
##### HTTP responder method. Gets run in own thread for each HTTP query #####
##### Automatically detects if client is a browser or a telnet client #####
##### HTTP responder method. Gets run in own thread for each HTTP query ##### ##### HTTP responder method. Gets run in own thread for each HTTP query #####
def http_talker(client, valdict, proto="http"): def http_talker(client, valdict, proto="http"):
time.sleep(.1) # Sleep to allow the client to send some data
client.setblocking(0) # Set the socket recv as non-blocking
browser = False # Is the client using a browser?
headers = {}
try: try:
time.sleep(0.1) # Allow client to send data data = client.recv(2048) # Receive data from the buffer (if any)
client.settimeout(5.0) # Set timeout to prevent blocking indefinitely request_lines = data.decode().split('\r\n')
browser = False # Is the client using a browser? for line in request_lines:
headers = {} if ": " in line:
try: key, value = line.split(": ", 1)
data = client.recv(2048) # Receive data from the buffer (if any) headers[key] = value
if data: # Extract real IP and adjust family based on headers if present
request_lines = data.decode().split('\r\n') if 'X-Real-IP' in headers or 'X-Forwarded-For' in headers:
for line in request_lines: real_ip = headers.get('X-Real-IP', headers.get('X-Forwarded-For').split(',')[0])
if ": " in line: ip, port, family = cleanip((real_ip, valdict['port']))
key, value = line.split(": ", 1) valdict.update({"ip": ip, "family": family})
headers[key] = value browser = True # Set client browser to True
# Extract real IP and adjust family based on headers if present except socket.error: # If buffer was empty, then like a telnet client on TCP80
if 'X-Real-IP' in headers or 'X-Forwarded-For' in headers: browser = False # Set client browser to False
real_ip = headers.get('X-Real-IP', headers.get('X-Forwarded-For').split(',')[0]) if not browser: # If the client is not a browser
ip, port, family = cleanip((real_ip, valdict['port'])) telnet_talker(client, valdict, "http-telnet") # Hand to telnet_talker
valdict.update({"ip": ip, "family": family}) else: # If client is a browser
browser = True # Set client browser to True # Proceed with standard HTTP response (with headers)
except socket.timeout: valdict.update({"proto": proto})
browser = False # Set client browser to False log(j2format(j2log, valdict))
except Exception as e: response_body_raw = j2format(j2send, valdict)+"\n"
logging.error("HTTP talker recv error: %s", e) if request_lines[0].split()[1] == "/raw":
browser = False json_content = json.loads(response_body_raw)
response_body_raw = json_content['ip']
response_headers_raw = """HTTP/1.1 200 OK
Content-Length: %s
Content-Type: application/json; encoding=utf8
Connection: close""" % str(len(response_body_raw)) # Response with headers
client.send(f'{response_headers_raw}\n\n{response_body_raw}'.encode())
client.close()
if not browser: # If the client is not a browser
telnet_talker(client, valdict, "http-telnet") # Hand to telnet_talker
else: # If client is a browser
valdict.update({"proto": proto})
logging.info(j2format(j2log, valdict))
response_body_raw = j2format(j2send, valdict) + "\n"
if request_lines and "/raw" in request_lines[0]:
try:
json_content = json.loads(response_body_raw)
response_body_raw = json_content.get('ip', '')
except json.JSONDecodeError:
response_body_raw = "Invalid JSON response."
response_headers_raw = (
"HTTP/1.1 200 OK\r\n"
f"Content-Length: {len(response_body_raw)}\r\n"
"Content-Type: application/json; charset=utf-8\r\n"
"Connection: close\r\n\r\n"
)
client.sendall(response_headers_raw.encode() + response_body_raw.encode())
except Exception as e:
logging.error("HTTP talker error: %s", e)
finally:
client.close() # Ensure the socket is closed
##### Server startup method. Starts a listener thread for each TCP port ##### ##### Server startup method. Starts a listener thread for each TCP port #####
def start(): def start():
talkers = { talkers = {2222: ssh_talker, 2223: telnet_talker,
2222: ssh_talker, 2280: http_talker} # Three listeners on different ports
2223: telnet_talker, for talker in talkers:
2280: http_talker # Launch a thread for each listener
} # Three listeners on different ports thread = threading.Thread(target=listener,
max_workers = 100 # Define maximum number of threads args=(talker, talkers[talker]))
with ThreadPoolExecutor(max_workers=max_workers) as executor: thread.daemon = True
for port, talker in talkers.items(): thread.start()
# Launch a thread for each listener # Start DNS server in a new thread
threading.Thread(target=listener, args=(port, talker, executor), daemon=True).start() dns_server = DNSResponder()
# Start DNS server in a new thread dns_thread = threading.Thread(target=dns_server.run)
dns_server = DNSResponder(port=5553) # Assuming DNS is on port 5553 as per original code dns_thread.daemon = True
threading.Thread(target=dns_server.run, daemon=True).start() dns_thread.start()
logging.info("All services started.") while True: # While loop to allow a CTRL-C interrupt when interactive
while True: # Keep the main thread alive try:
try: time.sleep(1)
time.sleep(1) except KeyboardInterrupt:
except KeyboardInterrupt: quit()
logging.info("Shutting down server.")
sys.exit(0)
##### Client class to be used to make API calls to CheckMyIP server ##### ##### Client class to be used to make API calls to CheckMyIP server #####
class CheckMyIP_Client: class CheckMyIP_Client:
def __init__(self): def __init__(self):
self._json = json # Direct reference to json module self._json = __import__('json') # Import the JSON library
self._socket = socket # Direct reference to socket module self._socket = __import__('socket') # Import the socket library
self._raw_data = None # Initialize the _raw_data variable self._raw_data = None # Initialize the _raw_data variable
self._data = None # Initialize the _data variable self._data = None # Initialize the _data variable
self._af = "auto" # Set the IP address family type to "auto" self._af = "auto" # Set the IP address family type to "auto"
self.server = "telnetmyip.com" # Set the default CheckMyIP server self.server = "telnetmyip.com" # Set the default CheckMyIP server
def get(self): # Primary method to run IP check
def get(self): # Primary method to run IP check if self._af == "auto": # If we are using an auto address family
sock = None try: # Try using IPv6
try: sock = self._socket.socket(self._socket.AF_INET6,
if self._af == "auto": # If we are using an auto address family self._socket.SOCK_STREAM)
try: # Try using IPv6 sock.connect((self.server, 23))
sock = self._socket.socket(self._socket.AF_INET6, self._socket.SOCK_STREAM) except: # Fall back to IPv4 if IPv6 fails
sock.connect((self.server, 23)) sock = self._socket.socket(self._socket.AF_INET,
except: # Fall back to IPv4 if IPv6 fails self._socket.SOCK_STREAM)
sock = self._socket.socket(self._socket.AF_INET, self._socket.SOCK_STREAM) sock.connect((self.server, 23))
sock.connect((self.server, 23)) elif self._af == "ipv6": # If we are using the IPv6 address family
elif self._af == "ipv6": # If we are using the IPv6 address family sock = self._socket.socket(self._socket.AF_INET6,
sock = self._socket.socket(self._socket.AF_INET6, self._socket.SOCK_STREAM) self._socket.SOCK_STREAM)
sock.connect((self.server, 23)) sock.connect((self.server, 23))
elif self._af == "ipv4": # If we are using the IPv4 address family elif self._af == "ipv4": # If we are using the IPv4 address family
sock = self._socket.socket(self._socket.AF_INET, self._socket.SOCK_STREAM) sock = self._socket.socket(self._socket.AF_INET,
sock.connect((self.server, 23)) self._socket.SOCK_STREAM)
self._raw_data = sock.recv(1024).decode() sock.connect((self.server, 23))
self._data = self._json.loads(self._raw_data) # Receive data from the buffer self._raw_data = sock.recv(1024).decode()
return self._data # Return the JSON data self._data = self._json.loads(self._raw_data) # Recieve data from the buffer
except Exception as e: sock.close() # Close the socket
logging.error("CheckMyIP_Client error: %s", e) return self._data # Return the JSON data
return None def set_family(self, family): # Method to set the IP address family
finally: allowed = ["auto", "ipv4", "ipv6"] # Allowed input values
if sock: if family in allowed:
sock.close() # Ensure the socket is closed self._af = family
else:
def set_family(self, family): # Method to set the IP address family raise Exception("Allowed families are 'auto', 'ipv4', 'ipv6'")
allowed = ["auto", "ipv4", "ipv6"] # Allowed input values
if family in allowed:
self._af = family
else:
raise ValueError("Allowed families are 'auto', 'ipv4', 'ipv6'")
### CheckMyIP_Client Example Usage ### ### CheckMyIP_Client Example Usage ###
# client = CheckMyIP_Client() #client = CheckMyIP_Client()
# data = client.get() #client.get()
# print(data)
if __name__ == "__main__": if __name__ == "__main__":
start() # Start the server logging = log_management() # Instantiate log class
start() # Start the server