| Server IP : 68.183.124.220 / Your IP : 216.73.217.137 Web Server : Apache/2.4.18 (Ubuntu) System : Linux Sandbox-A 4.4.0-210-generic #242-Ubuntu SMP Fri Apr 16 09:57:56 UTC 2021 x86_64 User : gavin ( 1000) PHP Version : 7.0.33-0ubuntu0.16.04.16 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /home/gavin/workspace/readjs/node_modules/mongodb-core/lib/connection/ |
Upload File : |
"use strict";
var inherits = require('util').inherits,
EventEmitter = require('events').EventEmitter,
Connection = require('./connection'),
MongoError = require('../error'),
Logger = require('./logger'),
f = require('util').format,
Query = require('./commands').Query,
CommandResult = require('./command_result'),
assign = require('../topologies/shared').assign;
var MongoCR = require('../auth/mongocr')
, X509 = require('../auth/x509')
, Plain = require('../auth/plain')
, GSSAPI = require('../auth/gssapi')
, SSPI = require('../auth/sspi')
, ScramSHA1 = require('../auth/scram');
var DISCONNECTED = 'disconnected';
var CONNECTING = 'connecting';
var CONNECTED = 'connected';
var DESTROYING = 'destroying';
var DESTROYED = 'destroyed';
var _id = 0;
/**
* Creates a new Pool instance
* @class
* @param {string} options.host The server host
* @param {number} options.port The server port
* @param {number} [options.size=1] Max server connection pool size
* @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
* @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
* @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
* @param {number} [options.socketTimeout=0] TCP Socket timeout setting
* @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
* @param {boolean} [options.ssl=false] Use SSL for connection
* @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
* @param {Buffer} [options.ca] SSL Certificate store binary buffer
* @param {Buffer} [options.cert] SSL Certificate binary buffer
* @param {Buffer} [options.key] SSL Key file binary buffer
* @param {string} [options.passPhrase] SSL Certificate pass phrase
* @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
* @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
* @fires Pool#connect
* @fires Pool#close
* @fires Pool#error
* @fires Pool#timeout
* @fires Pool#parseError
* @return {Pool} A cursor instance
*/
var Pool = function(options) {
var self = this;
// Add event listener
EventEmitter.call(this);
// Add the options
this.options = assign({
// Host and port settings
host: 'localhost',
port: 27017,
// Pool default max size
size: 5,
// socket settings
connectionTimeout: 30000,
socketTimeout: 30000,
keepAlive: true,
keepAliveInitialDelay: 0,
noDelay: true,
// SSL Settings
ssl: false, checkServerIdentity: true,
ca: null, cert: null, key: null, passPhrase: null,
rejectUnauthorized: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: false,
// Reconnection options
reconnect: true,
reconnectInterval: 1000,
reconnectTries: 30,
// Enable domains
domainsEnabled: false
}, options);
// Identification information
this.id = _id++;
// Current reconnect retries
this.retriesLeft = this.options.reconnectTries;
this.reconnectId = null;
// No bson parser passed in
if(!options.bson || (options.bson
&& (typeof options.bson.serialize != 'function' || typeof options.bson.deserialize != 'function'))) throw new Error("must pass in valid bson parser");
// Logger instance
this.logger = Logger('Pool', options);
// Pool state
this.state = DISCONNECTED;
// Connections
this.availableConnections = [];
this.inUseConnections = [];
this.connectingConnections = [];
// Currently executing
this.executing = false;
// Operation work queue
this.queue = [];
// All the authProviders
this.authProviders = options.authProviders || {
'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
, 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
, 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
}
// Are we currently authenticating
this.authenticating = false;
this.loggingout = false;
this.nonAuthenticatedConnections = [];
this.authenticatingTimestamp = null;
// Number of consecutive timeouts caught
this.numberOfConsecutiveTimeouts = 0;
}
inherits(Pool, EventEmitter);
Object.defineProperty(Pool.prototype, 'size', {
enumerable:true,
get: function() { return this.options.size; }
});
Object.defineProperty(Pool.prototype, 'connectionTimeout', {
enumerable:true,
get: function() { return this.options.connectionTimeout; }
});
Object.defineProperty(Pool.prototype, 'socketTimeout', {
enumerable:true,
get: function() { return this.options.socketTimeout; }
});
function stateTransition(self, newState) {
var legalTransitions = {
'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
'connected': [CONNECTED, DISCONNECTED, DESTROYING],
'destroying': [DESTROYING, DESTROYED],
'destroyed': [DESTROYED]
}
// Get current state
var legalStates = legalTransitions[self.state];
if(legalStates && legalStates.indexOf(newState) != -1) {
self.state = newState;
} else {
self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
, self.id, self.state, newState, legalStates));
}
}
function authenticate(pool, auth, connection, cb) {
if(auth[0] === undefined) return cb(null);
// We need to authenticate the server
var mechanism = auth[0];
var db = auth[1];
// Validate if the mechanism exists
if(!pool.authProviders[mechanism]) {
throw new MongoError(f('authMechanism %s not supported', mechanism));
}
// Get the provider
var provider = pool.authProviders[mechanism];
// Authenticate using the provided mechanism
provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
}
// The write function used by the authentication mechanism (bypasses external)
function write(self) {
return function(connection, buffer, callback) {
// Ensure we stop auth if pool was destroyed
if(self.state == DESTROYED || self.state == DESTROYING) {
return callback(new MongoError('pool destroyed'));
}
// Set the connection workItem callback
connection.workItem = {cb: callback, command: true};
// Write the buffer out to the connection
connection.write(buffer);
};
}
function reauthenticate(pool, connection, cb) {
// Authenticate
function authenticateAgainstProvider(pool, connection, providers, cb) {
// Finished re-authenticating against providers
if(providers.length == 0) return cb();
// Get the provider name
var provider = pool.authProviders[providers.pop()];
// Auth provider
provider.reauthenticate(write(pool), [connection], function(err, r) {
// We got an error return immediately
if(err) return cb(err);
// Continue authenticating the connection
authenticateAgainstProvider(pool, connection, providers, cb);
});
}
// Start re-authenticating process
authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
}
function connectionFailureHandler(self, event) {
return function(err) {
if (this._connectionFailHandled) return;
this._connectionFailHandled = true;
// Destroy the connection
this.destroy();
// Remove the connection
removeConnection(self, this);
// Flush out the callback if there is one
if(this.workItem && this.workItem.cb) {
var workItem = this.workItem;
this.workItem = null;
workItem.cb(err);
}
// Did we catch a timeout, increment the numberOfConsecutiveTimeouts
if(event == 'timeout') {
self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
// Have we timed out more than reconnectTries in a row ?
// Force close the pool as we are trying to connect to tcp sink hole
if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
self.numberOfConsecutiveTimeouts = 0;
// Destroy all connections and pool
self.destroy(true);
// Emit close event
return self.emit('close', self);
}
}
// No more socket available propegate the event
if(self.socketCount() == 0) {
if(self.state != DESTROYED && self.state != DESTROYING) {
stateTransition(self, DISCONNECTED);
}
// Do not emit error events, they are always close events
// do not trigger the low level error handler in node
event = event == 'error' ? 'close' : event;
self.emit(event, err);
}
// Start reconnection attempts
if(!self.reconnectId && self.options.reconnect) {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
};
}
function attemptReconnect(self) {
return function() {
self.emit('attemptReconnect', self);
if(self.state == DESTROYED || self.state == DESTROYING) return;
// We are connected do not try again
if(self.isConnected()) {
self.reconnectId = null;
return;
}
// If we have failure schedule a retry
function _connectionFailureHandler(self, event) {
return function() {
if (this._connectionFailHandled) return;
this._connectionFailHandled = true;
// Destroy the connection
this.destroy();
// Count down the number of reconnects
self.retriesLeft = self.retriesLeft - 1;
// How many retries are left
if(self.retriesLeft == 0) {
// Destroy the instance
self.destroy();
// Emit close event
self.emit('reconnectFailed'
, new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
} else {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
}
}
// Got a connect handler
function _connectHandler(self) {
return function() {
// Assign
var connection = this;
// Pool destroyed stop the connection
if(self.state == DESTROYED || self.state == DESTROYING) {
return connection.destroy();
}
// Clear out all handlers
handlers.forEach(function(event) {
connection.removeAllListeners(event);
});
// Reset reconnect id
self.reconnectId = null;
// Apply pool connection handlers
connection.on('error', connectionFailureHandler(self, 'error'));
connection.on('close', connectionFailureHandler(self, 'close'));
connection.on('timeout', connectionFailureHandler(self, 'timeout'));
connection.on('parseError', connectionFailureHandler(self, 'parseError'));
// Apply any auth to the connection
reauthenticate(self, this, function(err) {
// Reset retries
self.retriesLeft = self.options.reconnectTries;
// Push to available connections
self.availableConnections.push(connection);
// Emit reconnect event
self.emit('reconnect', self);
// Trigger execute to start everything up again
_execute(self)();
});
}
}
// Create a connection
var connection = new Connection(messageHandler(self), self.options);
// Add handlers
connection.on('close', _connectionFailureHandler(self, 'close'));
connection.on('error', _connectionFailureHandler(self, 'error'));
connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
// On connection
connection.on('connect', _connectHandler(self));
// Attempt connection
connection.connect();
}
}
function moveConnectionBetween(connection, from, to) {
var index = from.indexOf(connection);
// Move the connection from connecting to available
if(index != -1) {
from.splice(index, 1);
to.push(connection);
}
}
function messageHandler(self) {
return function(message, connection) {
// Get the callback
var workItem = connection.workItem;
// Reset timeout counter
self.numberOfConsecutiveTimeouts = 0;
// Reset the connection timeout if we modified it for
// this operation
if(workItem.socketTimeout) {
connection.resetSocketTimeout();
}
// Log if debug enabled
if(self.logger.isDebug()) {
self.logger.debug(f('message [%s] received from %s:%s'
, message.raw.toString('hex'), self.options.host, self.options.port));
}
// Authenticate any straggler connections
function authenticateStragglers(self, connection, callback) {
// Get any non authenticated connections
var connections = self.nonAuthenticatedConnections.slice(0);
var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
self.nonAuthenticatedConnections = [];
// Establish if the connection need to be authenticated
// Add to authentication list if
// 1. we were in an authentication process when the operation was executed
// 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
if(connection.workItem.authenticating == true
|| (typeof connection.workItem.authenticatingTimestamp == 'number'
&& connection.workItem.authenticatingTimestamp != self.authenticatingTimestamp)) {
// Add connection to the list
connections.push(connection);
}
// Clear out workItem
connection.workItem = null;
// No connections need to be re-authenticated
if(connections.length == 0) {
// Release the connection back to the pool
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
// Finish
return callback();
}
// Apply re-authentication to all connections before releasing back to pool
var connectionCount = connections.length;
// Authenticate all connections
for(var i = 0; i < connectionCount; i++) {
reauthenticate(self, connections[i], function(err) {
connectionCount = connectionCount - 1;
if(connectionCount == 0) {
// Put non authenticated connections in available connections
self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
// Release the connection back to the pool
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
// Return
callback();
}
});
}
}
function handleOperationCallback(self, cb, err, result) {
// No domain enabled
if(!self.options.domainsEnabled) {
return process.nextTick(function() {
return cb(err, result);
});
}
// Domain enabled just call the callback
cb(err, result);
}
authenticateStragglers(self, connection, function(err) {
// Keep executing, ensure current message handler does not stop execution
process.nextTick(function() {
_execute(self)();
});
// Time to dispatch the message if we have a callback
if(!workItem.immediateRelease) {
try {
// Parse the message according to the provided options
message.parse(workItem);
} catch(err) {
return handleOperationCallback(self, workItem.cb, MongoError.create(err));
}
// Establish if we have an error
if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
|| message.documents[0]['errmsg'] || message.documents[0]['code'])) {
return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
}
// Add the connection details
message.hashedName = connection.hashedName;
// Return the documents
handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
}
});
}
}
/**
* Return the total socket count in the pool.
* @method
* @return {Number} The number of socket available.
*/
Pool.prototype.socketCount = function() {
return this.availableConnections.length
+ this.inUseConnections.length
+ this.connectingConnections.length;
}
/**
* Return all pool connections
* @method
* @return {Connectio[]} The pool connections
*/
Pool.prototype.allConnections = function() {
return this.availableConnections
.concat(this.inUseConnections)
.concat(this.connectingConnections);
}
/**
* Get a pool connection (round-robin)
* @method
* @return {Connection}
*/
Pool.prototype.get = function() {
return this.allConnections()[0];
}
/**
* Is the pool connected
* @method
* @return {boolean}
*/
Pool.prototype.isConnected = function() {
// We are in a destroyed state
if(this.state == DESTROYED || this.state == DESTROYING) {
return false;
}
// Get connections
var connections = this.availableConnections
.concat(this.inUseConnections)
.concat(this.connectingConnections);
for(var i = 0; i < connections.length; i++) {
if(connections[i].isConnected()) return true;
}
// Might be authenticating, but we are still connected
if(connections.length == 0 && this.authenticating) {
return true
}
// Not connected
return false;
}
/**
* Was the pool destroyed
* @method
* @return {boolean}
*/
Pool.prototype.isDestroyed = function() {
return this.state == DESTROYED || this.state == DESTROYING;
}
/**
* Is the pool in a disconnected state
* @method
* @return {boolean}
*/
Pool.prototype.isDisconnected = function() {
return this.state == DISCONNECTED;
}
/**
* Connect pool
* @method
*/
Pool.prototype.connect = function(auth) {
if(this.state != DISCONNECTED) throw new MongoError('connection in unlawful state ' + this.state);
var self = this;
// Transition to connecting state
stateTransition(this, CONNECTING);
// Create an array of the arguments
var args = Array.prototype.slice.call(arguments, 0);
// Create a connection
var connection = new Connection(messageHandler(self), this.options);
// Add to list of connections
this.connectingConnections.push(connection);
// Add listeners to the connection
connection.once('connect', function(connection) {
if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
// Apply any store credentials
reauthenticate(self, connection, function(err) {
if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
// We have an error emit it
if(err) {
// Destroy the pool
self.destroy();
// Emit the error
return self.emit('error', err);
}
// Authenticate
authenticate(self, args, connection, function(err) {
if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
// We have an error emit it
if(err) {
// Destroy the pool
self.destroy();
// Emit the error
return self.emit('error', err);
}
// Set connected mode
stateTransition(self, CONNECTED);
// Move the active connection
moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
// Emit the connect event
self.emit('connect', self);
});
});
});
// Add error handlers
connection.once('error', connectionFailureHandler(this, 'error'));
connection.once('close', connectionFailureHandler(this, 'close'));
connection.once('timeout', connectionFailureHandler(this, 'timeout'));
connection.once('parseError', connectionFailureHandler(this, 'parseError'));
try {
connection.connect();
} catch(err) {
// SSL or something threw on connect
self.emit('error', err);
}
}
/**
* Authenticate using a specified mechanism
* @method
* @param {string} mechanism The Auth mechanism we are invoking
* @param {string} db The db we are invoking the mechanism against
* @param {...object} param Parameters for the specific mechanism
* @param {authResultCallback} callback A callback function
*/
Pool.prototype.auth = function(mechanism, db) {
var self = this;
var args = Array.prototype.slice.call(arguments, 0);
var callback = args.pop();
// If we are not connected don't allow additonal authentications to happen
// if(this.state != CONNECTED) throw new MongoError('connection in unlawful state ' + this.state);
// If we don't have the mechanism fail
if(self.authProviders[mechanism] == null && mechanism != 'default') {
throw new MongoError(f("auth provider %s does not exist", mechanism));
}
// Signal that we are authenticating a new set of credentials
this.authenticating = true;
this.authenticatingTimestamp = new Date().getTime();
// Authenticate all live connections
function authenticateLiveConnections(self, args, cb) {
// Get the current viable connections
var connections = self.availableConnections;
// Allow nothing else to use the connections while we authenticate them
self.availableConnections = [];
var connectionsCount = connections.length;
var error = null;
// No connections available, return
if(connectionsCount == 0) return callback(null);
// Authenticate the connections
for(var i = 0; i < connections.length; i++) {
authenticate(self, args, connections[i], function(err) {
connectionsCount = connectionsCount - 1;
// Store the error
if(err) error = err;
// Processed all connections
if(connectionsCount == 0) {
// Auth finished
self.authenticating = false;
// Add the connections back to available connections
self.availableConnections = self.availableConnections.concat(connections);
// We had an error, return it
if(error) {
// Log the error
if(self.logger.isError()) {
self.logger.error(f('[%s] failed to authenticate against server %s:%s'
, self.id, self.options.host, self.options.port));
}
return cb(error);
}
cb(null);
}
});
}
}
// Wait for a logout in process to happen
function waitForLogout(self, cb) {
if(!self.loggingout) return cb();
setTimeout(function() {
waitForLogout(self, cb);
}, 1)
}
// Wait for loggout to finish
waitForLogout(self, function() {
// Authenticate all live connections
authenticateLiveConnections(self, args, function(err) {
// Credentials correctly stored in auth provider if successful
// Any new connections will now reauthenticate correctly
self.authenticating = false;
// Return after authentication connections
callback(err);
});
});
}
/**
* Logout all users against a database
* @method
* @param {string} dbName The database name
* @param {authResultCallback} callback A callback function
*/
Pool.prototype.logout = function(dbName, callback) {
var self = this;
if(typeof dbName != 'string') throw new MongoError('logout method requires a db name as first argument');
if(typeof callback != 'function') throw new MongoError('logout method requires a callback');
// Indicate logout in process
this.loggingout = true;
// Get all relevant connections
var connections = self.availableConnections.concat(self.inUseConnections);
var count = connections.length;
// Store any error
var error = null;
// Send logout command over all the connections
for(var i = 0; i < connections.length; i++) {
var query = new Query(this.options.bson
, f('%s.$cmd', dbName)
, {logout:1}, {numberToSkip: 0, numberToReturn: 1});
write(self)(connections[i], query.toBin(), function(err, r) {
count = count - 1;
if(err) error = err;
if(count == 0) {
self.loggingout = false;
callback(error);
};
});
}
}
/**
* Unref the pool
* @method
*/
Pool.prototype.unref = function() {
// Get all the known connections
var connections = this.availableConnections
.concat(this.inUseConnections)
.concat(this.connectingConnections);
connections.forEach(function(c) {
c.unref();
});
}
// Events
var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
// Destroy the connections
function destroy(self, connections) {
// Destroy all connections
connections.forEach(function(c) {
// Remove all listeners
for(var i = 0; i < events.length; i++) {
c.removeAllListeners(events[i]);
}
// Destroy connection
c.destroy();
});
// Zero out all connections
self.inUseConnections = [];
self.availableConnections = [];
self.nonAuthenticatedConnections = [];
self.connectingConnections = [];
// Set state to destroyed
stateTransition(self, DESTROYED);
}
/**
* Destroy pool
* @method
*/
Pool.prototype.destroy = function(force) {
var self = this;
// Do not try again if the pool is already dead
if(this.state == DESTROYED || self.state == DESTROYING) return;
// Set state to destroyed
stateTransition(this, DESTROYING);
// Are we force closing
if(force) {
// Get all the known connections
var connections = self.availableConnections
.concat(self.inUseConnections)
.concat(self.nonAuthenticatedConnections)
.concat(self.connectingConnections);
return destroy(self, connections);
}
// Wait for the operations to drain before we close the pool
function checkStatus() {
if(self.queue.length == 0) {
// Get all the known connections
var connections = self.availableConnections
.concat(self.inUseConnections)
.concat(self.nonAuthenticatedConnections)
.concat(self.connectingConnections);
// Check if we have any in flight operations
for(var i = 0; i < connections.length; i++) {
// There is an operation still in flight, reschedule a
// check waiting for it to drain
if(connections[i].workItem) {
return setTimeout(checkStatus, 1);
}
}
destroy(self, connections);
} else {
setTimeout(checkStatus, 1);
}
}
// Initiate drain of operations
checkStatus();
}
/**
* Write a message to MongoDB
* @method
* @return {Connection}
*/
Pool.prototype.write = function(buffer, options, cb) {
// Ensure we have a callback
if(typeof options == 'function') {
cb = options;
}
// Always have options
options = options || {};
// Pool was destroyed error out
if(this.state == DESTROYED || this.state == DESTROYING) {
// Callback with an error
if(cb) {
try {
cb(new MongoError('pool destroyed'));
} catch(err) {
process.nextTick(function() { throw err; });
}
}
return;
}
if(this.options.domainsEnabled
&& process.domain && typeof cb === "function") {
// if we have a domain bind to it
var oldCb = cb;
cb = process.domain.bind(function() {
// v8 - argumentsToArray one-liner
var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
// bounce off event loop so domain switch takes place
process.nextTick(function() {
oldCb.apply(null, args);
});
});
}
// Do we have an operation
var operation = {
buffer:buffer, cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
};
// Set the options for the parsing
operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
operation.documentsReturnedIn = options.documentsReturnedIn;
operation.command = typeof options.command == 'boolean' ? options.command : false;
operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
// Optional per operation socketTimeout
operation.socketTimeout = options.socketTimeout;
operation.monitoring = options.monitoring;
// We need to have a callback function unless the message returns no response
if(!(typeof cb == 'function') && !options.noResponse) {
throw new MongoError('write method must provide a callback');
}
// If we have a monitoring operation schedule as the very first operation
// Otherwise add to back of queue
if(options.monitoring) {
this.queue.unshift(operation);
} else {
this.queue.push(operation);
}
// Attempt to execute the operation
_execute(this)();
}
// Remove connection method
function remove(connection, connections) {
for(var i = 0; i < connections.length; i++) {
if(connections[i] === connection) {
connections.splice(i, 1);
return true;
}
}
}
function removeConnection(self, connection) {
if(remove(connection, self.availableConnections)) return;
if(remove(connection, self.inUseConnections)) return;
if(remove(connection, self.connectingConnections)) return;
if(remove(connection, self.nonAuthenticatedConnections)) return;
}
// All event handlers
var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
function _createConnection(self) {
var connection = new Connection(messageHandler(self), self.options);
// Push the connection
self.connectingConnections.push(connection);
// Handle any errors
var tempErrorHandler = function(_connection) {
return function(err) {
// Destroy the connection
_connection.destroy();
// Remove the connection from the connectingConnections list
removeConnection(self, _connection);
// Start reconnection attempts
if(!self.reconnectId && self.options.reconnect) {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
}
}
// Handle successful connection
var tempConnectHandler = function(_connection) {
return function() {
// Destroyed state return
if(self.state == DESTROYED || self.state == DESTROYING) {
// Remove the connection from the list
removeConnection(self, _connection);
return _connection.destroy();
}
// Destroy all event emitters
handlers.forEach(function(e) {
_connection.removeAllListeners(e);
});
// Add the final handlers
_connection.once('close', connectionFailureHandler(self, 'close'));
_connection.once('error', connectionFailureHandler(self, 'error'));
_connection.once('timeout', connectionFailureHandler(self, 'timeout'));
_connection.once('parseError', connectionFailureHandler(self, 'parseError'));
// Signal
reauthenticate(self, _connection, function(err) {
if(self.state == DESTROYED || self.state == DESTROYING) {
return _connection.destroy();
}
// Remove the connection from the connectingConnections list
removeConnection(self, _connection);
// Handle error
if(err) {
return _connection.destroy();
}
// If we are authenticating at the moment
// Do not automatially put in available connections
// As we need to apply the credentials first
if(self.authenticating) {
self.nonAuthenticatedConnections.push(_connection);
} else {
// Push to available
self.availableConnections.push(_connection);
// Execute any work waiting
_execute(self)();
}
});
}
}
// Add all handlers
connection.once('close', tempErrorHandler(connection));
connection.once('error', tempErrorHandler(connection));
connection.once('timeout', tempErrorHandler(connection));
connection.once('parseError', tempErrorHandler(connection));
connection.once('connect', tempConnectHandler(connection));
// Start connection
connection.connect();
}
function flushMonitoringOperations(queue) {
for(var i = 0; i < queue.length; i++) {
if(queue[i].monitoring) {
var workItem = queue[i];
queue.splice(i, 1);
workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
}
}
}
function _execute(self) {
return function() {
if(self.state == DESTROYED) return;
// Already executing, skip
if(self.executing) return;
// Set pool as executing
self.executing = true;
// Wait for auth to clear before continuing
function waitForAuth(cb) {
if(!self.authenticating) return cb();
// Wait for a milisecond and try again
setTimeout(function() {
waitForAuth(cb);
}, 1);
}
// Block on any auth in process
waitForAuth(function() {
// As long as we have available connections
while(true) {
// Total availble connections
var totalConnections = self.availableConnections.length
+ self.connectingConnections.length
+ self.inUseConnections.length;
// Have we not reached the max connection size yet
if(self.availableConnections.length == 0
&& self.connectingConnections.length == 0
&& totalConnections < self.options.size
&& self.queue.length > 0) {
// Create a new connection
_createConnection(self);
// Attempt to execute again
self.executing = false;
return;
}
// No available connections available, flush any monitoring ops
if(self.availableConnections.length == 0) {
// Flush any monitoring operations
flushMonitoringOperations(self.queue);
break;
}
// No queue break
if(self.queue.length == 0) {
break;
}
// Get a connection
var connection = self.availableConnections.pop();
if(connection.isConnected()) {
// Get the next work item
var workItem = self.queue.shift();
// Get actual binary commands
var buffer = workItem.buffer;
// Add connection to workers in flight
self.inUseConnections.push(connection);
// Set current status of authentication process
workItem.authenticating = self.authenticating;
workItem.authenticatingTimestamp = self.authenticatingTimestamp;
// Add current associated callback to the connection
connection.workItem = workItem
// We have a custom socketTimeout
if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
connection.setSocketTimeout(workItem.socketTimeout);
}
// Put operation on the wire
if(Array.isArray(buffer)) {
for(var i = 0; i < buffer.length; i++) {
connection.write(buffer[i])
}
} else {
connection.write(buffer);
}
// Fire and forgot message, release the socket
if(workItem.immediateRelease && !self.authenticating) {
self.inUseConnections.pop();
self.availableConnections.push(connection);
} else if(workItem.immediateRelease && self.authenticating) {
self.inUseConnections.pop();
self.nonAuthenticatedConnections.push(connection);
}
} else {
flushMonitoringOperations(self.queue);
}
}
});
self.executing = false;
}
}
/**
* A server connect event, used to verify that the connection is up and running
*
* @event Pool#connect
* @type {Pool}
*/
/**
* A server reconnect event, used to verify that pool reconnected.
*
* @event Pool#reconnect
* @type {Pool}
*/
/**
* The server connection closed, all pool connections closed
*
* @event Pool#close
* @type {Pool}
*/
/**
* The server connection caused an error, all pool connections closed
*
* @event Pool#error
* @type {Pool}
*/
/**
* The server connection timed out, all pool connections closed
*
* @event Pool#timeout
* @type {Pool}
*/
/**
* The driver experienced an invalid message, all pool connections closed
*
* @event Pool#parseError
* @type {Pool}
*/
/**
* The driver attempted to reconnect
*
* @event Pool#attemptReconnect
* @type {Pool}
*/
/**
* The driver exhausted all reconnect attempts
*
* @event Pool#reconnectFailed
* @type {Pool}
*/
module.exports = Pool;