connector-manager.js 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. var Query = require("./query")
  2. , Utils = require("../../utils")
  3. module.exports = (function() {
  4. var ConnectorManager = function(sequelize, config) {
  5. var pgModule = config.dialectModulePath || 'pg'
  6. this.sequelize = sequelize
  7. this.client = null
  8. this.config = config || {}
  9. this.config.port = this.config.port || 5432
  10. this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0))
  11. this.pg = this.config.native ? require(pgModule).native : require(pgModule)
  12. // Better support for BigInts
  13. // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
  14. this.pg.types.setTypeParser(20, String);
  15. this.disconnectTimeoutId = null
  16. this.pendingQueries = 0
  17. this.clientDrained = true
  18. this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
  19. this.ConnectionParameters = require(pgModule + '/lib/connection-parameters')
  20. this.onProcessExit = function () {
  21. this.disconnect()
  22. }.bind(this);
  23. process.on('exit', this.onProcessExit)
  24. }
  25. Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
  26. ConnectorManager.prototype.endQuery = function() {
  27. var self = this
  28. self.pendingQueries--
  29. if (!self.pooling && self.pendingQueries === 0) {
  30. setTimeout(function() {
  31. self.pendingQueries === 0 && self.disconnect.call(self)
  32. }, 100)
  33. }
  34. }
  35. ConnectorManager.prototype.query = function(sql, callee, options) {
  36. var self = this
  37. self.pendingQueries++
  38. self.clientDrained = false
  39. return new Utils.CustomEventEmitter(function(emitter) {
  40. self.connect()
  41. .on('error', function(err) {
  42. emitter.emit('error', err)
  43. })
  44. .on('success', function(done) {
  45. var query = new Query(self.client, self.sequelize, callee, options || {})
  46. return query.run(sql)
  47. .complete(function(err) {
  48. self.endQuery.call(self)
  49. done && done(err) })
  50. .proxy(emitter)
  51. })
  52. }).run()
  53. }
  54. ConnectorManager.prototype.afterTransactionSetup = function(callback) {
  55. this.setTimezone(this.client, 'UTC', callback)
  56. }
  57. ConnectorManager.prototype.connect = function(callback) {
  58. var self = this
  59. var emitter = new (require('events').EventEmitter)()
  60. // in case database is slow to connect, prevent orphaning the client
  61. // TODO: We really need some sort of queue/flush/drain mechanism
  62. if (this.isConnecting && !this.pooling && this.client === null) {
  63. emitter.emit('success', null)
  64. return emitter
  65. }
  66. this.isConnecting = true
  67. this.isConnected = false
  68. var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
  69. , config = new this.ConnectionParameters(uri)
  70. // set pooling parameters if specified
  71. if (this.pooling) {
  72. config.poolSize = this.config.pool.maxConnections || 10
  73. config.poolIdleTimeout = this.config.pool.maxIdleTime || 30000
  74. config.reapIntervalMillis = this.config.pool.reapInterval || 1000
  75. config.uuid = this.config.uuid
  76. }
  77. var connectCallback = function(err, client, done) {
  78. self.isConnecting = false
  79. if (!!err) {
  80. // release the pool immediately, very important.
  81. done && done(err)
  82. self.client = null
  83. if (err.code) {
  84. switch(err.code) {
  85. case 'ECONNREFUSED':
  86. emitter.emit('error', new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
  87. break
  88. case 'ENOTFOUND':
  89. case 'EHOSTUNREACH':
  90. case 'EINVAL':
  91. emitter.emit('error', new Error("Failed to find PostgresSQL server. Please double check your settings."))
  92. break
  93. default:
  94. emitter.emit('error', err)
  95. break
  96. }
  97. } else {
  98. emitter.emit('error', new Error(err.message))
  99. }
  100. } else if (client) {
  101. var timezoneCallback = function() {
  102. self.isConnected = true
  103. self.client = client
  104. emitter.emit('success', done)
  105. }
  106. if (self.config.keepDefaultTimezone) {
  107. Utils.tick(timezoneCallback)
  108. } else {
  109. self.setTimezone(client, 'UTC', timezoneCallback)
  110. }
  111. } else if (self.config.native) {
  112. self.setTimezone(self.client, 'UTC', function() {
  113. self.isConnected = true
  114. emitter.emit('success', done)
  115. })
  116. } else {
  117. done && done()
  118. self.client = null
  119. emitter.emit('success')
  120. }
  121. }
  122. if (this.pooling) {
  123. // acquire client from pool
  124. this.pg.connect(config, connectCallback)
  125. } else {
  126. if (!!this.client) {
  127. connectCallback(null, this.client)
  128. } else {
  129. //create one-off client
  130. var responded = false
  131. this.client = new this.pg.Client(config)
  132. this.client.connect(function(err, client, done) {
  133. responded = true
  134. connectCallback(err, client || self.client, done)
  135. })
  136. // If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
  137. this.client.on('end', function () {
  138. if (!responded) {
  139. connectCallback(new Error('Connection timed out'))
  140. }
  141. })
  142. // Closes a client correctly even if we have backed up queries
  143. // https://github.com/brianc/node-postgres/pull/346
  144. this.client.on('drain', function() {
  145. self.clientDrained = true
  146. })
  147. }
  148. }
  149. return emitter
  150. }
  151. ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
  152. client.query("SET TIME ZONE '" + (timezone || "UTC") + "'").on('end', callback)
  153. }
  154. ConnectorManager.prototype.disconnect = function() {
  155. if (this.client) {
  156. if (this.clientDrained) {
  157. this.client.end()
  158. }
  159. this.client = null
  160. }
  161. this.isConnecting = false
  162. this.isConnected = false
  163. }
  164. return ConnectorManager
  165. })()