123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- var Query = require("./query")
- , Utils = require("../../utils")
- module.exports = (function() {
- var ConnectorManager = function(sequelize, config) {
- var pgModule = config.dialectModulePath || 'pg'
- this.sequelize = sequelize
- this.client = null
- this.config = config || {}
- this.config.port = this.config.port || 5432
- this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0))
- this.pg = this.config.native ? require(pgModule).native : require(pgModule)
- // Better support for BigInts
- // https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
- this.pg.types.setTypeParser(20, String);
- this.disconnectTimeoutId = null
- this.pendingQueries = 0
- this.clientDrained = true
- this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
- this.ConnectionParameters = require(pgModule + '/lib/connection-parameters')
- this.onProcessExit = function () {
- this.disconnect()
- }.bind(this);
- process.on('exit', this.onProcessExit)
- }
- Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
- ConnectorManager.prototype.endQuery = function() {
- var self = this
- self.pendingQueries--
- if (!self.pooling && self.pendingQueries === 0) {
- setTimeout(function() {
- self.pendingQueries === 0 && self.disconnect.call(self)
- }, 100)
- }
- }
- ConnectorManager.prototype.query = function(sql, callee, options) {
- var self = this
- self.pendingQueries++
- self.clientDrained = false
- return new Utils.CustomEventEmitter(function(emitter) {
- self.connect()
- .on('error', function(err) {
- emitter.emit('error', err)
- })
- .on('success', function(done) {
- var query = new Query(self.client, self.sequelize, callee, options || {})
- return query.run(sql)
- .complete(function(err) {
- self.endQuery.call(self)
- done && done(err) })
- .proxy(emitter)
- })
- }).run()
- }
- ConnectorManager.prototype.afterTransactionSetup = function(callback) {
- this.setTimezone(this.client, 'UTC', callback)
- }
- ConnectorManager.prototype.connect = function(callback) {
- var self = this
- var emitter = new (require('events').EventEmitter)()
- // in case database is slow to connect, prevent orphaning the client
- // TODO: We really need some sort of queue/flush/drain mechanism
- if (this.isConnecting && !this.pooling && this.client === null) {
- emitter.emit('success', null)
- return emitter
- }
- this.isConnecting = true
- this.isConnected = false
- var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
- , config = new this.ConnectionParameters(uri)
- // set pooling parameters if specified
- if (this.pooling) {
- config.poolSize = this.config.pool.maxConnections || 10
- config.poolIdleTimeout = this.config.pool.maxIdleTime || 30000
- config.reapIntervalMillis = this.config.pool.reapInterval || 1000
- config.uuid = this.config.uuid
- }
- var connectCallback = function(err, client, done) {
- self.isConnecting = false
- if (!!err) {
- // release the pool immediately, very important.
- done && done(err)
- self.client = null
- if (err.code) {
- switch(err.code) {
- case 'ECONNREFUSED':
- emitter.emit('error', new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
- break
- case 'ENOTFOUND':
- case 'EHOSTUNREACH':
- case 'EINVAL':
- emitter.emit('error', new Error("Failed to find PostgresSQL server. Please double check your settings."))
- break
- default:
- emitter.emit('error', err)
- break
- }
- } else {
- emitter.emit('error', new Error(err.message))
- }
- } else if (client) {
- var timezoneCallback = function() {
- self.isConnected = true
- self.client = client
- emitter.emit('success', done)
- }
- if (self.config.keepDefaultTimezone) {
- Utils.tick(timezoneCallback)
- } else {
- self.setTimezone(client, 'UTC', timezoneCallback)
- }
- } else if (self.config.native) {
- self.setTimezone(self.client, 'UTC', function() {
- self.isConnected = true
- emitter.emit('success', done)
- })
- } else {
- done && done()
- self.client = null
- emitter.emit('success')
- }
- }
- if (this.pooling) {
- // acquire client from pool
- this.pg.connect(config, connectCallback)
- } else {
- if (!!this.client) {
- connectCallback(null, this.client)
- } else {
- //create one-off client
- var responded = false
- this.client = new this.pg.Client(config)
- this.client.connect(function(err, client, done) {
- responded = true
- connectCallback(err, client || self.client, done)
- })
- // If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
- this.client.on('end', function () {
- if (!responded) {
- connectCallback(new Error('Connection timed out'))
- }
- })
- // Closes a client correctly even if we have backed up queries
- // https://github.com/brianc/node-postgres/pull/346
- this.client.on('drain', function() {
- self.clientDrained = true
- })
- }
- }
- return emitter
- }
- ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
- client.query("SET TIME ZONE '" + (timezone || "UTC") + "'").on('end', callback)
- }
- ConnectorManager.prototype.disconnect = function() {
- if (this.client) {
- if (this.clientDrained) {
- this.client.end()
- }
- this.client = null
- }
- this.isConnecting = false
- this.isConnected = false
- }
- return ConnectorManager
- })()
|