[分享] .Net实现 WebSphere MQ与Oracle数据库的XA事务管理
WebSphere MQ以下简称为WMQ.
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库
该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.
实现一个MQGet类:
- C# code
using System;using System.Collections.Generic;using System.Text;using System.Collections;using System.Transactions;using IBM.WMQ;namespace WMQClient_WithXA{ public class MqGet { private String _host = "127.0.0.1"; private int _port; private String _channelName = "SYSTEM.DEF.SVRCONN"; private String _queueManagerName = null; private String _queueName = null; private int _charSet; private WMQTransactionType _transactionType; private bool isTopic = false; private String _transportMode = "managed"; private bool commit = true; private MQQueueManager queueManager; private MQQueue queue; private MyMqObject myMqObj; private Hashtable properties; private MQMessage message; private MQGetMessageOptions getMessageOptions; public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed") { getMessageOptions = new MQGetMessageOptions(); _host = sHost; _port = iPort; _channelName = sChannelName; _queueManagerName = sMqQmgrName; _queueName = sQueueName; _transportMode = sTransportMode; _charSet = iCharacterSet; _transactionType = TransactionType; getMessageOptions.Options += MQC.MQGMO_WAIT; getMessageOptions.WaitInterval = 20000; // 20 seconds wait properties = new Hashtable(); properties.Add(MQC.HOST_NAME_PROPERTY, _host); properties.Add(MQC.PORT_PROPERTY, _port); properties.Add(MQC.CHANNEL_PROPERTY, _channelName); switch (TransactionType) { case WMQTransactionType.NORMAL_TRANSACTION: getMessageOptions.Options += MQC.MQGMO_SYNCPOINT; break; case WMQTransactionType.XA_TRANSACTION: if (_transportMode == "managed") { properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED); // for managed mode } else { properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT); } getMessageOptions.Options += MQC.MQGMO_SYNCPOINT; break; } } private MQQueueManager getMqManager(MQQueueManager qmg) { if (qmg == null) { try { qmg = new MQQueueManager(_queueManagerName, properties); return qmg; } catch(Exception err) { Console.WriteLine(err.Message); return null; } } else return qmg; } private MQQueue getMqQ(MQQueue q) { if (q == null) { try { q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING); return q; } catch (Exception err) { Console.WriteLine(err.Message); return null; } } else return q; } private MyMqObject GetMqObj(ref MyMqObject mqObj) { if (mqObj._qMg == null) { mqObj._qMg = getMqManager(mqObj._qMg); mqObj._q = null; } if ( mqObj._qMg !=null ) mqObj._q = getMqQ(mqObj._q); if ( mqObj._q == null ) { CloseMQConn(ref mqObj); } return mqObj; } public void CloseMQConn(ref MyMqObject mqObj) { if (mqObj._q != null) { try { mqObj._q.Close(); } catch{} mqObj._q = null; } if (mqObj._qMg != null) { try { mqObj._qMg.Disconnect(); } catch{} mqObj._qMg = null; } } public MyMessage GetMessage() { myMqObj = GetMqObj(ref myMqObj); if (myMqObj._qMg != null) { message = new MQMessage(); MyMessage msg = new MyMessage(); try { myMqObj._q.Get(message, getMessageOptions); byte[] buff = message.ReadBytes(message.MessageLength); msg.MsgBody = System.Text.Encoding.GetEncoding(CodePageTrans.getMsCodePage(_charSet)).GetString(buff); msg.PutTime = message.PutDateTime.AddHours(8); message.ClearMessage(); return msg; } catch (MQException mqe) { Console.WriteLine("获取消息失败. 原因: " + mqe.ToString()); CloseMQConn(ref myMqObj); } } return null; } public bool Commit() { try { switch (_transactionType) { case WMQTransactionType.NORMAL_TRANSACTION: myMqObj._qMg.Commit(); break; } } catch { return false; } return true; } public bool Rollback() { try { switch (_transactionType) { case WMQTransactionType.NORMAL_TRANSACTION: myMqObj._qMg.Backout(); break; } } catch { return false; } return true; } }}
Oracle使用 ODP.NET 驱动, 测试代码使用显示事务方式, 代码如下:
- C# code
private void btnGetMessage_Click(object sender, EventArgs e){ DbProviderFactory dbFactory = DbProviderFactories.GetFactory("Oracle.DataAccess.Client"); DbConnection dbConn = dbFactory.CreateConnection(); dbConn.ConnectionString = sConnStr; dbConn.Open(); using (CommittableTransaction transScope = new CommittableTransaction()) { CommittableTransaction.Current = transScope; dbConn.EnlistTransaction(transScope); WMQClient_WithXA.MyMessage ss = mqGet.GetMessage(); // 从消息队列1中取出消息 if (ss != null) { MessageBox.Show(ss.MsgBody); DbCommand ocmd = dbFactory.CreateCommand(); ocmd.CommandText = string.Format("insert into HY (NAME) VALUES ('{0}')", Guid.NewGuid().ToString()); ocmd.Connection = dbConn; ocmd.ExecuteNonQuery(); mqPut.PutMessage(ss.MsgBody); // 将消息放入到队列2 transScope.Commit(); // 或者回滚事务 } CommittableTransaction.Current = null; dbConn.Close(); }}
完整DEMO, 免积分下载地址: http://download.csdn.net/detail/hyblusea/4529167
[解决办法]
查水表了
[解决办法]
查水表了
[解决办法]
全部是高手?
[解决办法]
真的是高手啊
[解决办法]
高手啊
[解决办法]
好东西呀
[解决办法]
好东东
[解决办法]
不错,楼主辛苦了
[解决办法]
高手啊
[解决办法]
你的命名看的很爽
[解决办法]
高手高手啊
[解决办法]
高手高手
[解决办法]
学习共享
[解决办法]
好一个实现
[解决办法]
楼主高手
[解决办法]
高点积分下东西,谢谢