Compare commits

...

9 commits

2 changed files with 314 additions and 291 deletions

View file

@ -1,29 +1,38 @@
# CheckMyIP - [myip.bsd.cafe](https://myip.bsd.cafe) # CheckMyIP - [myip.bsd.cafe](https://myip.bsd.cafe)
A Telnet, SSH and Simple HTTP Based Public IP Address Lookup Service A Telnet, SSH, DNS and Simple HTTP Based Public IP Address Lookup Service
----------------------------------------- -----------------------------------------
### USAGE ### USAGE
- **TELNET**: #### DNS
- Default: `telnet myip.bsd.cafe` - **DNS TXT Record**:
- IPv4: `telnet myip4.bsd.cafe` - Retrieve your IP via a DNS TXT record: `dig +short myip.bsd.cafe TXT @myip.bsd.cafe`
- IPv6: `telnet myip6.bsd.cafe`
- **SSH**: #### Telnet
- Default: `ssh myip.bsd.cafe` - **Default**: `telnet myip.bsd.cafe`
- IPv4: `ssh myip4.bsd.cafe` - **IPv4**: `telnet myip4.bsd.cafe`
- IPv6: `ssh myip6.bsd.cafe` - **IPv6**: `telnet myip6.bsd.cafe`
- Your SSH client may require you to enter a username. You can use anything you want (`ssh -l imrootbitch myip.bsd.cafe`)
- **CURL**: #### SSH
- Default: `curl -L myip.bsd.cafe` - **Default**: `ssh myip.bsd.cafe`
- IPv4: `curl -L myip4.bsd.cafe` - **IPv4**: `ssh myip4.bsd.cafe`
- IPv6: `curl -L myip6.bsd.cafe` - **IPv6**: `ssh myip6.bsd.cafe`
- **WGET**: - Your SSH client may require you to enter a username. You can use anything you want (`ssh -l username myip.bsd.cafe`)
- Default: `wget -qO- myip.bsd.cafe`
- IPv4: `wget -qO- myip4.bsd.cafe` #### CURL
- IPv6: `wget -qO- myip6.bsd.cafe` - **Default**: `curl -L myip.bsd.cafe`
- **IPv4**: `curl -L myip4.bsd.cafe`
- **IPv6**: `curl -L myip6.bsd.cafe`
#### WGET
- **Default**: `wget -qO- myip.bsd.cafe`
- **IPv4**: `wget -qO- myip4.bsd.cafe`
- **IPv6**: `wget -qO- myip6.bsd.cafe`
For HTTP (CURL and WGET above), use `myip.bsd.cafe/raw` to get only the IP address. This also works for the HTTP IPv4 and IPv6 addresses.
----------------------------------------- -----------------------------------------
### VERSION ### ### VERSION ###
@ -74,7 +83,7 @@ If you would rather set up your own private instance of CheckMyIP, then you can
Install Dependencies (for example, on a FreeBSD jail) Install Dependencies (for example, on a FreeBSD jail)
``` ```
pkg install python39 py39-gssapi py39-paramiko pkg install python39 py39-gssapi py39-paramiko py39-dnspython
``` ```
Clone Repo and install Clone Repo and install
@ -106,7 +115,7 @@ load_rc_config $name
pidfile="/var/run/${name}.pid" pidfile="/var/run/${name}.pid"
command="/usr/local/bin/checkmyip.py" command="/usr/local/bin/checkmyip.py"
command_interpreter="/usr/local/bin/python3.9" command_interpreter="/usr/local/bin/python3.11"
command_args="&" command_args="&"
start_precmd="checkmyip_precmd" start_precmd="checkmyip_precmd"
@ -127,22 +136,6 @@ service checkmyip enable
service checkmyip start service checkmyip start
``` ```
-----------------------------------------
### UPDATES IN V1.0.0 --> V1.1.0 ###
**NEW FEATURES:**
- Was seeing issues where SSH would be very slow to exchange. Likely related to log file sizes, so I change the logging function to turnover to new logging files every day.
-----------------------------------------
### UPDATES IN V1.1.0 --> V1.3.0 ###
- README updated for install on Ubuntu instead of CentOS
- Small tweaks to support Python3
----------------------------------------- -----------------------------------------
### CONTRIBUTING ### ### CONTRIBUTING ###
If you would like to help out by contributing code or reporting issues, please do! If you would like to help out by contributing code or reporting issues, please do!

View file

@ -1,18 +1,14 @@
#!/usr/local/bin/python3.9 #!/usr/local/bin/python3.11
##### BSD Cafe CheckMyIP Server ##### ##### BSD Cafe CheckMyIP Server #####
##### Originally written by John W Kerns ##### ##### Originally written by John W Kerns #####
##### http://blog.packetsar.com #####
##### Forked to BSD Cafe by Stefano Marinelli ##### ##### Forked to BSD Cafe by Stefano Marinelli #####
##### https://brew.bsd.cafe/BSDCafe/checkmyip ##### ##### https://brew.bsd.cafe/BSDCafe/checkmyip #####
##### Inform version here ##### ##### Inform version here #####
version = "v1.3.0" version = "v1.3.2"
##### Import necessary modules #####
##### Import python2 native modules #####
import os import os
import sys import sys
import time import time
@ -21,19 +17,34 @@ import json
import jinja2 import jinja2
import paramiko import paramiko
import threading import threading
from concurrent.futures import ThreadPoolExecutor
from dnslib import DNSRecord, DNSHeader, RR, TXT, QTYPE
import logging # Use built-in logging module
##### Configure Logging #####
LOG_DIR = "/var/log/checkmyip/"
os.makedirs(LOG_DIR, exist_ok=True)
# General Logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s: %(message)s",
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, f"{time.strftime('%Y-%m-%d')}_checkmyip.log")),
logging.StreamHandler(sys.stdout)
]
)
# Paramiko Logger
paramiko_log = os.path.join(LOG_DIR, f"{time.strftime('%Y-%m-%d')}_ssh.log")
paramiko.util.log_to_file(paramiko_log)
##### Jinja formatting for logging queries ##### ##### Jinja formatting for logging queries #####
j2log = "Connection from: {{ ip }} ({{ port }}) ({{ proto }})" j2log = "Connection from: {{ ip }} ({{ port }}) ({{ proto }})"
##### Jinja formatting for response queries ##### ##### Jinja formatting for response queries #####
j2send = """{ j2send = """{
"comment": "## Your IP Address is {{ ip }} ({{ port }}) ##", "comment": "## Your IP Address is {{ ip }} ({{ port }}) ##",
"family": "{{ family }}", "family": "{{ family }}",
"ip": "{{ ip }}", "ip": "{{ ip }}",
"port": "{{ port }}", "port": "{{ port }}",
@ -43,63 +54,45 @@ j2send = """{
"sponsor": "Served by BSD Cafe, https://bsd.cafe/" "sponsor": "Served by BSD Cafe, https://bsd.cafe/"
}""" % version }""" % version
##### DNS Server Functionality #####
class DNSResponder:
def __init__(self, address='::', port=5553):
self.address = address
self.port = port
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Enables dual-stack mode
self.sock.bind((self.address, self.port))
##### Handles all prnting to console and logging to the logfile ##### def run(self):
class log_management: logging.info("DNS Server listening on port %s for both IPv4 and IPv6", self.port)
def __init__(self):
self.logpath = "/var/log/checkmyip/" # Log file directory path
self.logfile = "/var/log/checkmyip/%scheckmyip.log" % \
time.strftime("%Y-%m-%d_") # Log file full path
self.paramikolog = "/var/log/checkmyip/%sssh.log" % \
time.strftime("%Y-%m-%d_") # SSH log file path
self.thread = threading.Thread(target=self._change_logfiles)
self.thread.daemon = True
self.thread.start() # Start talker thread to listen to port
self._publish_methods() # Publish the console and log methods to glob
self.can_log = True # Variable used to check if we can log
try: # Try to configure the SSH log file, create dir if fail
paramiko.util.log_to_file(self.paramikolog)
except IOError:
self._create_log_dir()
def _logger(self, data): # Logging method published to global as 'log'
logdata = time.strftime("%Y-%m-%d %H:%M:%S") + ": " + data + "\n"
if self.can_log:
try: # Try to write to log, create log dir if fail
f = open(self.logfile, 'a')
f.write(logdata)
f.close()
except IOError:
self._console("Unable to log to logfile %s. Creating log directory" % self.logfile)
self.can_log = False
self._create_log_dir()
self._console(logdata)
def _console(self, data, timestamp=False):
if timestamp:
logdata = time.strftime("%Y-%m-%d %H:%M:%S") + ": " + data + "\n"
else:
logdata = data
print(logdata)
def _publish_methods(self):
global log
global console
log = self._logger # Global method used to write to the log file
console = self._console # Global method used to write to the console
def _create_log_dir(self): # Create the directory for logging
os.system('mkdir -p ' + self.logpath)
self._console("Logpath (%s) created" % self.logpath)
self.can_log = True
def _change_logfiles(self, thread=True):
while True: while True:
time.sleep(10) try:
self.logfile = "/var/log/checkmyip/%scheckmyip.log" % \ data, addr = self.sock.recvfrom(1024)
time.strftime("%Y-%m-%d_") # Log file full path threading.Thread(target=self.handle_query, args=(data, addr), daemon=True).start()
self.paramikolog = "/var/log/checkmyip/%sssh.log" % \ except Exception as e:
time.strftime("%Y-%m-%d_") # SSH log file path logging.error("DNS Server error: %s", e)
paramiko.util.log_to_file(self.paramikolog)
def handle_query(self, data, addr):
try:
request = DNSRecord.parse(data)
qname = request.q.qname
qtype = request.q.qtype
ip, port, *_ = addr # Get IP and port; ignore other details
##### Creates a RSA key for use by paramiko ##### # Normalize IPv6 address (remove IPv4 mapping if exists)
class rsa_key: if ip.startswith("::ffff:"):
ip = ip[7:]
# Only respond to TXT queries for myip.bsd.cafe
if qname.matchGlob('myip.bsd.cafe') and qtype == QTYPE.TXT:
response = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)
response.add_answer(RR(qname, QTYPE.TXT, ttl=60, rdata=TXT(ip)))
self.sock.sendto(response.pack(), addr)
except Exception as e:
logging.error("Error handling DNS query: %s", e)
##### Creates an RSA key for use by paramiko #####
class RSAKey:
data = """-----BEGIN RSA PRIVATE KEY----- data = """-----BEGIN RSA PRIVATE KEY-----
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/ oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
@ -116,38 +109,40 @@ class rsa_key:
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7 nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----
""" """
def readlines(self): # For use by paramiko.RSAKey.from_private_key mthd def readlines(self): # For use by paramiko.RSAKey.from_private_key
return self.data.split("\n") return self.data.split("\n")
def __call__(self): # Recursive method uses own object as arg when called
def get_key(self):
return paramiko.RSAKey.from_private_key(self) return paramiko.RSAKey.from_private_key(self)
##### Imports and modifies the ServerInterface module for use by paramiko ##### ##### Imports and modifies the ServerInterface module for use by paramiko #####
class ssh_server (paramiko.ServerInterface): class SSHServer(paramiko.ServerInterface):
def __init__(self): def __init__(self):
self.event = threading.Event() self.event = threading.Event()
def check_channel_request(self, kind, chanid): def check_channel_request(self, kind, chanid):
if kind == 'session': if kind == 'session':
return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_none(self, username): # Auth none method left wide open
def check_auth_none(self, username):
return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username): # Give no auth options
def get_allowed_auths(self, username):
return 'none' return 'none'
def check_channel_shell_request(self, channel): def check_channel_shell_request(self, channel):
self.event.set() self.event.set()
return True return True
def check_channel_pty_request(self, channel, term, width, height,
pixelwidth, pixelheight, modes):
return True
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
##### Method to merge Jinja templates ##### ##### Method to merge Jinja templates #####
def j2format(j2tmp, valdict): def j2format(j2tmp, valdict):
template = jinja2.Template(j2tmp) template = jinja2.Template(j2tmp)
return template.render(valdict).replace("\n", "\r\n") return template.render(valdict).replace("\n", "\r\n")
##### Cleans IP addresses coming from socket library ##### ##### Cleans IP addresses coming from socket library #####
def cleanip(addr): def cleanip(addr):
ip, port = addr[:2] ip, port = addr[:2]
@ -159,68 +154,84 @@ def cleanip(addr):
family = "ipv4" family = "ipv4"
return ip, port, family # Return cleaned IP and family return ip, port, family # Return cleaned IP and family
##### TCP listener methods using ThreadPoolExecutor for better thread management #####
##### TCP listener methods. Gets used once for each listening port ##### def listener(port, talker, executor):
def listener(port, talker):
listen_ip = '' listen_ip = ''
listen_port = port listen_port = port
buffer_size = 1024 buffer_size = 1024
while True: while True:
try:
# Use AF_INET6 but also handle IPv4 connections # Use AF_INET6 but also handle IPv4 connections
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Allow IPv4-mapped IPv6 addresses sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Allow IPv4-mapped IPv6 addresses
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((listen_ip, listen_port)) sock.bind((listen_ip, listen_port))
sock.listen(buffer_size) sock.listen(buffer_size)
logging.info("Listening on port %s", listen_port)
while True: while True:
client, addr = sock.accept() client, addr = sock.accept()
ip, port, family = cleanip(addr) # Get cleaned IP info including family ip, port, family = cleanip(addr) # Get cleaned IP info including family
valdict = {"ip": ip, "port": port, "family": family, "proto": ""} valdict = {"ip": ip, "port": port, "family": family, "proto": ""}
thread = threading.Thread(target=talker, args=(client, valdict)) executor.submit(talker, client, valdict) # Submit to thread pool
thread.start() # Start talker thread to listen to port except Exception as e:
logging.error("Listener on port %s encountered an error: %s", port, e)
time.sleep(5) # Wait before retrying
##### Telnet responder method. Is run in own thread for each telnet query ##### ##### Telnet responder method. Is run in own thread for each telnet query #####
def telnet_talker(client, valdict, proto="telnet"): def telnet_talker(client, valdict, proto="telnet"):
try:
valdict.update({"proto": proto}) # Add the protocol to the value dict valdict.update({"proto": proto}) # Add the protocol to the value dict
log(j2format(j2log, valdict)) # Log the query to the console and logfile logging.info(j2format(j2log, valdict)) # Log the query
# Send the query response # Send the query response
client.send(f'{j2format(j2send, valdict)}\n'.encode()) response = j2format(j2send, valdict) + "\n"
client.close() # Close the channel client.sendall(response.encode())
except Exception as e:
logging.error("Telnet talker error: %s", e)
finally:
client.close() # Ensure the socket is closed
##### SSH responder method. Gets run in own thread for each SSH query ##### ##### SSH responder method. Gets run in own thread for each SSH query #####
def ssh_talker(client, valdict, proto="ssh"): def ssh_talker(client, valdict, proto="ssh"):
def makefile(): # A hack to make Cisco SSH sessions work properly try:
def makefile(chan):
# A hack to make SSH sessions work properly
chan.makefile('rU').readline().strip('\r\n') chan.makefile('rU').readline().strip('\r\n')
valdict.update({"proto": proto}) valdict.update({"proto": proto})
log(j2format(j2log, valdict)) logging.info(j2format(j2log, valdict))
t = paramiko.Transport(client, gss_kex=True) t = paramiko.Transport(client)
t.set_gss_host(socket.getfqdn("")) t.set_gss_host(socket.getfqdn(""))
t.load_server_moduli() t.load_server_moduli()
t.add_server_key(rsa_key()()) # RSA key object nested call t.add_server_key(RSAKey().get_key()) # RSA key object
server = ssh_server() server = SSHServer()
t.start_server(server=server) t.start_server(server=server)
chan = t.accept(20) chan = t.accept(20)
if chan: if chan:
server.event.wait(10) if not server.event.wait(10):
chan.send('%s\n' % j2format(j2send, valdict)) # Send the response logging.warning("No shell request received.")
thread = threading.Thread(target=makefile) chan.close()
thread.start() # Start hack in thread since it hangs indefinately else:
time.sleep(1) # Wait a second response = j2format(j2send, valdict) + "\n"
chan.close() # And kill the SSH channel chan.send(response)
client.close() # And kill the session threading.Thread(target=makefile, args=(chan,), daemon=True).start()
time.sleep(1)
chan.close()
except Exception as e:
logging.error("SSH talker error: %s", e)
finally:
client.close()
##### HTTP responder method. Gets run in own thread for each HTTP query #####
##### Automatically detects if client is a browser or a telnet client #####
##### HTTP responder method. Gets run in own thread for each HTTP query ##### ##### HTTP responder method. Gets run in own thread for each HTTP query #####
def http_talker(client, valdict, proto="http"): def http_talker(client, valdict, proto="http"):
time.sleep(.1) # Sleep to allow the client to send some data try:
client.setblocking(0) # Set the socket recv as non-blocking time.sleep(0.1) # Allow client to send data
client.settimeout(5.0) # Set timeout to prevent blocking indefinitely
browser = False # Is the client using a browser? browser = False # Is the client using a browser?
headers = {} headers = {}
try: try:
data = client.recv(2048) # Receive data from the buffer (if any) data = client.recv(2048) # Receive data from the buffer (if any)
if data:
request_lines = data.decode().split('\r\n') request_lines = data.decode().split('\r\n')
for line in request_lines: for line in request_lines:
if ": " in line: if ": " in line:
@ -232,87 +243,106 @@ def http_talker(client, valdict, proto="http"):
ip, port, family = cleanip((real_ip, valdict['port'])) ip, port, family = cleanip((real_ip, valdict['port']))
valdict.update({"ip": ip, "family": family}) valdict.update({"ip": ip, "family": family})
browser = True # Set client browser to True browser = True # Set client browser to True
except socket.error: # If buffer was empty, then like a telnet client on TCP80 except socket.timeout:
browser = False # Set client browser to False browser = False # Set client browser to False
except Exception as e:
logging.error("HTTP talker recv error: %s", e)
browser = False
if not browser: # If the client is not a browser if not browser: # If the client is not a browser
telnet_talker(client, valdict, "http-telnet") # Hand to telnet_talker telnet_talker(client, valdict, "http-telnet") # Hand to telnet_talker
else: # If client is a browser else: # If client is a browser
# Proceed with standard HTTP response (with headers)
valdict.update({"proto": proto}) valdict.update({"proto": proto})
log(j2format(j2log, valdict)) logging.info(j2format(j2log, valdict))
response_body_raw = j2format(j2send, valdict) + "\n" response_body_raw = j2format(j2send, valdict) + "\n"
if request_lines[0].split()[1] == "/raw": if request_lines and "/raw" in request_lines[0]:
try:
json_content = json.loads(response_body_raw) json_content = json.loads(response_body_raw)
response_body_raw = json_content['ip'] response_body_raw = json_content.get('ip', '')
response_headers_raw = """HTTP/1.1 200 OK except json.JSONDecodeError:
Content-Length: %s response_body_raw = "Invalid JSON response."
Content-Type: application/json; encoding=utf8 response_headers_raw = (
Connection: close""" % str(len(response_body_raw)) # Response with headers "HTTP/1.1 200 OK\r\n"
client.send(f'{response_headers_raw}\n\n{response_body_raw}'.encode()) f"Content-Length: {len(response_body_raw)}\r\n"
client.close() "Content-Type: application/json; charset=utf-8\r\n"
"Connection: close\r\n\r\n"
)
client.sendall(response_headers_raw.encode() + response_body_raw.encode())
except Exception as e:
logging.error("HTTP talker error: %s", e)
finally:
client.close() # Ensure the socket is closed
##### Server startup method. Starts a listener thread for each TCP port ##### ##### Server startup method. Starts a listener thread for each TCP port #####
def start(): def start():
talkers = {2222: ssh_talker, 2223: telnet_talker, talkers = {
2280: http_talker} # Three listeners on different ports 2222: ssh_talker,
for talker in talkers: 2223: telnet_talker,
2280: http_talker
} # Three listeners on different ports
max_workers = 100 # Define maximum number of threads
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for port, talker in talkers.items():
# Launch a thread for each listener # Launch a thread for each listener
thread = threading.Thread(target=listener, threading.Thread(target=listener, args=(port, talker, executor), daemon=True).start()
args=(talker, talkers[talker])) # Start DNS server in a new thread
thread.daemon = True dns_server = DNSResponder(port=5553) # Assuming DNS is on port 5553 as per original code
thread.start() threading.Thread(target=dns_server.run, daemon=True).start()
while True: # While loop to allow a CTRL-C interrupt when interactive logging.info("All services started.")
while True: # Keep the main thread alive
try: try:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
quit() logging.info("Shutting down server.")
sys.exit(0)
##### Client class to be used to make API calls to CheckMyIP server ##### ##### Client class to be used to make API calls to CheckMyIP server #####
class CheckMyIP_Client: class CheckMyIP_Client:
def __init__(self): def __init__(self):
self._json = __import__('json') # Import the JSON library self._json = json # Direct reference to json module
self._socket = __import__('socket') # Import the socket library self._socket = socket # Direct reference to socket module
self._raw_data = None # Initialize the _raw_data variable self._raw_data = None # Initialize the _raw_data variable
self._data = None # Initialize the _data variable self._data = None # Initialize the _data variable
self._af = "auto" # Set the IP address family type to "auto" self._af = "auto" # Set the IP address family type to "auto"
self.server = "telnetmyip.com" # Set the default CheckMyIP server self.server = "telnetmyip.com" # Set the default CheckMyIP server
def get(self): # Primary method to run IP check def get(self): # Primary method to run IP check
sock = None
try:
if self._af == "auto": # If we are using an auto address family if self._af == "auto": # If we are using an auto address family
try: # Try using IPv6 try: # Try using IPv6
sock = self._socket.socket(self._socket.AF_INET6, sock = self._socket.socket(self._socket.AF_INET6, self._socket.SOCK_STREAM)
self._socket.SOCK_STREAM)
sock.connect((self.server, 23)) sock.connect((self.server, 23))
except: # Fall back to IPv4 if IPv6 fails except: # Fall back to IPv4 if IPv6 fails
sock = self._socket.socket(self._socket.AF_INET, sock = self._socket.socket(self._socket.AF_INET, self._socket.SOCK_STREAM)
self._socket.SOCK_STREAM)
sock.connect((self.server, 23)) sock.connect((self.server, 23))
elif self._af == "ipv6": # If we are using the IPv6 address family elif self._af == "ipv6": # If we are using the IPv6 address family
sock = self._socket.socket(self._socket.AF_INET6, sock = self._socket.socket(self._socket.AF_INET6, self._socket.SOCK_STREAM)
self._socket.SOCK_STREAM)
sock.connect((self.server, 23)) sock.connect((self.server, 23))
elif self._af == "ipv4": # If we are using the IPv4 address family elif self._af == "ipv4": # If we are using the IPv4 address family
sock = self._socket.socket(self._socket.AF_INET, sock = self._socket.socket(self._socket.AF_INET, self._socket.SOCK_STREAM)
self._socket.SOCK_STREAM)
sock.connect((self.server, 23)) sock.connect((self.server, 23))
self._raw_data = sock.recv(1024).decode() self._raw_data = sock.recv(1024).decode()
self._data = self._json.loads(self._raw_data) # Recieve data from the buffer self._data = self._json.loads(self._raw_data) # Receive data from the buffer
sock.close() # Close the socket
return self._data # Return the JSON data return self._data # Return the JSON data
except Exception as e:
logging.error("CheckMyIP_Client error: %s", e)
return None
finally:
if sock:
sock.close() # Ensure the socket is closed
def set_family(self, family): # Method to set the IP address family def set_family(self, family): # Method to set the IP address family
allowed = ["auto", "ipv4", "ipv6"] # Allowed input values allowed = ["auto", "ipv4", "ipv6"] # Allowed input values
if family in allowed: if family in allowed:
self._af = family self._af = family
else: else:
raise Exception("Allowed families are 'auto', 'ipv4', 'ipv6'") raise ValueError("Allowed families are 'auto', 'ipv4', 'ipv6'")
### CheckMyIP_Client Example Usage ### ### CheckMyIP_Client Example Usage ###
# client = CheckMyIP_Client() # client = CheckMyIP_Client()
#client.get() # data = client.get()
# print(data)
if __name__ == "__main__": if __name__ == "__main__":
logging = log_management() # Instantiate log class
start() # Start the server start() # Start the server