-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Doesn't play nice with newer Stream API #24
Comments
yes. but new streams are backwards compatible with classic-streams, to it should still work fine. |
Oh, just saw your tweet - can you describe what your problem was, so we can improve documentation? |
I fork off a child process which chroots itself. The child process runs a bunch of commands, some concurrently. I wanted to use mux-demux to relay all the chroot's sub-processes stdio to the non-chrooted parent process. What I encountered along the way was I got things to run by switching to When using the old stream interface, I had to wrap the data in a Buffer. This is probably the wrong thing to do, but this is what I ended up with: stream.on('data', function (data) {
data = new Buffer(data).toString('utf8')
console.log("demuxed: ", data)
}) I tried wrapping the stream like so: mdm.on('connection', function (demux_stream) {
var new_stream = new stream.Readable()
new_stream.wrap(demux_stream)
new_stream.on('readable', function () {
console.log("demuxed: ", new_stream.read())
})
}) But I get a bunch of decoding errors. |
Aha, it's not new streams that is the problem, but buffers. in coming data gets turned into json, which isn't nice for a buffer. Are you sending buffers? or strings? the wrapper that mux-demux uses to serialize the stream is pluggable, you might want to try using this approach I have also been considering wrapping mux-demux in the redis protocol But still, that is a good option - I've just been waiting until I have a personal need, But try it with msg-pack stream first, that might work - if so, I'll update the docs. |
I'm not directly sending anything myself. It's short, so here's the toy code I've been working with (not actually using chroot). parent.js: var spawn = require("child_process").spawn
, mdm = require("mux-demux")()
, stream = require("stream")
var child = spawn('node', ['./child.js'], {'stdio': ['ignore', 'pipe', 'ignore']})
child.stdout.pipe(mdm)
child.on('error', console.log)
child.on('exit', function() {
console.log("Child exit")
})
function data_logger(name) {
console.log("new logger for: " + name)
return function (data) {
data = new Buffer(data).toString('utf8')
console.log("demuxed " + name + ": ", data[0], data.length)
}
}
function read_logger(name, demuxed_stream) {
console.log("new logger for: " + name)
return function () {
data = demuxed_stream.read()
console.log("demuxed " + name + ": ", data[0], data.length)
}
}
var use_streams2 = true
function connection(demux_stream) {
console.log("New connection!")
if (use_streams2) {
var new_stream = new stream.Readable()
new_stream.wrap(demux_stream)
//new_stream.setEncoding('utf8') // bad data with, crash without
new_stream.on('readable', read_logger(demux_stream.meta, new_stream))
new_stream.on('error', console.log)
} else {
demux_stream.on('data', data_logger(demux_stream.meta))
}
}
mdm.on('connection', connection) child.js: var spawn = require("child_process").spawn
, mdm = require("mux-demux")()
, ones = spawn("yes", ["1"], {"stdio": "pipe"})
, twos = spawn("yes", ["2"], {"stdio": "pipe"})
mdm.pipe(process.stdout)
ones.stdout.pipe(mdm.createWriteStream("ones"))
twos.stdout.pipe(mdm.createWriteStream("twos"))
function done() {
ones.kill()
twos.kill()
setTimeout(process.exit, 1000)
}
setTimeout(done, 500) |
Before looking at mux-demux I was using my own JSON based approach that used |
I ended up throwing together my own muxer/demuxer. It's not done yet, but I managed to get it functional enough to sort out what an API I would expect to use for such a thing https://github.com/rmg/unfunnel |
Cool! thats great! since you are just getting started, can I recommend you don't use JSON? you are length delimiting the json packets here https://github.com/rmg/unfunnel/blob/master/index.js#L34 |
JSON was just the make-things-go solution. The packet format is entirely encapsulated in the receive and send methods for the purpose of changing it later :-) I think what I'm going to do is use |
You definately need an end message, 'close' was more significant in Although It is handy to be able to create a new stream, and send some metadata over, it became awkward because it could only go one way, if the child process (or what ever) KNOWS it's gonna be talking another process it's nicer to have it symmetrical: //server
cp.stdout.pipe(mx.createStream('rpc')).pipe(cp.stdin) //client
process.stdin.pipe(mx.createStream('rpc')).pipe(process.stdout) Other times, you don't know how many connections will come through, and one end is the 'server' and the other the 'client', but looking back I realize that it would have been better to have a simpler stream, and allow the first message to be the header, instead of having a special header message. you do need an error message, though - but maybe that can be mixed into the end message? |
Ya, I guess I was thinking about For I've implemented the non-JSON protocol now, btw. |
Well, if you just use the name as the "data", then when you see a new name, you assume "create". @Raynos encountered the first need for errors with mux-demux. if a new stream came in, If you are using a multiplexer to tunnel some other stream, like http/tcp you definitely need errors. |
Currently the endpoints are auto-created on first reference by either the I'll definitely be adding On Tue, Apr 16, 2013 at 9:49 AM, Dominic Tarr [email protected]:
|
No rush. The 404-like error works by having a default handler, that just emits an error - since we wanted an error on the other end - mux-demux has an |
The streams created by mux-demux only support the
data
event interface, not the newreadable
event as documented in Node 0.10.x.Or I'm completely misunderstanding things.
The text was updated successfully, but these errors were encountered: