connector-manager.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. var mysql
  2. , Pooling = require('generic-pool')
  3. , Query = require("./query")
  4. , Utils = require("../../utils")
  5. , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
  6. module.exports = (function() {
  7. var ConnectorManager = function(sequelize, config) {
  8. try {
  9. if (config.dialectModulePath) {
  10. mysql = require(config.dialectModulePath)
  11. } else {
  12. mysql = require('mysql')
  13. }
  14. } catch (err) {
  15. console.log('You need to install mysql package manually')
  16. }
  17. this.sequelize = sequelize
  18. this.client = null
  19. this.config = config || {}
  20. this.config.port = this.config.port || 3306
  21. this.disconnectTimeoutId = null
  22. this.queue = []
  23. this.activeQueue = []
  24. this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
  25. this.poolCfg = Utils._.defaults(this.config.pool, {
  26. maxConnections: 10,
  27. minConnections: 0,
  28. maxIdleTime: 1000,
  29. handleDisconnects: false,
  30. validate: validateConnection
  31. });
  32. this.pendingQueries = 0;
  33. this.useReplicaton = !!config.replication;
  34. this.useQueue = config.queue !== undefined ? config.queue : true;
  35. var self = this
  36. if (this.useReplicaton) {
  37. var reads = 0
  38. , writes = 0;
  39. // Init configs with options from config if not present
  40. for (var i in config.replication.read) {
  41. config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
  42. host: this.config.host,
  43. port: this.config.port,
  44. username: this.config.username,
  45. password: this.config.password,
  46. database: this.config.database
  47. });
  48. }
  49. config.replication.write = Utils._.defaults(config.replication.write, {
  50. host: this.config.host,
  51. port: this.config.port,
  52. username: this.config.username,
  53. password: this.config.password,
  54. database: this.config.database
  55. });
  56. // I'll make my own pool, with blackjack and hookers!
  57. this.pool = {
  58. release: function (client) {
  59. if (client.queryType == 'read') {
  60. return this.read.release(client);
  61. } else {
  62. return this.write.release(client);
  63. }
  64. },
  65. acquire: function (callback, priority, queryType) {
  66. if (queryType == 'SELECT') {
  67. this.read.acquire(callback, priority);
  68. } else {
  69. this.write.acquire(callback, priority);
  70. }
  71. },
  72. drain: function () {
  73. this.read.drain();
  74. this.write.drain();
  75. },
  76. read: Pooling.Pool({
  77. name: 'sequelize-read',
  78. create: function (done) {
  79. if (reads >= self.config.replication.read.length) {
  80. reads = 0
  81. }
  82. var config = self.config.replication.read[reads++];
  83. connect.call(self, function (err, connection) {
  84. if (connection) {
  85. connection.queryType = 'read'
  86. }
  87. done(err, connection)
  88. }, config);
  89. },
  90. destroy: function(client) {
  91. disconnect.call(self, client)
  92. },
  93. validate: self.poolCfg.validate,
  94. max: self.poolCfg.maxConnections,
  95. min: self.poolCfg.minConnections,
  96. idleTimeoutMillis: self.poolCfg.maxIdleTime
  97. }),
  98. write: Pooling.Pool({
  99. name: 'sequelize-write',
  100. create: function (done) {
  101. connect.call(self, function (err, connection) {
  102. if (connection) {
  103. connection.queryType = 'read'
  104. }
  105. done(err, connection)
  106. }, self.config.replication.write);
  107. },
  108. destroy: function(client) {
  109. disconnect.call(self, client)
  110. },
  111. validate: self.poolCfg.validate,
  112. max: self.poolCfg.maxConnections,
  113. min: self.poolCfg.minConnections,
  114. idleTimeoutMillis: self.poolCfg.maxIdleTime
  115. })
  116. };
  117. } else if (this.poolCfg) {
  118. //the user has requested pooling, so create our connection pool
  119. this.pool = Pooling.Pool({
  120. name: 'sequelize-mysql',
  121. create: function (done) {
  122. connect.call(self, function (err, connection) {
  123. // This has to be nested for some reason, else the error won't propagate correctly
  124. done(err, connection);
  125. })
  126. },
  127. destroy: function(client) {
  128. disconnect.call(self, client)
  129. },
  130. max: self.poolCfg.maxConnections,
  131. min: self.poolCfg.minConnections,
  132. validate: self.poolCfg.validate,
  133. idleTimeoutMillis: self.poolCfg.maxIdleTime
  134. })
  135. }
  136. this.onProcessExit = function () {
  137. //be nice & close our connections on exit
  138. if (self.pool) {
  139. self.pool.drain()
  140. } else if (self.client) {
  141. disconnect(self.client)
  142. }
  143. return
  144. }.bind(this);
  145. process.on('exit', this.onProcessExit)
  146. }
  147. Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
  148. ConnectorManager.prototype.query = function(sql, callee, options) {
  149. if (this.useQueue) {
  150. // If queueing we'll let the execQueueItem method handle connecting
  151. var queueItem = {
  152. query: new Query(null, this.sequelize, callee, options || {}),
  153. sql: sql
  154. };
  155. queueItem.query.options.uuid = this.config.uuid
  156. enqueue.call(this, queueItem, options);
  157. return queueItem.query;
  158. }
  159. var self = this, query = new Query(null, this.sequelize, callee, options || {});
  160. this.pendingQueries++;
  161. query.options.uuid = this.config.uuid
  162. query.done(function() {
  163. self.pendingQueries--;
  164. if (self.pool) {
  165. self.pool.release(query.client);
  166. } else {
  167. if (self.pendingQueries === 0) {
  168. setTimeout(function() {
  169. if (self.pendingQueries === 0){
  170. self.disconnect.call(self);
  171. }
  172. }, 100);
  173. }
  174. }
  175. });
  176. this.getConnection(options, function (err, client) {
  177. if (err) {
  178. return query.emit('error', err)
  179. }
  180. query.client = client
  181. query.run(sql)
  182. });
  183. return query;
  184. };
  185. ConnectorManager.prototype.getConnection = function(options, callback) {
  186. var self = this;
  187. if (typeof options === "function") {
  188. callback = options;
  189. options = {};
  190. }
  191. return new Utils.CustomEventEmitter(function (emitter) {
  192. if (!self.pool) {
  193. // Regular client caching
  194. if (self.client) {
  195. return emitter.emit('success', self.client);
  196. } else {
  197. // Cache for concurrent queries
  198. if (self._getConnection) {
  199. self._getConnection.proxy(emitter);
  200. return;
  201. }
  202. // Set cache and acquire connection
  203. self._getConnection = emitter;
  204. connect.call(self, function(err, client) {
  205. if (err) {
  206. return emitter.emit('error', err);
  207. }
  208. // Unset caching, should now be caught by the self.client check
  209. self._getConnection = null;
  210. self.client = client;
  211. emitter.emit('success', client);
  212. });
  213. }
  214. }
  215. if (self.pool) {
  216. // Acquire from pool
  217. self.pool.acquire(function(err, client) {
  218. if (err) {
  219. return emitter.emit('error', err);
  220. }
  221. emitter.emit('success', client);
  222. }, options.priority, options.type);
  223. }
  224. }).run().done(callback);
  225. };
  226. ConnectorManager.prototype.disconnect = function() {
  227. if (this.client) {
  228. disconnect.call(this, this.client)
  229. }
  230. return
  231. };
  232. // private
  233. var disconnect = function(client) {
  234. var self = this;
  235. this.client = null;
  236. client.end(function() {
  237. if (!self.useQueue) {
  238. return client.destroy();
  239. }
  240. var intervalObj = null
  241. var cleanup = function () {
  242. var retryCt = 0
  243. // make sure to let client finish before calling destroy
  244. if (client._queue && (client._queue.length > 0)) {
  245. return
  246. }
  247. // needed to prevent mysql connection leak
  248. client.destroy()
  249. clearInterval(intervalObj)
  250. }
  251. intervalObj = setInterval(cleanup, 10)
  252. cleanup()
  253. return
  254. })
  255. }
  256. var connect = function(done, config) {
  257. config = config || this.config
  258. var emitter = new (require('events').EventEmitter)()
  259. var connectionConfig = {
  260. host: config.host,
  261. port: config.port,
  262. user: config.username,
  263. password: config.password,
  264. database: config.database,
  265. timezone: 'Z'
  266. };
  267. if (config.dialectOptions) {
  268. Object.keys(config.dialectOptions).forEach(function (key) {
  269. connectionConfig[key] = config.dialectOptions[key];
  270. });
  271. }
  272. var connection = mysql.createConnection(connectionConfig);
  273. connection.connect(function(err) {
  274. if (err) {
  275. switch(err.code) {
  276. case 'ECONNREFUSED':
  277. case 'ER_ACCESS_D2ENIED_ERROR':
  278. emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
  279. break
  280. case 'ENOTFOUND':
  281. case 'EHOSTUNREACH':
  282. case 'EINVAL':
  283. emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
  284. break
  285. default:
  286. emitter.emit('error', err);
  287. break;
  288. }
  289. return;
  290. }
  291. emitter.emit('success', connection);
  292. })
  293. connection.query("SET time_zone = '+0:00'");
  294. // client.setMaxListeners(self.maxConcurrentQueries)
  295. this.isConnecting = false
  296. if (config.pool !== null && config.pool.handleDisconnects) {
  297. handleDisconnect(this.pool, connection)
  298. }
  299. emitter.on('error', function (err) {
  300. done(err);
  301. });
  302. emitter.on('success', function (connection) {
  303. done(null, connection);
  304. });
  305. }
  306. var handleDisconnect = function(pool, client) {
  307. client.on('error', function(err) {
  308. if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
  309. throw err
  310. }
  311. pool.destroy(client)
  312. })
  313. }
  314. var validateConnection = function(client) {
  315. return client && client.state !== 'disconnected'
  316. }
  317. var enqueue = function(queueItem, options) {
  318. options = options || {}
  319. if (this.activeQueue.length < this.maxConcurrentQueries) {
  320. this.activeQueue.push(queueItem)
  321. execQueueItem.call(this, queueItem)
  322. } else {
  323. this.queue.push(queueItem)
  324. }
  325. }
  326. var dequeue = function(queueItem) {
  327. //return the item's connection to the pool
  328. if (this.pool) {
  329. this.pool.release(queueItem.client)
  330. }
  331. this.activeQueue = without(this.activeQueue, queueItem)
  332. }
  333. var transferQueuedItems = function(count) {
  334. for(var i = 0; i < count; i++) {
  335. var queueItem = this.queue.shift();
  336. if (queueItem) {
  337. enqueue.call(this, queueItem)
  338. }
  339. }
  340. }
  341. var afterQuery = function(queueItem) {
  342. dequeue.call(this, queueItem)
  343. transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
  344. disconnectIfNoConnections.call(this)
  345. }
  346. var execQueueItem = function(queueItem) {
  347. var self = this
  348. self.getConnection({
  349. priority: queueItem.query.options.priority,
  350. type: queueItem.query.options.type
  351. }, function (err, connection) {
  352. if (err) {
  353. queueItem.query.emit('error', err)
  354. return
  355. }
  356. queueItem.query.client = connection
  357. queueItem.client = connection
  358. queueItem.query
  359. .success(function(){ afterQuery.call(self, queueItem) })
  360. .error(function(){ afterQuery.call(self, queueItem) })
  361. queueItem.query.run(queueItem.sql, queueItem.client)
  362. })
  363. }
  364. ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
  365. return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
  366. })
  367. // legacy
  368. ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
  369. return !this.hasQueuedItems
  370. })
  371. ConnectorManager.prototype.__defineGetter__('isConnected', function() {
  372. return this.client != null
  373. })
  374. var disconnectIfNoConnections = function() {
  375. var self = this
  376. this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
  377. this.disconnectTimeoutId = setTimeout(function() {
  378. self.isConnected && !self.hasQueuedItems && self.disconnect()
  379. }, 100)
  380. }
  381. return ConnectorManager
  382. })()