分享
golang连接activemq
chen yuwen · · 12223 次点击 · · 开始浏览这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。
config.ini 配置文件
[stomp] ;activemq的IP地址 host:192.168.7.85 ;activemq的端口 port:61613 ;activemq的队列 queue:/queue/bbg_ordercache [php] ;php的执行路径 phpbin:php.exe ;被执行的文件的路径 filepath:D:/jianguo/command/application/cli ;传递给被执行文件的参数 params:show
main.go 代码文件:
package main
import (
"bytes"
"fmt"
"github.com/gmallard/stompngo"
"github.com/unknwon/goconfig"
"log"
"net"
"os"
"os/exec"
)
// 存储配置信息的变量
var config *goconfig.ConfigFile
// 存储日志信息的变量
var mylog *log.Logger
// 启动初始化
func init() {
// 加载配置文件
configPath := "./config.ini"
conf, err := goconfig.LoadConfigFile(configPath)
if err != nil {
fmt.Println(err)
}
config = conf
// @todo 强化按日期写日志文件
file := "./log.txt"
//t := time.Now()
//file := "./log_" + strings.Replace(t.String()[:19], ":", "_", 3) + ".txt"
hander, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
log.Println(err)
}
mylog = log.New(hander, "\r\n", log.Ldate|log.Ltime|log.Llongfile)
}
// 主程序
func main() {
host, _ := config.GetValue("stomp", "host")
port, _ := config.GetValue("stomp", "port")
n, e := net.Dial("tcp", net.JoinHostPort(host, port))
if e != nil {
fmt.Println(e)
}
// STOMP 1.0 的标准头
//h := stompngo.Headers{}
// STOMP 1.1 的标准头
h := stompngo.Headers{"accept-version", "1.1"}
// @todo 强化网络断开之后重试
c, e := stompngo.Connect(n, h)
if e != nil {
fmt.Println(e)
}
// 必须客户端响应才可以删除MQ队列数据
f := stompngo.Headers{"destination", "/queue/bbg_ordercache", "ack", "client"}
// 自动删除MQ队列的数据
//f := stompngo.Headers{"destination", "/queue/bbg_ordercache"}
s, e := c.Subscribe(f)
if e != nil {
fmt.Println(e)
}
// 设置通道的容量
//fmt.Println(c.SubChanCap())
//c.SetSubChanCap(1)
for {
//r := <-s
//fmt.Println(r)
run(c, s)
}
}
// 外部shell脚本调用,成功处理删除相应队列
func run(c *stompngo.Connection, s <-chan stompngo.MessageData) {
phproot, _ := config.GetValue("php", "phpbin")
filepath, _ := config.GetValue("php", "filepath")
params, _ := config.GetValue("php", "params")
r := <-s
// 记录结果
mylog.Println(r)
order_id := r.Message.Headers.Value("order_id")
//fmt.Println(order_id)
cmd := exec.Command(phproot, filepath, params, "order_id", order_id)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
mylog.Println("Waiting for command to finish...")
err = cmd.Wait()
if err != nil {
mylog.Printf("Command finished with error: %v", err)
}
str := out.String()
//fmt.Println(str)
if str == "success" {
e := c.Ack(r.Message.Headers)
if e != nil {
mylog.Println(e)
}
}
}
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信12223 次点击
上一篇:golang的cmd包使用例子
添加一条新回复
(您需要 后才能回复 没有账号 ?)
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
config.ini 配置文件
[stomp] ;activemq的IP地址 host:192.168.7.85 ;activemq的端口 port:61613 ;activemq的队列 queue:/queue/bbg_ordercache [php] ;php的执行路径 phpbin:php.exe ;被执行的文件的路径 filepath:D:/jianguo/command/application/cli ;传递给被执行文件的参数 params:show
main.go 代码文件:
package main
import (
"bytes"
"fmt"
"github.com/gmallard/stompngo"
"github.com/unknwon/goconfig"
"log"
"net"
"os"
"os/exec"
)
// 存储配置信息的变量
var config *goconfig.ConfigFile
// 存储日志信息的变量
var mylog *log.Logger
// 启动初始化
func init() {
// 加载配置文件
configPath := "./config.ini"
conf, err := goconfig.LoadConfigFile(configPath)
if err != nil {
fmt.Println(err)
}
config = conf
// @todo 强化按日期写日志文件
file := "./log.txt"
//t := time.Now()
//file := "./log_" + strings.Replace(t.String()[:19], ":", "_", 3) + ".txt"
hander, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
log.Println(err)
}
mylog = log.New(hander, "\r\n", log.Ldate|log.Ltime|log.Llongfile)
}
// 主程序
func main() {
host, _ := config.GetValue("stomp", "host")
port, _ := config.GetValue("stomp", "port")
n, e := net.Dial("tcp", net.JoinHostPort(host, port))
if e != nil {
fmt.Println(e)
}
// STOMP 1.0 的标准头
//h := stompngo.Headers{}
// STOMP 1.1 的标准头
h := stompngo.Headers{"accept-version", "1.1"}
// @todo 强化网络断开之后重试
c, e := stompngo.Connect(n, h)
if e != nil {
fmt.Println(e)
}
// 必须客户端响应才可以删除MQ队列数据
f := stompngo.Headers{"destination", "/queue/bbg_ordercache", "ack", "client"}
// 自动删除MQ队列的数据
//f := stompngo.Headers{"destination", "/queue/bbg_ordercache"}
s, e := c.Subscribe(f)
if e != nil {
fmt.Println(e)
}
// 设置通道的容量
//fmt.Println(c.SubChanCap())
//c.SetSubChanCap(1)
for {
//r := <-s
//fmt.Println(r)
run(c, s)
}
}
// 外部shell脚本调用,成功处理删除相应队列
func run(c *stompngo.Connection, s <-chan stompngo.MessageData) {
phproot, _ := config.GetValue("php", "phpbin")
filepath, _ := config.GetValue("php", "filepath")
params, _ := config.GetValue("php", "params")
r := <-s
// 记录结果
mylog.Println(r)
order_id := r.Message.Headers.Value("order_id")
//fmt.Println(order_id)
cmd := exec.Command(phproot, filepath, params, "order_id", order_id)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
mylog.Println("Waiting for command to finish...")
err = cmd.Wait()
if err != nil {
mylog.Printf("Command finished with error: %v", err)
}
str := out.String()
//fmt.Println(str)
if str == "success" {
e := c.Ack(r.Message.Headers)
if e != nil {
mylog.Println(e)
}
}
}