Skip to content

Perform continuous reads on a file or named pipe, push the data into a readable stream.

License

Notifications You must be signed in to change notification settings

mulekick/file-streamer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

File contents streamer

This module exports a class which, when instantiated and provided with a file / named pipe, will allow the continuous streaming of said file / named pipe contents into a node.js process for as long as needed.

You can think of it as behaving similarly to the node.js primitive fs.createReadStream(). The difference is that the readable stream here will not emit 'end' and terminate itself as soon as EOF is reached. Instead, the file streamer will keep on attempting to read from the specified file/named pipe and pass the bytes to the readable stream until explicitly told to stop.

One noticeable perk it provides is that it allows other processes to keep pushing data into a named pipe until the conditions are met for the pipe contents to be processed, without the pipe ever blocking (a file descriptor will be assigned to the named pipe, thus preventing any process writing to it from hanging, until the close() method is called and closes the file descriptor).

Install

npm install @mulekick/file-streamer

IMPORTANT: This package uses ESM type modules.

Usage

Callback mode:

import {FileStreamer} from "@mulekick/file-streamer";

try {
    const
        myFile = `/path/to/my/file`,
        streamer = new FileStreamer(),
        writable = getWritableStreamSomehow(),
        processing = getEventEmitterSomehow();

    // open file
    streamer.open(myFile).on(`file`, () => {
        streamer
            // create readable stream
            .stream()
            // pipe to writable and stream file contents
            .pipe(writable);
    });
    
    // once all processing is done
    processing.on(`done`, () => {
        streamer
            // stop streaming file contents, discard readable stream
            .unstream()
            // close file
            .close();
    });

} catch (err) {
    console.error(err.message);
}

Async mode:

import {FileStreamer} from "@mulekick/file-streamer";
import {once} from "events";

(async() => {
    try {
        const
            myFile = `/path/to/my/file`,
            streamer = new FileStreamer(),
            writable = getWritableStreamSomehow(),
            processing = getEventEmitterSomehow();

        // open file
        await streamer.promise(`open`, myFile);
        
        // create readable stream, pipe to writable and stream file contents
        streamer.stream().pipe(writable);
        
        // once all processing is done
        await once(processing, `done`);

        // stop streaming file contents, discard readable stream, close file
        await streamer.promise(`close`);

    } catch (err) {
        console.error(err.message);
    }
})();

stream() will return a stream.Readable instance that can operate in paused or flowing mode. Note though that it doesn't operate in object mode (in the event you need it, pipe to a stream.Duplex and manage object mode there).

API

Class: FileStreamer

Constructor

new FileStreamer([options])

  • options Options for the file streamer
  • Returns: FileStreamer

Options

The default values are shown after each option key.

{
    fileName: null,         // path of the file / named pipe to open and read from
    bufSize: 16384,         // max number of bytes to retrieve at each read / highWaterMark value for the readable stream
    errorOnMissing: false,  // set to true to emit an error if the file is renamed or deleted while its content is still being streamed
    closeOnEOF: false       // set to true to automatically discard readable stream and close file when EOF is reached (emulates fs.createReadStream)
}

Methods

FileStreamer.open(fileName[, resolve, reject])

  • filename String: the path of the file to open
  • resolve, reject Functions: in the event you want to wrap the open method in your own promises
  • Returns: FileStreamer

Opens filename in read-only mode (0o444) and assigns a file descriptor to it.

FileStreamer.stream()

Create a readable stream instance, begins to read from filename and pass the data to the readable, making it ready for consumption (FileStreamer.open must have been called first, or an error will be emitted).

FileStreamer.unstream()

Stop reading from filename, signals EOF to the readable stream, then resets its reference to it.

FileStreamer.close([resolve, reject])

  • resolve, reject Functions: in the event you want to wrap the close method in your own promises
  • Returns: FileStreamer

Closes filename and resets the references to it and to its file descriptor, thus making the FileStreamer ready to open another file (FileStreamer.unstream must have been called first, or an error will be emitted).

FileStreamer.promise(action[, fileName])

  • action String: what is to be done with the file (either open or close)
  • filename String: the path of the file
  • Returns: Promise<FileStreamer>

Wraps FileStreamer.open and FileStreamer.close methods into promises to use when streaming file contents in async mode.

Events

The following events may be emitted during the life cycle of a FileStreamer instance:

Event fired Condition
file file has been opened and file descriptor was assigned
reading starting to read from file and pass data to readable
paused pause file reads after readable highWaterMark was reached
stopped unstream() was called and readable has been discarded
closed file has been closed and file descriptor reference was reset
error some error occured

All callbacks will be passed the emitting FileStreamer instance except for the error callback which will be passed the emitted error.

Notes

  • What exactly is EOF ?
  • "If you watch closely, you'll notice that the first command you run appears to hang. This happens because the other end of the pipe is not yet connected, and so the kernel suspends the first process until the second process opens the pipe. In Unix jargon, the process is said to be “blocked”, since it is waiting for something to happen." (long story is here)