本文简单介绍在 Node.js 环境中如何把一个回调风格的可读流(Readable
)转换为一个异步迭代器(AsyncIterator
),从而可以使用 for await of
来获取其内容。
概念
可读流(Readable
)
可读流(Readable
)是 Node.js stream 模块中的一个类。比较典型的获得可读流的方法是使用 fs.createReadStream()
。
Readable
提供了事件,标识自己的状态。我们用到其中的三个:data
、end
和 error
,分别表示有新数据到来、所有数据已读取完毕,以及发生错误。
迭代器(Itertor
)
迭代器是 ES6 中添加的概念,其提供 next()
方法来迭代一个对象的内容。最常见的使用迭代器的方法是使用 for of
循环。关于迭代器的详细介绍可以参考阮老师的文章。
异步迭代器(AsyncIterator
)
异步迭代器是迭代器的一种变体。迭代器的 next()
方法是同步的,而异步迭代器的 next()
方法返回一个 Promise
。最常见的使用异步迭代器的方法是使用 for await of
循环。
生成器函数(Generator
)
生成器函数相比普通函数,添加了 yield
表达式,使得其可以暂停执行和惰性求值。生成器函数的接口与迭代器相同,因此可以用生成器函数编写迭代器。关于生成器函数的详细内容可以参考阮老师的文章。
转换函数
我们处理 Node.js Readable
时通常采用以下的模式:
const readable = getReadableSomehow();
// New data chunk arrives...
readable.on('data', chunk => {
// ...handle chunk...
});
// No more data is available
readable.on('end', () => {
// ...handle stream end...
});
readable.on('error', () => {
// ...handle error...
});
其中 data
事件会被触发多次。回调函数写法可以处理 Readable
,但是这种风格不是那么整洁。使用异步迭代器的风格则更加容易理解:
const readable = getReadableSomehow();
const readableAsyncIterator = getAsyncIteratorFromReadable(readable);
try {
for await (const chunk of readableAsyncIterator) {
// ...handle chunk...
}
} catch(e) {
// ...handle error...
}
我们来实现这个 getAsyncIteratorFromReadable()
。
在 AsyncIterator
上调用 next()
返回一个 Promise<{value: unknown, done: boolean}>
。容易想到:
data
事件对应resolve
这个Promise
并且提供value
;end
事件对应resolve
这个Promise
并且设置done
;error
事件对应reject
这个Promise
并且设置done
。
data
事件会被触发多次,每次被触发都应该创建一个新的 Promise
。因此大致框架如下:
export async function* getAsyncIteratorFromReadable(readable: Readable) {
let done = false;
readable.on('data', (chunk) => {
// 1. 存储 chunk
// 2. resolve 当前 Promise
// 3. 创建新的 Promise
});
readable.on('end', () => {
// 1. 设置 done = true
// 2. resolve 当前 Promise
});
readable.on('error', (e) => {
// 1. 设置 done = true
// 2. reject 当前 Promise
});
while (!done) {
// 1. 等待当前 Promise resolve
// 2. yield 之前存储的 chunk
// 3. 重置 chunk
}
// done = true 导致循环结束,return 结束当前异步迭代器
}
转换成代码:
type PromiseCallbackParameters<R> = Parameters<
ConstructorParameters<typeof Promise<R>>[0]
>;
type PromiseResolve<R> = PromiseCallbackParameters<R>[0];
type PromiseReject<R> = PromiseCallbackParameters<R>[1];
export async function* getAsyncIteratorFromReadable<R>(readable: Readable) {
let promiseResolve: PromiseResolve<void> | null = null;
let promiseReject: PromiseReject<void> | null = null;
let promise = new Promise<void>((resolve, reject) => {
promiseResolve = resolve;
promiseReject = reject;
});
let done = false;
let chunks: R[] = [];
readable.on('data', (chunk) => {
chunks.push(chunk);
assert.ok(promiseResolve);
promiseResolve();
promise = new Promise<void>((resolve, reject) => {
promiseResolve = resolve;
promiseReject = reject;
});
});
readable.on('end', () => {
done = true;
assert.ok(promiseResolve);
promiseResolve();
});
readable.on('error', (e) => {
done = true;
assert.ok(promiseReject);
promiseReject(e);
});
while (!done) {
await promise;
yield* chunks;
chunks = [];
}
}
注意 chunk
使用了数组存储,并且使用 yield*
发送到外界。这是因为 promise
可能因为 end
或 error
事件 resolve ,此时是没有 chunk
的。
参考
[1] https://www.jbernier.com/p?id=nodejs-stream-async-iterator