File Splitter

For one of the interfaces I was working on we get a flat file with information for multiple identifiers. We needed to split this file so that we only have 1 identifier per file. For this we create a new disassemble pipeline component. We will add 2 attributes to the class. The first is StartIndex, an integer which will define the location of a unique identifier in the file, for example a identifiercode. The second is Length, an integer that indicates how long the identifier is.
Also add a Queue to the class, which will hold the new messages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// <summary>
/// Split an incoming message.
/// </summary>
public class FileSplitter : IBaseComponent,
    IDisassemblerComponent,
    IComponentUI,
    IPersistPropertyBag
{
    /// <summary>
    /// Used to hold disassembled messages.
    /// </summary>
    private System.Collections.Queue qOutputMsgs = 
        new System.Collections.Queue();
 
    /// <summary>
    /// Namespace used to set the promoted properties.
    /// </summary>
    private string systemPropertiesNamespace = 
        @"http://schemas.microsoft.com/BizTalk/2003/system-properties";
 
    /// <summary>
    /// The start index of the identifier.
    /// </summary>
    private int _startIndex;
 
    /// <summary>
    /// The start index of the identifier.
    /// </summary>
    public int StartIndex
    {
        get
        {
            return _startIndex;
        }
        set
        {
            _startIndex = value;
        }
    }
 
    /// <summary>
    /// The length of the identifier.
    /// </summary>
    private int _length;
 
    /// <summary>
    /// The length of the identifier.
    /// </summary>
    public int Length
    {
        get
        {
            return _length;
        }
        set
        {
            _length = value;
        }
    }
}

Now we will create the disassemble method, this is called in the disassemble stage of the pipeline. Here we will read in the message line by line, and check in each line if the identifier in it is the same as the identifier in the previous line. If it is the same, we add it to the current message, if not, we create a new message.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/// <summary>
/// Disassemble method is used to initiate the disassembling of the 
/// message in this pipeline component.
/// </summary>
/// <param name="pc">Pipeline context.</param>
/// <param name="inmsg">Input message.</param>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
    // The namespace in which the new messages should be
    string namespaceURI = 
        "http://www.company.com/BizTalk/Application/v1";
 
    // The root element for the new messages
    string rootElement = "File_Incoming";
 
    // Stringbuilder used to create the new message
    StringBuilder messageString = new StringBuilder();
 
    // Stream that will hold the original message's data
    Stream originalMessageStream;
 
    // Get the original file name
    string srcFileName = pInMsg.Context.Read("ReceivedFileName", 
        "http://schemas.microsoft.com/BizTalk/2003/"
        + "file-properties").ToString().Replace(".txt", "");
 
    // Counter to make the outgoing filename unique
    int count = 0;
 
    try
    {
        // Fetch original message's data
        originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in reading original 
            message: " + ex.Message);
    }
 
    try
    {
        // Create a StreamReader to read the original message's data
        StreamReader sr = new StreamReader(originalMessageStream);
 
        // The identifier for the last line
        string curIdentifier = string.Empty;
 
        // Go through all the lines in the original message
        while (sr.Peek() >= 0)
        {
            // Read the next line
            string line = sr.ReadLine();
 
            // Get the identifier in this line
            string identifier = line.Substring(_startIndex, _length);
 
            // Check if this is the same identifier as in the previous 
            // line
            if (!identifier.Equals(curIdentifier))
            {
                // If not, close current identifier if any
                // This prevents an empty message to be created the
                // first time we come here
                if (!string.IsNullOrEmpty(curIdentifier))
                {
                    // Queue the message
                    CreateOutgoingMessage(pContext, 
                        messageString.ToString(), namespaceURI, 
                        rootElement, 
                        String.Format("{0}_{1}", srcFileName, count));
 
                    // Clear the message for the next message
                    messageString.Remove(0, messageString.Length);
 
                    // Next message will be in a unique file
                    count++;
                }
 
                // From now on we want to compare to this identifier
                curIdentifier = identifier;
            }
 
            // Add the line to the current message
            messageString.Append(line + Environment.NewLine);
        }
 
        // Close current identifier if any
        if (!string.IsNullOrEmpty(curIdentifier))
        {
            // Queue the message
            CreateOutgoingMessage(pContext, messageString.ToString(), 
                namespaceURI, rootElement, 
                String.Format("{0}_{1}", srcFileName, count));
        }
 
        // Close the StreamReader
        sr.Close();
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in writing outgoing " 
            + "messages: " + ex.Message);
    }
    finally
    {
        // Close the StringBuilder
        messageString = null;
    }
}

Now we will implement the GetNext method. This method is used in the pipeline to pass the messages to the next stage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/// <summary>
/// Used to pass output messages to next stage.
/// </summary>
public IBaseMessage GetNext(IPipelineContext pContext)
{
    // Check if there any messages in the queue
    if (qOutputMsgs.Count > 0)
    {
        // Get the next message
        return (IBaseMessage)qOutputMsgs.Dequeue();
    }
 
    return null;
}

Finally we have to create the method that puts the messages in the queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/// <summary>
/// Queue outgoing messages.
/// </summary>
/// <param name="pContext">Pipeline context.</param>
/// <param name="messageString">The string with the new (debatched) message.</param>
/// <param name="namespaceURI">The namespace we want to use for the message.</param>
/// <param name="rootElement">The root element for the message.</param>
/// <param name="sourceFileName">The file name we want to use for the new
message.</param>
private void CreateOutgoingMessage(IPipelineContext pContext, 
    String messageString, string namespaceURI, string rootElement, 
    string sourceFileName)
{
    // The message to be put in the queue for further processing in 
    // the pipeline
    IBaseMessage outMsg;
 
    try
    {
        // Create outgoing message
        outMsg = pContext.GetMessageFactory().CreateMessage();
 
        // Add the body part
        outMsg.AddPart("Body", 
            pContext.GetMessageFactory().CreateMessagePart(), true);
 
        // Add the namespace and root element
        outMsg.Context.Promote("MessageType", 
            systemPropertiesNamespace, namespaceURI + "#" 
            + rootElement.Replace("ns0:", ""));
 
        // Set the filename to be used, this can be used in BizTalk
        // by using the %SourceFileName% identifier
        outMsg.Context.Promote("ReceivedFileName", 
            "http://schemas.microsoft.com/BizTalk/2003/file-properties", 
            sourceFileName);
 
        // Get the outgoing message as bytes
        byte[] bufferOutgoingMessage = 
            System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
 
        // Set the data of the outgoing message
        outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage);
 
        // Place the message in the queue
        qOutputMsgs.Enqueue(outMsg);
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in queueing outgoing " 
            + "messages: " + ex.Message);
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *