=> e54db584a4a3d225b29d41eebc594b216aeaf033
[1mdiff --git a/README.md b/README.md[m [1mindex 0b1540a..2250fe3 100644[m [1m--- a/README.md[m [1m+++ b/README.md[m [36m@@ -51,7 +51,7 @@[m [mThe log can be viewed via journalctl (or syslog):[m [m * Extension modules can register new protocols in addition to the built-in Gemini and Titan.[m * SIGHUP causes the configuration file to be reloaded and workers to be restarted. The listening socket remains open, so the socket and TLS parameters cannot be changed.[m [31m-* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `WorkerContext`. `Capsule` is no longer available as global state.[m [32m+[m[32m* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `Context`. `Capsule` is no longer available as global state.[m [m ### v0.4[m [m [1mdiff --git a/docs/api.rst b/docs/api.rst[m [1mindex 75db2ed..4dd522b 100644[m [1m--- a/docs/api.rst[m [1m+++ b/docs/api.rst[m [36m@@ -10,9 +10,9 @@[m [mThese classes and functions are available to extension modules by importing[m Config[m Capsule[m Cache[m [32m+[m[32m Context[m gemini.Request[m gemini.Identity[m [31m- WorkerContext[m [m [m Classes[m [36m@@ -34,6 +34,11 @@[m [mCache[m .. autoclass:: gmcapsule.Cache[m :members:[m [m [32m+[m[32mContext[m [32m+[m[32m-------[m [32m+[m[32m.. autoclass:: gmcapsule.Context[m [32m+[m[32m :members:[m [32m+[m Request[m -------[m .. autoclass:: gmcapsule.gemini.Request[m [36m@@ -44,11 +49,6 @@[m [mIdentity[m .. autoclass:: gmcapsule.gemini.Identity[m :members:[m [m [31m-WorkerContext[m [31m--------------[m [31m-.. autoclass:: gmcapsule.WorkerContext[m [31m- :members:[m [31m-[m [m Functions[m *********[m [1mdiff --git a/example.ini b/example.ini[m [1mindex 44ebb4e..2b178da 100644[m [1m--- a/example.ini[m [1m+++ b/example.ini[m [36m@@ -6,8 +6,9 @@[m ;address = 0.0.0.0[m ;port = 1965[m ;certs = .certs[m [31m-;modules =[m [32m+[m[32m;modules =[m[41m [m ;threads = 5[m [32m+[m[32m;processes = 2[m [m [static][m root = .[m [36m@@ -15,6 +16,9 @@[m [mroot = .[m [titan][m ;upload_limit = 10485760[m [m [32m+[m[32m[cgi][m [32m+[m[32mbin_root = ./cgi-bin[m [32m+[m [cgi.booster][m protocol = titan[m host = localhost[m [36m@@ -25,11 +29,15 @@[m [mcommand = /usr/bin/python3 ../booster/booster.py[m path = /cgienv[m command = printenv[m [m [32m+[m[32m[rewrite.test][m [32m+[m[32mpath = ^/altenv$[m [32m+[m[32mstatus = 30 gemini://localhost/cgienv${QUERY_STRING}[m [32m+[m ;--------------------------------------------------------------------------[m [m [31m-;[gitview][m [31m-;git = /usr/bin/git[m [31m-;cache_path = /Users/jaakko/Library/Caches/gmgitview[m [32m+[m[32m[gitview][m [32m+[m[32mgit = /usr/bin/git[m [32m+[m[32mcache_path = /Users/jaakko/Library/Caches/gmgitview[m [m [gitview.lagrange][m title = Lagrange[m [36m@@ -47,3 +55,12 @@[m [mclone_url = https://git.skyjake.fi/gemini/gitview.git[m path = /Users/jaakko/src/gmgitview[m url_root = gmgitview[m default_branch = main[m [32m+[m [32m+[m[32m[gitview.bubble][m [32m+[m[32mtitle = Bubble[m [32m+[m[32mbrief = Bulletin Boards for Gemini[m [32m+[m[32mclone_url = https://git.skyjake.fi/gemini/bubble.git[m [32m+[m[32mpath = /Users/jaakko/src/bubble[m [32m+[m[32murl_root = bubble[m [32m+[m[32mdefault_branch = main[m [41m+[m [1mdiff --git a/gmcapsule/__init__.py b/gmcapsule/__init__.py[m [1mindex ebbeeb5..96ec7d8 100644[m [1m--- a/gmcapsule/__init__.py[m [1m+++ b/gmcapsule/__init__.py[m [36m@@ -428,7 +428,7 @@[m [mEach extension module is required to have an initialization function:[m :param context: Worker context. The extension can use this to access[m configuration parameters, install caches, and register entry[m points and custom scheme handlers.[m [31m- :type context: gmcapsule.WorkerContext[m [32m+[m[32m :type context: gmcapsule.Context[m [m [m Requests[m [36m@@ -481,13 +481,13 @@[m [mimport shlex[m import subprocess[m from pathlib import Path[m [m [31m-from .gemini import Server, Cache, WorkerContext[m [32m+[m[32mfrom .gemini import Server, Cache, Context[m from .markdown import to_gemtext as markdown_to_gemtext[m [m [m __version__ = '0.5.0'[m __all__ = [[m [31m- 'Config', 'Cache', 'WorkerContext',[m [32m+[m[32m 'Config', 'Cache', 'Context',[m 'get_mime_type', 'markdown_to_gemtext'[m ][m [m [36m@@ -557,6 +557,9 @@[m [mclass Config:[m def num_threads(self):[m return self.ini.getint('server', 'threads', fallback=5)[m [m [32m+[m[32m def num_processes(self):[m [32m+[m[32m return self.ini.getint('server', 'processes', fallback=2)[m [32m+[m def max_upload_size(self):[m return self.ini.getint('titan', 'upload_limit', fallback=10 * 1024 * 1024)[m [m [1mdiff --git a/gmcapsule/gemini.py b/gmcapsule/gemini.py[m [1mindex ce4edd2..163871b 100644[m [1m--- a/gmcapsule/gemini.py[m [1m+++ b/gmcapsule/gemini.py[m [36m@@ -7,7 +7,7 @@[m [mimport importlib[m import os.path[m import select[m import socket[m [31m-#import multiprocessing as mp[m [32m+[m[32mimport multiprocessing as mp[m import threading[m import queue[m import re[m [36m@@ -74,6 +74,10 @@[m [mdef safe_recv(stream, max_len, stall_timeout=10):[m return data[m [m [m [32m+[m[32mdef is_bytes(data):[m [32m+[m[32m return type(data) == bytes or type(data) == bytearray[m [32m+[m [32m+[m def safe_sendall(stream, data, stall_timeout=30):[m """[m Send data over an SSL connection, accounting for stalls and retries[m [36m@@ -86,46 +90,55 @@[m [mdef safe_sendall(stream, data, stall_timeout=30):[m stall_timeout (float): Number of seconds to wait until[m terminating a stalled send.[m """[m [31m- if type(data) == bytes or type(data) == bytearray:[m [31m- streaming = False[m [31m- else:[m [31m- streaming = True[m [31m-[m [31m- # We may need to retry sending with the exact same buffer,[m [31m- # so keep it around until successful.[m [31m- BUF_LEN = 32768[m [31m- if streaming:[m [31m- send_buf = data.read(BUF_LEN)[m [31m- else:[m [31m- send_buf = data[:BUF_LEN][m [32m+[m[32m try:[m [32m+[m[32m if is_bytes(data):[m [32m+[m[32m streaming = False[m [32m+[m[32m else:[m [32m+[m[32m streaming = True[m [32m+[m[32m if isinstance(data, Path):[m [32m+[m[32m print('opening', data)[m [32m+[m[32m data = open(data, 'rb')[m [32m+[m [32m+[m[32m # We may need to retry sending with the exact same buffer,[m [32m+[m[32m # so keep it around until successful.[m [32m+[m[32m BUF_LEN = 32768[m [32m+[m[32m if streaming:[m [32m+[m[32m send_buf = data.read(BUF_LEN)[m [32m+[m[32m else:[m [32m+[m[32m send_buf = data[:BUF_LEN][m [m [31m- last_time = time.time()[m [31m- pos = 0[m [31m- while len(send_buf) > 0:[m [31m- try:[m [31m- if time.time() - last_time > stall_timeout:[m [31m- raise AbortedIOError('stalled')[m [31m- sent = stream.send(send_buf)[m [31m- if sent < 0:[m [31m- raise AbortedIOError('failed to send')[m [31m- pos += sent[m [31m- if streaming:[m [31m- send_buf = send_buf[sent:][m [31m- if len(send_buf) < BUF_LEN / 2:[m [31m- send_buf += data.read(BUF_LEN)[m [31m- else:[m [31m- send_buf = data[pos : pos + BUF_LEN][m [31m- if sent > 0:[m [31m- last_time = time.time()[m [31m- else:[m [32m+[m[32m last_time = time.time()[m [32m+[m[32m pos = 0[m [32m+[m[32m while len(send_buf) > 0:[m [32m+[m[32m try:[m [32m+[m[32m if time.time() - last_time > stall_timeout:[m [32m+[m[32m raise AbortedIOError('stalled')[m [32m+[m[32m sent = stream.send(send_buf)[m [32m+[m[32m if sent < 0:[m [32m+[m[32m raise AbortedIOError('failed to send')[m [32m+[m[32m pos += sent[m [32m+[m[32m if streaming:[m [32m+[m[32m send_buf = send_buf[sent:][m [32m+[m[32m if len(send_buf) < BUF_LEN / 2:[m [32m+[m[32m send_buf += data.read(BUF_LEN)[m [32m+[m[32m else:[m [32m+[m[32m send_buf = data[pos : pos + BUF_LEN][m [32m+[m[32m if sent > 0:[m [32m+[m[32m last_time = time.time()[m [32m+[m[32m else:[m [32m+[m[32m wait_for_write(stream, stall_timeout)[m [32m+[m[32m except OpenSSL.SSL.WantReadError:[m [32m+[m[32m pass[m [32m+[m[32m except OpenSSL.SSL.WantWriteError:[m [32m+[m[32m # Wait until the socket is ready for writing.[m wait_for_write(stream, stall_timeout)[m [31m- except OpenSSL.SSL.WantReadError:[m [31m- pass[m [31m- except OpenSSL.SSL.WantWriteError:[m [31m- # Wait until the socket is ready for writing.[m [31m- wait_for_write(stream, stall_timeout)[m [31m- except OpenSSL.SSL.WantX509LookupError:[m [31m- pass[m [32m+[m[32m except OpenSSL.SSL.WantX509LookupError:[m [32m+[m[32m pass[m [32m+[m [32m+[m[32m finally:[m [32m+[m[32m # Close resources and handles.[m [32m+[m[32m if data and hasattr(data, 'close'):[m [32m+[m[32m data.close()[m [m [m def safe_close(stream):[m [36m@@ -148,6 +161,19 @@[m [mdef report_error(stream, code, msg):[m safe_close(stream)[m [m [m [32m+[m[32mdef cert_subject(cert):[m [32m+[m[32m comps = {}[m [32m+[m[32m for name, value in cert.get_subject().get_components():[m [32m+[m[32m comps[name.decode()] = value.decode()[m [32m+[m[32m return comps[m [32m+[m [32m+[m[32mdef cert_issuer(cert):[m [32m+[m[32m comps = {}[m [32m+[m[32m for name, value in cert.get_issuer().get_components():[m [32m+[m[32m comps[name.decode()] = value.decode()[m [32m+[m[32m return comps[m [32m+[m [32m+[m class Identity:[m """[m Client certificate.[m [36m@@ -156,19 +182,21 @@[m [mclass Identity:[m just for the public key.[m [m Attributes:[m [31m- cert (OpenSSL.SSL.X509): Certificate.[m [31m- pubkey (OpenSSL.SSL.PKey): Public key.[m [32m+[m[32m cert (bytes): Certificate (DER format).[m [32m+[m[32m pubkey (bytes): Public key (DER format).[m fp_cert (str): SHA-256 hash of the certificate.[m fp_pubkey (str): SHA-256 hash of the public key.[m """[m def __init__(self, cert):[m [31m- self.cert = cert[m [32m+[m[32m self._subject = cert_subject(cert)[m [32m+[m[32m self._issuer = cert_issuer(cert)[m [32m+[m[32m self.cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)[m m = hashlib.sha256()[m [31m- m.update(crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert))[m [32m+[m[32m m.update(self.cert)[m self.fp_cert = m.hexdigest()[m [31m- self.pubkey = self.cert.get_pubkey()[m [32m+[m[32m self.pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())[m m = hashlib.sha256()[m [31m- m.update(crypto.dump_publickey(crypto.FILETYPE_ASN1, self.pubkey))[m [32m+[m[32m m.update(self.pubkey)[m self.fp_pubkey = m.hexdigest()[m [m def __str__(self):[m [36m@@ -177,22 +205,16 @@[m [mclass Identity:[m def subject(self):[m """[m Returns:[m [31m- dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``[m [32m+[m[32m dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``.[m """[m [31m- comps = {}[m [31m- for name, value in self.cert.get_subject().get_components():[m [31m- comps[name.decode()] = value.decode()[m [31m- return comps[m [32m+[m[32m return self._subject[m [m def issuer(self):[m """[m Returns:[m [31m- dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``[m [32m+[m[32m dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``.[m """[m [31m- comps = {}[m [31m- for name, value in self.cert.get_issuer().get_components():[m [31m- comps[name.decode()] = value.decode()[m [31m- return comps[m [32m+[m[32m return self._issuer[m [m [m class Request:[m [36m@@ -220,7 +242,8 @@[m [mclass Request:[m May be ``None``.[m """[m def __init__(self, identity=None, scheme='gemini', hostname='', path='', query=None,[m [31m- remote_address=None, content_token=None, content_mime=None, content=None):[m [32m+[m[32m remote_address=None, content_token=None, content_mime=None, content=None,[m [32m+[m[32m worker_id=None):[m self.remote_address = remote_address[m self.scheme = scheme[m self.identity = identity[m [36m@@ -230,6 +253,7 @@[m [mclass Request:[m self.content_token = content_token[m self.content_mime = content_mime[m self.content = content[m [32m+[m[32m self.worker_id = worker_id[m [m def url(self):[m return f'{self.scheme}://{self.hostname}{self.path}{"?" + self.query if self.query else ""}'[m [36m@@ -297,10 +321,128 @@[m [mclass Cache:[m return None, None[m [m [m [31m-class WorkerContext:[m [31m- def __init__(self, cfg, shutdown_event):[m [32m+[m[32mdef handle_gemini_or_titan_request(request_data):[m [32m+[m[32m worker = request_data.worker[m [32m+[m[32m stream = request_data.stream[m [32m+[m[32m data = request_data.buffered_data[m [32m+[m[32m from_addr = request_data.from_addr[m [32m+[m[32m identity = request_data.identity[m [32m+[m[32m request = request_data.request[m [32m+[m[32m expected_size = 0[m [32m+[m[32m req_token = None[m [32m+[m[32m req_mime = None[m [32m+[m [32m+[m[32m if request.startswith('titan:'):[m [32m+[m[32m if identity is None and worker.cfg.require_upload_identity():[m [32m+[m[32m report_error(stream, 60, "Client certificate required for upload")[m [32m+[m[32m return[m [32m+[m[32m # Read the rest of the data.[m [32m+[m[32m parms = request.split(';')[m [32m+[m[32m request = parms[0][m [32m+[m[32m for p in parms:[m [32m+[m[32m if p.startswith('size='):[m [32m+[m[32m expected_size = int(p[5:])[m [32m+[m[32m elif p.startswith('token='):[m [32m+[m[32m req_token = p[6:][m [32m+[m[32m elif p.startswith('mime='):[m [32m+[m[32m req_mime = p[5:][m [32m+[m[32m worker.log(f'Receiving Titan content: {expected_size}')[m [32m+[m[32m max_upload_size = worker.cfg.max_upload_size()[m [32m+[m[32m if expected_size > max_upload_size and max_upload_size > 0:[m [32m+[m[32m report_error(stream, 59, "Maximum content length exceeded")[m [32m+[m[32m return[m [32m+[m[32m while len(data) < expected_size:[m [32m+[m[32m incoming = safe_recv(stream, 65536)[m [32m+[m[32m if len(incoming) == 0:[m [32m+[m[32m break[m [32m+[m[32m data += incoming[m [32m+[m[32m if len(data) != expected_size:[m [32m+[m[32m report_error(stream, 59, "Invalid content length")[m [32m+[m[32m return[m [32m+[m[32m else:[m [32m+[m[32m # No Payload in Gemini.[m [32m+[m[32m if len(data):[m [32m+[m[32m report_error(stream, 59, "Bad request")[m [32m+[m[32m return[m [32m+[m [32m+[m[32m url = urlparse(request)[m [32m+[m[32m path = url.path[m [32m+[m[32m if path == '':[m [32m+[m[32m path = '/'[m [32m+[m[32m hostname = url.hostname[m [32m+[m [32m+[m[32m if url.port != None and url.port != worker.port:[m [32m+[m[32m report_error(stream, 59, "Invalid port number")[m [32m+[m[32m return[m [32m+[m[32m if not stream.get_servername():[m [32m+[m[32m # Server name indication is required.[m [32m+[m[32m report_error(stream, 59, "Missing TLS server name indication")[m [32m+[m[32m return[m [32m+[m[32m if stream.get_servername().decode() != hostname:[m [32m+[m[32m report_error(stream, 53, "Proxy request refused")[m [32m+[m[32m return[m [32m+[m [32m+[m[32m try:[m [32m+[m[32m status, meta, body, from_cache = worker.context.call_entrypoint(Request([m [32m+[m[32m identity,[m [32m+[m[32m remote_address=from_addr,[m [32m+[m[32m scheme=url.scheme,[m [32m+[m[32m hostname=hostname,[m [32m+[m[32m path=path,[m [32m+[m[32m query=url.query if '?' in request else None,[m [32m+[m[32m content_token=req_token,[m [32m+[m[32m content_mime=req_mime,[m [32m+[m[32m content=data if len(data) else None,[m [32m+[m[32m worker_id=request_data.worker.id[m [32m+[m[32m ))[m [32m+[m [32m+[m[32m output = f'{status} {meta}\r\n'.encode('utf-8')[m [32m+[m[32m if is_bytes(body):[m [32m+[m[32m safe_sendall(stream, output + body)[m [32m+[m[32m else:[m [32m+[m[32m # `body` is some sort of streamable data, cannot send in one call.[m [32m+[m[32m safe_sendall(stream, output)[m [32m+[m[32m safe_sendall(stream, body)[m [32m+[m [32m+[m[32m # Save to cache.[m [32m+[m[32m if not from_cache and status == 20 and is_bytes(body):[m [32m+[m[32m for cache in worker.context.caches:[m [32m+[m[32m if cache.save(hostname + path, meta, body):[m [32m+[m[32m break[m [32m+[m [32m+[m[32m except GeminiError as error:[m [32m+[m[32m report_error(stream, error.status, str(error))[m [32m+[m[32m return[m [32m+[m [32m+[m [32m+[m[32mdef unpack_response(response):[m [32m+[m[32m if type(response) == tuple:[m [32m+[m[32m if len(response) == 2:[m [32m+[m[32m status, meta = response[m [32m+[m[32m response = ''[m [32m+[m[32m else:[m [32m+[m[32m status, meta, response = response[m [32m+[m[32m else:[m [32m+[m[32m status = 20[m [32m+[m[32m meta = 'text/gemini; charset=utf-8'[m [32m+[m [32m+[m[32m if response == None:[m [32m+[m[32m body = b''[m [32m+[m[32m elif type(response) == str:[m [32m+[m[32m body = response.encode('utf-8')[m [32m+[m[32m else:[m [32m+[m[32m body = response[m [32m+[m [32m+[m[32m return status, meta, body[m [32m+[m [32m+[m [32m+[m[32mclass Context:[m [32m+[m[32m def __init__(self, cfg, allow_extension_workers=False, handler_queue=None,[m [32m+[m[32m response_queues=None):[m self.cfg = cfg[m [31m- self.shutdown = shutdown_event[m [32m+[m[32m self.is_quiet = True[m [32m+[m[32m self.shutdown_events = [][m [32m+[m[32m self.allow_extension_workers = allow_extension_workers[m self.hostnames = cfg.hostnames()[m self.entrypoints = {'gemini': {}, 'titan': {}}[m for proto in ['gemini', 'titan']:[m [36m@@ -309,19 +451,41 @@[m [mclass WorkerContext:[m self.entrypoints[proto][hostname] = [][m self.caches = [][m self.protocols = {}[m [31m- self.is_quiet = False[m [32m+[m[32m self.add_protocol('gemini', handle_gemini_or_titan_request)[m [32m+[m[32m self.add_protocol('titan', handle_gemini_or_titan_request)[m [32m+[m[32m # Queue for pending handler jobs.[m [32m+[m[32m self.job_lock = threading.Lock()[m [32m+[m[32m self.job_id = 0[m [32m+[m[32m self.handler_queue = handler_queue[m [32m+[m[32m self.response_queues = response_queues[m [m def config(self):[m return self.cfg[m [m [31m- def shutdown_event(self):[m [32m+[m[32m def is_background_work_allowed(self):[m """[m [31m- Returns:[m [31m- threading.Event: Event that is set when the server is[m [32m+[m[32m Determines whether extension modules are allowed to start workers.[m [32m+[m[32m """[m [32m+[m[32m return self.allow_extension_workers[m [32m+[m [32m+[m[32m def add_shutdown_event(self, event):[m [32m+[m[32m """[m [32m+[m[32m Registers a shutdown event. Extension modules must call this to[m [32m+[m[32m get notified when the server is being shut down.[m [32m+[m [32m+[m[32m Args:[m [32m+[m[32m event (threading.Event): Event that is set when the server is[m shutting down. Background workers must wait on this and stop[m when the event is set.[m """[m [31m- return self.shutdown[m [32m+[m[32m if not self.is_background_work_allowed():[m [32m+[m[32m raise Exception("background work not allowed")[m [32m+[m[32m # This is used in a parser thread that is allowed to launch workers.[m [32m+[m[32m return self.shutdown_events.append(event)[m [32m+[m [32m+[m[32m def set_shutdown(self):[m [32m+[m[32m for event in self.shutdown_events:[m [32m+[m[32m event.set()[m [m def set_quiet(self, is_quiet):[m self.is_quiet = is_quiet[m [36m@@ -452,9 +616,8 @@[m [mclass WorkerContext:[m request (Request): Request object.[m [m Returns:[m [31m- Tuple with (response, cache). The response can be binary data, text,[m [31m- tuple with status and meta string, or tuple with status, meta, and body.[m [31m- The cache is None if the data was not read from a cache.[m [32m+[m[32m Tuple with (status, meta, body, cache). The body can be bytes/bytearray or[m [32m+[m[32m an I/O object. The cache is None if the data was not read from a cache.[m """[m entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)[m [m [36m@@ -468,17 +631,39 @@[m [mclass WorkerContext:[m for cache in caches:[m media, content = cache.try_load(request.hostname + request.path)[m if not media is None:[m [31m- response = 20, media, content[m if hasattr(content, '__len__'):[m self.print('%d bytes from cache, %s' % (len(content), media))[m else:[m self.print('stream from cache,', media)[m [31m- return response, cache[m [32m+[m[32m return 20, media, content, cache[m [m # Process the request normally if there is nothing cached.[m if not from_cache:[m try:[m [31m- return entrypoint(request), None[m [32m+[m[32m if not self.handler_queue:[m [32m+[m[32m # Handle in the same thread/process synchronously. This is probably[m [32m+[m[32m # running under a RequestHandler process.[m [32m+[m[32m response = entrypoint(request)[m [32m+[m[32m else:[m [32m+[m[32m # Put it in the handler queue and wait for completion. Parser threads use[m [32m+[m[32m # this to hand work off to the handler processes.[m [32m+[m[32m with self.job_lock:[m [32m+[m[32m # The job ID is for verifying we are getting the right response.[m [32m+[m[32m self.job_id += 1[m [32m+[m[32m job_id = self.job_id[m [32m+[m [32m+[m[32m self.handler_queue.put((job_id, request, request.worker_id))[m [32m+[m[32m result_id, response = self.response_queues[request.worker_id].get()[m [32m+[m [32m+[m[32m if result_id != job_id:[m [32m+[m[32m raise Exception('response queue out of sync: request handler returned wrong job ID')[m [32m+[m[32m if isinstance(response, Exception):[m [32m+[m[32m raise response[m [32m+[m [32m+[m[32m status, meta, body = unpack_response(response)[m [32m+[m [32m+[m[32m return status, meta, body, None[m [32m+[m except Exception as x:[m import traceback[m traceback.print_exception(x)[m [36m@@ -499,138 +684,19 @@[m [mclass RequestData:[m self.request = request[m [m [m [31m-def handle_gemini_or_titan_request(request_data):[m [31m- worker = request_data.worker[m [31m- stream = request_data.stream[m [31m- data = request_data.buffered_data[m [31m- from_addr = request_data.from_addr[m [31m- identity = request_data.identity[m [31m- request = request_data.request[m [31m- expected_size = 0[m [31m- req_token = None[m [31m- req_mime = None[m [31m-[m [31m- if request.startswith('titan:'):[m [31m- if identity is None and worker.cfg.require_upload_identity():[m [31m- report_error(stream, 60, "Client certificate required for upload")[m [31m- return[m [31m- # Read the rest of the data.[m [31m- parms = request.split(';')[m [31m- request = parms[0][m [31m- for p in parms:[m [31m- if p.startswith('size='):[m [31m- expected_size = int(p[5:])[m [31m- elif p.startswith('token='):[m [31m- req_token = p[6:][m [31m- elif p.startswith('mime='):[m [31m- req_mime = p[5:][m [31m- worker.log(f'Receiving Titan content: {expected_size}')[m [31m- max_upload_size = worker.cfg.max_upload_size()[m [31m- if expected_size > max_upload_size and max_upload_size > 0:[m [31m- report_error(stream, 59, "Maximum content length exceeded")[m [31m- return[m [31m- while len(data) < expected_size:[m [31m- incoming = safe_recv(stream, 65536)[m [31m- if len(incoming) == 0:[m [31m- break[m [31m- data += incoming[m [31m- if len(data) != expected_size:[m [31m- report_error(stream, 59, "Invalid content length")[m [31m- return[m [31m- else:[m [31m- # No Payload in Gemini.[m [31m- if len(data):[m [31m- report_error(stream, 59, "Bad request")[m [31m- return[m [31m-[m [31m- url = urlparse(request)[m [31m- path = url.path[m [31m- if path == '':[m [31m- path = '/'[m [31m- hostname = url.hostname[m [31m-[m [31m- if url.port != None and url.port != worker.port:[m [31m- report_error(stream, 59, "Invalid port number")[m [31m- return[m [31m- if not stream.get_servername():[m [31m- # Server name indication is required.[m [31m- report_error(stream, 59, "Missing TLS server name indication")[m [31m- return[m [31m- if stream.get_servername().decode() != hostname:[m [31m- report_error(stream, 53, "Proxy request refused")[m [31m- return[m [31m-[m [31m- try:[m [31m- request = Request([m [31m- identity,[m [31m- remote_address=from_addr,[m [31m- scheme=url.scheme,[m [31m- hostname=hostname,[m [31m- path=path,[m [31m- query=url.query if '?' in request else None,[m [31m- content_token=req_token,[m [31m- content_mime=req_mime,[m [31m- content=data if len(data) else None[m [31m- )[m [31m- response, from_cache = worker.context.call_entrypoint(request)[m [31m-[m [31m- # Determine status code, meta line, and body content.[m [31m- if type(response) == tuple:[m [31m- if len(response) == 2:[m [31m- status, meta = response[m [31m- response = ''[m [31m- else:[m [31m- status, meta, response = response[m [31m- else:[m [31m- status = 20[m [31m- meta = 'text/gemini; charset=utf-8'[m [31m-[m [31m- if response == None:[m [31m- response_data = b''[m [31m- elif type(response) == str:[m [31m- response_data = response.encode('utf-8')[m [31m- else:[m [31m- response_data = response[m [31m-[m [31m- safe_sendall(stream, f'{status} {meta}\r\n'.encode('utf-8'))[m [31m- safe_sendall(stream, response_data)[m [31m-[m [31m- # Save to cache.[m [31m- if not from_cache and status == 20 and \[m [31m- (type(response_data) == bytes or type(response_data) == bytearray):[m [31m- for cache in worker.context.caches:[m [31m- if cache.save(hostname + path, meta, response_data):[m [31m- break[m [31m-[m [31m- # Close handles.[m [31m- if hasattr(response_data, 'close'):[m [31m- response_data.close()[m [31m-[m [31m- except GeminiError as error:[m [31m- report_error(stream, error.status, str(error))[m [31m- return[m [31m-[m [31m-[m [31m-class Worker(threading.Thread):[m [31m- """Thread that handles incoming requests from clients."""[m [32m+[m[32mclass RequestParser(threading.Thread):[m [32m+[m[32m """Thread that parses incoming requests from clients."""[m [m [31m- def __init__(self, id, cfg, work_queue, shutdown_event):[m [31m- #super().__init__(target=Worker._run, args=(self,)) # multiprocessing[m [32m+[m[32m def __init__(self, id, context, job_queue):[m super().__init__()[m self.id = id[m [31m- self.cfg = cfg[m [31m- self.port = cfg.port()[m [31m- self.context = WorkerContext(self.cfg, shutdown_event)[m [31m- self.context.add_protocol('gemini', handle_gemini_or_titan_request)[m [31m- self.context.add_protocol('titan', handle_gemini_or_titan_request)[m [31m- self.context.set_quiet(id > 0)[m [31m- self.jobs = work_queue[m [32m+[m[32m self.context = context[m [32m+[m[32m self.cfg = context.cfg[m [32m+[m[32m self.port = self.cfg.port()[m [32m+[m[32m self.jobs = job_queue[m [m def run(self):[m try:[m [31m- # Extensions are initialized in the worker process.[m [31m- self.context.load_modules()[m [31m- self.context.set_quiet(False)[m while True:[m stream, from_addr = self.jobs.get()[m if stream is None:[m [36m@@ -660,7 +726,7 @@[m [mclass Worker(threading.Thread):[m [m def process_request(self, stream, from_addr):[m data = bytes()[m [31m- MAX_LEN = 1024[m [32m+[m[32m MAX_LEN = 1024 # TODO: Gemini/Titan limitation only.[m MAX_RECV = MAX_LEN + 2 # includes terminator "\r\n"[m request = None[m incoming = safe_recv(stream, MAX_RECV)[m [36m@@ -696,32 +762,63 @@[m [mclass Worker(threading.Thread):[m for scheme, handler in self.context.protocols.items():[m if request.startswith(scheme + ':'):[m self.log(request)[m [31m- response_data = handler(RequestData(self,[m [31m- stream, data, from_addr,[m [31m- identity, request))[m [31m- if not response_data is None:[m [31m- safe_sendall(stream, response_data)[m [31m- # Close handles.[m [31m- if hasattr(response_data, 'close'):[m [31m- response_data.close()[m [32m+[m[32m response = handler(RequestData(self, stream, data, from_addr, identity, request))[m [32m+[m[32m if not response is None:[m [32m+[m[32m safe_sendall(stream, response)[m return[m [m report_error(stream, 59, "Unsupported protocol")[m [m [m [31m-_server_instance = None[m [32m+[m[32mclass ServerRestarter:[m [32m+[m[32m def __init__(self, server):[m [32m+[m[32m self.server = server[m [32m+[m [32m+[m[32m def __call__(self, signum, frame):[m [32m+[m[32m if signum == signal.SIGHUP:[m [32m+[m[32m print('--- SIGHUP ---')[m [32m+[m[32m self.server.restart_workers()[m [32m+[m [32m+[m [32m+[m[32mclass RequestHandler(mp.Process):[m [32m+[m[32m def __init__(self, id, cfg, job_queue, result_queues):[m [32m+[m[32m super().__init__(target=RequestHandler._run, args=(self,))[m [32m+[m[32m self.id = id[m [32m+[m[32m self.cfg = cfg[m [32m+[m[32m self.jobs = job_queue[m [32m+[m[32m self.results = result_queues[m [32m+[m[32m self.context = None[m [m [31m-def _restart_workers(signum, frame):[m [31m- if signum == signal.SIGHUP:[m [31m- _server_instance.restart_workers()[m [32m+[m[32m def _run(self):[m [32m+[m[32m self.context = Context(self.cfg)[m [32m+[m[32m self.context.load_modules()[m [32m+[m [32m+[m[32m # Wait for request processing jobs.[m [32m+[m[32m try:[m [32m+[m[32m while True:[m [32m+[m[32m job_id, request, queue_id = self.jobs.get()[m [32m+[m[32m if job_id is None:[m [32m+[m[32m break[m [32m+[m[32m result_queue = self.results[queue_id][m [32m+[m[32m entrypoint = self.context.find_entrypoint(request.scheme, request.hostname, request.path)[m [32m+[m[32m if not entrypoint:[m [32m+[m[32m result_queue.put((job_id, Exception("Missing entrypoint: " + request.url())))[m [32m+[m[32m continue[m [32m+[m[32m try:[m [32m+[m[32m response = unpack_response(entrypoint(request))[m [32m+[m[32m result_queue.put((job_id, response))[m [32m+[m [32m+[m[32m except Exception as error:[m [32m+[m[32m result_queue.put((job_id, error))[m [32m+[m [32m+[m[32m except KeyboardInterrupt:[m [32m+[m[32m pass[m [m [m class Server:[m def __init__(self, cfg):[m [31m- global _server_instance[m [31m- assert _server_instance is None[m [31m- _server_instance = self[m [31m- #mp.set_start_method('spawn')[m [32m+[m[32m mp.set_start_method('spawn')[m [32m+[m self.cfg = cfg[m self.address = cfg.address()[m self.port = cfg.port()[m [36m@@ -745,25 +842,18 @@[m [mclass Server:[m self.context.set_session_id(session_id)[m [m # Spawn the worker threads.[m [31m- self.shutdown_event = threading.Event()[m [31m- self.workers = [][m [31m- self.work_queue = queue.Queue()[m [32m+[m[32m #self.parser_shutdown_event = threading.Event()[m [32m+[m[32m self.parser_queue = queue.Queue()[m [32m+[m[32m #self.handler_shutdown_event = mp.Event()[m [32m+[m[32m self.handler_queue = mp.Queue()[m [32m+[m[32m self.init_parser_context()[m [32m+[m[32m self.parsers = [][m [32m+[m[32m self.handlers = [][m self.create_workers(cfg)[m [m self.sock = None[m self.sv_conn = None[m [m [31m- def restart_workers(self):[m [31m- """[m [31m- Restarts workers with an updated configuration. The server socket or[m [31m- TLS configuration are not modified, even if the values have changed[m [31m- in the configuration file.[m [31m- """[m [31m- self.stop_workers()[m [31m- self.cfg.reload()[m [31m- self.create_workers(self.cfg)[m [31m- self.start_workers()[m [31m-[m def run(self):[m attempts = 60[m print(f'Opening port {self.port}...')[m [36m@@ -786,7 +876,7 @@[m [mclass Server:[m self.start_workers()[m [m try:[m [31m- signal.signal(signal.SIGHUP, _restart_workers)[m [32m+[m[32m signal.signal(signal.SIGHUP, ServerRestarter(self))[m except ValueError:[m print('Restarting with SIGHUP not supported')[m [m [36m@@ -795,7 +885,7 @@[m [mclass Server:[m try:[m stream, from_addr = self.sv_conn.accept()[m stream._socket.settimeout(10)[m [31m- self.work_queue.put((stream, from_addr))[m [32m+[m[32m self.parser_queue.put((stream, from_addr))[m del stream[m del from_addr[m except KeyboardInterrupt:[m [36m@@ -816,23 +906,68 @@[m [mclass Server:[m [m print('Done')[m [m [32m+[m[32m def init_parser_context(self):[m [32m+[m[32m self.handler_results = [][m [32m+[m[32m if self.is_using_handler_processes():[m [32m+[m[32m for _ in range(max(self.cfg.num_threads(), 1)):[m [32m+[m[32m # Handler processes put results in these queues.[m [32m+[m[32m self.handler_results.append(mp.Queue())[m [32m+[m[32m self.parser_context = Context(self.cfg,[m [32m+[m[32m allow_extension_workers=True,[m [32m+[m[32m handler_queue=self.handler_queue if self.is_using_handler_processes() else None,[m [32m+[m[32m response_queues=self.handler_results if self.is_using_handler_processes() else None)[m [32m+[m[32m self.parser_context.set_quiet(False)[m [32m+[m[32m self.parser_context.load_modules()[m [32m+[m [32m+[m[32m def restart_workers(self):[m [32m+[m[32m """[m [32m+[m[32m Restarts workers with an updated configuration. The server socket or[m [32m+[m[32m TLS configuration are not modified, even if the values have changed[m [32m+[m[32m in the configuration file.[m [32m+[m[32m """[m [32m+[m[32m self.stop_workers()[m [32m+[m[32m self.cfg.reload()[m [32m+[m[32m self.init_parser_context()[m [32m+[m[32m self.create_workers(self.cfg)[m [32m+[m[32m self.start_workers()[m [32m+[m [32m+[m[32m def is_using_handler_processes(self):[m [32m+[m[32m return self.cfg.num_processes() > 0[m [32m+[m def create_workers(self, cfg):[m [31m- self.shutdown_event.clear()[m [31m- for worker_id in range(max(cfg.num_threads(), 1)):[m [31m- worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)[m [31m- self.workers.append(worker)[m [32m+[m[32m for proc_id in range(max(cfg.num_processes(), 0)):[m [32m+[m[32m proc = RequestHandler(proc_id, cfg, self.handler_queue, self.handler_results)[m [32m+[m[32m self.handlers.append(proc)[m [32m+[m [32m+[m[32m for parser_id in range(max(cfg.num_threads(), 1)):[m [32m+[m[32m # Threads share one context (note: GIL).[m [32m+[m[32m parser = RequestParser(parser_id, self.parser_context, self.parser_queue)[m [32m+[m[32m self.parsers.append(parser)[m [m def start_workers(self):[m [31m- for worker in self.workers:[m [31m- worker.start()[m [31m- print(len(self.workers), 'worker(s) started')[m [32m+[m[32m for handler in self.handlers:[m [32m+[m[32m handler.start()[m [32m+[m[32m for parser in self.parsers:[m [32m+[m[32m parser.start()[m [32m+[m [32m+[m[32m print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) started')[m [m def stop_workers(self):[m [31m- self.shutdown_event.set()[m [31m- n = len(self.workers)[m [31m- for _ in range(n):[m [31m- self.work_queue.put((None, None))[m [31m- for worker in self.workers:[m [31m- worker.join()[m [31m- self.workers = [][m [31m- print(n, 'worker(s) stopped')[m [32m+[m[32m self.parser_context.set_shutdown()[m [32m+[m [32m+[m[32m # Stop parsers first so all ongoing handler processes get to finish, and no new[m [32m+[m[32m # requests can come in.[m [32m+[m[32m for _ in range(len(self.parsers)):[m [32m+[m[32m self.parser_queue.put((None, None))[m [32m+[m[32m for _ in range(len(self.handlers)):[m [32m+[m[32m self.handler_queue.put((None, None, None))[m [32m+[m [32m+[m[32m for p in self.parsers:[m [32m+[m[32m p.join()[m [32m+[m[32m for h in self.handlers:[m [32m+[m[32m h.join()[m [32m+[m [32m+[m[32m print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) stopped')[m [32m+[m[32m self.parsers = [][m [32m+[m[32m self.handlers = [][m [32m+[m[32m self.parser_context.shutdown_events = [][m [1mdiff --git a/gmcapsule/modules/99_static.py b/gmcapsule/modules/99_static.py[m [1mindex 05c74cf..4248d1c 100644[m [1m--- a/gmcapsule/modules/99_static.py[m [1m+++ b/gmcapsule/modules/99_static.py[m [36m@@ -66,9 +66,9 @@[m [mdef serve_file(req):[m if not os.path.exists(path):[m return 51, "Not found"[m [m [31m- # Note: We return the file object so the sender doesn't have to buffer[m [32m+[m[32m # Note: We return a Path object so the sender doesn't have to buffer[m # the entire file in memory first.[m [31m- return status, meta, (open(path, 'rb') if status == 20 else None)[m [32m+[m[32m return status, meta, (Path(path) if status == 20 else None)[m [m [m def init(context):[m
text/gemini; charset=utf-8
This content has been proxied by September (ba2dc).