123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- var mysql
- , Pooling = require('generic-pool')
- , Query = require("./query")
- , Utils = require("../../utils")
- , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
- module.exports = (function() {
- var ConnectorManager = function(sequelize, config) {
- try {
- if (config.dialectModulePath) {
- mysql = require(config.dialectModulePath)
- } else {
- mysql = require('mysql')
- }
- } catch (err) {
- console.log('You need to install mysql package manually')
- }
- this.sequelize = sequelize
- this.client = null
- this.config = config || {}
- this.config.port = this.config.port || 3306
- this.disconnectTimeoutId = null
- this.queue = []
- this.activeQueue = []
- this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
- this.poolCfg = Utils._.defaults(this.config.pool, {
- maxConnections: 10,
- minConnections: 0,
- maxIdleTime: 1000,
- handleDisconnects: false,
- validate: validateConnection
- });
- this.pendingQueries = 0;
- this.useReplicaton = !!config.replication;
- this.useQueue = config.queue !== undefined ? config.queue : true;
- var self = this
- if (this.useReplicaton) {
- var reads = 0
- , writes = 0;
- // Init configs with options from config if not present
- for (var i in config.replication.read) {
- config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
- host: this.config.host,
- port: this.config.port,
- username: this.config.username,
- password: this.config.password,
- database: this.config.database
- });
- }
- config.replication.write = Utils._.defaults(config.replication.write, {
- host: this.config.host,
- port: this.config.port,
- username: this.config.username,
- password: this.config.password,
- database: this.config.database
- });
- // I'll make my own pool, with blackjack and hookers!
- this.pool = {
- release: function (client) {
- if (client.queryType == 'read') {
- return this.read.release(client);
- } else {
- return this.write.release(client);
- }
- },
- acquire: function (callback, priority, queryType) {
- if (queryType == 'SELECT') {
- this.read.acquire(callback, priority);
- } else {
- this.write.acquire(callback, priority);
- }
- },
- drain: function () {
- this.read.drain();
- this.write.drain();
- },
- read: Pooling.Pool({
- name: 'sequelize-read',
- create: function (done) {
- if (reads >= self.config.replication.read.length) {
- reads = 0
- }
- var config = self.config.replication.read[reads++];
- connect.call(self, function (err, connection) {
- if (connection) {
- connection.queryType = 'read'
- }
- done(err, connection)
- }, config);
- },
- destroy: function(client) {
- disconnect.call(self, client)
- },
- validate: self.poolCfg.validate,
- max: self.poolCfg.maxConnections,
- min: self.poolCfg.minConnections,
- idleTimeoutMillis: self.poolCfg.maxIdleTime
- }),
- write: Pooling.Pool({
- name: 'sequelize-write',
- create: function (done) {
- connect.call(self, function (err, connection) {
- if (connection) {
- connection.queryType = 'read'
- }
- done(err, connection)
- }, self.config.replication.write);
- },
- destroy: function(client) {
- disconnect.call(self, client)
- },
- validate: self.poolCfg.validate,
- max: self.poolCfg.maxConnections,
- min: self.poolCfg.minConnections,
- idleTimeoutMillis: self.poolCfg.maxIdleTime
- })
- };
- } else if (this.poolCfg) {
- //the user has requested pooling, so create our connection pool
- this.pool = Pooling.Pool({
- name: 'sequelize-mysql',
- create: function (done) {
- connect.call(self, function (err, connection) {
- // This has to be nested for some reason, else the error won't propagate correctly
- done(err, connection);
- })
- },
- destroy: function(client) {
- disconnect.call(self, client)
- },
- max: self.poolCfg.maxConnections,
- min: self.poolCfg.minConnections,
- validate: self.poolCfg.validate,
- idleTimeoutMillis: self.poolCfg.maxIdleTime
- })
- }
- this.onProcessExit = function () {
- //be nice & close our connections on exit
- if (self.pool) {
- self.pool.drain()
- } else if (self.client) {
- disconnect(self.client)
- }
- return
- }.bind(this);
-
- process.on('exit', this.onProcessExit)
- }
- Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
- ConnectorManager.prototype.query = function(sql, callee, options) {
- if (this.useQueue) {
- // If queueing we'll let the execQueueItem method handle connecting
- var queueItem = {
- query: new Query(null, this.sequelize, callee, options || {}),
- sql: sql
- };
- queueItem.query.options.uuid = this.config.uuid
- enqueue.call(this, queueItem, options);
- return queueItem.query;
- }
- var self = this, query = new Query(null, this.sequelize, callee, options || {});
- this.pendingQueries++;
- query.options.uuid = this.config.uuid
- query.done(function() {
- self.pendingQueries--;
- if (self.pool) {
- self.pool.release(query.client);
- } else {
- if (self.pendingQueries === 0) {
- setTimeout(function() {
- if (self.pendingQueries === 0){
- self.disconnect.call(self);
- }
- }, 100);
- }
- }
- });
- this.getConnection(options, function (err, client) {
- if (err) {
- return query.emit('error', err)
- }
- query.client = client
- query.run(sql)
- });
- return query;
- };
- ConnectorManager.prototype.getConnection = function(options, callback) {
- var self = this;
- if (typeof options === "function") {
- callback = options;
- options = {};
- }
- return new Utils.CustomEventEmitter(function (emitter) {
- if (!self.pool) {
- // Regular client caching
- if (self.client) {
- return emitter.emit('success', self.client);
- } else {
- // Cache for concurrent queries
- if (self._getConnection) {
- self._getConnection.proxy(emitter);
- return;
- }
- // Set cache and acquire connection
- self._getConnection = emitter;
- connect.call(self, function(err, client) {
- if (err) {
- return emitter.emit('error', err);
- }
- // Unset caching, should now be caught by the self.client check
- self._getConnection = null;
- self.client = client;
- emitter.emit('success', client);
- });
- }
- }
- if (self.pool) {
- // Acquire from pool
- self.pool.acquire(function(err, client) {
- if (err) {
- return emitter.emit('error', err);
- }
- emitter.emit('success', client);
- }, options.priority, options.type);
- }
- }).run().done(callback);
- };
- ConnectorManager.prototype.disconnect = function() {
- if (this.client) {
- disconnect.call(this, this.client)
- }
- return
- };
- // private
- var disconnect = function(client) {
- var self = this;
- this.client = null;
- client.end(function() {
- if (!self.useQueue) {
- return client.destroy();
- }
- var intervalObj = null
- var cleanup = function () {
- var retryCt = 0
- // make sure to let client finish before calling destroy
- if (client._queue && (client._queue.length > 0)) {
- return
- }
- // needed to prevent mysql connection leak
- client.destroy()
- clearInterval(intervalObj)
- }
- intervalObj = setInterval(cleanup, 10)
- cleanup()
- return
- })
- }
- var connect = function(done, config) {
- config = config || this.config
- var emitter = new (require('events').EventEmitter)()
- var connectionConfig = {
- host: config.host,
- port: config.port,
- user: config.username,
- password: config.password,
- database: config.database,
- timezone: 'Z'
- };
- if (config.dialectOptions) {
- Object.keys(config.dialectOptions).forEach(function (key) {
- connectionConfig[key] = config.dialectOptions[key];
- });
- }
- var connection = mysql.createConnection(connectionConfig);
- connection.connect(function(err) {
- if (err) {
- switch(err.code) {
- case 'ECONNREFUSED':
- case 'ER_ACCESS_D2ENIED_ERROR':
- emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
- break
- case 'ENOTFOUND':
- case 'EHOSTUNREACH':
- case 'EINVAL':
- emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
- break
- default:
- emitter.emit('error', err);
- break;
- }
- return;
- }
- emitter.emit('success', connection);
- })
- connection.query("SET time_zone = '+0:00'");
- // client.setMaxListeners(self.maxConcurrentQueries)
- this.isConnecting = false
- if (config.pool !== null && config.pool.handleDisconnects) {
- handleDisconnect(this.pool, connection)
- }
- emitter.on('error', function (err) {
- done(err);
- });
- emitter.on('success', function (connection) {
- done(null, connection);
- });
- }
- var handleDisconnect = function(pool, client) {
- client.on('error', function(err) {
- if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
- throw err
- }
- pool.destroy(client)
- })
- }
- var validateConnection = function(client) {
- return client && client.state !== 'disconnected'
- }
- var enqueue = function(queueItem, options) {
- options = options || {}
- if (this.activeQueue.length < this.maxConcurrentQueries) {
- this.activeQueue.push(queueItem)
- execQueueItem.call(this, queueItem)
- } else {
- this.queue.push(queueItem)
- }
- }
- var dequeue = function(queueItem) {
- //return the item's connection to the pool
- if (this.pool) {
- this.pool.release(queueItem.client)
- }
- this.activeQueue = without(this.activeQueue, queueItem)
- }
- var transferQueuedItems = function(count) {
- for(var i = 0; i < count; i++) {
- var queueItem = this.queue.shift();
- if (queueItem) {
- enqueue.call(this, queueItem)
- }
- }
- }
- var afterQuery = function(queueItem) {
- dequeue.call(this, queueItem)
- transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
- disconnectIfNoConnections.call(this)
- }
- var execQueueItem = function(queueItem) {
- var self = this
- self.getConnection({
- priority: queueItem.query.options.priority,
- type: queueItem.query.options.type
- }, function (err, connection) {
- if (err) {
- queueItem.query.emit('error', err)
- return
- }
- queueItem.query.client = connection
- queueItem.client = connection
- queueItem.query
- .success(function(){ afterQuery.call(self, queueItem) })
- .error(function(){ afterQuery.call(self, queueItem) })
- queueItem.query.run(queueItem.sql, queueItem.client)
- })
- }
- ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
- return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
- })
- // legacy
- ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
- return !this.hasQueuedItems
- })
- ConnectorManager.prototype.__defineGetter__('isConnected', function() {
- return this.client != null
- })
- var disconnectIfNoConnections = function() {
- var self = this
- this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
- this.disconnectTimeoutId = setTimeout(function() {
- self.isConnected && !self.hasQueuedItems && self.disconnect()
- }, 100)
- }
- return ConnectorManager
- })()
|