讀取WCF傳送的MSMQ Message

因為工作中的專案有二台不同地點的主機,有資料同步的需求,而這個專案中我們是用WCF + MSMQ來處理資料交換,為了確保資料的完整性,有使用交易性的MSMQ,在WCF的預設機制下一個Message交易失敗十幾次後會把Message移到Poison的目錄下,可以知道有那一些Messqge處理失敗,但是用Windows的工具非常不易閱讀Message的內容,而且某些設定還會使得Message經過編碼,這時就很難看出Message的內容,本篇主要介紹如何用程式把Messge讀出並解碼。

因為工作中的專案有二台不同地點的主機,有資料同步的需求,而這個專案中我們是用WCF + MSMQ來處理資料交換,為了確保資料的完整性,有使用交易性的MSMQ,在WCF的預設機制下一個Message交易失敗十幾次後會把Message移到Poison的目錄下,可以知道有那一些Messqge處理失敗,但是用Windows的工具非常不易閱讀Message的內容,而且某些設定還會使得Message經過編碼,這時就很難看出Message的內容,本篇主要介紹如何用程式把Messge讀出並解碼。

影像 1

圖一 編碼過的Message非常不容易閱讀

 

WCF中Message的編碼格式

在WCF中屬於MSMQ的Binding有二個netMsmqBinding與msmqIntegrationBinding,這樣二個Binding的使用的編碼格式也不同。

netMsmqBinding的編碼格式沒法設定只有一種

  • application/soap+msbin1

 

msmqIntegrationBinding的可以設定serializationFormat屬性來改變編碼格式,格式如下:

  • Xml
  • Binary
  • ActivX
  • ByteArray
  • Steam

NOTE: netMsmqBinding與msmqIntegrationBinding的差異

netMsmqBinding是Client與Server都要是WCF,因為是使用application/soap+msbin1 格式傳過訊息,其他的程式沒辦法直接讀取,且MSMQ的版本要在4.0以上,在寫程式時與MSMQ是弱耦合,與其他Binding寫法沒什麼不同,可以直接的切換到其他Binding,但Message比較大。

msmqIntegrationBinding是Client與Server不一定要是WCF,如果編碼格式是Xml,任何的程式都可以直接讀取,不限MSMQ的版本,在寫程式時與MSMQ是強耦合,無法直接切換到其他Binding。

相關參考:佇列訊息的疑難排解

 

編碼結果

為了讓大家更了解各編碼格式的差異,列出在各編碼格式傳送 [TEST] 的Message內容。

 

編碼格式 application/soap+msbin1 編碼格式 Xml 編碼格式 Binary
內容 影像 7 內容 影像 5 內容 影像 9
大小 124 大小 44 大小 28

 

 

編碼格式 ActivX 編碼格式 ByteArray 編碼格式 Steam
內容 影像 8 內容 影像 10 內容 影像 10
大小 8 大小 4 大小 4

 

讀取WCF傳送的MSMQ Message

netMsmqBinding

因為的netMsmqBinding的application/soap+msbin1格式是,WCF特有的所以在讀取時要比較麻煩,要做一些處理

void Main()
{
    var queue = new System.Messaging.MessageQueue(@".\private$\{YourQueue}\poison$");
    var queueMessage = queue.PeekById(@"{id}");
    
    var stream= GetEnvolopeSteam(queueMessage.BodyStream);    
    
    //new application/soap+msbin1 的Encoder
    var binding = new BinaryMessageEncodingBindingElement();    
    var encoder = binding.CreateMessageEncoderFactory();
    var message =encoder.Encoder.ReadMessage(stream,int.MaxValue);
    
    //message.ToString()可以取得整個SOAP,但Body被省略了
    //Console.Write(message.ToString());
    //取SOAP中的Body
    Console.Write(message.GetReaderAtBodyContents().ReadOuterXml());    
}

//取得Message的屬於SOAP的部分
private Stream GetEnvolopeSteam(Stream originStream)
{        
    //因為Message.BodySteam前面是一些WCF定義,不是SOAP的部分所以要過濾掉
    int prevByte = originStream.ReadByte();    
    int curByte = 0;    
    for (int i = 0; i < originStream.Length; i++)    
    {    
        curByte = originStream.ReadByte();
        //Envolope是從0x56 0x02開始
        if (curByte == 0x02 && prevByte == 0x56)    
        {
            originStream.Position-=2;
            break;    
        }
        
        prevByte = curByte;    
    }    
    
    //產生新的Stream
    byte[] buffer=new byte[4096];
    int count=0;
    var envolopeStream = new MemoryStream();
    while ((count = originStream.Read(buffer,0,buffer.Length)) > 0)
    {            
        envolopeStream.Write(buffer,0,count);
    }            

    envolopeStream.Position=0;
    return envolopeStream;
}
//執行結果,是不是好閱讀許多
<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing">
  <s:Header>
    <a:To s:mustUnderstand="1">net.msmq://./private/test</a:To>
  </s:Header>
  <s:Body>
    <Send xmlns="http://tempuri.org/"><value>TEST</value></Send>
  </s:Body>
</s:Envelope>

 

msmqIntegrationBinding

msmqIntegrationBinding的讀取就相對簡單許多

var queue = new System.Messaging.MessageQueue(@".\private$\{YourQueue}\poison$");
var queueMessage = queue.PeekById(@"{id}");

// XML
message.Formatter =new XmlMessageFormatter(new Type[]{typeof(string)});    
Console.Write(message.Body);

// Binary
message.Formatter =new BinaryMessageFormatter();
Console.Write(message.Body);

// ActiveX
message.Formatter =new ActiveXMessageFormatter();
Console.Write(message.Body);

// ByteArray 與 Stream
//Method 1
message.Formatter =new ActiveXMessageFormatter();    
byte[] bytes =(byte[])message.Body;
Console.Write(Encoding.UTF8.GetString(bytes));

//Method 2
Console.Write(new StreamReader(message.BodyStream).ReadToEnd());