Last Updated: February 25, 2016
·
946
· forbeslindesay

Syphoning Streams (Piping with Errors)

If you were writing a method to read a file in as a stream and parse it using a streaming parser (e.g. it might be newline separated JSON) you might write something like:

function read(path) {
  return fs.createReadStream(path)
    .pipe(splitNewLines())
    .pipe(parseJSON())
}

If there are no errors, this will work, perfectly. If there is an error in parseJSON it will be emitted on the returned stream, which is also great. If there is an error in reading the file or splitting it into lines though, you have a problem. The error will go unhandled unless you add extra logic to forward the error, which would look something like this:

function read(path) {
  var src = fs.createReadStream(path)
  var destA = src.pipe(splitNewLines())
  var destB = destA.pipe(parseJSON())
  //forward errors
  src.on('error', destA.emit.bind(destA, 'error'))
  destA.on('error', destB.emit.bind(destB, 'error'))
  return destB
}

What you want is a method that works like pipe but forwards the errors. This is exactly what barrage gives you in the syphon method:

var b = require('barrage')
function read(path) {
  return b(fs.createReadStream(path))
    .syphon(b(splitNewLines()))
    .syphon(parseJSON())
}