# through2 **Repository Path**: mirrors_rvagg/through2 ## Basic Information - **Project Name**: through2 - **Description**: Tiny wrapper around Node streams2 Transform to avoid explicit subclassing noise - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2020-08-18 - **Last Updated**: 2026-05-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # through2 [![NPM](https://nodei.co/npm/through2.svg?style=flat&data=n,v&color=blue)](https://nodei.co/npm/through2/) **Tiny utilities for inserting transformation logic into Node.js streams and Web Streams pipelines.** ```bash npm install through2 ``` ```js import { transform } from 'through2' readableStream .pipe(transform(async (chunk) => chunk.toString().toUpperCase())) .pipe(writableStream) ``` ## Contents * [Contents](#contents) * [Why](#why) * [Migrating from v4](#migrating-from-v4) * [API](#api) * [Transform function styles](#transform-function-styles) * [Recipes](#recipes) * [Map](#map) * [Filter](#filter) * [FlatMap](#flatmap) * [Batch](#batch) * [Tap](#tap) * [Parse newline-delimited input](#parse-newline-delimited-input) * [Node-style streams (`from 'through2'`)](#node-style-streams-from-through2) * [`transform()`](#transform) * [`objectTransform()`](#objecttransform) * [`transformer()`](#transformer) * [Composing pipelines](#composing-pipelines) * [Default export (legacy)](#default-export-legacy) * [Web Streams (`from 'through2/web'`)](#web-streams-from-through2web) * [`transform()` (web)](#transform-web) * [Implementation notes](#implementation-notes) * [License](#license) ## Why Writing a `Transform` stream usually means subclassing, wiring up `_transform`, choosing `objectMode`, and minding backpressure. **through2** wraps a function instead: ```js // Without through2 import { Transform } from 'node:stream' class Upper extends Transform { _transform (chunk, _enc, cb) { this.push(chunk.toString().toUpperCase()) cb() } } input.pipe(new Upper()).pipe(output) // With through2 import { transform } from 'through2' input.pipe(transform(async (chunk) => chunk.toString().toUpperCase())).pipe(output) ``` Same idea for the modern, cross-runtime Web Streams API: ```js import { transform } from 'through2/web' await response.body .pipeThrough(transform(async (chunk) => /* ... */)) .pipeTo(destination) ``` ## Migrating from v4 v5 is a major version bump. Headline changes: - **ESM source.** The package ships as ESM; new code should `import` it. CommonJS callers can still `require('through2')` on Node.js 22.12+ via `require(esm)`. On older Node, convert to `import` or pin to `through2@4`. - **Named exports added.** `transform`, `objectTransform`, `transformer` are the preferred surface in new code. - **Default export still works.** `through2(fn)`, `through2.obj(fn)`, and `through2.ctor(fn)` produce equivalent stream instances. Once your callers are converted to ESM imports, the call-site syntax and runtime behaviour are unchanged (modulo the `instanceof` caveat below). - **Async functions and async generators are now accepted** as transform functions, in addition to the classic `(chunk, enc, cb)` callback form. See [Transform function styles](#transform-function-styles). - **`through2/web` subpath added** for Web Streams (`TransformStream`) pipelines. - **`readable-stream@4`** (was `@3`) is the underlying dependency. - **`transformer` / `.ctor` returns a factory function**, not a true constructor. The returned instance is still a `Transform`, but `instanceof YourFactory` no longer holds. Use `instanceof Transform` instead. Mapping for legacy code: | v4 / legacy | v5 named export | | --------------------- | ------------------------------ | | `through2(fn)` | `transform(fn)` | | `through2.obj(fn)` | `objectTransform(fn)` | | `through2.ctor(fn)` | `transformer(fn)` | ## API Two import paths, mirroring the two stream worlds: | Import | Stream API | Returns | Use when | | ---------------------------- | ---------------------------- | ---------------------- | ------------------------------------------------ | | `from 'through2'` | Node-style streams | `stream.Transform` | You're working with `Readable`/`Writable`/`Transform`-shaped streams (`.pipe(...)`) | | `from 'through2/web'` | Web Streams (WHATWG) | `TransformStream` | You're working with `ReadableStream`/`WritableStream`-shaped streams (`.pipeThrough(...)`) | The two entries differ in the *stream API* they target, not the runtime they run on. Either entry can run in Node.js, browsers, Deno, Bun, or Cloudflare Workers. Pick the one that matches the streams you're piping with. - **`from 'through2'`** uses the Node-style streams API (`Readable`/`Writable`/`Transform`, `.pipe()`, callback-driven `_transform`). It depends on [`readable-stream`](https://github.com/nodejs/readable-stream); browser bundlers pick up that package's `browser` field automatically and ship its self-contained shim, so no Node-builtin polyfill is needed. - **`from 'through2/web'`** uses the WHATWG Web Streams API (`TransformStream`, `.pipeThrough()`). Zero runtime dependencies; relies only on `TransformStream` being a global (it is in modern browsers, Node.js >= 18, Deno, Bun, Workers). ### Transform function styles Every transform-creating export accepts the same three function styles, auto-dispatched by inspecting the function's kind: ```js // 1. Classic Node-style callback (use `this.push()` and the callback) transform(function (chunk, encoding, callback) { this.push(chunk) callback() }) // 2. Async function (resolved value is pushed; `undefined` skips) transform(async (chunk) => chunk.toString().toUpperCase()) // 3. Async generator (1-to-many; full pipeline coroutine) transform(async function * (source) { for await (const chunk of source) { yield chunk yield chunk } }) ``` A flush function may be passed as the trailing argument; it follows the same dispatch rules. > **Note on async**: the async function form does **not** use `this.push`. To emit zero or many chunks per input, use the async generator form. To emit one chunk per input (or skip), return the value (or `undefined`). ## Recipes ```js import { objectTransform } from 'through2' ``` ### Map One in, one out. Async function form; the resolved value is pushed. ```js objectTransform(async (item) => doSomething(item)) ``` ### Filter Return `undefined` to drop a chunk. ```js objectTransform(async (item) => predicate(item) ? item : undefined) ``` ### FlatMap One in, many out. Async generator form. ```js objectTransform(async function * (source) { for await (const item of source) { for (const x of expand(item)) yield x } }) ``` ### Batch Collect a fixed-size batch, emit at size or at flush. ```js objectTransform(async function * (source) { let batch = [] for await (const item of source) { batch.push(item) if (batch.length >= 100) { yield batch; batch = [] } } if (batch.length) yield batch }) ``` ### Tap Side effect, pass through unchanged. ```js objectTransform(async (item) => { observe(item); return item }) ``` ### Parse newline-delimited input Byte chunks in, line strings out. The async generator buffers across chunk boundaries. ```js objectTransform(async function * (source) { let buf = '' for await (const chunk of source) { buf += chunk.toString() const lines = buf.split('\n') buf = lines.pop() for (const line of lines) yield line } if (buf) yield buf }) ``` For a runnable end-to-end demo combining NDJSON parsing, level filtering, formatting, and a tally, see [`example-ndjson.js`](./example-ndjson.js): ```bash cat app.log | node example-ndjson.js warn ``` ## Node-style streams (`from 'through2'`) ```js import { transform, objectTransform, transformer } from 'through2' ``` ### `transform()` ``` transform([options], transformFn[, flushFn]) -> stream.Transform ``` Returns a `stream.Transform`. `options` is forwarded to the underlying `Transform` constructor. If `transformFn` is omitted, a passthrough is returned. ```js fs.createReadStream('in.txt') .pipe(transform(function (chunk, _enc, cb) { for (let i = 0; i < chunk.length; i++) { if (chunk[i] === 97) chunk[i] = 122 // swap 'a' for 'z' } this.push(chunk) cb() })) .pipe(fs.createWriteStream('out.txt')) ``` ### `objectTransform()` ``` objectTransform([options], transformFn[, flushFn]) -> stream.Transform ``` Like `transform`, with `objectMode: true` enabled by default. Most async/async-generator use cases want this. ### `transformer()` ``` transformer([options], transformFn[, flushFn]) -> (overrideOptions?) -> stream.Transform ``` Returns a factory function. Calling it (with or without `new`) produces a fresh `Transform` instance with the configured behaviour. Per-call options merge on top of the configured defaults; the merged options are exposed as `this.options` inside the transform function. ```js const Counter = transformer({ objectMode: true }, function (chunk, _enc, cb) { this.count = (this.count || 0) + 1 this.push(chunk) cb() }) const a = Counter() const b = new Counter({ highWaterMark: 32 }) // override per-call ``` ### Composing pipelines For anything beyond a quick demo, prefer `node:stream/promises`'s `pipeline()` over chained `.pipe()`. It propagates errors, awaits completion, and destroys all streams on failure (chained `.pipe()` silently leaves streams hanging on error). ```js import { pipeline } from 'node:stream/promises' import { createReadStream, createWriteStream } from 'node:fs' import { objectTransform } from 'through2' await pipeline( createReadStream('in.ndjson'), objectTransform(async function * (source) { let buf = '' for await (const chunk of source) { buf += chunk.toString() const lines = buf.split('\n') buf = lines.pop() for (const line of lines) yield JSON.parse(line) } }), objectTransform(async (record) => record.active ? record : undefined), objectTransform(async (record) => JSON.stringify(record) + '\n'), createWriteStream('out.ndjson') ) ``` `pipeline()` accepts any mix of through2-built transforms and other `Readable`/`Writable`/`Transform` instances. Use it whenever the pipeline can fail or you need to know when it's done. A runnable, more elaborate version of this NDJSON pipeline lives in [`example-ndjson.js`](./example-ndjson.js) (parses arbitrary structured logs from stdin, filters by level, pretty-prints to stdout, summarises on stderr). ### Default export (legacy) The default export is `transform` with `.obj` and `.ctor` attached for back-compatibility: ```js import through2 from 'through2' through2(fn) // === transform(fn) through2.obj(fn) // === objectTransform(fn) through2.ctor(fn) // === transformer(fn) ``` ## Web Streams (`from 'through2/web'`) ```js import { transform } from 'through2/web' ``` ### `transform()` (web) ``` transform([transformFn][, flushFn]) -> TransformStream ``` Returns a `TransformStream`. The classic-style function takes `(chunk, controller)` and uses `controller.enqueue()`. Async and async-generator forms work the same as in the Node-style streams entry. ```js // 1-to-many fan-out (controller form, stateless) const tagged = transform((chunk, controller) => { controller.enqueue({ kind: 'raw', value: chunk }) controller.enqueue({ kind: 'upper', value: chunk.toString().toUpperCase() }) }) // Splitter: cross-chunk state is needed (a line can span chunks). The async // generator form lets the buffer be a local variable; the classic controller // form would need closure state plus a flush handler. const splitter = transform(async function * (source) { let buf = '' for await (const chunk of source) { buf += chunk.toString() const lines = buf.split('\n') buf = lines.pop() for (const line of lines) yield line } if (buf) yield buf }) // Async (1-to-1) with flush const withTrailer = transform( async (chunk) => chunk.toString().toUpperCase(), (controller) => controller.enqueue('END') ) // Pipe a fetch response through a transform await response.body .pipeThrough(transform(async (chunk) => chunk)) .pipeTo(destinationWritableStream) ``` ### Implementation notes At the time of writing, Chromium hasn't shipped the cleanup hook that `TransformStream` would use to tear down an in-flight async generator on cancel. Without a workaround, calling `reader.cancel()` on a browser-side pipeline would leave your `async function *` suspended and its `finally` block would never run. The web entry handles this for you: - Backpressure works in both directions through `pipeThrough`. - Cancelling the consumer or aborting the producer cleans up your generator (its `finally` runs) and errors the other side. - Same behaviour in browsers, Node.js, Deno, Bun, and Cloudflare Workers. One thing to note: `transform(asyncGenFn)` returns a `{ readable, writable }` pair rather than a `TransformStream` instance. `pipeThrough` and `pipeTo` accept it identically. ## License **through2** is Copyright (c) Rod Vagg and additional contributors and licensed under the MIT license. See the included LICENSE file for more details.