"""Monitoring plugin (Nagios-compatible) for watching a Gemini server
The monitoring plugin API is documented at
https://www.monitoring-plugins.org/doc/guidelines.html.
"""
import socket
import re
import sys
import datetime
import getopt
import urllib.parse
import OpenSSL
import Agunua
STATE_OK = 0
STATE_WARNING = 1
STATE_CRITICAL = 2
STATE_UNKNOWN = 3
STATE_DEPENDENT = 4
host = None
vhostname = None
path = ""
expect = None
expect_statuscode = None
expect_meta = None
sni = True
warn_cert = None
crit_cert = None
insecure = False
disable_tofu = False
accept_expired = False
socks = None
port = Agunua.GEMINI_PORT
forceIPv4 = False
forceIPv6 = False
get_content = False
err_message = ""
def error(msg=None):
if msg is None:
msg = "Unknown error"
print("%s CRITICAL: %s" % (host, msg))
sys.exit(STATE_CRITICAL)
def warning(msg=None):
if msg is None:
msg = "Unknown warning"
print("%s WARNING: %s" % (host, msg))
sys.exit(STATE_WARNING)
try:
try:
optlist, args = getopt.getopt (sys.argv[1:], "hH:p:V:e:c:C:m:P:s:ghi46xaT")
for option, value in optlist:
if option == "-h":
print("Standard Nagios API options")
sys.exit(STATE_UNKNOWN)
elif option == "-H":
host = value
elif option == "-V":
vhostname = value
elif option == "-e":
expect = value
elif option == "-c":
expect_statuscode = value
elif option == "-C":
if value.find(",") < 0:
warn_cert = int(value)
else:
(warning_c, critical_c) = value.split(",")
warn_cert = int(warning_c)
crit_cert = int(critical_c)
elif option == "-m":
expect_meta = value
elif option == "-p":
path = value
elif option == "-P":
port = int(value)
elif option == "-i":
insecure = True
elif option == "-T":
disable_tofu = True
elif option == "-x":
sni = False
elif option == "-a":
accept_expired = True
elif option == "-s":
match = re.search("^\[?(([\w\.]+)|(([a-fA-F0-9:]+))|([0-9\.]+))\]?:([0-9]+)$", value)
if not match:
print("Socks must be host:port")
sys.exit(STATE_UNKNOWN)
socks = (match.group(1), int(match.group(6)))
elif option == "-4":
forceIPv4 = True
elif option == "-6":
forceIPv6 = True
elif option == '-g':
get_content = True
else:
# Should never occur, it is trapped by getopt
print("Unknown option %s" % option)
sys.exit(STATE_UNKNOWN)
except getopt.error as reason:
print("Option parsing problem %s" % reason)
sys.exit(STATE_UNKNOWN)
if host is None:
print("Host (-H) is necessary")
sys.exit(STATE_UNKNOWN)
if vhostname is None:
vhostname = host
if socks is not None and vhostname.lower().endswith(".onion"):
host = vhostname
if forceIPv4 and forceIPv6:
print("Force IPv4 or IPv6 but not both")
sys.exit(STATE_UNKNOWN)
if warn_cert is not None and crit_cert is not None and warn_cert <= crit_cert:
print("Warning threshold must be higher than critical threshold")
sys.exit(STATE_UNKNOWN)
if len(args) > 0:
print("Too many arguments (\"%s\")" % args)
sys.exit(STATE_UNKNOWN)
if port == Agunua.GEMINI_PORT:
netloc = vhostname
else:
netloc = "%s:%i" % (vhostname, port)
url = urllib.parse.urlunsplit(("gemini", netloc, path, "", ""))
if disable_tofu:
tofu_path = ""
else:
tofu_path = Agunua.TOFU
if expect != None:
get_content = True
start = datetime.datetime.now()
result = Agunua.GeminiUri(url, get_content=get_content,
connect_to=host, use_socks=socks,
insecure=insecure, tofu=tofu_path,
accept_expired=accept_expired,
send_sni=sni, force_ipv4=forceIPv4,
force_ipv6=forceIPv6)
if not result.network_success:
error(result.error)
end = datetime.datetime.now()
if get_content:
data = result.payload
if expect_statuscode is not None:
if result.status_code != expect_statuscode:
error("Unexpected status code %s (expected was %s)" % \
(result.status_code, expect_statuscode))
elif result.status_code == "20":
if expect_meta is not None:
f = result.meta.find(expect_meta)
if f < 0:
error("\"%s\" not found in meta (value is \"%s\")" % (expect_meta, result.meta))
if expect is not None:
f = data.find(expect)
if f < 0:
error("\"%s\" not found in answer" % expect)
else:
error("Status code %s: %s" % (result.status_code, result.meta))
now = datetime.datetime.now()
if crit_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=crit_cert)):
error("Certificate expires in less than %i days" % crit_cert)
if warn_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=warn_cert)):
warning("Certificate expires in less than %i days" % warn_cert)
latency = end-start
elapsed = latency.seconds + latency.microseconds/(10**6)
if get_content:
size = " - %i bytes" % len(data)
else:
size = ""
print("%s OK - %s%s - %.3f seconds" % (host, "No error", size, elapsed))
sys.exit(STATE_OK)
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
print("%s UNKNOWN - Unknown internal error %s: %s" % (host, exc_type, exc_value))
sys.exit(STATE_UNKNOWN)
text/plain
This content has been proxied by September (ba2dc).