the_Foundation [main]

Windows: Ported TCP sockets to Windows Sockets API

=> 84928fcaab4aedc3cfc1fd8e79b9ef422ae362d4

diff --git a/src/platform/win32/service.c b/src/platform/win32/service.c
index b66e442..878a0f9 100644
--- a/src/platform/win32/service.c
+++ b/src/platform/win32/service.c
@@ -29,7 +29,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #include "the_Foundation/socket.h"
 #include "the_Foundation/string.h"
 #include "the_Foundation/thread.h"
-#include "pipe.h"
 #include "wide.h"
 
 #define WIN32_LEAN_AND_MEAN
@@ -40,7 +39,8 @@ struct Impl_Service {
     iObject object;
     uint16_t port;
     SOCKET fd;
-    iPipe stop;
+    HANDLE fdEvent;
+    HANDLE stopEvent;
     iThread *listening;
     iAudience *incomingAccepted;
 };
@@ -50,55 +50,69 @@ iDefineAudienceGetter(Service, incomingAccepted)
 iDefineObjectConstructionArgs(Service, (uint16_t port), port)
 
 static iThreadResult listen_Service_(iThread *thd) {
-    iService *d = userData_Thread(thd);
-    while (d->fd >= 0) {
+    iService *d = userData_Thread(thd);    
+    while (d->fd != INVALID_SOCKET) {
         /* Wait for activity. */
-        fd_set fds;
-        FD_ZERO(&fds);
-        FD_SET(d->fd, &fds);
-        FD_SET(output_Pipe(&d->stop), &fds);
-        if (select(0, &fds, NULL, NULL, NULL) == -1) {
+        HANDLE events[2] = { d->fdEvent, d->stopEvent };
+        DWORD rc = WaitForMultipleObjects(2, events, FALSE, INFINITE);
+        if (rc == WAIT_FAILED) {
+            iDebug("[Service] %s\n", errorMessage_Windows_(GetLastError()));
             break;
         }
-        if (FD_ISSET(output_Pipe(&d->stop), &fds)) {
+        else if (rc == WAIT_OBJECT_0 + 1) {
+            iDebug("[Service] stop signal received\n");
             break;
         }
-        if (FD_ISSET(d->fd, &fds)) {
-            struct sockaddr_storage addr;
-            int size = sizeof(addr);
-            int incoming = accept(d->fd, (struct sockaddr *) &addr, &size);
-            if (incoming < 0) {
-                iWarning("[Service] error on accept: %s\n",
-                         errorMessage_Windows_(WSAGetLastError()));
+        else if (rc == WAIT_OBJECT_0) {
+            WSANETWORKEVENTS ev;
+            iZap(ev);
+            WSAEnumNetworkEvents(d->fd, d->fdEvent, &ev);
+            if (ev.lNetworkEvents & FD_ACCEPT) {
+                struct sockaddr_storage addr;
+                int size = sizeof(addr);
+                int incoming = accept(d->fd, (struct sockaddr *) &addr, &size);
+                if (incoming < 0) {
+                    iWarning("[Service] error on accept: %s\n",
+                             errorMessage_Windows_(WSAGetLastError()));
+                    break;
+                }
+                iSocket *socket = newExisting_Socket(incoming, &addr, size);
+                iNotifyAudienceArgs(d, incomingAccepted, ServiceIncomingAccepted, socket);
+                iRelease(socket);
+            }
+            else if (ev.lNetworkEvents & FD_CLOSE) {
+                iDebug("[Service] socket closed\n");
                 break;
             }
-            iSocket *socket = newExisting_Socket(incoming, &addr, size);
-            iNotifyAudienceArgs(d, incomingAccepted, ServiceIncomingAccepted, socket);
-            iRelease(socket);
         }
     }
     iReleasePtr(&d->listening);
+    iDebug("[Service] listen thread exited\n");
     return 0;
 }
 
 void init_Service(iService *d, uint16_t port) {
     d->port = port;
-    d->fd = NULL;
+    d->fd = INVALID_SOCKET;
     d->listening = NULL;
-    init_Pipe(&d->stop);
+    //init_Pipe(&d->stop);
+    d->fdEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
+    d->stopEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
     d->incomingAccepted = new_Audience();
 }
 
 void deinit_Service(iService *d) {
     close_Service(d);
-    deinit_Pipe(&d->stop);
+    //deinit_Pipe(&d->stop);
+    CloseHandle(d->stopEvent);
+    CloseHandle(d->fdEvent);
     iAssert(d->listening == NULL);
-    iAssert(!d->fd);
+    iAssert(d->fd == INVALID_SOCKET);
     delete_Audience(d->incomingAccepted);
 }
 
 iBool isOpen_Service(const iService *d) {
-    return d->fd >= 0;
+    return d->fd != INVALID_SOCKET;
 }
 
 iBool open_Service(iService *d) {
@@ -118,25 +132,26 @@ iBool open_Service(iService *d) {
             return iFalse;
         }
         d->fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
-        if (d->fd < 0) {
+        if (d->fd == INVALID_SOCKET) {
             iWarning("[Service] failed to open socket: %s\n", errorMessage_Windows_(WSAGetLastError()));
             return iFalse;
         }
         rc = bind(d->fd, info->ai_addr, info->ai_addrlen);
         if (rc < 0) {
             closesocket(d->fd);
-            d->fd = NULL;
+            d->fd = INVALID_SOCKET;
             iWarning("[Service] failed to bind address: %s\n", errorMessage_Windows_(WSAGetLastError()));
             return iFalse;
         }
         rc = listen(d->fd, 10);
         if (rc < 0) {
             closesocket(d->fd);
-            d->fd = NULL;
+            d->fd = INVALID_SOCKET;
             iWarning("[Service] failed to listen: %s\n", errorMessage_Windows_(WSAGetLastError()));
             return iFalse;
         }
     }
+    WSAEventSelect(d->fd, d->fdEvent, FD_ACCEPT | FD_CLOSE);
     d->listening = new_Thread(listen_Service_);
     setUserData_Thread(d->listening, d);
     start_Thread(d->listening);
@@ -146,12 +161,17 @@ iBool open_Service(iService *d) {
 void close_Service(iService *d) {
     if (d->listening) {
         /* Signal the listening thread to stop. */
-        writeByte_Pipe(&d->stop, 1);
+        //writeByte_Pipe(&d->stop, 1);
+        SetEvent(d->stopEvent);
         closesocket(d->fd);
-        d->fd = NULL;
+        d->fd = INVALID_SOCKET;
         join_Thread(d->listening);
         iAssert(d->listening == NULL);
     }
+    if (d->fd != INVALID_SOCKET) {
+        closesocket(d->fd);
+        d->fd = INVALID_SOCKET;
+    }
 }
 
 iDefineClass(Service)
diff --git a/src/platform/win32/socket.c b/src/platform/win32/socket.c
index 86ca92e..2ebf680 100644
--- a/src/platform/win32/socket.c
+++ b/src/platform/win32/socket.c
@@ -30,7 +30,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #include "the_Foundation/mutex.h"
 #include "the_Foundation/thread.h"
 #include "the_Foundation/atomic.h"
-#include "pipe.h"
 #include "wide.h"
 
 #define WIN32_LEAN_AND_MEAN
@@ -56,7 +55,8 @@ struct Impl_Socket {
     enum iSocketStatus status;
     iAddress *address;
     SOCKET fd;
-    iPipe *stopConnect;
+    HANDLE fdEvent;
+    HANDLE stopConnectEvent;
     iThread *connecting;
     iSocketThread *thread;
     iCondition allSent;
@@ -92,7 +92,7 @@ iDeclareClass(SocketThread)
 struct Impl_SocketThread {
     iThread thread;
     iSocket *socket;
-    iPipe wakeup;
+    HANDLE wakeupEvent;
     iAtomicInt mode; /* enum iSocketThreadMode */
 };
 
@@ -107,33 +107,26 @@ static iThreadResult run_SocketThread_(iThread *thread) {
     iBlock *inbuf = collect_Block(new_Block(0x20000));
     iGuardMutex(smx, {
         /* Connection has been formed. */
-        delete_Pipe(d->socket->stopConnect);
-        d->socket->stopConnect = NULL;
+        CloseHandle(d->socket->stopConnectEvent);
+        d->socket->stopConnectEvent = INVALID_HANDLE_VALUE;
     });
     while (value_Atomic(&d->mode) == run_SocketThreadMode) {
         if (bytesToSend_Socket(d->socket) > 0) {
             /* Make sure we won't block on select() when there's still data to send. */
-            writeByte_Pipe(&d->wakeup, 0);
+            SetEvent(d->wakeupEvent);
         }
         /* Wait for activity. */
-        fd_set reads, errors; {
-            FD_ZERO(&reads);
-            FD_ZERO(&errors);
-            FD_SET(output_Pipe(&d->wakeup), &reads);
-            FD_SET(d->socket->fd, &reads);
-            FD_SET(d->socket->fd, &errors);
-            int ready = select(0, &reads, NULL, &errors, NULL);
-            if (ready == -1) {
-                const DWORD err = WSAGetLastError();
-                iWarning("[Socket] error from select(): %s\n", errorMessage_Windows_(err));
-                return errno_Windows_(err);
-            }
-        }
-        if (FD_ISSET(output_Pipe(&d->wakeup), &reads)) {
-            readByte_Pipe(&d->wakeup);
+        HANDLE events[2] = { d->socket->fdEvent, d->wakeupEvent };
+        DWORD waitResult = WaitForMultipleObjects(2, events, FALSE, INFINITE);
+        if (waitResult == WAIT_FAILED) {
+            const DWORD err = GetLastError();
+            iWarning("[Socket] %s\n", errorMessage_Windows_(err));
+            return errno_Windows_(err);
         }
         /* Check for incoming data. */
-        if (FD_ISSET(d->socket->fd, &reads)) {
+        WSANETWORKEVENTS netEvents;
+        WSAEnumNetworkEvents(d->socket->fd, d->socket->fdEvent, &netEvents);
+        if (netEvents.lNetworkEvents & FD_READ) {
             ssize_t readSize = recv(d->socket->fd, data_Block(inbuf), size_Block(inbuf), 0);
             if (readSize == 0) {
                 iWarning("[Socket] peer closed the connection while we were receiving\n");
@@ -156,12 +149,11 @@ static iThreadResult run_SocketThread_(iThread *thread) {
             iNotifyAudience(d->socket, readyRead, SocketReadyRead);
         }
         /* Problem with the socket? */
-        if (FD_ISSET(d->socket->fd, &errors)) {
+        if (netEvents.lNetworkEvents & FD_CLOSE) {
             if (status_Socket(d->socket) == connected_SocketStatus) {
-                const DWORD err = WSAGetLastError();
-                iWarning("[Socket] error while receiving: %s\n", errorMessage_Windows_(err));
+                iDebug("[Socket] socket was closed\n");
                 shutdown_Socket_(d->socket);
-                return errno_Windows_(err);
+                return ENOTCONN;
             }
             return 0;
         }
@@ -220,18 +212,18 @@ static void init_SocketThread(iSocketThread *d, iSocket *socket,
         setName_Thread(&d->thread, cstr_String(&name));
         deinit_String(&name);
     }
-    init_Pipe(&d->wakeup);
+    d->wakeupEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
     d->socket = socket;
     set_Atomic(&d->mode, mode);
 }
 
 static void deinit_SocketThread(iSocketThread *d) {
-    deinit_Pipe(&d->wakeup);
+    CloseHandle(d->wakeupEvent);
 }
 
 static void exit_SocketThread_(iSocketThread *d) {
     set_Atomic(&d->mode, stop_SocketThreadMode);
-    writeByte_Pipe(&d->wakeup, 1); // select() will exit
+    SetEvent(d->wakeupEvent);
     join_Thread(&d->thread);
 }
 
@@ -264,9 +256,10 @@ static void init_Socket_(iSocket *d) {
     d->input = new_Buffer();
     openEmpty_Buffer(d->output);
     openEmpty_Buffer(d->input);
-    d->fd = NULL;
+    d->fd = INVALID_SOCKET;
+    d->fdEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
     d->address = NULL;
-    d->stopConnect = new_Pipe(); /* used for aborting select() on user action */
+    d->stopConnectEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
     d->connecting = NULL;
     d->thread = NULL;
     init_Condition(&d->allSent);
@@ -289,7 +282,10 @@ void deinit_Socket(iSocket *d) {
     waitForFinished_Address(d->address);
     iReleasePtr(&d->address);
     deinit_Mutex(&d->mutex);
-    delete_Pipe(d->stopConnect);
+    if (d->stopConnectEvent != INVALID_HANDLE_VALUE) {
+        CloseHandle(d->stopConnectEvent);
+    }
+    CloseHandle(d->fdEvent);
     deinit_Condition(&d->allSent);
     delete_Audience(d->connected);
     delete_Audience(d->disconnected);
@@ -322,15 +318,15 @@ static iBool setNonBlocking_Socket_(iSocket *d, iBool set) {
 static void shutdown_Socket_(iSocket *d) {
     iGuardMutex(&d->mutex, {
         setStatus_Socket_(d, disconnecting_SocketStatus);
-        if (d->fd) {
+        if (d->fd != INVALID_SOCKET) {
             shutdown(d->fd, SD_RECEIVE);
         }
     });
     iBool notify = iFalse;
     iGuardMutex(&d->mutex, {
-        if (d->fd) {
+        if (d->fd != INVALID_SOCKET) {
             closesocket(d->fd);
-            d->fd = NULL;
+            d->fd = INVALID_SOCKET;
         }
         notify = setStatus_Socket_(d, disconnected_SocketStatus);
     });
@@ -369,12 +365,13 @@ static iThreadResult connectAsync_Socket_(iThread *thd) {
                    cstrCollect_String(toString_SockAddr(addr)),
                    addrSize, indexInFamily);
             const iSocketParameters sp = socketParametersIndex_Address(d->address, addrIndex);
-            if (d->fd) {
+            if (d->fd != INVALID_SOCKET) {
                 closesocket(d->fd);
-                d->fd = NULL;
+                d->fd = INVALID_SOCKET;
             }
             iDebug("[Socket] family:%d type:%d protocol:%d\n", sp.family, sp.type, sp.protocol);
             d->fd = socket(sp.family, sp.type, sp.protocol);
+            WSAEventSelect(d->fd, d->fdEvent, FD_CONNECT | FD_READ | FD_CLOSE);
             if (!setNonBlocking_Socket_(d, iTrue)) {
                 /* Wait indefinitely. */
                 rc = connect(d->fd, addr, addrSize);
@@ -382,46 +379,36 @@ static iThreadResult connectAsync_Socket_(iThread *thd) {
             else {
                 /* Give up after a timeout. */
                 rc = connect(d->fd, addr, addrSize);
-                if (rc && WSAGetLastError() != WSAEINPROGRESS) {
+                if (rc && WSAGetLastError() != WSAEWOULDBLOCK) {
                     iDebug("[Socket] result from connect: rc=%d (%s)\n",
                            rc,
                            errorMessage_Windows_(WSAGetLastError()));
                     continue;
                 }
-                iAssert(d->stopConnect != NULL);
-                const HANDLE stopFd = output_Pipe(d->stopConnect);
-                fd_set stopSet;
-                fd_set connSet;
-                fd_set errSet;
-                FD_ZERO(&stopSet);
-                FD_ZERO(&connSet);
-                FD_ZERO(&errSet);
-                FD_SET(stopFd, &stopSet);
-                FD_SET(d->fd, &connSet);
-                FD_SET(d->fd, &errSet);
-                struct timeval timeout = { .tv_sec = connectionTimeoutSeconds_Socket_ };
-                rc = select(0, &stopSet, &connSet, &errSet, &timeout);
-                if (rc > 0) {
-                    if (FD_ISSET(stopFd, &stopSet)) {
-                        setError_Socket_(d, ECONNABORTED, "Connection aborted");
-                        return ECONNABORTED;
-                    }
-                    socklen_t argLen = sizeof(int);
-                    int sockError = 0;
-                    getsockopt(d->fd, SOL_SOCKET, SO_ERROR, (char *) &sockError, &argLen);
-                    if (sockError) {
-                        errno = sockError;
-                        iDebug("[Socket] socket error: errno=%d (%s)\n",
-                               errno,
-                               strerror(errno));
-                        continue;
+                HANDLE events[2] = { d->stopConnectEvent, d->fdEvent };
+                DWORD waitResult = WaitForMultipleObjects(
+                    2, events, FALSE, connectionTimeoutSeconds_Socket_ * 1000);
+                if (waitResult == WAIT_OBJECT_0 /* stop connect */) {
+                    setError_Socket_(d, ECONNABORTED, "Connection aborted");
+                    return ECONNABORTED;
+                }
+                else if (waitResult == WAIT_OBJECT_0 + 1) {
+                    WSANETWORKEVENTS netEvents;
+                    WSAEnumNetworkEvents(d->fd, d->fdEvent, &netEvents);
+                    if (netEvents.lNetworkEvents & FD_CONNECT) {
+                        const int err = netEvents.iErrorCode[FD_CONNECT_BIT];
+                        if (err) {
+                            errno = WSAECONNREFUSED;
+                            iDebug("[Socket] socket error: %s\n", errorMessage_Windows_(err));
+                            continue;
+                        }
+                        rc = 0; /* Success. */
+                        setNonBlocking_Socket_(d, iFalse);
                     }
-                    rc = 0; /* Success. */
-                    setNonBlocking_Socket_(d, iFalse);
                 }
                 else {
                     rc = -1;
-                    errno = ETIMEDOUT;
+                    errno = WSAETIMEDOUT;
                 }
             }
             lock_Mutex(&d->mutex);
@@ -441,10 +428,10 @@ static iThreadResult connectAsync_Socket_(iThread *thd) {
     }
     if (rc) {
         int errNum;
-        char *msg;
+        const char *msg;
         if (isHostFound_Address(d->address)) {
             errNum = errno;
-            msg = strerror(errNum);
+            msg = errorMessage_Windows_(errNum);
         }
         else {
             errNum = -1;
@@ -468,7 +455,7 @@ static iBool open_Socket_(iSocket *d) {
         return iFalse;
     }
     else if (!d->connecting) {
-        iAssert(!d->fd);
+        iAssert(d->fd == INVALID_SOCKET);
         setStatus_Socket_(d, connecting_SocketStatus);
         d->connecting = new_Thread(connectAsync_Socket_);
         setUserData_Thread(d->connecting, d);
@@ -504,6 +491,7 @@ iSocket *newExisting_Socket(int fd, const void *sockAddr, size_t sockAddrSize) {
     iSocket *d = iNew(Socket);
     init_Socket_(d);
     d->fd = fd;
+    WSAEventSelect(d->fd, d->fdEvent, FD_READ | FD_CLOSE);
     d->address = newSockAddr_Address(sockAddr, sockAddrSize, tcp_SocketType);
     setStatus_Socket_(d, connected_SocketStatus);
     startThread_Socket_(d);
@@ -551,8 +539,8 @@ void close_Socket(iSocket *d) {
             return;
         }
         if (d->status == connecting_SocketStatus) {
-            if (d->stopConnect) {
-                write_Pipe(d->stopConnect, "0", 1);
+            if (d->stopConnectEvent != INVALID_HANDLE_VALUE) {
+                SetEvent(d->stopConnectEvent);
             }
             shutdown(d->fd, SD_SEND);
         }
@@ -612,7 +600,7 @@ static size_t write_Socket_(iSocket *d, const void *data, size_t size) {
     iGuardMutex(&d->mutex, {
         writeData_Stream(stream_Buffer(d->output), data, size);
         if (d->thread) {
-            writeByte_Pipe(&d->thread->wakeup, 0); // wake up the I/O thread
+            SetEvent(d->thread->wakeupEvent); /* wake up the I/O thread */
         }
     });
     return size;
diff --git a/tests/t_network.c b/tests/t_network.c
index 5b7c429..c4239e4 100644
--- a/tests/t_network.c
+++ b/tests/t_network.c
@@ -122,12 +122,12 @@ static void communicate_(iAny *d, iService *sv, iSocket *sock) {
     start_Thread(receiver);
 }
 
-static bool connectTo_(const char *address) {
+static iBool connectTo_(const char *address) {
     iSocket *sock = iClob(new_Socket(address, 14666));
     observeSocket_(sock);
     if (!open_Socket(sock)) {
         puts("Failed to connect");
-        return false;
+        return iFalse;
     }
     puts("Type to send a message (empty to quit):");
     for (;;) {
@@ -139,7 +139,7 @@ static bool connectTo_(const char *address) {
         writeData_Socket(sock, buf, strlen(buf));
     }
     puts("Good day!");
-    return true;
+    return iTrue;
 }
 
 #if defined (iHaveTlsRequest)
@@ -197,6 +197,7 @@ int main(int argc, char *argv[]) {
             else {
                 printf("Failure! CURL says: %s\n", cstr_String(errorMessage_WebRequest(web)));
             }
+            deinit_Foundation();
             return 0;
         }
 #endif
@@ -236,6 +237,7 @@ int main(int argc, char *argv[]) {
                        hexEncode_Block(collect_Block(fingerprint_TlsCertificate(cert)))));
             printf("Recreated private key:\n%s", cstrCollect_String(privateKeyPem_TlsCertificate(cert)));
             delete_TlsCertificate(cert);
+            deinit_Foundation();
             return 0;
         }
         iCommandLineArg *tlsArgs = iClob(checkArgumentValues_CommandLine(cmdline, "t;tls", 2));
@@ -252,6 +254,7 @@ int main(int argc, char *argv[]) {
             submit_TlsRequest(tls);
             waitForFinished_TlsRequest(tls);
             printf("We are done.\n");
+            deinit_Foundation();
             return 0;
         }
     }
@@ -261,6 +264,7 @@ int main(int argc, char *argv[]) {
         iConnect(Service, sv, incomingAccepted, sv, communicate_);
         if (!open_Service(sv)) {
             puts("Failed to start service");
+            deinit_Foundation();
             return 1;
         }
         puts("Press Enter to quit..."); {
@@ -269,6 +273,7 @@ int main(int argc, char *argv[]) {
                 iWarning("fgets failed\n");
             }
         }
+        close_Service(sv);
     }
     else if (contains_CommandLine(cmdline, "c;client")) {
         connectTo_("localhost");
Proxy Information
Original URL
gemini://git.skyjake.fi/the_Foundation/main/cdiff/84928fcaab4aedc3cfc1fd8e79b9ef422ae362d4
Status Code
Success (20)
Meta
text/gemini; charset=utf-8
Capsule Response Time
27.965049 milliseconds
Gemini-to-HTML Time
0.810949 milliseconds

This content has been proxied by September (ba2dc).