forked from lowleveldesign/diagnostics-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MessagePeeker.cs
108 lines (95 loc) · 4.81 KB
/
MessagePeeker.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
using System;
using System.Configuration;
using System.IO;
using System.Messaging;
using NDesk.Options;
namespace MessagePeeker
{
public class Program
{
private static readonly byte[] header = new byte[] { 0x1, 0x2 };
public static void Main(String[] args)
{
String outfile = null;
String queue = null;
bool showhelp = false;
int batchSize = 1000, numberOfFiles = 0;
var p = new OptionSet() {
{ "b|batch=", "How many messages should be read into one file.", (int v) => batchSize = v },
{ "o|output=", "Base name for the output file. It will have numbers appended", v => outfile = v },
{ "n|filecnt=", "Number of files after which the program should stop. If 0 (default) it will finish after queue is empty", (int v) => numberOfFiles = v },
{ "q|queue=", "Queue address.", v => queue = v },
{ "h|help", "show this message and exit", v => showhelp = v != null }
};
try {
p.Parse (args);
} catch (OptionException e) {
Console.WriteLine(e.Message);
Console.WriteLine("Try --help for more information.");
return;
}
if (showhelp || String.IsNullOrEmpty(outfile) || String.IsNullOrEmpty(queue) || batchSize <= 0) {
Console.WriteLine("Options:");
p.WriteOptionDescriptions(Console.Out);
return;
}
if (!MessageQueue.Exists(queue)) {
Console.WriteLine("Cannot connect to the provided queue. Please provide a queue name in format: MachineName\\Private$\\QueueName or any other accepted by MessageQueue class.");
return;
}
int fcnt = 0;
var msmq = new MessageQueue(queue);
using (var cursor = msmq.CreateCursor()) {
bool first = true;
using (var msgHeadersWriter = new StreamWriter(String.Format("{0}.headers", outfile))) {
while (numberOfFiles == 0 || fcnt < numberOfFiles) {
fcnt++;
var dumpFileName = String.Format("{0}.{1}", outfile, fcnt);
using (var fileStream = new FileStream(dumpFileName, FileMode.OpenOrCreate)) {
using (var memoryStream = new MemoryStream()) {
int cnt = 0;
while (cnt++ < batchSize) {
try {
var msg = msmq.Peek(TimeSpan.FromSeconds(10), cursor, first ? PeekAction.Current : PeekAction.Next);
// log message header (for now only id)
msgHeadersWriter.Write("FILE='{0}', OFFSET='{1}', MSGID='{2}' - ",
dumpFileName, fileStream.Position, msg.Id);
first = false;
WriteTo(msg.BodyStream, memoryStream);
WriteMessageHeader(fileStream, (int)memoryStream.Length);
memoryStream.Position = 0; // set position to 0
WriteTo(memoryStream, fileStream);
memoryStream.SetLength(0);
msgHeadersWriter.WriteLine("SUCCESS");
} catch (Exception ex) {
var mex = ex as MessageQueueException;
if (mex != null && mex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
return; // everything is ok
}
msgHeadersWriter.WriteLine("FAILED");
throw;
}
}
}
}
}
}
}
}
private static void WriteMessageHeader(FileStream stream, int length)
{
stream.Write(header, 0, header.Length);
byte[] b = BitConverter.GetBytes(length);
if (BitConverter.IsLittleEndian)
Array.Reverse(b);
stream.Write(b, 0, b.Length);
}
private static byte[] buffer = new byte[0x10000];
public static void WriteTo(Stream sourceStream, Stream targetStream)
{
int n;
while ((n = sourceStream.Read(buffer, 0, buffer.Length)) != 0)
targetStream.Write(buffer, 0, n);
}
}
}