Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/lib/libcore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,11 @@ function wrapSyscallFunction(x, library, isWasi) {
post = handler + post;

if (pre || post) {
t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`);
if (library[x + '__async']) {
t = modifyJSFunction(t, (args, body) => `async function (${args}) {\n${pre}${body}${post}}\n`);
} else {
t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`);
}
}

library[x] = eval('(' + t + ')');
Expand Down
3 changes: 3 additions & 0 deletions src/lib/libeventloop.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ LibraryJSEventLoop = {
emscripten_set_main_loop__deps: ['$setMainLoop'],
emscripten_set_main_loop: (func, fps, simulateInfiniteLoop) => {
var iterFunc = {{{ makeDynCall('v', 'func') }}};
#if JSPI
iterFunc = WebAssembly.promising(iterFunc);
#endif
setMainLoop(iterFunc, fps, simulateInfiniteLoop);
},

Expand Down
278 changes: 274 additions & 4 deletions src/lib/libsockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ addToLibrary({
$SOCKFS__postset: () => {
addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);');
},
$SOCKFS__deps: ['$FS'],
$SOCKFS__deps: ['$FS', '$DNS'],
$SOCKFS: {
#if expectToReceiveOnModule('websocket')
websocketArgs: {},
Expand Down Expand Up @@ -69,6 +69,8 @@ addToLibrary({
pending: [],
recv_queue: [],
#if SOCKET_WEBRTC
#elif SOCKET_WEBTRANSPORT
sock_ops: SOCKFS.webtransport_sock_ops
#else
sock_ops: SOCKFS.websocket_sock_ops
#endif
Expand Down Expand Up @@ -104,9 +106,9 @@ addToLibrary({
},
// node and stream ops are backend agnostic
stream_ops: {
poll(stream) {
poll(stream, timeout) {
var sock = stream.node.sock;
return sock.sock_ops.poll(sock);
return sock.sock_ops.poll(sock, timeout);
},
ioctl(stream, request, varargs) {
var sock = stream.node.sock;
Expand Down Expand Up @@ -138,6 +140,273 @@ addToLibrary({
return `socket[${SOCKFS.nextname.current++}]`;
},
// backend-specific stream ops
#if SOCKET_WEBRTC
#elif SOCKET_WEBTRANSPORT
webtransport_sock_ops: {
getSession(sock, addr, port) {
return sock.peers[`${addr}:${port}`];
},
initSession(sock, session, addr, port) {
sock.peers[`${addr}:${port}`] = session;

/* buffer writes before session is ready */
const outgoing = [];

session.write = (buffer) => {
outgoing.push(buffer);
};

/* prevent unhandled rejections before main loop */
session.ready.catch(() => {});
session.closed.catch(() => {});

(async () => {
try {
await session.ready;

const writer = session.datagrams.writable.getWriter();
let first = true;

while (outgoing.length) {
writer.write(outgoing.shift()).catch(e => {});
}

session.write = (buffer) => {
writer.write(buffer).catch(e => {});
};

for await (const packet of session.datagrams.readable) {
// handle the internal port identification message
if (first && packet[0] === 0xff && packet[1] === 0xff && packet[2] === 0xff && packet[3] === 0xff &&
packet[4] === 'p' && packet[5] === 'o' && packet[6] === 'r' && packet[7] === 't') {
// update cache key
delete sock.peers[`${addr}:${port}`];
port = parseInt(String.fromCharCode.apply(null, packet.subarray(9)), 10);
sock.peers[`${addr}:${port}`] = session;
} else {
sock.recv_queue.push({ addr: addr, port: port, buffer: packet });
}

if (sock.pendingPollResolve) {
sock.pendingPollResolve();
}

first = false;
}
} catch (e) {
console.error(`Session ${addr}:${port} terminated`, e);
} finally {
console.log(`Removing peer ${addr}:${port}`);
delete sock.peers[`${addr}:${port}`];
}
})();
},
newSession(sock, addr, port) {
let hostname = DNS.lookup_addr(addr);

if (!hostname) {
hostname = addr;
}

const session = new WebTransport(`https://${hostname}:${port}`);

console.log(`New session https://${hostname}:${port}`);

SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);

// send the original bound port number to the peer
if (sock.type === {{{ cDefs.SOCK_DGRAM }}} && typeof sock.sport != 'undefined') {
const msg = Uint8Array.from(`\xff\xff\xff\xffport ${sock.sport}\x00`, x => x.charCodeAt(0));
session.write(msg);
}

return session;
},
acceptSession(sock, session) {
#if ENVIRONMENT_MAY_BE_NODE
const split = session.peerAddress.split(':');

const addr = split[0];
const port = parseInt(split[1], 10);

console.log(`Accept session ${addr}:${port}`);

SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);
#endif
},
stopListenServer(sock) {
#if ENVIRONMENT_MAY_BE_NODE
if (!ENVIRONMENT_IS_NODE) {
return;
}

if (!sock.h3) {
return;
}

sock.h3.stopServer();
sock.h3 = null;
#endif
},
startListenServer(sock) {
#if ENVIRONMENT_MAY_BE_NODE
if (!ENVIRONMENT_IS_NODE) {
return;
}

SOCKFS.webtransport_sock_ops.stopListenServer(sock);

sock.h3 = new Http3Server({
host: sock.saddr,
port: sock.sport,
secret: require('crypto').randomBytes(16).toString('hex'),
cert: Module['cert'],
privKey: Module['key']
});

sock.h3.startServer();

(async () => {
try {
await sock.h3.ready;

const stream = await sock.h3.sessionStream('/');

console.log(`Listening on ${sock.h3.host}:${sock.h3.port}`);

for await (const session of stream) {
SOCKFS.webtransport_sock_ops.acceptSession(sock, session);
}
} catch (e) {
sock.error = {{{ cDefs.EHOSTUNREACH }}};
} finally {
sock.h3 = null;
}
})();
#endif
},

// actual sock ops
#if ASYNCIFY
async poll(sock, timeout)
#else
poll(sock, timeout)
#endif
{
let mask = 0;

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
#if ASYNCIFY
if (!sock.recv_queue.length) {
await new Promise((resolve, reject) => {
sock.pendingPromiseResolve = resolve;
setTimeout(resolve, timeout);
}).finally(() => {
sock.pendingPromiseResolve = null;
});
}
#endif

if (sock.recv_queue.length) {
mask |= {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}};
}

/* always ready to write */
mask |= {{{ cDefs.POLLOUT }}};
}

return mask;
},
ioctl(sock, request, arg) {
switch (request) {
default:
return {{{ cDefs.EINVAL }}};
}
},
close(sock) {
for (const session of Object.values(sock.peers)) {
session.close();
}

SOCKFS.webtransport_sock_ops.stopListenServer(sock);

return 0;
},
bind(sock, addr, port) {
if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
throw new FS.ErrnoError({{{ cDefs.EINVAL }}}); // already bound
}

sock.saddr = addr;
sock.sport = port;

if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) {
SOCKFS.webtransport_sock_ops.startListenServer(sock);
} else {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}
},
connect(sock, addr, port) {
if (sock.h3) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
sock.daddr = addr;
sock.dport = port;
}
},
listen(sock, backlog) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
},
sendmsg(sock, buffer, offset, length, addr, port) {
let session = null;

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
if (addr === undefined || port === undefined) {
addr = sock.daddr;
port = sock.dport;
}

session = SOCKFS.webtransport_sock_ops.getSession(sock, addr, port);

if (!session) {
session = SOCKFS.webtransport_sock_ops.newSession(sock, addr, port);
}
}

if (!session) {
throw new FS.ErrnoError({{{ cDefs.EDESTADDRREQ }}});
}

// copy off the buffer because write is async
buffer = buffer.slice(offset, offset + length);

session.write(buffer);

return length;
},
recvmsg(sock, length) {
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}

const msg = sock.recv_queue.shift();

if (!msg) {
throw new FS.ErrnoError({{{ cDefs.EAGAIN }}});
}

return msg;
},
},
#else
websocket_sock_ops: {
//
// peers are a small wrapper around a WebSocket to help in
Expand Down Expand Up @@ -375,7 +644,7 @@ addToLibrary({
//
// actual sock ops
//
poll(sock) {
poll(sock, timeout) {
if (sock.type === {{{ cDefs.SOCK_STREAM }}} && sock.server) {
// listen sockets should only say they're available for reading
// if there are pending clients.
Expand Down Expand Up @@ -728,6 +997,7 @@ addToLibrary({
return res;
}
}
#endif
},

/*
Expand Down
19 changes: 17 additions & 2 deletions src/lib/libsyscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,11 @@ var SyscallsLibrary = {
var sock = getSocketFromFD(fd);
if (!addr) {
// send, no address provided
return FS.write(sock.stream, HEAP8, message, length);
return FS.write(sock.stream, HEAPU8, message, length);
}
var dest = getSocketAddress(addr, addr_len);
// sendto an address
return sock.sock_ops.sendmsg(sock, HEAP8, message, length, dest.addr, dest.port);
return sock.sock_ops.sendmsg(sock, HEAPU8, message, length, dest.addr, dest.port);
},
__syscall_getsockopt__deps: ['$getSocketFromFD'],
__syscall_getsockopt: (fd, level, optname, optval, optlen, d1) => {
Expand Down Expand Up @@ -606,7 +606,12 @@ var SyscallsLibrary = {
'_emscripten_proxy_newselect',
#endif
],
#if ASYNCIFY
__syscall__newselect__async: true,
__syscall__newselect: async (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#else
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#endif
#if PTHREADS
if (ENVIRONMENT_IS_PTHREAD) {
return __emscripten_proxy_newselect(nfds,
Expand All @@ -631,7 +636,12 @@ var SyscallsLibrary = {
'_emscripten_proxy_newselect_finish',
#endif
],
#if ASYNCIFY
_newselect_js__async: true,
_newselect_js: async (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#else
_newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#endif
// readfds are supported,
// writefds checks socket open status
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
Expand Down Expand Up @@ -701,6 +711,11 @@ var SyscallsLibrary = {
#endif
return stream.stream_ops.poll(stream, timeoutInMillis);
})();

#if ASYNCIFY
/* poll is possibly a promise */
flags = await flags;
#endif
} else {
#if ASSERTIONS
if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis)
Expand Down
Loading