connector-manager.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. var mariadb
  2. , Pooling = require('generic-pool')
  3. , Query = require("./query")
  4. , Utils = require("../../utils")
  5. , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
  6. module.exports = (function() {
  7. var ConnectorManager = function(sequelize, config) {
  8. try {
  9. if (config.dialectModulePath) {
  10. mariadb = require(config.dialectModulePath)
  11. } else {
  12. mariadb = require('mariasql')
  13. }
  14. } catch (err) {
  15. console.log('You need to install mariasql package manually')
  16. }
  17. this.sequelize = sequelize
  18. this.client = null
  19. this.config = config || {}
  20. this.config.port = this.config.port || 3306
  21. this.disconnectTimeoutId = null
  22. this.queue = []
  23. this.activeQueue = []
  24. this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
  25. this.poolCfg = Utils._.defaults(this.config.pool, {
  26. maxConnections: 10,
  27. minConnections: 0,
  28. maxIdleTime: 1000,
  29. handleDisconnects: false,
  30. validate: function(client) {
  31. return client && client.connected
  32. }
  33. })
  34. this.pendingQueries = 0
  35. this.useReplicaton = !!config.replication
  36. this.useQueue = config.queue !== undefined ? config.queue : true
  37. var self = this
  38. if (this.useReplicaton) {
  39. var reads = 0
  40. , writes = 0
  41. // Init configs with options from config if not present
  42. for (var i in config.replication.read) {
  43. config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
  44. host: this.config.host,
  45. port: this.config.port,
  46. username: this.config.username,
  47. password: this.config.password,
  48. database: this.config.database
  49. })
  50. }
  51. config.replication.write = Utils._.defaults(config.replication.write, {
  52. host: this.config.host,
  53. port: this.config.port,
  54. username: this.config.username,
  55. password: this.config.password,
  56. database: this.config.database
  57. })
  58. // I'll make my own pool, with blackjack and hookers!
  59. this.pool = {
  60. release: function (client) {
  61. if (client.queryType == 'read') {
  62. return this.read.release(client)
  63. } else {
  64. return this.write.release(client)
  65. }
  66. },
  67. acquire: function (callback, priority, queryType) {
  68. if (queryType == 'SELECT') {
  69. this.read.acquire(callback, priority)
  70. } else {
  71. this.write.acquire(callback, priority)
  72. }
  73. },
  74. drain: function () {
  75. this.read.drain()
  76. this.write.drain()
  77. },
  78. read: Pooling.Pool({
  79. name: 'sequelize-read',
  80. create: function (done) {
  81. if (reads >= self.config.replication.read.length) {
  82. reads = 0
  83. }
  84. var config = self.config.replication.read[reads++]
  85. connect.call(self, function (err, connection) {
  86. if (connection) {
  87. connection.queryType = 'read'
  88. }
  89. done(err, connection)
  90. }, config)
  91. },
  92. destroy: function(client) {
  93. disconnect.call(self, client)
  94. },
  95. validate: self.poolCfg.validate,
  96. max: self.poolCfg.maxConnections,
  97. min: self.poolCfg.minConnections,
  98. idleTimeoutMillis: self.poolCfg.maxIdleTime
  99. }),
  100. write: Pooling.Pool({
  101. name: 'sequelize-write',
  102. create: function (done) {
  103. connect.call(self, function (err, connection) {
  104. if (connection) {
  105. connection.queryType = 'write'
  106. }
  107. done(err, connection)
  108. }, self.config.replication.write)
  109. },
  110. destroy: function(client) {
  111. disconnect.call(self, client)
  112. },
  113. validate: self.poolCfg.validate,
  114. max: self.poolCfg.maxConnections,
  115. min: self.poolCfg.minConnections,
  116. idleTimeoutMillis: self.poolCfg.maxIdleTime
  117. })
  118. };
  119. } else if (this.poolCfg) {
  120. //the user has requested pooling, so create our connection pool
  121. this.pool = Pooling.Pool({
  122. name: 'sequelize-mariadb',
  123. create: function (done) {
  124. connect.call(self, done)
  125. },
  126. destroy: function(client) {
  127. disconnect.call(self, client)
  128. },
  129. max: self.poolCfg.maxConnections,
  130. min: self.poolCfg.minConnections,
  131. validate: self.poolCfg.validate,
  132. idleTimeoutMillis: self.poolCfg.maxIdleTime
  133. })
  134. }
  135. this.onProcessExit = function () {
  136. //be nice & close our connections on exit
  137. if (self.pool) {
  138. self.pool.drain()
  139. } else if (self.client) {
  140. disconnect(self.client)
  141. }
  142. return
  143. }.bind(this);
  144. process.on('exit', this.onProcessExit)
  145. }
  146. Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
  147. ConnectorManager.prototype.query = function(sql, callee, options) {
  148. if (!this.isConnected && !this.pool) {
  149. this.connect()
  150. }
  151. if (this.useQueue) {
  152. var queueItem = {
  153. query: new Query(this.client, this.sequelize, callee, options || {}),
  154. client: this.client,
  155. sql: sql
  156. }
  157. enqueue.call(this, queueItem, options)
  158. return queueItem.query
  159. }
  160. var self = this
  161. , query = new Query(this.client, this.sequelize, callee, options || {})
  162. this.pendingQueries++
  163. query.done(function() {
  164. self.pendingQueries--
  165. if (self.pool) {
  166. self.pool.release(query.client)
  167. } else {
  168. if (self.pendingQueries === 0) {
  169. setTimeout(function() {
  170. self.pendingQueries === 0 && self.disconnect.call(self)
  171. }, 100)
  172. }
  173. }
  174. })
  175. if (!this.pool) {
  176. query.run(sql)
  177. } else {
  178. this.pool.acquire(function(err, client) {
  179. if (err) {
  180. return query.emit('error', err)
  181. }
  182. query.client = client
  183. query.run(sql)
  184. return
  185. }, undefined, options.type)
  186. }
  187. return query
  188. }
  189. ConnectorManager.prototype.connect = function() {
  190. var self = this
  191. // in case database is slow to connect, prevent orphaning the client
  192. if (this.isConnecting || this.pool) {
  193. return
  194. }
  195. connect.call(self, function(err, client) {
  196. self.client = client
  197. return
  198. })
  199. return
  200. }
  201. ConnectorManager.prototype.disconnect = function() {
  202. if (this.client) {
  203. disconnect.call(this, this.client)
  204. }
  205. return
  206. }
  207. // private
  208. var disconnect = function(client) {
  209. var self = this
  210. if (!this.useQueue) {
  211. this.client = null
  212. client.end()
  213. return
  214. }
  215. var intervalObj = null
  216. var cleanup = function () {
  217. // make sure to let queued items be finish before calling end
  218. if (self && self.hasQueuedItems) {
  219. return
  220. }
  221. client.end()
  222. if (self && self.client) {
  223. self.client = null
  224. }
  225. clearInterval(intervalObj)
  226. }
  227. intervalObj = setInterval(cleanup, 10)
  228. cleanup()
  229. }
  230. var connect = function(done, config) {
  231. config = config || this.config
  232. var self = this
  233. , client
  234. this.isConnecting = true
  235. var connectionConfig = {
  236. host: config.host,
  237. port: config.port,
  238. user: config.username,
  239. password: config.password,
  240. db: config.database,
  241. metadata: true
  242. }
  243. if (config.dialectOptions) {
  244. Object.keys(config.dialectOptions).forEach(function (key) {
  245. connectionConfig[key] = config.dialectOptions[key];
  246. })
  247. }
  248. if (connectionConfig.unixSocket) {
  249. delete connectionConfig.host;
  250. delete connectionConfig.port;
  251. }
  252. client = new mariadb()
  253. client.connect(connectionConfig)
  254. client
  255. .on('error', function (err) {
  256. self.isConnecting = false
  257. done(err)
  258. })
  259. .on('connect', function () {
  260. client.query("SET time_zone = '+0:00'").on('result', function (res) {
  261. res.on('end', function () {
  262. client.setMaxListeners(self.maxConcurrentQueries)
  263. self.isConnecting = false
  264. if (config.pool.handleDisconnects) {
  265. handleDisconnect(self.pool, client)
  266. }
  267. done(null, client)
  268. })
  269. })
  270. })
  271. .on('close', function() {
  272. disconnect.call(self, client)
  273. })
  274. }
  275. var handleDisconnect = function(pool, client) {
  276. client.on('error', function(err) {
  277. if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
  278. throw err
  279. }
  280. pool.destroy(client)
  281. })
  282. }
  283. var enqueue = function(queueItem, options) {
  284. options = options || {}
  285. if (this.activeQueue.length < this.maxConcurrentQueries) {
  286. this.activeQueue.push(queueItem)
  287. if (this.pool) {
  288. var self = this
  289. this.pool.acquire(function(err, client) {
  290. if (err) {
  291. queueItem.query.emit('error', err)
  292. return
  293. }
  294. //we set the client here, asynchronously, when getting a pooled connection
  295. //allowing the ConnectorManager.query method to remain synchronous
  296. queueItem.query.client = client
  297. queueItem.client = client
  298. execQueueItem.call(self, queueItem)
  299. return
  300. }, undefined, options.type)
  301. } else {
  302. execQueueItem.call(this, queueItem)
  303. }
  304. } else {
  305. this.queue.push(queueItem)
  306. }
  307. }
  308. var dequeue = function(queueItem) {
  309. //return the item's connection to the pool
  310. if (this.pool) {
  311. this.pool.release(queueItem.client)
  312. }
  313. this.activeQueue = without(this.activeQueue, queueItem)
  314. }
  315. var transferQueuedItems = function(count) {
  316. for(var i = 0; i < count; i++) {
  317. var queueItem = this.queue.shift()
  318. if (queueItem) {
  319. enqueue.call(this, queueItem)
  320. }
  321. }
  322. }
  323. var afterQuery = function(queueItem) {
  324. dequeue.call(this, queueItem)
  325. transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
  326. disconnectIfNoConnections.call(this)
  327. }
  328. var execQueueItem = function(queueItem) {
  329. var self = this
  330. queueItem.query
  331. .success(function(){ afterQuery.call(self, queueItem) })
  332. .error(function(){ afterQuery.call(self, queueItem) })
  333. queueItem.query.run(queueItem.sql, queueItem.client)
  334. }
  335. ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
  336. return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
  337. })
  338. // legacy
  339. ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
  340. return !this.hasQueuedItems
  341. })
  342. ConnectorManager.prototype.__defineGetter__('isConnected', function() {
  343. return this.client != null
  344. })
  345. var disconnectIfNoConnections = function() {
  346. var self = this
  347. this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
  348. this.disconnectTimeoutId = setTimeout(function() {
  349. self.isConnected && !self.hasQueuedItems && self.disconnect()
  350. }, 100)
  351. }
  352. return ConnectorManager
  353. })()