diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index a288a05..7fbf9f2 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -2,30 +2,30 @@ name: CI env: DEBUG: napi:* APP_NAME: node-unix-socket - MACOSX_DEPLOYMENT_TARGET: '10.13' -'on': + MACOSX_DEPLOYMENT_TARGET: "10.13" +"on": push: branches: - master tags-ignore: - - '**' + - "**" paths-ignore: - - '**/*.md' + - "**/*.md" - LICENSE - - '**/*.gitignore' + - "**/*.gitignore" - .editorconfig - docs/** pull_request: null workflow_dispatch: inputs: publish: - description: 'Publish the npm package.' + description: "Publish the npm package." required: true type: boolean default: false jobs: build: - if: '!contains(github.event.head_commit.message, ''skip ci'')' + if: "!contains(github.event.head_commit.message, 'skip ci')" strategy: fail-fast: false matrix: @@ -54,7 +54,7 @@ jobs: yarn build --target x86_64-unknown-linux-gnu && strip *.node - host: ubuntu-latest - target: 'x86_64-unknown-linux-musl' + target: "x86_64-unknown-linux-musl" docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-alpine build: yarn build - host: ubuntu-latest @@ -140,14 +140,14 @@ jobs: if: ${{ matrix.settings.docker }} with: image: ${{ matrix.settings.docker }} - options: '-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build' + options: "-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build" run: ${{ matrix.settings.build }} - name: Build run: ${{ matrix.settings.build }} if: ${{ !matrix.settings.docker }} shell: bash - name: Upload artifact - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: bindings-${{ matrix.settings.target }} path: ${{ env.APP_NAME }}.*.node @@ -160,10 +160,9 @@ jobs: fail-fast: false matrix: node: - - '12' - - '14' - - '16' - - '18' + - "18" + - "20" + - "22" runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -201,10 +200,9 @@ jobs: - host: macos-latest target: x86_64-apple-darwin node: - - '12' - - '14' - - '16' - - '18' + - "18" + - "20" + - "22" runs-on: ${{ matrix.settings.host }} steps: - uses: actions/checkout@v3 diff --git a/.npmignore b/.npmignore index c851769..a3f486d 100644 --- a/.npmignore +++ b/.npmignore @@ -10,7 +10,7 @@ yarn.lock *.node check.py -resouce +resource scripts .devcontainer __test__ diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1d29a..85edde6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # CHANGELOG +## 0.2.6 +fix: clean up handles in worker_threads environments to prevent aborting + ## 0.2.5 fix: close the socket if connecting failed diff --git a/__test__/create_socket.js b/__test__/create_socket.js new file mode 100644 index 0000000..4d8d973 --- /dev/null +++ b/__test__/create_socket.js @@ -0,0 +1,12 @@ +const { DgramSocket } = require('../js') +const fs = require('fs') +const path = require('path') + +const serverPath = path.resolve(__dirname, './.tmp/worker_server.sock') +try { + fs.unlinkSync(serverPath) +} catch (err) { + // +} +const socket = new DgramSocket(serverPath); +socket.bind(serverPath) diff --git a/__test__/dgram.spec.ts b/__test__/dgram.spec.ts index b856b05..5e1083f 100644 --- a/__test__/dgram.spec.ts +++ b/__test__/dgram.spec.ts @@ -1,14 +1,14 @@ -// TODO add tests for worker_threads import * as path from 'path'; import * as fs from 'fs'; import * as os from 'os'; +import * as workerThreads from 'worker_threads' import { DgramSocket } from '../js/index'; -import { kTmp, silently, createDefer, kServerPath } from './util'; +import { kTmp, silently, createDefer, kServerPath, wait } from './util'; const kClientPath = path.resolve(kTmp, './client.sock'); const kInvalidPath = path.resolve(kTmp, './A_PATH_THAT_DOESNT_EXIST'); -const emptyFn = () => {}; +const emptyFn = () => { }; describe('DgramSocket', () => { beforeAll(() => { @@ -308,4 +308,10 @@ describe('DgramSocket', () => { client.close() await p }); + + it('should not abort in worker_threads', async () => { + const worker = new workerThreads.Worker(path.resolve(__dirname, './create_socket.js')) + await wait(1000) + worker.terminate() + }); }); diff --git a/__test__/socket.spec.ts b/__test__/socket.spec.ts index 7d6f485..fabf96a 100644 --- a/__test__/socket.spec.ts +++ b/__test__/socket.spec.ts @@ -1,5 +1,5 @@ import * as net from 'net' -import { createReuseportFd as createFd, closeFd } from '../js/index' +import { createReuseportFd as createFd, closeFd } from '../js/index' import { hasIPv6 } from './util' describe('tcp', () => { @@ -8,11 +8,13 @@ describe('tcp', () => { const host = '0.0.0.0' let port = 0; - async function createServer() { + async function createServer(): Promise { const fd = createFd(port, host); const server = await new Promise((resolve, reject) => { - const server = net.createServer() + const server = net.createServer({ + noDelay: true, + }) server.listen({ fd, @@ -26,14 +28,14 @@ describe('tcp', () => { return server } - const servers = []; + const servers: net.Server[] = []; for (let i = 0; i < 5; i += 1) { const server = await createServer() servers.push(server); } const pList = servers.map(server => { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { server.once('connection', (socket) => { socket.on('data', buf => { resolve(buf) @@ -55,7 +57,7 @@ describe('tcp', () => { servers.forEach(server => server.close()); }) - if (hasIPv6()) { + if (hasIPv6()) { it('should work with ipv6', async () => { const host = '::1' let port = 0; @@ -63,7 +65,9 @@ describe('tcp', () => { const fd = createFd(port, host); const server = await new Promise((resolve, reject) => { - const server = net.createServer() + const server = net.createServer({ + noDelay: true, + }) server.listen({ fd, @@ -72,7 +76,7 @@ describe('tcp', () => { }) }) port = (server.address() as any).port - const p = new Promise((resolve, reject) => { + const p = new Promise((resolve, reject) => { server.once('connection', (socket) => { socket.on('data', buf => { resolve(buf) diff --git a/js/addon.d.ts b/js/addon.d.ts index c33bc8d..6dcdde1 100644 --- a/js/addon.d.ts +++ b/js/addon.d.ts @@ -5,6 +5,7 @@ export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number export function socketClose(fd: number): void +export function initCleanupHook(): void export class SeqpacketSocketWrap { constructor(ee: object, fd?: number | undefined | null) init(thisObj: object): void diff --git a/js/index.ts b/js/index.ts index 7a3718a..22c5420 100644 --- a/js/index.ts +++ b/js/index.ts @@ -1,3 +1,11 @@ +import * as workerThreads from 'worker_threads' +import { initCleanupHook } from './addon' + export { SendCb, DgramSocket } from './dgram' export { NotifyCb, SeqpacketSocket, SeqpacketServer } from './seqpacket' export { createReuseportFd, closeFd } from './socket' + +// Node.js will abort when threads are termiated if we don't clean up uv handles. +if (!workerThreads.isMainThread) { + initCleanupHook() +} diff --git a/js/seqpacket.ts b/js/seqpacket.ts index d421b5b..aac2331 100644 --- a/js/seqpacket.ts +++ b/js/seqpacket.ts @@ -291,6 +291,13 @@ export class SeqpacketSocket extends EventEmitter { this.wrap.close(); } + /** + * Alias of "destory". + */ + close() { + return this.destroy(); + } + /** * For test only * @ignore diff --git a/package.json b/package.json index b159ee5..fcbe05a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-unix-socket", - "version": "0.2.5", + "version": "0.2.6", "main": "js/index.js", "types": "js/index.d.ts", "author": { @@ -39,7 +39,7 @@ "devDependencies": { "@napi-rs/cli": "^2.10.3", "@types/jest": "^27.5.0", - "@types/node": "^17.0.31", + "@types/node": "^22.7.6", "jest": "^27.5.1", "ts-jest": "^27.1.4", "typedoc": "^0.22.15", diff --git a/src/dgram.rs b/src/dgram.rs index 025fbb0..4a2eb6e 100644 --- a/src/dgram.rs +++ b/src/dgram.rs @@ -13,6 +13,7 @@ use crate::util::{ addr_to_string, buf_into_vec, check_emit, error, get_err, i8_slice_into_u8_slice, resolve_libc_err, resolve_uv_err, set_clo_exec, set_non_block, socket_addr_to_string, }; +use crate::uv_handle::{insert_handle, remove_handle}; #[allow(dead_code)] fn string_from_i8_slice(slice: &[i8]) -> Result { @@ -65,6 +66,7 @@ impl DgramSocketWrap { handle.data = std::ptr::null_mut() as *mut _; handle })); + insert_handle(unsafe { mem::transmute(handle) })?; Ok(Self { fd, @@ -297,6 +299,7 @@ impl DgramSocketWrap { unsafe { let handle = mem::transmute(self.handle); sys::uv_close(handle, Some(on_close)); + remove_handle(handle)?; }; // release Ref in msg_queue diff --git a/src/lib.rs b/src/lib.rs index 20cd932..c25da57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,4 @@ mod seqpacket; mod dgram; mod util; mod socket; +mod uv_handle; diff --git a/src/seqpacket.rs b/src/seqpacket.rs index d8af569..16332ab 100644 --- a/src/seqpacket.rs +++ b/src/seqpacket.rs @@ -7,6 +7,7 @@ use crate::util::{ addr_to_string, buf_into_vec, error, get_err, resolve_libc_err, resolve_uv_err, set_clo_exec, set_non_block, socket_addr_to_string, uv_err_msg, }; +use crate::uv_handle::{insert_handle, remove_handle}; use libc::{sockaddr, sockaddr_un, EAGAIN, EINTR, EINVAL, ENOBUFS, EWOULDBLOCK}; use napi::{Env, JsBuffer, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result}; use nix::errno::errno; @@ -97,6 +98,7 @@ impl SeqpacketSocketWrap { handle.data = std::ptr::null_mut(); handle })); + insert_handle(unsafe { mem::transmute(handle) })?; let uv_loop = get_loop(&env)?; resolve_uv_err(unsafe { sys::uv_poll_init(uv_loop, handle, fd) })?; @@ -137,6 +139,7 @@ impl SeqpacketSocketWrap { unsafe { sys::uv_close(self.handle as *mut _, Some(on_close)); + remove_handle(mem::transmute(self.handle))?; }; // release msg_queue diff --git a/src/socket.rs b/src/socket.rs index cb351ec..058474d 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -4,7 +4,10 @@ use std::str::FromStr; use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err}; use libc::{c_void, sockaddr_storage, sockaddr_un}; -use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result, bindgen_prelude::FromNapiValue}; +use napi::{ + bindgen_prelude::FromNapiValue, Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, + Result, +}; use uv_sys::sys; pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> { diff --git a/src/util.rs b/src/util.rs index 678f16c..a96f6cc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -2,8 +2,8 @@ use std::ffi::CStr; use std::intrinsics::transmute; use std::mem; -use libc::{sockaddr, sockaddr_un, c_char}; -use napi::{self, Error, JsBuffer, Result, JsFunction, JsObject}; +use libc::{c_char, sockaddr, sockaddr_un}; +use napi::{self, Error, JsBuffer, JsFunction, JsObject, Result}; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, OFlag}; use uv_sys::sys; @@ -37,8 +37,8 @@ pub(crate) fn socket_addr_to_string(fd: i32) -> Result { Ok(addr_to_string(&addr)) } -pub(crate) fn error(msg: String) -> Error { - Error::new(napi::Status::Unknown, msg) +pub(crate) fn error(item: T) -> Error { + Error::new(napi::Status::Unknown, item.to_string()) } pub(crate) fn nix_err(err: Errno) -> Error { @@ -51,7 +51,8 @@ pub(crate) fn uv_err_msg(errno: i32) -> String { let ret = CStr::from_ptr(ret); ret .to_str() - .map_err(|_| error("parsing cstr failed".to_string())).unwrap() + .map_err(|_| error("parsing cstr failed".to_string())) + .unwrap() .to_string() }; diff --git a/src/uv_handle.rs b/src/uv_handle.rs new file mode 100644 index 0000000..71d9dc8 --- /dev/null +++ b/src/uv_handle.rs @@ -0,0 +1,87 @@ +use std::{ + collections::HashSet, + sync::{Mutex, MutexGuard, OnceLock}, +}; + +use crate::util; +use napi::{Env, Result}; +use uv_sys::sys::{uv_close, uv_handle_t}; + +// UV_HANDLES should only be used in clean up hooks +static mut UV_HANDLES: OnceLock>> = OnceLock::new(); +static HOOK_INITED: OnceLock = OnceLock::new(); + +fn get_handles(f: T) -> Result<()> +where + T: FnOnce(MutexGuard<'_, HashSet<*mut uv_handle_t>>) -> Result<()>, +{ + let handles = unsafe { UV_HANDLES.get_or_init(|| Mutex::new(HashSet::new())) }; + + { + let inner = handles.lock(); + if inner.is_err() { + let e = inner.err().unwrap(); + return Err(util::error(e)); + } + let inner = inner.unwrap(); + f(inner)? + } + + Ok(()) +} + +pub(crate) fn insert_handle(handle: *mut uv_handle_t) -> Result<()> { + get_handles(|mut inner| { + inner.insert(handle); + Ok(()) + }) +} + +// TODO pointer same? +pub(crate) fn remove_handle(handle: *mut uv_handle_t) -> Result<()> { + get_handles(|mut inner| { + inner.remove(&handle); + Ok(()) + }) +} + +pub(crate) fn cleanup_handles() -> Result<()> { + let handles = unsafe { UV_HANDLES.get() }; + if handles.is_none() { + return Ok(()); + } + + get_handles(|mut inner| { + for handle in inner.drain() { + if handle.is_null() { + continue; + } + unsafe { + uv_close(handle, None); + }; + } + + Ok(()) + }) +} + +#[napi] +#[allow(dead_code)] +pub fn init_cleanup_hook(mut env: Env) -> Result<()> { + if HOOK_INITED.get().is_some() { + return Ok(()); + } + HOOK_INITED.get_or_init(|| true); + + env.add_env_cleanup_hook((), move |_| { + let result = cleanup_handles(); + if result.is_err() { + println!( + "cleanup_handles failed, msg: {}", + result.err().unwrap().to_string() + ) + } + })?; + + Ok(()) +} diff --git a/yarn.lock b/yarn.lock index 03d3968..8625e1f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -609,10 +609,17 @@ jest-matcher-utils "^27.0.0" pretty-format "^27.0.0" -"@types/node@*", "@types/node@^17.0.31": +"@types/node@*": version "17.0.45" resolved "https://registry.npmjs.org/@types/node/-/node-17.0.45.tgz" +"@types/node@^22.7.6": + version "22.7.6" + resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.6.tgz#3ec3e2b071e136cd11093c19128405e1d1f92f33" + integrity sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw== + dependencies: + undici-types "~6.19.2" + "@types/prettier@^2.1.5": version "2.6.3" resolved "https://registry.npmjs.org/@types/prettier/-/prettier-2.6.3.tgz" @@ -2256,6 +2263,11 @@ typescript@^4.6.3: version "4.7.4" resolved "https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz" +undici-types@~6.19.2: + version "6.19.8" + resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.19.8.tgz#35111c9d1437ab83a7cdc0abae2f26d88eda0a02" + integrity sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw== + universalify@^0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/universalify/-/universalify-0.2.0.tgz#6451760566fa857534745ab1dde952d1b1761be0"