转换可读流为异步迭代器

2024-08-26JavaScript

本文简单介绍在 Node.js 环境中如何把一个回调风格的可读流(Readable)转换为一个异步迭代器(AsyncIterator),从而可以使用 for await of 来获取其内容。

概念

可读流(Readable

可读流(Readable)是 Node.js stream 模块中的一个类。比较典型的获得可读流的方法是使用 fs.createReadStream()

Readable 提供了事件,标识自己的状态。我们用到其中的三个:dataenderror,分别表示有新数据到来、所有数据已读取完毕,以及发生错误。

迭代器(Itertor

迭代器是 ES6 中添加的概念,其提供 next() 方法来迭代一个对象的内容。最常见的使用迭代器的方法是使用 for of 循环。关于迭代器的详细介绍可以参考阮老师的文章

异步迭代器(AsyncIterator

异步迭代器是迭代器的一种变体。迭代器的 next() 方法是同步的,而异步迭代器的 next() 方法返回一个 Promise。最常见的使用异步迭代器的方法是使用 for await of 循环。

生成器函数(Generator

生成器函数相比普通函数,添加了 yield 表达式,使得其可以暂停执行和惰性求值。生成器函数的接口与迭代器相同,因此可以用生成器函数编写迭代器。关于生成器函数的详细内容可以参考阮老师的文章

转换函数

我们处理 Node.js Readable 时通常采用以下的模式:

const readable = getReadableSomehow();

// New data chunk arrives...
readable.on('data', chunk => {
  // ...handle chunk...
});

// No more data is available
readable.on('end', () => {
  // ...handle stream end...
});

readable.on('error', () => {
  // ...handle error...
});

其中 data 事件会被触发多次。回调函数写法可以处理 Readable,但是这种风格不是那么整洁。使用异步迭代器的风格则更加容易理解:

const readable = getReadableSomehow();
const readableAsyncIterator = getAsyncIteratorFromReadable(readable);

try {
  for await (const chunk of readableAsyncIterator) {
      // ...handle chunk...
    }
} catch(e) {
  // ...handle error...
}

我们来实现这个 getAsyncIteratorFromReadable()

AsyncIterator 上调用 next() 返回一个 Promise<{value: unknown, done: boolean}>。容易想到:

  • data 事件对应 resolve 这个 Promise 并且提供 value
  • end 事件对应 resolve 这个 Promise 并且设置 done
  • error 事件对应 reject 这个 Promise 并且设置 done

data 事件会被触发多次,每次被触发都应该创建一个新的 Promise。因此大致框架如下:

export async function* getAsyncIteratorFromReadable(readable: Readable) {
  let done = false;

  readable.on('data', (chunk) => {
    // 1. 存储 chunk
    // 2. resolve 当前 Promise
    // 3. 创建新的 Promise
  });

  readable.on('end', () => {
    // 1. 设置 done = true
    // 2. resolve 当前 Promise
  });

  readable.on('error', (e) => {
    // 1. 设置 done = true
    // 2. reject 当前 Promise
  });

  while (!done) {
    // 1. 等待当前 Promise resolve
    // 2. yield 之前存储的 chunk
    // 3. 重置 chunk
  }

  // done = true 导致循环结束,return 结束当前异步迭代器
}

转换成代码

type PromiseCallbackParameters<R> = Parameters<
  ConstructorParameters<typeof Promise<R>>[0]
>;
type PromiseResolve<R> = PromiseCallbackParameters<R>[0];
type PromiseReject<R> = PromiseCallbackParameters<R>[1];

export async function* getAsyncIteratorFromReadable<R>(readable: Readable) {
  let promiseResolve: PromiseResolve<void> | null = null;
  let promiseReject: PromiseReject<void> | null = null;
  let promise = new Promise<void>((resolve, reject) => {
    promiseResolve = resolve;
    promiseReject = reject;
  });
  let done = false;
  let chunks: R[] = [];

  readable.on('data', (chunk) => {
    chunks.push(chunk);
    assert.ok(promiseResolve);
    promiseResolve();

    promise = new Promise<void>((resolve, reject) => {
      promiseResolve = resolve;
      promiseReject = reject;
    });
  });

  readable.on('end', () => {
    done = true;
    assert.ok(promiseResolve);
    promiseResolve();
  });

  readable.on('error', (e) => {
    done = true;
    assert.ok(promiseReject);
    promiseReject(e);
  });

  while (!done) {
    await promise;
    yield* chunks;
    chunks = [];
  }
}

注意 chunk 使用了数组存储,并且使用 yield* 发送到外界。这是因为 promise 可能因为 enderror 事件 resolve ,此时是没有 chunk 的。

参考

[1] https://www.jbernier.com/p?id=nodejs-stream-async-iterator