// Copyright 2013 The Obvious Corporation.
var util = require('util')
var Writable = require('stream').Writable
var S3Queue = require('./Queue')
/**
* Writable stream interface for S3. Inherits stream.Writable
*
* @constructor
* @param {Object} params Same params object as AWS.S3.createMultipartUpload
* @param {String} params.Bucket The S3 bucket your file will go in.
* @param {String} params.Key The full path to the object on S3, within its bucket.
* @param {Object} s3 Authenticated instance of AWS.S3
*/
function S3Stream(params, s3) {
// Initialize the parent stream class
Writable.call(this)
// Cache inputs
this.params = params
this.s3 = s3
// Set the stream's initial state, create a queue, and bind event listeners
this.init()
}
util.inherits(S3Stream, Writable)
module.exports = S3Stream
/**
* Initialize the instance with default values and listeners
*/
S3Stream.prototype.init = function () {
// Setup the stream's initial state and limits
this.uploadPartNumber = 0
this.uploadedParts = []
this.activeUploads = 0
this.stringifyPartNumber = false
// Setup a queue instance
this.setupQueue()
// Bind internal listners
this.setupEvents()
}
/**
* Setup an instance of S3Queue and binds the stream's upload()
* method to the queue's 'drain' event.
*/
S3Stream.prototype.setupQueue = function () {
this.queue = new S3Queue()
this.queue.on('drain', this.upload.bind(this))
}
/**
* Set the minimum size (in bytes) to upload in a non-final part.
*
* @param {Number} bytes Minimum upload size.
*/
S3Stream.prototype.setThreshold = function (bytes) {
this.queue._threshold = bytes
return this
}
/**
* Bind internal events.
*
* Helps methods act more independently and keeps most communication
* in events instead of nested calls.
*/
S3Stream.prototype.setupEvents = function () {
var _this = this
// By default, abort the upload on any error. Unbind this event with:
// `s3stream.removeListener('error', s3stream.abort)`
this.on('error', this.abort.bind(this))
// When uploads complete, track the uploaded part and
// emit drain/flush events
this.on('uploaded', function () {
// 'drain' is emitted when the stream is writable
if (_this.ready()) _this.emit('drain')
// 'flush' is emitted when the stream has no active uploads
if (_this.activeUploads === 0) _this.emit('flush')
})
// Complete the upload when the stream's 'finish' event fires
this.once('finish', this.complete.bind(this));
}
/**
* Abstract stream.Writable method. Writes chunk to the stream.
*
* @private
* @param {Buffer} chunk The data to write to the stream.
* @param {String} encoding Ignored because chunk is always a Buffer.
* @param {Function} callback Called when the chunk is successfully handled.
*/
S3Stream.prototype._write = function (chunk, encoding, callback) {
var _this = this
// On the queue's `push` event, fire the callback if the queue is ready
// for more data. If the queue is not ready, fire the callback on the
// queue's next `drain` event. If the stream has been ended, call its
// complete() method and run the callback on its `complete` event.
this.queue.once('push', function () {
if (_this.ready()) {
callback()
} else {
_this.once('drain', callback)
}
})
// If the queue is initialized, push this chunk
if (this.initialized()) {
this.queue.push(chunk)
// If the queue is not initialized yet, push this chunk when it fires
// its `writable` event
} else {
this.once('writable', function () {
_this.queue.push(chunk)
})
}
}
/**
* Check whether the stream is ready to accept data.
*
* @return {Boolean} True if the stream is ready.
*/
S3Stream.prototype.ready = function () {
return !! (this.initialized() && this.activeUploads === 0)
}
/**
* Check whether the stream has been initialized with an S3 upload ID.
*
* @return {Boolean} True if the stream has an upload ID.
*/
S3Stream.prototype.initialized = function () {
return !! this.params.UploadId
}
/**
* Abort the stream's multi-part upload.
*
* By default, this method will be called when any error is emitted from
* the queue so that the zombie parts are not left hanging around S3.
*/
S3Stream.prototype.abort = function () {
this.params.UploadId = null
this.s3.abortMultipartUpload(this.params).send()
}
/**
* Helper to generate an object of params used by multiple S3 upload methods.
*
* @param {Object} extraParams Additional params to merge with the default params.
* @return {Object} Merged params.
*/
S3Stream.prototype.getUploadParams = function (extraParams) {
var params = {
Bucket: this.params.Bucket,
Key: this.params.Key,
UploadId: this.params.UploadId
}
Object.keys(extraParams || {}).forEach(function (key) {
params[key] = extraParams[key]
})
if (this.stringifyPartNumber && params.PartNumber) {
params.PartNumber = params.PartNumber.toString()
}
return params
}
/**
* Upload a chunk of data to S3. Emits an 'uploaded' event on completion.
*
* @param {Buffer} body Chunk of data to upload.
*/
S3Stream.prototype.upload = function (body) {
var _this = this
this.uploadPartNumber++
var partNumber = this.uploadPartNumber
var params = this.getUploadParams({
Body: body,
PartNumber: partNumber
})
this.activeUploads++
this.s3.uploadPart(params, function (err, response) {
_this.activeUploads--
// Handle cases where we get an invalid type but might still be able to duck punch our
// way into a valid type. This allows Canoe to work with any version of the AWS SDK, even
// though we have no way to actually know what version we're using.
//
// If we get an InvalidParameterType error, we'll set a flag to cast PartNumber params
// as strings and try again. We'll also decrement the upload part number because S3 never
// got that part.
//
// See: https://github.com/Obvious/canoe/pull/22
var retryAsString = err && err.name === 'InvalidParameterType' && ! _this.stringifyPartNumber
if (retryAsString) {
_this.uploadPartNumber--
_this.stringifyPartNumber = true
return _this.upload(body)
} else if (err) {
_this.emit('error', err)
} else {
// Maintain `ETag` and `PartNumber` data about each uploaded part
// so that the data can be combined when the upload is complete.
_this.uploadedParts.push({
ETag: response.ETag,
PartNumber: partNumber
})
}
_this.emit('uploaded', err, response, body)
})
}
/**
* Finishes a multi-part upload. Any remaning data is flushed from the
* underlying queue. Once all parts have been uploaded to S3 and the
* 'flush' event is fired, completion starts. The stream emits a 'complete'
* event when the completed upload finishes with `err` and `response`
* arguments.
*/
S3Stream.prototype.complete = function () {
var _this = this
// Ensure this is called at most once, and only runs after the
// writable event has occured so the queue has a chance to accept data.
if (!this.initialized()) return this.once('writable', this.complete)
this.queue.drain()
this.once('flush', function () {
// Parts have to be sorted in ascending order
_this.uploadedParts.sort(function (a, b) {
return a.PartNumber < b.PartNumber ? -1 : 1
})
var params = _this.getUploadParams({
MultipartUpload: {Parts: _this.uploadedParts}
})
_this.s3.completeMultipartUpload(params, function (err, response) {
if (err) _this.emit('error', err)
_this.emit('close', err, response)
})
})
}