GmCapsule [main]

Use multiple processes to handle requests

=> e54db584a4a3d225b29d41eebc594b216aeaf033

diff --git a/README.md b/README.md
index 0b1540a..2250fe3 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,7 @@ The log can be viewed via journalctl (or syslog):
 
 * Extension modules can register new protocols in addition to the built-in Gemini and Titan.
 * SIGHUP causes the configuration file to be reloaded and workers to be restarted. The listening socket remains open, so the socket and TLS parameters cannot be changed.
-* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `WorkerContext`. `Capsule` is no longer available as global state.
+* API change: Extension modules get initialized separately in each worker thread. Instead of a `Capsule`, the extension module `init` method is passed a `Context`. `Capsule` is no longer available as global state.
 
 ### v0.4
 
diff --git a/docs/api.rst b/docs/api.rst
index 75db2ed..4dd522b 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -10,9 +10,9 @@ These classes and functions are available to extension modules by importing
    Config
    Capsule
    Cache
+   Context
    gemini.Request
    gemini.Identity
-   WorkerContext
 
 
 Classes
@@ -34,6 +34,11 @@ Cache
 .. autoclass:: gmcapsule.Cache
     :members:
 
+Context
+-------
+.. autoclass:: gmcapsule.Context
+    :members:
+
 Request
 -------
 .. autoclass:: gmcapsule.gemini.Request
@@ -44,11 +49,6 @@ Identity
 .. autoclass:: gmcapsule.gemini.Identity
     :members:
 
-WorkerContext
--------------
-.. autoclass:: gmcapsule.WorkerContext
-    :members:
-
 
 Functions
 *********
diff --git a/example.ini b/example.ini
index 44ebb4e..2b178da 100644
--- a/example.ini
+++ b/example.ini
@@ -6,8 +6,9 @@
 ;address = 0.0.0.0
 ;port = 1965
 ;certs = .certs
-;modules =
+;modules = 
 ;threads = 5
+;processes = 2
 
 [static]
 root = .
@@ -15,6 +16,9 @@ root = .
 [titan]
 ;upload_limit = 10485760
 
+[cgi]
+bin_root	= ./cgi-bin
+
 [cgi.booster]
 protocol        = titan
 host            = localhost
@@ -25,11 +29,15 @@ command         = /usr/bin/python3 ../booster/booster.py
 path            = /cgienv
 command         = printenv
 
+[rewrite.test]
+path = ^/altenv$
+status = 30 gemini://localhost/cgienv${QUERY_STRING}
+
 ;--------------------------------------------------------------------------
 
-;[gitview]
-;git             = /usr/bin/git
-;cache_path      = /Users/jaakko/Library/Caches/gmgitview
+[gitview]
+git             = /usr/bin/git
+cache_path      = /Users/jaakko/Library/Caches/gmgitview
 
 [gitview.lagrange]
 title           = Lagrange
@@ -47,3 +55,12 @@ clone_url       = https://git.skyjake.fi/gemini/gitview.git
 path            = /Users/jaakko/src/gmgitview
 url_root        = gmgitview
 default_branch  = main
+
+[gitview.bubble]
+title = Bubble
+brief = Bulletin Boards for Gemini
+clone_url = https://git.skyjake.fi/gemini/bubble.git
+path = /Users/jaakko/src/bubble
+url_root = bubble
+default_branch = main
+
diff --git a/gmcapsule/__init__.py b/gmcapsule/__init__.py
index ebbeeb5..96ec7d8 100644
--- a/gmcapsule/__init__.py
+++ b/gmcapsule/__init__.py
@@ -428,7 +428,7 @@ Each extension module is required to have an initialization function:
     :param context: Worker context. The extension can use this to access
         configuration parameters, install caches, and register entry
         points and custom scheme handlers.
-    :type context: gmcapsule.WorkerContext
+    :type context: gmcapsule.Context
 
 
 Requests
@@ -481,13 +481,13 @@ import shlex
 import subprocess
 from pathlib import Path
 
-from .gemini import Server, Cache, WorkerContext
+from .gemini import Server, Cache, Context
 from .markdown import to_gemtext as markdown_to_gemtext
 
 
 __version__ = '0.5.0'
 __all__ = [
-    'Config', 'Cache', 'WorkerContext',
+    'Config', 'Cache', 'Context',
     'get_mime_type', 'markdown_to_gemtext'
 ]
 
@@ -557,6 +557,9 @@ class Config:
     def num_threads(self):
         return self.ini.getint('server', 'threads', fallback=5)
 
+    def num_processes(self):
+        return self.ini.getint('server', 'processes', fallback=2)
+
     def max_upload_size(self):
         return self.ini.getint('titan', 'upload_limit', fallback=10 * 1024 * 1024)
 
diff --git a/gmcapsule/gemini.py b/gmcapsule/gemini.py
index ce4edd2..163871b 100644
--- a/gmcapsule/gemini.py
+++ b/gmcapsule/gemini.py
@@ -7,7 +7,7 @@ import importlib
 import os.path
 import select
 import socket
-#import multiprocessing as mp
+import multiprocessing as mp
 import threading
 import queue
 import re
@@ -74,6 +74,10 @@ def safe_recv(stream, max_len, stall_timeout=10):
     return data
 
 
+def is_bytes(data):
+    return type(data) == bytes or type(data) == bytearray
+
+
 def safe_sendall(stream, data, stall_timeout=30):
     """
     Send data over an SSL connection, accounting for stalls and retries
@@ -86,46 +90,55 @@ def safe_sendall(stream, data, stall_timeout=30):
         stall_timeout (float): Number of seconds to wait until
             terminating a stalled send.
     """
-    if type(data) == bytes or type(data) == bytearray:
-        streaming = False
-    else:
-        streaming = True
-
-    # We may need to retry sending with the exact same buffer,
-    # so keep it around until successful.
-    BUF_LEN = 32768
-    if streaming:
-        send_buf = data.read(BUF_LEN)
-    else:
-        send_buf = data[:BUF_LEN]
+    try:
+        if is_bytes(data):
+            streaming = False
+        else:
+            streaming = True
+            if isinstance(data, Path):
+                print('opening', data)
+                data = open(data, 'rb')
+
+        # We may need to retry sending with the exact same buffer,
+        # so keep it around until successful.
+        BUF_LEN = 32768
+        if streaming:
+            send_buf = data.read(BUF_LEN)
+        else:
+            send_buf = data[:BUF_LEN]
 
-    last_time = time.time()
-    pos = 0
-    while len(send_buf) > 0:
-        try:
-            if time.time() - last_time > stall_timeout:
-                raise AbortedIOError('stalled')
-            sent = stream.send(send_buf)
-            if sent < 0:
-                raise AbortedIOError('failed to send')
-            pos += sent
-            if streaming:
-                send_buf = send_buf[sent:]
-                if len(send_buf) < BUF_LEN / 2:
-                    send_buf += data.read(BUF_LEN)
-            else:
-                send_buf = data[pos : pos + BUF_LEN]
-            if sent > 0:
-                last_time = time.time()
-            else:
+        last_time = time.time()
+        pos = 0
+        while len(send_buf) > 0:
+            try:
+                if time.time() - last_time > stall_timeout:
+                    raise AbortedIOError('stalled')
+                sent = stream.send(send_buf)
+                if sent < 0:
+                    raise AbortedIOError('failed to send')
+                pos += sent
+                if streaming:
+                    send_buf = send_buf[sent:]
+                    if len(send_buf) < BUF_LEN / 2:
+                        send_buf += data.read(BUF_LEN)
+                else:
+                    send_buf = data[pos : pos + BUF_LEN]
+                if sent > 0:
+                    last_time = time.time()
+                else:
+                    wait_for_write(stream, stall_timeout)
+            except OpenSSL.SSL.WantReadError:
+                pass
+            except OpenSSL.SSL.WantWriteError:
+                # Wait until the socket is ready for writing.
                 wait_for_write(stream, stall_timeout)
-        except OpenSSL.SSL.WantReadError:
-            pass
-        except OpenSSL.SSL.WantWriteError:
-            # Wait until the socket is ready for writing.
-            wait_for_write(stream, stall_timeout)
-        except OpenSSL.SSL.WantX509LookupError:
-            pass
+            except OpenSSL.SSL.WantX509LookupError:
+                pass
+
+    finally:
+        # Close resources and handles.
+        if data and hasattr(data, 'close'):
+            data.close()
 
 
 def safe_close(stream):
@@ -148,6 +161,19 @@ def report_error(stream, code, msg):
     safe_close(stream)
 
 
+def cert_subject(cert):
+    comps = {}
+    for name, value in cert.get_subject().get_components():
+        comps[name.decode()] = value.decode()
+    return comps
+
+def cert_issuer(cert):
+    comps = {}
+    for name, value in cert.get_issuer().get_components():
+        comps[name.decode()] = value.decode()
+    return comps
+
+
 class Identity:
     """
     Client certificate.
@@ -156,19 +182,21 @@ class Identity:
     just for the public key.
 
     Attributes:
-        cert (OpenSSL.SSL.X509): Certificate.
-        pubkey (OpenSSL.SSL.PKey): Public key.
+        cert (bytes): Certificate (DER format).
+        pubkey (bytes): Public key (DER format).
         fp_cert (str): SHA-256 hash of the certificate.
         fp_pubkey (str): SHA-256 hash of the public key.
     """
     def __init__(self, cert):
-        self.cert = cert
+        self._subject = cert_subject(cert)
+        self._issuer = cert_issuer(cert)
+        self.cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
         m = hashlib.sha256()
-        m.update(crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert))
+        m.update(self.cert)
         self.fp_cert = m.hexdigest()
-        self.pubkey = self.cert.get_pubkey()
+        self.pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, cert.get_pubkey())
         m = hashlib.sha256()
-        m.update(crypto.dump_publickey(crypto.FILETYPE_ASN1, self.pubkey))
+        m.update(self.pubkey)
         self.fp_pubkey = m.hexdigest()
 
     def __str__(self):
@@ -177,22 +205,16 @@ class Identity:
     def subject(self):
         """
         Returns:
-            dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``
+            dict: Name components of the certificate subject, e.g.: ``{'CN': 'Name'}``.
         """
-        comps = {}
-        for name, value in self.cert.get_subject().get_components():
-            comps[name.decode()] = value.decode()
-        return comps
+        return self._subject
 
     def issuer(self):
         """
         Returns:
-            dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``
+            dict: Name components of the certificate issuer, e.g.: ``{'CN': 'Name'}``.
         """
-        comps = {}
-        for name, value in self.cert.get_issuer().get_components():
-            comps[name.decode()] = value.decode()
-        return comps
+        return self._issuer
 
 
 class Request:
@@ -220,7 +242,8 @@ class Request:
             May be ``None``.
     """
     def __init__(self, identity=None, scheme='gemini', hostname='', path='', query=None,
-                 remote_address=None, content_token=None, content_mime=None, content=None):
+                 remote_address=None, content_token=None, content_mime=None, content=None,
+                 worker_id=None):
         self.remote_address = remote_address
         self.scheme = scheme
         self.identity = identity
@@ -230,6 +253,7 @@ class Request:
         self.content_token = content_token
         self.content_mime = content_mime
         self.content = content
+        self.worker_id = worker_id
 
     def url(self):
         return f'{self.scheme}://{self.hostname}{self.path}{"?" + self.query if self.query else ""}'
@@ -297,10 +321,128 @@ class Cache:
         return None, None
 
 
-class WorkerContext:
-    def __init__(self, cfg, shutdown_event):
+def handle_gemini_or_titan_request(request_data):
+    worker = request_data.worker
+    stream = request_data.stream
+    data = request_data.buffered_data
+    from_addr = request_data.from_addr
+    identity = request_data.identity
+    request = request_data.request
+    expected_size = 0
+    req_token = None
+    req_mime = None
+
+    if request.startswith('titan:'):
+        if identity is None and worker.cfg.require_upload_identity():
+            report_error(stream, 60, "Client certificate required for upload")
+            return
+        # Read the rest of the data.
+        parms = request.split(';')
+        request = parms[0]
+        for p in parms:
+            if p.startswith('size='):
+                expected_size = int(p[5:])
+            elif p.startswith('token='):
+                req_token = p[6:]
+            elif p.startswith('mime='):
+                req_mime = p[5:]
+        worker.log(f'Receiving Titan content: {expected_size}')
+        max_upload_size = worker.cfg.max_upload_size()
+        if expected_size > max_upload_size and max_upload_size > 0:
+            report_error(stream, 59, "Maximum content length exceeded")
+            return
+        while len(data) < expected_size:
+            incoming = safe_recv(stream, 65536)
+            if len(incoming) == 0:
+                break
+            data += incoming
+        if len(data) != expected_size:
+            report_error(stream, 59, "Invalid content length")
+            return
+    else:
+        # No Payload in Gemini.
+        if len(data):
+            report_error(stream, 59, "Bad request")
+            return
+
+    url = urlparse(request)
+    path = url.path
+    if path == '':
+        path = '/'
+    hostname = url.hostname
+
+    if url.port != None and url.port != worker.port:
+        report_error(stream, 59, "Invalid port number")
+        return
+    if not stream.get_servername():
+        # Server name indication is required.
+        report_error(stream, 59, "Missing TLS server name indication")
+        return
+    if stream.get_servername().decode() != hostname:
+        report_error(stream, 53, "Proxy request refused")
+        return
+
+    try:
+        status, meta, body, from_cache = worker.context.call_entrypoint(Request(
+            identity,
+            remote_address=from_addr,
+            scheme=url.scheme,
+            hostname=hostname,
+            path=path,
+            query=url.query if '?' in request else None,
+            content_token=req_token,
+            content_mime=req_mime,
+            content=data if len(data) else None,
+            worker_id=request_data.worker.id
+        ))
+
+        output = f'{status} {meta}\r\n'.encode('utf-8')
+        if is_bytes(body):
+            safe_sendall(stream, output + body)
+        else:
+            # `body` is some sort of streamable data, cannot send in one call.
+            safe_sendall(stream, output)
+            safe_sendall(stream, body)
+
+        # Save to cache.
+        if not from_cache and status == 20 and is_bytes(body):
+            for cache in worker.context.caches:
+                if cache.save(hostname + path, meta, body):
+                    break
+
+    except GeminiError as error:
+        report_error(stream, error.status, str(error))
+        return
+
+
+def unpack_response(response):
+    if type(response) == tuple:
+        if len(response) == 2:
+            status, meta = response
+            response = ''
+        else:
+            status, meta, response = response
+    else:
+        status = 20
+        meta = 'text/gemini; charset=utf-8'
+
+    if response == None:
+        body = b''
+    elif type(response) == str:
+        body = response.encode('utf-8')
+    else:
+        body = response
+
+    return status, meta, body
+
+
+class Context:
+    def __init__(self, cfg, allow_extension_workers=False, handler_queue=None,
+                 response_queues=None):
         self.cfg = cfg
-        self.shutdown = shutdown_event
+        self.is_quiet = True
+        self.shutdown_events = []
+        self.allow_extension_workers = allow_extension_workers
         self.hostnames = cfg.hostnames()
         self.entrypoints = {'gemini': {}, 'titan': {}}
         for proto in ['gemini', 'titan']:
@@ -309,19 +451,41 @@ class WorkerContext:
                 self.entrypoints[proto][hostname] = []
         self.caches = []
         self.protocols = {}
-        self.is_quiet = False
+        self.add_protocol('gemini', handle_gemini_or_titan_request)
+        self.add_protocol('titan', handle_gemini_or_titan_request)
+        # Queue for pending handler jobs.
+        self.job_lock = threading.Lock()
+        self.job_id = 0
+        self.handler_queue = handler_queue
+        self.response_queues = response_queues
 
     def config(self):
         return self.cfg
 
-    def shutdown_event(self):
+    def is_background_work_allowed(self):
         """
-        Returns:
-            threading.Event: Event that is set when the server is
+        Determines whether extension modules are allowed to start workers.
+        """
+        return self.allow_extension_workers
+
+    def add_shutdown_event(self, event):
+        """
+        Registers a shutdown event. Extension modules must call this to
+        get notified when the server is being shut down.
+
+        Args:
+            event (threading.Event): Event that is set when the server is
             shutting down. Background workers must wait on this and stop
             when the event is set.
         """
-        return self.shutdown
+        if not self.is_background_work_allowed():
+            raise Exception("background work not allowed")
+        # This is used in a parser thread that is allowed to launch workers.
+        return self.shutdown_events.append(event)
+
+    def set_shutdown(self):
+        for event in self.shutdown_events:
+            event.set()
 
     def set_quiet(self, is_quiet):
         self.is_quiet = is_quiet
@@ -452,9 +616,8 @@ class WorkerContext:
             request (Request): Request object.
 
         Returns:
-            Tuple with (response, cache). The response can be binary data, text,
-            tuple with status and meta string, or tuple with status, meta, and body.
-            The cache is None if the data was not read from a cache.
+            Tuple with (status, meta, body, cache). The body can be bytes/bytearray or
+            an I/O object. The cache is None if the data was not read from a cache.
         """
         entrypoint = self.find_entrypoint(request.scheme, request.hostname, request.path)
 
@@ -468,17 +631,39 @@ class WorkerContext:
             for cache in caches:
                 media, content = cache.try_load(request.hostname + request.path)
                 if not media is None:
-                    response = 20, media, content
                     if hasattr(content, '__len__'):
                         self.print('%d bytes from cache, %s' % (len(content), media))
                     else:
                         self.print('stream from cache,', media)
-                    return response, cache
+                    return 20, media, content, cache
 
             # Process the request normally if there is nothing cached.
             if not from_cache:
                 try:
-                    return entrypoint(request), None
+                    if not self.handler_queue:
+                        # Handle in the same thread/process synchronously. This is probably
+                        # running under a RequestHandler process.
+                        response = entrypoint(request)
+                    else:
+                        # Put it in the handler queue and wait for completion. Parser threads use
+                        # this to hand work off to the handler processes.
+                        with self.job_lock:
+                            # The job ID is for verifying we are getting the right response.
+                            self.job_id += 1
+                            job_id = self.job_id
+
+                        self.handler_queue.put((job_id, request, request.worker_id))
+                        result_id, response = self.response_queues[request.worker_id].get()
+
+                        if result_id != job_id:
+                            raise Exception('response queue out of sync: request handler returned wrong job ID')
+                        if isinstance(response, Exception):
+                            raise response
+
+                    status, meta, body = unpack_response(response)
+
+                    return status, meta, body, None
+
                 except Exception as x:
                     import traceback
                     traceback.print_exception(x)
@@ -499,138 +684,19 @@ class RequestData:
         self.request = request
 
 
-def handle_gemini_or_titan_request(request_data):
-    worker = request_data.worker
-    stream = request_data.stream
-    data = request_data.buffered_data
-    from_addr = request_data.from_addr
-    identity = request_data.identity
-    request = request_data.request
-    expected_size = 0
-    req_token = None
-    req_mime = None
-
-    if request.startswith('titan:'):
-        if identity is None and worker.cfg.require_upload_identity():
-            report_error(stream, 60, "Client certificate required for upload")
-            return
-        # Read the rest of the data.
-        parms = request.split(';')
-        request = parms[0]
-        for p in parms:
-            if p.startswith('size='):
-                expected_size = int(p[5:])
-            elif p.startswith('token='):
-                req_token = p[6:]
-            elif p.startswith('mime='):
-                req_mime = p[5:]
-        worker.log(f'Receiving Titan content: {expected_size}')
-        max_upload_size = worker.cfg.max_upload_size()
-        if expected_size > max_upload_size and max_upload_size > 0:
-            report_error(stream, 59, "Maximum content length exceeded")
-            return
-        while len(data) < expected_size:
-            incoming = safe_recv(stream, 65536)
-            if len(incoming) == 0:
-                break
-            data += incoming
-        if len(data) != expected_size:
-            report_error(stream, 59, "Invalid content length")
-            return
-    else:
-        # No Payload in Gemini.
-        if len(data):
-            report_error(stream, 59, "Bad request")
-            return
-
-    url = urlparse(request)
-    path = url.path
-    if path == '':
-        path = '/'
-    hostname = url.hostname
-
-    if url.port != None and url.port != worker.port:
-        report_error(stream, 59, "Invalid port number")
-        return
-    if not stream.get_servername():
-        # Server name indication is required.
-        report_error(stream, 59, "Missing TLS server name indication")
-        return
-    if stream.get_servername().decode() != hostname:
-        report_error(stream, 53, "Proxy request refused")
-        return
-
-    try:
-        request = Request(
-            identity,
-            remote_address=from_addr,
-            scheme=url.scheme,
-            hostname=hostname,
-            path=path,
-            query=url.query if '?' in request else None,
-            content_token=req_token,
-            content_mime=req_mime,
-            content=data if len(data) else None
-        )
-        response, from_cache = worker.context.call_entrypoint(request)
-
-        # Determine status code, meta line, and body content.
-        if type(response) == tuple:
-            if len(response) == 2:
-                status, meta = response
-                response = ''
-            else:
-                status, meta, response = response
-        else:
-            status = 20
-            meta = 'text/gemini; charset=utf-8'
-
-        if response == None:
-            response_data = b''
-        elif type(response) == str:
-            response_data = response.encode('utf-8')
-        else:
-            response_data = response
-
-        safe_sendall(stream, f'{status} {meta}\r\n'.encode('utf-8'))
-        safe_sendall(stream, response_data)
-
-        # Save to cache.
-        if not from_cache and status == 20 and \
-                (type(response_data) == bytes or type(response_data) == bytearray):
-            for cache in worker.context.caches:
-                if cache.save(hostname + path, meta, response_data):
-                    break
-
-        # Close handles.
-        if hasattr(response_data, 'close'):
-            response_data.close()
-
-    except GeminiError as error:
-        report_error(stream, error.status, str(error))
-        return
-
-
-class Worker(threading.Thread):
-    """Thread that handles incoming requests from clients."""
+class RequestParser(threading.Thread):
+    """Thread that parses incoming requests from clients."""
 
-    def __init__(self, id, cfg, work_queue, shutdown_event):
-        #super().__init__(target=Worker._run, args=(self,))     # multiprocessing
+    def __init__(self, id, context, job_queue):
         super().__init__()
         self.id = id
-        self.cfg = cfg
-        self.port = cfg.port()
-        self.context = WorkerContext(self.cfg, shutdown_event)
-        self.context.add_protocol('gemini', handle_gemini_or_titan_request)
-        self.context.add_protocol('titan', handle_gemini_or_titan_request)
-        self.context.set_quiet(id > 0)
-        self.jobs = work_queue
+        self.context = context
+        self.cfg = context.cfg
+        self.port = self.cfg.port()
+        self.jobs = job_queue
 
     def run(self):
         try:
-            # Extensions are initialized in the worker process.
-            self.context.load_modules()
-            self.context.set_quiet(False)
             while True:
                 stream, from_addr = self.jobs.get()
                 if stream is None:
@@ -660,7 +726,7 @@ class Worker(threading.Thread):
 
     def process_request(self, stream, from_addr):
         data = bytes()
-        MAX_LEN = 1024
+        MAX_LEN = 1024  # TODO: Gemini/Titan limitation only.
         MAX_RECV = MAX_LEN + 2  # includes terminator "\r\n"
         request = None
         incoming = safe_recv(stream, MAX_RECV)
@@ -696,32 +762,63 @@ class Worker(threading.Thread):
         for scheme, handler in self.context.protocols.items():
             if request.startswith(scheme + ':'):
                 self.log(request)
-                response_data = handler(RequestData(self,
-                                                    stream, data, from_addr,
-                                                    identity, request))
-                if not response_data is None:
-                    safe_sendall(stream, response_data)
-                    # Close handles.
-                    if hasattr(response_data, 'close'):
-                        response_data.close()
+                response = handler(RequestData(self, stream, data, from_addr, identity, request))
+                if not response is None:
+                    safe_sendall(stream, response)
                 return
 
         report_error(stream, 59, "Unsupported protocol")
 
 
-_server_instance = None
+class ServerRestarter:
+    def __init__(self, server):
+        self.server = server
+
+    def __call__(self, signum, frame):
+        if signum == signal.SIGHUP:
+            print('--- SIGHUP ---')
+            self.server.restart_workers()
+
+
+class RequestHandler(mp.Process):
+    def __init__(self, id, cfg, job_queue, result_queues):
+        super().__init__(target=RequestHandler._run, args=(self,))
+        self.id = id
+        self.cfg = cfg
+        self.jobs = job_queue
+        self.results = result_queues
+        self.context = None
 
-def _restart_workers(signum, frame):
-    if signum == signal.SIGHUP:
-        _server_instance.restart_workers()
+    def _run(self):
+        self.context = Context(self.cfg)
+        self.context.load_modules()
+
+        # Wait for request processing jobs.
+        try:
+            while True:
+                job_id, request, queue_id = self.jobs.get()
+                if job_id is None:
+                    break
+                result_queue = self.results[queue_id]
+                entrypoint = self.context.find_entrypoint(request.scheme, request.hostname, request.path)
+                if not entrypoint:
+                    result_queue.put((job_id, Exception("Missing entrypoint: " + request.url())))
+                    continue
+                try:
+                    response = unpack_response(entrypoint(request))
+                    result_queue.put((job_id, response))
+
+                except Exception as error:
+                    result_queue.put((job_id, error))
+
+        except KeyboardInterrupt:
+            pass
 
 
 class Server:
     def __init__(self, cfg):
-        global _server_instance
-        assert _server_instance is None
-        _server_instance = self
-        #mp.set_start_method('spawn')
+        mp.set_start_method('spawn')
+
         self.cfg = cfg
         self.address = cfg.address()
         self.port = cfg.port()
@@ -745,25 +842,18 @@ class Server:
         self.context.set_session_id(session_id)
 
         # Spawn the worker threads.
-        self.shutdown_event = threading.Event()
-        self.workers = []
-        self.work_queue = queue.Queue()
+        #self.parser_shutdown_event = threading.Event()
+        self.parser_queue = queue.Queue()
+        #self.handler_shutdown_event = mp.Event()
+        self.handler_queue = mp.Queue()
+        self.init_parser_context()
+        self.parsers = []
+        self.handlers = []
         self.create_workers(cfg)
 
         self.sock = None
         self.sv_conn = None
 
-    def restart_workers(self):
-        """
-        Restarts workers with an updated configuration. The server socket or
-        TLS configuration are not modified, even if the values have changed
-        in the configuration file.
-        """
-        self.stop_workers()
-        self.cfg.reload()
-        self.create_workers(self.cfg)
-        self.start_workers()
-
     def run(self):
         attempts = 60
         print(f'Opening port {self.port}...')
@@ -786,7 +876,7 @@ class Server:
         self.start_workers()
 
         try:
-            signal.signal(signal.SIGHUP, _restart_workers)
+            signal.signal(signal.SIGHUP, ServerRestarter(self))
         except ValueError:
             print('Restarting with SIGHUP not supported')
 
@@ -795,7 +885,7 @@ class Server:
             try:
                 stream, from_addr = self.sv_conn.accept()
                 stream._socket.settimeout(10)
-                self.work_queue.put((stream, from_addr))
+                self.parser_queue.put((stream, from_addr))
                 del stream
                 del from_addr
             except KeyboardInterrupt:
@@ -816,23 +906,68 @@ class Server:
 
         print('Done')
 
+    def init_parser_context(self):
+        self.handler_results = []
+        if self.is_using_handler_processes():
+            for _ in range(max(self.cfg.num_threads(), 1)):
+                # Handler processes put results in these queues.
+                self.handler_results.append(mp.Queue())
+        self.parser_context = Context(self.cfg,
+                                      allow_extension_workers=True,
+                                      handler_queue=self.handler_queue if self.is_using_handler_processes() else None,
+                                      response_queues=self.handler_results if self.is_using_handler_processes() else None)
+        self.parser_context.set_quiet(False)
+        self.parser_context.load_modules()
+
+    def restart_workers(self):
+        """
+        Restarts workers with an updated configuration. The server socket or
+        TLS configuration are not modified, even if the values have changed
+        in the configuration file.
+        """
+        self.stop_workers()
+        self.cfg.reload()
+        self.init_parser_context()
+        self.create_workers(self.cfg)
+        self.start_workers()
+
+    def is_using_handler_processes(self):
+        return self.cfg.num_processes() > 0
+
     def create_workers(self, cfg):
-        self.shutdown_event.clear()
-        for worker_id in range(max(cfg.num_threads(), 1)):
-            worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)
-            self.workers.append(worker)
+        for proc_id in range(max(cfg.num_processes(), 0)):
+            proc = RequestHandler(proc_id, cfg, self.handler_queue, self.handler_results)
+            self.handlers.append(proc)
+
+        for parser_id in range(max(cfg.num_threads(), 1)):
+            # Threads share one context (note: GIL).
+            parser = RequestParser(parser_id, self.parser_context, self.parser_queue)
+            self.parsers.append(parser)
 
     def start_workers(self):
-        for worker in self.workers:
-            worker.start()
-        print(len(self.workers), 'worker(s) started')
+        for handler in self.handlers:
+            handler.start()
+        for parser in self.parsers:
+            parser.start()
+
+        print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) started')
 
     def stop_workers(self):
-        self.shutdown_event.set()
-        n = len(self.workers)
-        for _ in range(n):
-            self.work_queue.put((None, None))
-        for worker in self.workers:
-            worker.join()
-        self.workers = []
-        print(n, 'worker(s) stopped')
+        self.parser_context.set_shutdown()
+
+        # Stop parsers first so all ongoing handler processes get to finish, and no new
+        # requests can come in.
+        for _ in range(len(self.parsers)):
+            self.parser_queue.put((None, None))
+        for _ in range(len(self.handlers)):
+            self.handler_queue.put((None, None, None))
+
+        for p in self.parsers:
+            p.join()
+        for h in self.handlers:
+            h.join()
+
+        print(len(self.parsers), 'parser(s) and', len(self.handlers), 'handler(s) stopped')
+        self.parsers = []
+        self.handlers = []
+        self.parser_context.shutdown_events = []
diff --git a/gmcapsule/modules/99_static.py b/gmcapsule/modules/99_static.py
index 05c74cf..4248d1c 100644
--- a/gmcapsule/modules/99_static.py
+++ b/gmcapsule/modules/99_static.py
@@ -66,9 +66,9 @@ def serve_file(req):
     if not os.path.exists(path):
         return 51, "Not found"
 
-    # Note: We return the file object so the sender doesn't have to buffer
+    # Note: We return a Path object so the sender doesn't have to buffer
     # the entire file in memory first.
-    return status, meta, (open(path, 'rb') if status == 20 else None)
+    return status, meta, (Path(path) if status == 20 else None)
 
 
 def init(context):
Proxy Information
Original URL
gemini://git.skyjake.fi/gmcapsule/main/cdiff/e54db584a4a3d225b29d41eebc594b216aeaf033
Status Code
Success (20)
Meta
text/gemini; charset=utf-8
Capsule Response Time
39.656955 milliseconds
Gemini-to-HTML Time
2.00311 milliseconds

This content has been proxied by September (ba2dc).