For one of the interfaces I was working on we get a flat file with information for multiple identifiers. We needed to split this file so that we only have 1 identifier per file. For this we create a new disassemble pipeline component. We will add 2 attributes to the class. The first is StartIndex, an integer which will define the location of a unique identifier in the file, for example a identifiercode. The second is Length, an integer that indicates how long the identifier is.
Also add a Queue to the class, which will hold the new messages.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | /// <summary> /// Split an incoming message. /// </summary> public class FileSplitter : IBaseComponent, IDisassemblerComponent, IComponentUI, IPersistPropertyBag { /// <summary> /// Used to hold disassembled messages. /// </summary> private System.Collections.Queue qOutputMsgs = new System.Collections.Queue(); /// <summary> /// Namespace used to set the promoted properties. /// </summary> private string systemPropertiesNamespace = @"http://schemas.microsoft.com/BizTalk/2003/system-properties"; /// <summary> /// The start index of the identifier. /// </summary> private int _startIndex; /// <summary> /// The start index of the identifier. /// </summary> public int StartIndex { get { return _startIndex; } set { _startIndex = value; } } /// <summary> /// The length of the identifier. /// </summary> private int _length; /// <summary> /// The length of the identifier. /// </summary> public int Length { get { return _length; } set { _length = value; } } } |
Now we will create the disassemble method, this is called in the disassemble stage of the pipeline. Here we will read in the message line by line, and check in each line if the identifier in it is the same as the identifier in the previous line. If it is the same, we add it to the current message, if not, we create a new message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | /// <summary> /// Disassemble method is used to initiate the disassembling of the /// message in this pipeline component. /// </summary> /// <param name="pc">Pipeline context.</param> /// <param name="inmsg">Input message.</param> public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg) { // The namespace in which the new messages should be string namespaceURI = "http://www.company.com/BizTalk/Application/v1"; // The root element for the new messages string rootElement = "File_Incoming"; // Stringbuilder used to create the new message StringBuilder messageString = new StringBuilder(); // Stream that will hold the original message's data Stream originalMessageStream; // Get the original file name string srcFileName = pInMsg.Context.Read("ReceivedFileName", "http://schemas.microsoft.com/BizTalk/2003/" + "file-properties").ToString().Replace(".txt", ""); // Counter to make the outgoing filename unique int count = 0; try { // Fetch original message's data originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream(); } catch (Exception ex) { // Something went wrong throw new ApplicationException("Error in reading original message: " + ex.Message); } try { // Create a StreamReader to read the original message's data StreamReader sr = new StreamReader(originalMessageStream); // The identifier for the last line string curIdentifier = string.Empty; // Go through all the lines in the original message while (sr.Peek() >= 0) { // Read the next line string line = sr.ReadLine(); // Get the identifier in this line string identifier = line.Substring(_startIndex, _length); // Check if this is the same identifier as in the previous // line if (!identifier.Equals(curIdentifier)) { // If not, close current identifier if any // This prevents an empty message to be created the // first time we come here if (!string.IsNullOrEmpty(curIdentifier)) { // Queue the message CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement, String.Format("{0}_{1}", srcFileName, count)); // Clear the message for the next message messageString.Remove(0, messageString.Length); // Next message will be in a unique file count++; } // From now on we want to compare to this identifier curIdentifier = identifier; } // Add the line to the current message messageString.Append(line + Environment.NewLine); } // Close current identifier if any if (!string.IsNullOrEmpty(curIdentifier)) { // Queue the message CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement, String.Format("{0}_{1}", srcFileName, count)); } // Close the StreamReader sr.Close(); } catch (Exception ex) { // Something went wrong throw new ApplicationException("Error in writing outgoing " + "messages: " + ex.Message); } finally { // Close the StringBuilder messageString = null; } } |
Now we will implement the GetNext method. This method is used in the pipeline to pass the messages to the next stage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | /// <summary> /// Used to pass output messages to next stage. /// </summary> public IBaseMessage GetNext(IPipelineContext pContext) { // Check if there any messages in the queue if (qOutputMsgs.Count > 0) { // Get the next message return (IBaseMessage)qOutputMsgs.Dequeue(); } return null; } |
Finally we have to create the method that puts the messages in the queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | /// <summary> /// Queue outgoing messages. /// </summary> /// <param name="pContext">Pipeline context.</param> /// <param name="messageString">The string with the new (debatched) message.</param> /// <param name="namespaceURI">The namespace we want to use for the message.</param> /// <param name="rootElement">The root element for the message.</param> /// <param name="sourceFileName">The file name we want to use for the new message.</param> private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement, string sourceFileName) { // The message to be put in the queue for further processing in // the pipeline IBaseMessage outMsg; try { // Create outgoing message outMsg = pContext.GetMessageFactory().CreateMessage(); // Add the body part outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true); // Add the namespace and root element outMsg.Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", "")); // Set the filename to be used, this can be used in BizTalk // by using the %SourceFileName% identifier outMsg.Context.Promote("ReceivedFileName", "http://schemas.microsoft.com/BizTalk/2003/file-properties", sourceFileName); // Get the outgoing message as bytes byte[] bufferOutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString); // Set the data of the outgoing message outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage); // Place the message in the queue qOutputMsgs.Enqueue(outMsg); } catch (Exception ex) { // Something went wrong throw new ApplicationException("Error in queueing outgoing " + "messages: " + ex.Message); } } |