分享
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。
说明:
网上找了一圈好像没有rabbmitmq连接池的维护比较好的包,索性按照整理出来的自己写了一个简易版的,希望各位大神继续完善,目前只是实现了连接的一部分,具体tabbmitmq 的操作 按照 amqp的说明操作即可
这里有大部分的ampq的实例
https://blog.csdn.net/lastsweetop/article/details/91038836
/***************************************************
* @Time : 2019年11月21日 6:46 下午
* @Author : ccoding
* @File : rabbmitmq
* @Software: GoLand
**************************************************/
package rabbitmqPool
import (
"errors"
"github.com/streadway/amqp"
"sync"
"time"
)
var (
ErrInvalidConfig = errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)
type PoolConfig struct {
MaxOpen int // 池中最大资源数
NumOpen int // 当前池中资源数
MinOpen int // 池中最少资源数
Closed bool // 池是否已关闭
IdleTimeout time.Duration //空闲连接连接超时时间
WaitTimeOut time.Duration //等待获取连接超时时间
}
type NewConnection func() (*amqp.Connection, error)
type RabbitmqPool struct {
mu sync.Mutex
conns chan *amqp.Connection
newConnection func() (*amqp.Connection, error)
poolConfig *PoolConfig
}
func NewPool(config *PoolConfig, newConnection NewConnection) (*RabbitmqPool, error) {
if config.MaxOpen <= 0 || config.MinOpen > config.MaxOpen {
return nil, ErrInvalidConfig
}
p := &RabbitmqPool{
conns: make(chan *amqp.Connection, config.MaxOpen),
newConnection: newConnection,
poolConfig: config,
}
for i := 0; i < config.MinOpen; i++ {
conn, err := newConnection()
if err != nil {
continue
}
config.NumOpen++
p.conns <- conn
}
return p, nil
}
func (p *RabbitmqPool) Get() (*amqp.Connection, error) {
if p.poolConfig.Closed {
return nil, ErrPoolClosed
}
for {
conn, err := p.connection()
if err != nil {
return nil, err
}
// todo maxLifttime处理
return conn, nil
}
}
func (p *RabbitmqPool) connection() (*amqp.Connection, error) {
select {
case conn := <-p.conns:
return conn, nil
default:
p.mu.Lock()
if p.poolConfig.NumOpen >= p.poolConfig.MaxOpen {
conn := <-p.conns
p.mu.Unlock()
return conn, nil
}
// 新建连接
conn, err := p.newConnection()
if err != nil {
p.mu.Unlock()
return nil, err
}
p.poolConfig.NumOpen++
p.mu.Unlock()
return conn, nil
}
}
// 释放单个资源到连接池
func (p *RabbitmqPool) Release(conn *amqp.Connection) error {
if p.poolConfig.Closed {
return ErrPoolClosed
}
p.mu.Lock()
p.conns <- conn
p.mu.Unlock()
return nil
}
// 关闭单个资源
func (p *RabbitmqPool) Close(conn *amqp.Connection) error {
p.mu.Lock()
conn.Close()
p.poolConfig.NumOpen--
p.mu.Unlock()
return nil
}
// 关闭连接池,释放所有资源
func (p *RabbitmqPool) ClosePool() error {
if p.poolConfig.Closed {
return ErrPoolClosed
}
p.mu.Lock()
close(p.conns)
for conn := range p.conns {
conn.Close()
p.poolConfig.NumOpen--
}
p.poolConfig.Closed = true
p.mu.Unlock()
return nil
}
//打开通道
func OpenChannel (conn *amqp.Connection) (*amqp.Channel,error){
ch,err := conn.Channel()
return ch,err
}
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信1649 次点击
添加一条新回复
(您需要 后才能回复 没有账号 ?)
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
说明:
网上找了一圈好像没有rabbmitmq连接池的维护比较好的包,索性按照整理出来的自己写了一个简易版的,希望各位大神继续完善,目前只是实现了连接的一部分,具体tabbmitmq 的操作 按照 amqp的说明操作即可
这里有大部分的ampq的实例
https://blog.csdn.net/lastsweetop/article/details/91038836
/***************************************************
* @Time : 2019年11月21日 6:46 下午
* @Author : ccoding
* @File : rabbmitmq
* @Software: GoLand
**************************************************/
package rabbitmqPool
import (
"errors"
"github.com/streadway/amqp"
"sync"
"time"
)
var (
ErrInvalidConfig = errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)
type PoolConfig struct {
MaxOpen int // 池中最大资源数
NumOpen int // 当前池中资源数
MinOpen int // 池中最少资源数
Closed bool // 池是否已关闭
IdleTimeout time.Duration //空闲连接连接超时时间
WaitTimeOut time.Duration //等待获取连接超时时间
}
type NewConnection func() (*amqp.Connection, error)
type RabbitmqPool struct {
mu sync.Mutex
conns chan *amqp.Connection
newConnection func() (*amqp.Connection, error)
poolConfig *PoolConfig
}
func NewPool(config *PoolConfig, newConnection NewConnection) (*RabbitmqPool, error) {
if config.MaxOpen <= 0 || config.MinOpen > config.MaxOpen {
return nil, ErrInvalidConfig
}
p := &RabbitmqPool{
conns: make(chan *amqp.Connection, config.MaxOpen),
newConnection: newConnection,
poolConfig: config,
}
for i := 0; i < config.MinOpen; i++ {
conn, err := newConnection()
if err != nil {
continue
}
config.NumOpen++
p.conns <- conn
}
return p, nil
}
func (p *RabbitmqPool) Get() (*amqp.Connection, error) {
if p.poolConfig.Closed {
return nil, ErrPoolClosed
}
for {
conn, err := p.connection()
if err != nil {
return nil, err
}
// todo maxLifttime处理
return conn, nil
}
}
func (p *RabbitmqPool) connection() (*amqp.Connection, error) {
select {
case conn := <-p.conns:
return conn, nil
default:
p.mu.Lock()
if p.poolConfig.NumOpen >= p.poolConfig.MaxOpen {
conn := <-p.conns
p.mu.Unlock()
return conn, nil
}
// 新建连接
conn, err := p.newConnection()
if err != nil {
p.mu.Unlock()
return nil, err
}
p.poolConfig.NumOpen++
p.mu.Unlock()
return conn, nil
}
}
// 释放单个资源到连接池
func (p *RabbitmqPool) Release(conn *amqp.Connection) error {
if p.poolConfig.Closed {
return ErrPoolClosed
}
p.mu.Lock()
p.conns <- conn
p.mu.Unlock()
return nil
}
// 关闭单个资源
func (p *RabbitmqPool) Close(conn *amqp.Connection) error {
p.mu.Lock()
conn.Close()
p.poolConfig.NumOpen--
p.mu.Unlock()
return nil
}
// 关闭连接池,释放所有资源
func (p *RabbitmqPool) ClosePool() error {
if p.poolConfig.Closed {
return ErrPoolClosed
}
p.mu.Lock()
close(p.conns)
for conn := range p.conns {
conn.Close()
p.poolConfig.NumOpen--
}
p.poolConfig.Closed = true
p.mu.Unlock()
return nil
}
//打开通道
func OpenChannel (conn *amqp.Connection) (*amqp.Channel,error){
ch,err := conn.Channel()
return ch,err
}