From 9d266a5cb3d1f4d0251e903b7a6f79079774777f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jaakko=20Kera=CC=88nen?= jaakko.keranen@iki.fi
Date: Fri, 16 Jun 2023 13:17:14 +0300
Subject: [PATCH 1/1] Switch back to regular threading; fixed refactoring
issues
gmcapsule/gemini.py | 93 ++++++++++++++++++++++-----------------------
1 file changed, 46 insertions(+), 47 deletions(-)
diff --git a/gmcapsule/gemini.py b/gmcapsule/gemini.py
index dcb17d9..ecddbeb 100644
--- a/gmcapsule/gemini.py
+++ b/gmcapsule/gemini.py
@@ -7,7 +7,9 @@ import importlib
import os.path
import select
import socket
-import multiprocessing as mp
+#import multiprocessing as mp
+import threading
+import queue
import re
import time
from pathlib import Path
@@ -303,12 +305,21 @@ class WorkerContext:
self.caches = []
self.is_quiet = False
self.is_quiet = is_quiet
def config(self):
return self.cfg
"""
Returns:
threading.Event: Event that is set when the server is
shutting down. Background workers must wait on this and stop
when the event is set.
"""
return self.shutdown
self.is_quiet = is_quiet
def print(self, *args):
if not self.is_quiet:
print(*args)
@@ -396,14 +407,24 @@ class WorkerContext:
self.print(f'Init:', mod.__doc__ if mod.__doc__ else name)
mod.init(self)
"""
Returns:
threading.Event: Event that is set when the server is
shutting down. Background workers must wait on this and stop
when the event is set.
"""
return self.shutdown
try:
for entry in self.entrypoints[protocol][hostname]:
path_pattern, handler = entry
if handler != None:
# A path string, possibly with wildcards.
if len(path_pattern) == 0 or fnmatch.fnmatch(path, path_pattern):
return handler
else:
# A callable generic path matcher.
handler = path_pattern(path)
if handler:
return handler
except Exception as x:
print(x)
return None
return None
def call_entrypoint(self, request):
"""
@@ -448,11 +469,12 @@ class WorkerContext:
raise GeminiError(50, 'Permanent failure')
-class Worker(mp.Process):
+class Worker(threading.Thread):
def __init__(self, id, cfg, work_queue, shutdown_event):
super().__init__(target=Worker._run, args=(self,))
#super().__init__(target=Worker._run, args=(self,)) # mp
super().__init__()
self.id = id
self.cfg = cfg
self.port = cfg.port()
@@ -460,7 +482,7 @@ class Worker(mp.Process):
self.context.set_quiet(id > 0)
self.jobs = work_queue
try:
# Extensions are initialized in the worker process.
self.context.load_modules()
@@ -624,7 +646,7 @@ class Worker(mp.Process):
# Save to cache.
if not from_cache and status == 20 and \
(type(response_data) == bytes or type(response_data) == bytearray):
for cache in self.server.caches:
for cache in self.context.caches:
if cache.save(hostname + path, meta, response_data):
break
@@ -639,7 +661,7 @@ class Worker(mp.Process):
class Server:
def __init__(self, cfg):
mp.set_start_method('spawn')
#mp.set_start_method('spawn')
hostname_or_hostnames = cfg.hostnames()
cert_path = cfg.certs_dir() / 'cert.pem'
@@ -653,10 +675,6 @@ class Server:
if type(hostname_or_hostnames) == str else hostname_or_hostnames
self.address = address
self.port = port
#if cache:
# self.caches.append(cache)
#self.max_upload_size = max_upload_size
#self.require_upload_identity = require_upload_identity
if not os.path.exists(cert_path):
raise Exception("certificate file not found: " + str(cert_path))
@@ -673,9 +691,9 @@ class Server:
self.context.set_session_id(session_id)
# Spawn the worker threads.
self.shutdown_event = mp.Event()
self.shutdown_event = threading.Event()
self.workers = []
self.work_queue = mp.Queue()
self.work_queue = queue.Queue()
for worker_id in range(max(num_threads, 1)):
worker = Worker(worker_id, cfg, self.work_queue, self.shutdown_event)
self.workers.append(worker)
@@ -702,9 +720,9 @@ class Server:
print('...')
print(f'Server started on port {self.port}')
MULTIPROCESS = True
PARALLELIZE = True
if MULTIPROCESS:
if PARALLELIZE:
for worker in self.workers:
worker.start()
print(len(self.workers), 'worker(s) started')
@@ -718,7 +736,7 @@ class Server:
stream._socket.settimeout(10)
self.work_queue.put((stream, from_addr))
if not MULTIPROCESS:
if not PARALLELIZE:
self.work_queue.put((None, None)) # single iteration only
self.workers[0].run()
@@ -739,29 +757,10 @@ class Server:
# Stop all workers.
self.shutdown_event.set()
if MULTIPROCESS:
if PARALLELIZE:
for i in range(len(self.workers)):
self.work_queue.put((None, None))
for worker in self.workers:
worker.join()
print('Done')
try:
for entry in self.entrypoints[protocol][hostname]:
path_pattern, handler = entry
if handler != None:
# A path string, possibly with wildcards.
if len(path_pattern) == 0 or fnmatch.fnmatch(path, path_pattern):
return handler
else:
# A callable generic path matcher.
handler = path_pattern(path)
if handler:
return handler
except Exception as x:
print(x)
return None
return None
--
2.25.1
text/plain
This content has been proxied by September (ba2dc).