forked from kriskowal/q-io
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.js
133 lines (116 loc) · 3.1 KB
/
reader.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
var Q = require("q");
/**
* Wraps a Node readable stream, providing an API similar
* to a Narwhal synchronous `io` stream except returning
* Q promises for long latency operations.
* @param stream any Node readable stream
* @returns {Promise * Reader} a promise for
* the text stream reader.
* @constructor
*/
module.exports = Reader;
function Reader(_stream, charset) {
var self = Object.create(Reader.prototype);
if (charset && _stream.setEncoding) // TODO complain about inconsistency
_stream.setEncoding(charset);
var begin = Q.defer();
var end = Q.defer();
_stream.on("error", function (reason) {
begin.reject(reason);
});
var chunks = [];
var receiver;
_stream.on("end", function () {
begin.resolve(self);
end.resolve()
});
_stream.on("data", function (chunk) {
begin.resolve(self);
if (receiver) {
receiver(chunk);
} else {
chunks.push(chunk);
}
});
function slurp() {
var result;
if (charset) {
result = chunks.join("");
} else {
result = self.constructor.join(chunks);
}
chunks.splice(0, chunks.length);
return result;
}
/***
* Reads all of the remaining data from the stream.
* @returns {Promise * String} a promise for a String
* containing the entirety the remaining stream.
*/
self.read = function () {
receiver = undefined;
var deferred = Q.defer();
Q.done(end.promise, function () {
deferred.resolve(slurp());
});
return deferred.promise;
};
/***
* Reads and writes all of the remaining data from the
* stream in chunks.
* @param {Function(Promise * String)} write a function
* to be called on each chunk of input from this stream.
* @returns {Promise * Undefined} a promise that will
* be resolved when the input is depleted.
*/
self.forEach = function (write) {
if (chunks && chunks.length)
write(slurp());
receiver = write;
return Q.when(end.promise, function () {
receiver = undefined;
});
};
self.close = function () {
_stream.destroy();
};
self.node = _stream;
return begin.promise;
}
/*
Reads an entire forEachable stream of buffers and returns a single buffer.
*/
Reader.read = read;
function read(stream, charset) {
var chunks = [];
stream.forEach(function (chunk) {
chunks.push(chunk);
});
if (charset) {
return chunks.join("");
} else {
return join(chunks);
}
}
Reader.join = join;
function join(buffers) {
var length = 0;
var at;
var i;
var ii = buffers.length;
var buffer;
var result;
for (i = 0; i < ii; i++) {
buffer = buffers[i];
length += buffer.length;
}
result = new Buffer(length);
at = 0;
for (i = 0; i < ii; i++) {
buffer = buffers[i];
buffer.copy(result, at, 0);
at += buffer.length;
}
buffers.splice(0, ii, result);
return result;
}