当前位置:编程学习 > 网站相关 >>

关于ActiveMQ接收消息,以及事务,hibernat事务的处理方法

最新项目上应用到ActiveMQ开源框架,其中也发现了一些问题,总结了一些浅薄的经验,本着开源,其他程序员少走一些弯路的宗旨,特与人分享,有好的方法,好的经验可以互相分享。

上正文(因为项目使用grails框架groovy语言,但是其他语言大致想通):

[java]
class ChargeRespReceiverService { 
 
 // ConnectionFactory :连接工厂,JMS 用它创建连接 
    private static ConnectionFactory connectionFactory; 
    // Connection 
    private static Connection topicConn; 
    // Session: 一个响应发送或接收消息的线程 
    private static Session topicSession; 
    // Topic: 响应消息的目的地;消息发送给谁. 
    private static Topic topicPay = null; 
    // MessageConsumer:响应消息接收者 
    private static MessageConsumer topicConsumer; 
    //使用静态块初始化JMS连接 
    static { 
        initJms(); 
    } 
     
    /**
     * 初始化
     */ 
    static void initJms() { 
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现 Constants.JMS_URL--JMS链接 Constants.PAY_TOPIC--TOPIC名称 "myChargeClient" 为注册到MQ的客户端ID,取唯一值 
        try { 
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, Constants.JMS_URL); 
            topicConn = connectionFactory.createConnection(); 
            topicConn.setClientID("myChargeClient"); 
            //true为开启事务模式,onMessage方法完全执行成功之后签名,过程中如果出现异常自动回滚,消息还在,false为接收之后自动签名,没有回滚 
            //还有一点要注意,Session.AUTO_ACKNOWLEDGE为MQ自动签名机制,但是如果MQ中如果有之前已经序列化的内容,经测试MQ不会自动去签名,此时需要调用topicSession.commit()方法进行手动签名 
            topicSession = topicConn.createSession(true, Session.AUTO_ACKNOWLEDGE); 
            topicPay = topicSession.createTopic(Constants.PAY_TOPIC); 
            topicConn.start(); 
        } catch (JMSException e) { 
            e.printStackTrace(); 
            if (topicConn != null) { 
                try { 
                    topicConn.close(); 
                    topicConn = null; 
                } catch (JMSException e1) { 
                    e1.printStackTrace(); 
                } 
            } 
        } 
    } 
     
    /**
     * 接收消息
     * @param selector
     * @return
     * @throws Exception
     */ 
    static synchronized MessageConsumer receiveResp(String selector) throws Exception { 
            //"chargeClient"为注册到MQ中的用户名,这个建议不要重复,方便在管理后台查看消息数量 
        topicConsumer = topicSession.createDurableSubscriber(topicPay, "chargeClient", selector, Boolean.FALSE); 
        return topicConsumer; 
    } 
     
    /**
     * 监听MQ的消息
     */ 
    public void chargeRespReceive() { 
 
        println "充值服务监听启动..........."; 
 
 
        try { 
                //此处进行选择,过滤掉不需要的消息、这个机制非常好 
            String selector = "receiver = 'charge'"; 
            MessageConsumer consumer = receiveResp(selector); 
            String s = consumer.getMessageSelector(); 
            consumer.setMessageListener(new MessageListener() { 
                public void onMessage(Message arg0) { 
                    MapMessage msg = (MapMessage) arg0; 
                    try { 
                        String srvCode = msg.getString("srvCode"); 
                        String outOrder = msg.getString("outOrder"); 
                        String feedback = msg.getString("feedback"); 
                        String reason = msg.getString("reason"); 
                        String chanelId = msg.getString("chanelId"); 
                        Charge charge = Charge.get(outOrder as long); 
                     &nb

补充:综合编程 , 其他综合 ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,