Skip to content

Commit

Permalink
fix: clean up handles in worker_threads environments to prevent aborting
Browse files Browse the repository at this point in the history
When using in `worker_threads` Workers, Node.js will abort with messages like:

```
uv loop at [0x00000000] has open handles...
```

when workers are exiting or are terminated with active `DgramSocket` or `SeqpacketSocket`.

To fix this issue, we need clean up all active uv_handles in hooks registered by `napi_add_env_cleanup_hook`.
  • Loading branch information
oyyd committed Oct 17, 2024
1 parent bd3d460 commit d098554
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 39 deletions.
34 changes: 16 additions & 18 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ name: CI
env:
DEBUG: napi:*
APP_NAME: node-unix-socket
MACOSX_DEPLOYMENT_TARGET: '10.13'
'on':
MACOSX_DEPLOYMENT_TARGET: "10.13"
"on":
push:
branches:
- master
tags-ignore:
- '**'
- "**"
paths-ignore:
- '**/*.md'
- "**/*.md"
- LICENSE
- '**/*.gitignore'
- "**/*.gitignore"
- .editorconfig
- docs/**
pull_request: null
workflow_dispatch:
inputs:
publish:
description: 'Publish the npm package.'
description: "Publish the npm package."
required: true
type: boolean
default: false
jobs:
build:
if: '!contains(github.event.head_commit.message, ''skip ci'')'
if: "!contains(github.event.head_commit.message, 'skip ci')"
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -54,7 +54,7 @@ jobs:
yarn build --target x86_64-unknown-linux-gnu &&
strip *.node
- host: ubuntu-latest
target: 'x86_64-unknown-linux-musl'
target: "x86_64-unknown-linux-musl"
docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-alpine
build: yarn build
- host: ubuntu-latest
Expand Down Expand Up @@ -140,14 +140,14 @@ jobs:
if: ${{ matrix.settings.docker }}
with:
image: ${{ matrix.settings.docker }}
options: '-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build'
options: "-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build"
run: ${{ matrix.settings.build }}
- name: Build
run: ${{ matrix.settings.build }}
if: ${{ !matrix.settings.docker }}
shell: bash
- name: Upload artifact
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: bindings-${{ matrix.settings.target }}
path: ${{ env.APP_NAME }}.*.node
Expand All @@ -160,10 +160,9 @@ jobs:
fail-fast: false
matrix:
node:
- '12'
- '14'
- '16'
- '18'
- "18"
- "20"
- "22"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -201,10 +200,9 @@ jobs:
- host: macos-latest
target: x86_64-apple-darwin
node:
- '12'
- '14'
- '16'
- '18'
- "18"
- "20"
- "22"
runs-on: ${{ matrix.settings.host }}
steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ yarn.lock
*.node

check.py
resouce
resource
scripts
.devcontainer
__test__
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## 0.2.6
fix: clean up handles in worker_threads environments to prevent aborting

## 0.2.5
fix: close the socket if connecting failed

Expand Down
12 changes: 12 additions & 0 deletions __test__/create_socket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const { DgramSocket } = require('../js')
const fs = require('fs')
const path = require('path')

const serverPath = path.resolve(__dirname, './.tmp/worker_server.sock')
try {
fs.unlinkSync(serverPath)
} catch (err) {
//
}
const socket = new DgramSocket(serverPath);
socket.bind(serverPath)
12 changes: 9 additions & 3 deletions __test__/dgram.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// TODO add tests for worker_threads
import * as path from 'path';
import * as fs from 'fs';
import * as os from 'os';
import * as workerThreads from 'worker_threads'
import { DgramSocket } from '../js/index';
import { kTmp, silently, createDefer, kServerPath } from './util';
import { kTmp, silently, createDefer, kServerPath, wait } from './util';

const kClientPath = path.resolve(kTmp, './client.sock');
const kInvalidPath = path.resolve(kTmp, './A_PATH_THAT_DOESNT_EXIST');

const emptyFn = () => {};
const emptyFn = () => { };

describe('DgramSocket', () => {
beforeAll(() => {
Expand Down Expand Up @@ -308,4 +308,10 @@ describe('DgramSocket', () => {
client.close()
await p
});

it('should not abort in worker_threads', async () => {
const worker = new workerThreads.Worker(path.resolve(__dirname, './create_socket.js'))
await wait(1000)
worker.terminate()
});
});
20 changes: 12 additions & 8 deletions __test__/socket.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as net from 'net'
import { createReuseportFd as createFd, closeFd } from '../js/index'
import { createReuseportFd as createFd, closeFd } from '../js/index'
import { hasIPv6 } from './util'

describe('tcp', () => {
Expand All @@ -8,11 +8,13 @@ describe('tcp', () => {
const host = '0.0.0.0'
let port = 0;

async function createServer() {
async function createServer(): Promise<net.Server> {
const fd = createFd(port, host);

const server = await new Promise<net.Server>((resolve, reject) => {
const server = net.createServer()
const server = net.createServer({
noDelay: true,
})

server.listen({
fd,
Expand All @@ -26,14 +28,14 @@ describe('tcp', () => {
return server
}

const servers = [];
const servers: net.Server[] = [];
for (let i = 0; i < 5; i += 1) {
const server = await createServer()
servers.push(server);
}

const pList = servers.map(server => {
return new Promise((resolve, reject) => {
return new Promise<Buffer>((resolve, reject) => {
server.once('connection', (socket) => {
socket.on('data', buf => {
resolve(buf)
Expand All @@ -55,15 +57,17 @@ describe('tcp', () => {
servers.forEach(server => server.close());
})

if (hasIPv6()) {
if (hasIPv6()) {
it('should work with ipv6', async () => {
const host = '::1'
let port = 0;

const fd = createFd(port, host);

const server = await new Promise<net.Server>((resolve, reject) => {
const server = net.createServer()
const server = net.createServer({
noDelay: true,
})

server.listen({
fd,
Expand All @@ -72,7 +76,7 @@ describe('tcp', () => {
})
})
port = (server.address() as any).port
const p = new Promise((resolve, reject) => {
const p = new Promise<Buffer>((resolve, reject) => {
server.once('connection', (socket) => {
socket.on('data', buf => {
resolve(buf)
Expand Down
1 change: 1 addition & 0 deletions js/addon.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number
export function socketClose(fd: number): void
export function initCleanupHook(): void
export class SeqpacketSocketWrap {
constructor(ee: object, fd?: number | undefined | null)
init(thisObj: object): void
Expand Down
8 changes: 8 additions & 0 deletions js/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import * as workerThreads from 'worker_threads'
import { initCleanupHook } from './addon'

export { SendCb, DgramSocket } from './dgram'
export { NotifyCb, SeqpacketSocket, SeqpacketServer } from './seqpacket'
export { createReuseportFd, closeFd } from './socket'

// Node.js will abort when threads are termiated if we don't clean up uv handles.
if (!workerThreads.isMainThread) {
initCleanupHook()
}
7 changes: 7 additions & 0 deletions js/seqpacket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ export class SeqpacketSocket extends EventEmitter {
this.wrap.close();
}

/**
* Alias of "destory".
*/
close() {
return this.destroy();
}

/**
* For test only
* @ignore
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-unix-socket",
"version": "0.2.5",
"version": "0.2.6",
"main": "js/index.js",
"types": "js/index.d.ts",
"author": {
Expand Down Expand Up @@ -39,7 +39,7 @@
"devDependencies": {
"@napi-rs/cli": "^2.10.3",
"@types/jest": "^27.5.0",
"@types/node": "^17.0.31",
"@types/node": "^22.7.6",
"jest": "^27.5.1",
"ts-jest": "^27.1.4",
"typedoc": "^0.22.15",
Expand Down
3 changes: 3 additions & 0 deletions src/dgram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::util::{
addr_to_string, buf_into_vec, check_emit, error, get_err, i8_slice_into_u8_slice,
resolve_libc_err, resolve_uv_err, set_clo_exec, set_non_block, socket_addr_to_string,
};
use crate::uv_handle::{insert_handle, remove_handle};

#[allow(dead_code)]
fn string_from_i8_slice(slice: &[i8]) -> Result<String> {
Expand Down Expand Up @@ -65,6 +66,7 @@ impl DgramSocketWrap {
handle.data = std::ptr::null_mut() as *mut _;
handle
}));
insert_handle(unsafe { mem::transmute(handle) })?;

Ok(Self {
fd,
Expand Down Expand Up @@ -297,6 +299,7 @@ impl DgramSocketWrap {
unsafe {
let handle = mem::transmute(self.handle);
sys::uv_close(handle, Some(on_close));
remove_handle(handle)?;
};

// release Ref<JsFunction> in msg_queue
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ mod seqpacket;
mod dgram;
mod util;
mod socket;
mod uv_handle;
3 changes: 3 additions & 0 deletions src/seqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::util::{
addr_to_string, buf_into_vec, error, get_err, resolve_libc_err, resolve_uv_err, set_clo_exec,
set_non_block, socket_addr_to_string, uv_err_msg,
};
use crate::uv_handle::{insert_handle, remove_handle};
use libc::{sockaddr, sockaddr_un, EAGAIN, EINTR, EINVAL, ENOBUFS, EWOULDBLOCK};
use napi::{Env, JsBuffer, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result};
use nix::errno::errno;
Expand Down Expand Up @@ -97,6 +98,7 @@ impl SeqpacketSocketWrap {
handle.data = std::ptr::null_mut();
handle
}));
insert_handle(unsafe { mem::transmute(handle) })?;

let uv_loop = get_loop(&env)?;
resolve_uv_err(unsafe { sys::uv_poll_init(uv_loop, handle, fd) })?;
Expand Down Expand Up @@ -137,6 +139,7 @@ impl SeqpacketSocketWrap {

unsafe {
sys::uv_close(self.handle as *mut _, Some(on_close));
remove_handle(mem::transmute(self.handle))?;
};

// release msg_queue
Expand Down
5 changes: 4 additions & 1 deletion src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::str::FromStr;

use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err};
use libc::{c_void, sockaddr_storage, sockaddr_un};
use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result, bindgen_prelude::FromNapiValue};
use napi::{
bindgen_prelude::FromNapiValue, Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref,
Result,
};
use uv_sys::sys;

pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> {
Expand Down
11 changes: 6 additions & 5 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::ffi::CStr;
use std::intrinsics::transmute;
use std::mem;

use libc::{sockaddr, sockaddr_un, c_char};
use napi::{self, Error, JsBuffer, Result, JsFunction, JsObject};
use libc::{c_char, sockaddr, sockaddr_un};
use napi::{self, Error, JsBuffer, JsFunction, JsObject, Result};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use uv_sys::sys;
Expand Down Expand Up @@ -37,8 +37,8 @@ pub(crate) fn socket_addr_to_string(fd: i32) -> Result<String> {
Ok(addr_to_string(&addr))
}

pub(crate) fn error(msg: String) -> Error {
Error::new(napi::Status::Unknown, msg)
pub(crate) fn error<T: ToString>(item: T) -> Error {
Error::new(napi::Status::Unknown, item.to_string())
}

pub(crate) fn nix_err(err: Errno) -> Error {
Expand All @@ -51,7 +51,8 @@ pub(crate) fn uv_err_msg(errno: i32) -> String {
let ret = CStr::from_ptr(ret);
ret
.to_str()
.map_err(|_| error("parsing cstr failed".to_string())).unwrap()
.map_err(|_| error("parsing cstr failed".to_string()))
.unwrap()
.to_string()
};

Expand Down
Loading

0 comments on commit d098554

Please sign in to comment.