Viewing and Sorting XEvents Efficiently (Code Samples) – XEProfiler


I was doing backups and clean-ups and ran across a couple of sample projects for XEvent and event_sequence processing I thought others might find helpful. – Enjoy!

The sample code is provided "as is" and any express or implied warranties, including the implied warranties of merchantability and fitness for a particular purpose, are disclaimed.


In my previous blog posts, I highlighted the importance of using Event Sequence as the sort key.

How It Works: XEvent Output and Visualization
SQL Server Management Studio Provides–“XE Profiler”
Use the SSMS XEvent Profiler

Some of the feedback has been that sorting can be a cumbersome operation because:

  • Using sys.fn_xe_file_target_read_file the XML has to be queried and indexed which can take time and space, followed by the order by event sequence query
  • Using SQL Server Management Studio (SSMS) and storing the events in a table requires reading the events, streaming to table storage followed by an order by event sequence query
  • Large traces can encounter the 32-bit SSMS memory limitations

Several years ago, I created a XEProfiler based loosely on the SSMS implementation, built as a 64-bit application to avoid the 32-bit memory limitations and using parallel activities to speed up the processing.

image

The XEProfiler Sample

·         Creates an empty DataGrid control

·         Makes a pass over the files to obtain the Event Locators

·         Sorts the EventLocators

·         Leverages the display location to paint the information on the screen

Note: By tracking and using EventLocators, the overall memory is reduced, and the 64-bit address space allows for a larger set of trace data.

clip_image002

TSQL Sample Session

CREATE EVENT SESSION [EventSequence] ON SERVER
ADD EVENT sqlserver.sql_batch_completed
(
    ACTION(package0.event_sequence,sqlserver.client_app_name,sqlserver.session_id
)),
ADD EVENT sqlserver.sql_batch_starting
(
    ACTION(package0.event_sequence,sqlserver.client_app_name,sqlserver.session_id
))
ADD TARGET package0.event_file(SET filename=N'c:\temp\EventSequence',max_file_size=(1024),max_rollover_files=(100
))
WITH
(MAX_MEMORY=1024 MB,EVENT_RETENTION_MODE=ALLOW_SINGLE_EVENT_LOSS,MAX_DISPATCH_LATENCY=30 SECONDS,MAX_EVENT_SIZE=
0
KB
,MEMORY_PARTITION_MODE=PER_CPU,TRACK_CAUSALITY=ON,STARTUP_STATE=OFF
)

Form1.cs

using System;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Microsoft.SqlServer.XEvent.Linq; 

namespace XEProfiler
{
    public partial class xeProfilerForm : Form
    {
        // Event locator index needs
        //
        private class LocationInfo
        {
            public QueryableXEventData Stream
            {
                get;
                private set;
            } 

            public EventLocator Location
            {
                get;
                private set;
            } 

            public ulong Sequence
            {
                get;
                private set;
            } 

            // File can consume memory - for sample purposes only
            //
            public string File
            {
                get;
                private set;
            } 

            public LocationInfo(string file, ulong sequence, EventLocator location, QueryableXEventData stream)
            {
                File = file;
                Sequence = sequence;
                Location = location;
                Stream = stream;
            }
        } 

        private SolidBrush textBrush;
        private Font textFont;
        private StringFormat stringFormat = new StringFormat();
        private LocationInfo[] sortedLocators = null; 

        // Constructor to make display loop a bit like SQL Profiler
        //
        public xeProfilerForm()
        {
            InitializeComponent();
            stringFormat.Alignment = StringAlignment.Near;
            stringFormat.LineAlignment = StringAlignment.Near;
            stringFormat.FormatFlags = StringFormatFlags.NoWrap; 

            textBrush = new SolidBrush(Color.Black);
            textFont = new System.Drawing.Font("Courier New", 8.25F, System.Drawing.FontStyle.Regular, System.Drawing.GraphicsUnit.Point, ((byte)(0))); 

            dataGrid.RowHeadersWidth = 10;
            dataGrid.AllowUserToAddRows = false;
            dataGrid.CellPainting += DataGrid_CellPainting;
        } 

        // TODO: Make this meta data driven with targets for fields vs actions to remove need for switch statement
        //
        string[] ColIndexNames = { "timestamp", "event_sequence", "name", "batch_text", "event_file" }; 

        // Draw the proper text for the current display region
        //
        private void DataGrid_CellPainting(object sender, DataGridViewCellPaintingEventArgs e)
        {
            if (e.ColumnIndex > -1 && e.RowIndex > -1)
            {
                e.PaintBackground(e.ClipBounds, false);
                LocationInfo info = sortedLocators[e.RowIndex];
                PublishedEvent pubEvent = info.Stream.EventProvider.RetrieveEvent(info.Location);
                PublishedAction action;
                PublishedEventField field;

                string strText = string.Empty; 

                switch (e.ColumnIndex)
                {
                    case 0:
                        strText = pubEvent.Timestamp.UtcDateTime.ToString();
                        break; 

                    case 1:
                        if (pubEvent.Actions.TryGetValue(ColIndexNames[e.ColumnIndex], out action))
                        {
                            strText = action.Value.ToString();
                        }
                        break; 

                    case 2:
                        strText = pubEvent.Name;
                        break; 

                    case 3:
                        if (pubEvent.Fields.TryGetValue(ColIndexNames[e.ColumnIndex], out field))
                        {
                            strText = field.Value.ToString();
                        }
                        break; 

                    case 4:
                        strText = info.File;
                        break;
                }              

                e.Graphics.DrawString(strText, textFont, textBrush, e.CellBounds, stringFormat);
                e.Handled = true;
            }
        } 

        // Load the event locators and activate the display
        //
        private void Form1_Load(object sender, EventArgs e)
        {

            dataGrid.Enabled = false;
            string[] files = Directory.GetFiles(@"c:\temp", @"EventSequence*", SearchOption.TopDirectoryOnly); 

            object locationLock = new object();
            int rows = 0;
            List<LocationInfo> eventLocators = new List<LocationInfo>(); 

            // Load the EventLocators in parallel
            //
            Parallel.ForEach (files, file  =>
            {
                LoadEvents(ref rows, ref locationLock, ref eventLocators, file);
            }); 

            // Get array sorted by sequence values, allowing parallel Linq activities
            //
            sortedLocators = (from locator in eventLocators select locator)
                                 .OrderBy(x => x.Sequence)
                                 .ToArray(); 

            SetRowCount(rows, true);
        } 

        // Load the events into the list to be sorted and used as index for display
        //
        private void LoadEvents(ref int rows, ref object locationLock, ref List<LocationInfo> eventLocators, string file)
        {
            string fileName = Path.GetFileNameWithoutExtension(file);

            // Open and track the stream so we can use later to access event data
            //
            QueryableXEventData stream = new QueryableXEventData(file); 

            foreach (PublishedEvent pubEvent in stream)
            {
                // Sequence may have holes due to options such as Event Loss
                // All events must be read before final sort can be closed.
                //
                ulong sequence = (ulong)pubEvent.Actions["event_sequence"].Value;
                EventLocator locator = pubEvent.Location;
                LocationInfo info = new LocationInfo(fileName, sequence, locator, stream); 

                lock (locationLock)
                {
                    eventLocators.Add(info);
                } 

                // Simple progress
                //
                int rowCount = Interlocked.Increment(ref rows);
                if (rowCount % 128 == 0)
                {
                    SetRowCount(rowCount, false);
                }
            }
        } 

        // Once row count is complete display headers (TODO: based on metadata)
        // and update the rowcount in status bar
        //
        private void SetRowCount(int rows, bool finished)
        {
            toolStripRowCount.Text = String.Format("{0}", rows);
            toolStripRowCount.Invalidate(); 

            if (finished)
            {
                textBoxDisplay.Text = String.Empty;
                dataGrid.Columns.Clear(); 

                DataGridViewColumn newColumn = new DataGridViewTextBoxColumn();
                newColumn.Name = "timestamp";
                newColumn.HeaderText = "timestamp";
                newColumn.ToolTipText = "Timestamp of the captured event";
                dataGrid.Columns.Add(newColumn); 

                newColumn = new DataGridViewTextBoxColumn();
                newColumn.Name = "sequence";
                newColumn.HeaderText = "sequence";
                newColumn.ToolTipText = "Event sequence of the captured event";
                dataGrid.Columns.Add(newColumn); 

                newColumn = new DataGridViewTextBoxColumn();
                newColumn.Name = "name";
                newColumn.HeaderText = "name";
                newColumn.ToolTipText = "Name of the captured event";
                dataGrid.Columns.Add(newColumn); 

                newColumn = new DataGridViewTextBoxColumn();
                newColumn.Name = "batch_text";
                newColumn.HeaderText = "batch_text";
                newColumn.ToolTipText = "Batch text";
                newColumn.Width = 200;
                dataGrid.Columns.Add(newColumn); 

                newColumn = new DataGridViewTextBoxColumn();
                newColumn.Name = "file";
                newColumn.HeaderText = "file";
                newColumn.ToolTipText = "Event source file";
                newColumn.Width = 200;
                dataGrid.Columns.Add(newColumn); 

                //  Change the dataGrid to an OwnerDraw (OnPaint) so we only have to
                //  draw the data and not really load it into the data grid rows
                //
                dataGrid.RowCount = rows;
                if (rows > 0)
                {
                    dataGrid.Rows[1].Selected = true;
                } 

                dataGrid.Enabled = true;
                dataGrid.Invalidate();
                dataGrid.Refresh();
            }
        } 

        // Batch text display
        //
        private void dataGrid_SelectionChanged(object sender, EventArgs e)
        {
            textBoxDisplay.Text = String.Empty; 

            if (0 != dataGrid.SelectedRows.Count)
            {
                List<DataGridViewRow> orderedSelection = (from DataGridViewRow row in dataGrid.SelectedRows select row)
                                                          .OrderBy(x=>x.Index)
                                                          .ToList(); 

                foreach (DataGridViewRow selectedRow in orderedSelection)
                {
                    if (textBoxDisplay.Text != String.Empty)
                    {
                        textBoxDisplay.Text += "\r\ngo\r\n";
                    } 

                    LocationInfo info = sortedLocators[selectedRow.Index];
                    PublishedEvent pubEvent = info.Stream.EventProvider.RetrieveEvent(info.Location);
                    PublishedEventField field;
                    string strText = string.Empty; 

                    if (pubEvent.Fields.TryGetValue("batch_text", out field))
                    {
                        strText = field.Value.ToString();
                    } 

                    textBoxDisplay.Text += strText;
                }
            }
        }
    }
}
 

Form1.Designer.cs

namespace XEProfiler
{
    partial class xeProfilerForm
    {
        /// <summary>
        /// Required designer variable.
        /// </summary>
        private System.ComponentModel.IContainer components = null;

        /// <summary>
        /// Clean up any resources being used.
        /// </summary>
        /// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param>
        protected override void Dispose(bool disposing)
        {
            if (disposing && (components != null))
            {
                components.Dispose();
            }
            base.Dispose(disposing);
        }

        #region Windows Form Designer generated code

        /// <summary>
        /// Required method for Designer support - do not modify
        /// the contents of this method with the code editor.
        /// </summary>
        private void InitializeComponent()
        {
            System.Windows.Forms.DataGridViewCellStyle dataGridViewCellStyle1 = new System.Windows.Forms.DataGridViewCellStyle();
            System.Windows.Forms.DataGridViewCellStyle dataGridViewCellStyle2 = new System.Windows.Forms.DataGridViewCellStyle();
            System.ComponentModel.ComponentResourceManager resources = new System.ComponentModel.ComponentResourceManager(typeof(xeProfilerForm));
            this.dataGrid = new System.Windows.Forms.DataGridView();
            this.statusBar = new System.Windows.Forms.StatusStrip();
            this.labelRowCount = new System.Windows.Forms.ToolStripStatusLabel();
            this.toolStripRowCount = new System.Windows.Forms.ToolStripStatusLabel();
            this.menuStrip1 = new System.Windows.Forms.MenuStrip();
            this.fileToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.editToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.viewToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.toolsToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.windowToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.helpToolStripMenuItem = new System.Windows.Forms.ToolStripMenuItem();
            this.textBoxDisplay = new System.Windows.Forms.TextBox();
            this.splitContainer1 = new System.Windows.Forms.SplitContainer();
            ((System.ComponentModel.ISupportInitialize)(this.dataGrid)).BeginInit();
            this.statusBar.SuspendLayout();
            this.menuStrip1.SuspendLayout();
            ((System.ComponentModel.ISupportInitialize)(this.splitContainer1)).BeginInit();
            this.splitContainer1.Panel1.SuspendLayout();
            this.splitContainer1.Panel2.SuspendLayout();
            this.splitContainer1.SuspendLayout();
            this.SuspendLayout();
            //
            // dataGrid
            //
            this.dataGrid.AllowUserToAddRows = false;
            this.dataGrid.AllowUserToDeleteRows = false;
            this.dataGrid.AllowUserToOrderColumns = true;
            this.dataGrid.ColumnHeadersHeightSizeMode = System.Windows.Forms.DataGridViewColumnHeadersHeightSizeMode.AutoSize;
            dataGridViewCellStyle1.Alignment = System.Windows.Forms.DataGridViewContentAlignment.MiddleLeft;
            dataGridViewCellStyle1.BackColor = System.Drawing.SystemColors.Window;
            dataGridViewCellStyle1.Font = new System.Drawing.Font("Courier New", 8.25F, System.Drawing.FontStyle.Regular, System.Drawing.GraphicsUnit.Point, ((byte)(0)));
            dataGridViewCellStyle1.ForeColor = System.Drawing.SystemColors.ControlText;
            dataGridViewCellStyle1.SelectionBackColor = System.Drawing.SystemColors.Highlight;
            dataGridViewCellStyle1.SelectionForeColor = System.Drawing.SystemColors.HighlightText;
            dataGridViewCellStyle1.WrapMode = System.Windows.Forms.DataGridViewTriState.False;
            this.dataGrid.DefaultCellStyle = dataGridViewCellStyle1;
            this.dataGrid.Dock = System.Windows.Forms.DockStyle.Fill;
            this.dataGrid.Location = new System.Drawing.Point(0, 0);
            this.dataGrid.Name = "dataGrid";
            this.dataGrid.ReadOnly = true;
            dataGridViewCellStyle2.Alignment = System.Windows.Forms.DataGridViewContentAlignment.MiddleLeft;
            dataGridViewCellStyle2.BackColor = System.Drawing.SystemColors.Control;
            dataGridViewCellStyle2.Font = new System.Drawing.Font("Courier New", 8.25F, System.Drawing.FontStyle.Regular, System.Drawing.GraphicsUnit.Point, ((byte)(0)));
            dataGridViewCellStyle2.ForeColor = System.Drawing.SystemColors.WindowText;
            dataGridViewCellStyle2.SelectionBackColor = System.Drawing.SystemColors.Highlight;
            dataGridViewCellStyle2.SelectionForeColor = System.Drawing.SystemColors.HighlightText;
            dataGridViewCellStyle2.WrapMode = System.Windows.Forms.DataGridViewTriState.True;
            this.dataGrid.RowHeadersDefaultCellStyle = dataGridViewCellStyle2;
            this.dataGrid.RowTemplate.Height = 18;
            this.dataGrid.Size = new System.Drawing.Size(1089, 277);
            this.dataGrid.TabIndex = 0;
            this.dataGrid.SelectionChanged += new System.EventHandler(this.dataGrid_SelectionChanged);
            //
            // statusBar
            //
            this.statusBar.Items.AddRange(new System.Windows.Forms.ToolStripItem[] {
            this.labelRowCount,
            this.toolStripRowCount});
            this.statusBar.Location = new System.Drawing.Point(0, 479);
            this.statusBar.Name = "statusBar";
            this.statusBar.Size = new System.Drawing.Size(1089, 22);
            this.statusBar.TabIndex = 1;
            this.statusBar.Text = "statusBar";
            //
            // labelRowCount
            //
            this.labelRowCount.Name = "labelRowCount";
            this.labelRowCount.Size = new System.Drawing.Size(41, 17);
            this.labelRowCount.Text = "Rows: ";
            //
            // toolStripRowCount
            //
            this.toolStripRowCount.Name = "toolStripRowCount";
            this.toolStripRowCount.Size = new System.Drawing.Size(108, 17);
            this.toolStripRowCount.Text = "toolStripRowCount";
            //
            // menuStrip1
            //
            this.menuStrip1.Items.AddRange(new System.Windows.Forms.ToolStripItem[] {
            this.fileToolStripMenuItem,
            this.editToolStripMenuItem,
            this.viewToolStripMenuItem,
            this.toolsToolStripMenuItem,
            this.windowToolStripMenuItem,
            this.helpToolStripMenuItem});
            this.menuStrip1.Location = new System.Drawing.Point(0, 0);
            this.menuStrip1.Name = "menuStrip1";
            this.menuStrip1.Size = new System.Drawing.Size(1089, 24);
            this.menuStrip1.TabIndex = 2;
            this.menuStrip1.Text = "menuStrip1";
            //
            // fileToolStripMenuItem
            //
            this.fileToolStripMenuItem.Name = "fileToolStripMenuItem";
            this.fileToolStripMenuItem.Size = new System.Drawing.Size(37, 20);
            this.fileToolStripMenuItem.Text = "File";
            //
            // editToolStripMenuItem
            //
            this.editToolStripMenuItem.Name = "editToolStripMenuItem";
            this.editToolStripMenuItem.Size = new System.Drawing.Size(39, 20);
            this.editToolStripMenuItem.Text = "Edit";
            //
            // viewToolStripMenuItem
            //
            this.viewToolStripMenuItem.Name = "viewToolStripMenuItem";
            this.viewToolStripMenuItem.Size = new System.Drawing.Size(44, 20);
            this.viewToolStripMenuItem.Text = "View";
            //
            // toolsToolStripMenuItem
            //
            this.toolsToolStripMenuItem.Name = "toolsToolStripMenuItem";
            this.toolsToolStripMenuItem.Size = new System.Drawing.Size(47, 20);
            this.toolsToolStripMenuItem.Text = "Tools";
            //
            // windowToolStripMenuItem
            //
            this.windowToolStripMenuItem.Name = "windowToolStripMenuItem";
            this.windowToolStripMenuItem.Size = new System.Drawing.Size(63, 20);
            this.windowToolStripMenuItem.Text = "Window";
            //
            // helpToolStripMenuItem
            //
            this.helpToolStripMenuItem.Name = "helpToolStripMenuItem";
            this.helpToolStripMenuItem.Size = new System.Drawing.Size(44, 20);
            this.helpToolStripMenuItem.Text = "Help";
            //
            // textBoxDisplay
            //
            this.textBoxDisplay.Dock = System.Windows.Forms.DockStyle.Fill;
            this.textBoxDisplay.HideSelection = false;
            this.textBoxDisplay.Location = new System.Drawing.Point(0, 0);
            this.textBoxDisplay.Multiline = true;
            this.textBoxDisplay.Name = "textBoxDisplay";
            this.textBoxDisplay.ReadOnly = true;
            this.textBoxDisplay.ScrollBars = System.Windows.Forms.ScrollBars.Both;
            this.textBoxDisplay.Size = new System.Drawing.Size(1089, 174);
            this.textBoxDisplay.TabIndex = 3;
            //
            // splitContainer1
            //
            this.splitContainer1.Dock = System.Windows.Forms.DockStyle.Fill;
            this.splitContainer1.Location = new System.Drawing.Point(0, 24);
            this.splitContainer1.Name = "splitContainer1";
            this.splitContainer1.Orientation = System.Windows.Forms.Orientation.Horizontal;
            //
            // splitContainer1.Panel1
            //
            this.splitContainer1.Panel1.Controls.Add(this.dataGrid);
            this.splitContainer1.Panel1MinSize = 150;
            //
            // splitContainer1.Panel2
            //
            this.splitContainer1.Panel2.Controls.Add(this.textBoxDisplay);
            this.splitContainer1.Panel2MinSize = 150;
            this.splitContainer1.Size = new System.Drawing.Size(1089, 455);
            this.splitContainer1.SplitterDistance = 277;
            this.splitContainer1.TabIndex = 4;
            //
            // xeProfilerForm
            //
            this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F);
            this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font;
            this.ClientSize = new System.Drawing.Size(1089, 501);
            this.Controls.Add(this.splitContainer1);
            this.Controls.Add(this.statusBar);
            this.Controls.Add(this.menuStrip1);
            this.Icon = ((System.Drawing.Icon)(resources.GetObject("$this.Icon")));
            this.MainMenuStrip = this.menuStrip1;
            this.Name = "xeProfilerForm";
            this.Text = "XEProfiler";
            this.WindowState = System.Windows.Forms.FormWindowState.Maximized;
            this.Load += new System.EventHandler(this.Form1_Load);
            ((System.ComponentModel.ISupportInitialize)(this.dataGrid)).EndInit();
            this.statusBar.ResumeLayout(false);
            this.statusBar.PerformLayout();
            this.menuStrip1.ResumeLayout(false);
            this.menuStrip1.PerformLayout();
            this.splitContainer1.Panel1.ResumeLayout(false);
            this.splitContainer1.Panel2.ResumeLayout(false);
            this.splitContainer1.Panel2.PerformLayout();
            ((System.ComponentModel.ISupportInitialize)(this.splitContainer1)).EndInit();
            this.splitContainer1.ResumeLayout(false);
            this.ResumeLayout(false);
            this.PerformLayout();

        }

        #endregion

        private System.Windows.Forms.DataGridView dataGrid;
        private System.Windows.Forms.StatusStrip statusBar;
        private System.Windows.Forms.ToolStripStatusLabel labelRowCount;
        private System.Windows.Forms.ToolStripStatusLabel toolStripRowCount;
        private System.Windows.Forms.MenuStrip menuStrip1;
        private System.Windows.Forms.ToolStripMenuItem fileToolStripMenuItem;
        private System.Windows.Forms.ToolStripMenuItem editToolStripMenuItem;
        private System.Windows.Forms.ToolStripMenuItem viewToolStripMenuItem;
        private System.Windows.Forms.ToolStripMenuItem toolsToolStripMenuItem;
        private System.Windows.Forms.ToolStripMenuItem windowToolStripMenuItem;
        private System.Windows.Forms.ToolStripMenuItem helpToolStripMenuItem;
        private System.Windows.Forms.TextBox textBoxDisplay;
        private System.Windows.Forms.SplitContainer splitContainer1;
    }
}

MergeXEvents

MergeXEvents consumes XEvent files, using optimized parallel sort algorithms and outputting events into the physical file in event_sequence order.  This was a project I created in 2014 to help Microsoft SQL Server support process incoming XEvent files, including the ability to merge XEvent files from multiple servers such as replication or high availability XEvent capture.  The project uses parallel threads to perform scatter/gather activities as well as tournament sorting to optimize the sorting and merging activities.

Program.cs

/*
*      Designed to merge XEvents from multiple files into a single stream sorted by Event Sequence, Time and Server name
*     
*      This allows capture of large XEvent traces, using per CPU partitioning, and then post sorting in sequence order so the
*      stream is in physical sequence order.
*
*      Arrays are owned by ## of partitions and dynamically expanded as events are read and added to them
*      Once all data is read each array is sorted in ascending or decending order (by a a group of parallel workers)
*      Once sorted the tournament sort is used by a single reader to gather the array streams and write to the output file
*     
*      Output File[n..] <----- Parallel Task [Ordered Output Array Segment 1] |                                                           | <----- Array[1]       
*                                                                             | <---- FWriteEntry <-- Gather Streams  <-- Parellel sorts  | <----- Array[2] <----- Distribute streams | Reader
*      Output File[n..] <----- Parallel Task [Ordered Output Array Segment N] |                                                           | <----- Array[3]
*
*      NOTE:  Testing shows parallel Output files do not have performance gains.  The original EventProvider, which is not thread safe,
*              prevents you opening and using a second copy of the input stream and a second EventProvider and adding lock() activities between the
*              RetreiveObject(EventLocator) and the Serialize causes more context swithing than just doing the work on a single thread.
* */

#define SEPERATE_OUTPUT

using System;
using System.Reflection;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.SqlServer.XEvent.Linq;
using System.IO;
using VersionInfo;
using System.Security.AccessControl;
using System.Runtime.InteropServices;
using ScatterGatherParallelNS;
using System.Threading;

namespace SortManager
{
    abstract class ISortLogOutput
    {
        abstract public void LogOutputMessage(string strMsg);
    }

#region DynamicList
    public sealed class TDynamicList<_T> : IDisposable
    {
        internal TDynamicList()
        {
            m_Items = new List<_T>();
        }

        public void Dispose()
        {
            if (null != m_Items)
            {
                m_Items.Clear();
            }

            m_Items = null;
        }

        internal void Add(_T newEntry)
        {
            lock (m_Items)
            {
                m_Items.Add(newEntry);
            }
        }

        internal bool FMoveNext()
        {
            return m_Walker.MoveNext();
        }

        internal _T GetCurrentEntry
        {
            get
            {
                System.Diagnostics.Debug.Assert(null != m_Walker.Current);
                return m_Walker.Current;
            }
        }

        internal bool FHasData
        {
            get
            {
                return null != m_Walker.Current;
            }
        }

        internal Int32 StoredItems
        {
            get { return m_Items.Count; }
        }

        internal void Sort(IComparer<_T> comparer)
        {
            if (null != m_Items && m_Items.Count > 0)
            {
                m_Items.Sort(comparer);
            }

            m_Walker = m_Items.GetEnumerator();
        }

        //      Members
        List<_T>                  m_Items;
        List<_T>.Enumerator       m_Walker;
    }
#endregion

#region DynamicArray
    public sealed class TDynamicArray<_T> : IDisposable
    {
        internal TDynamicArray()
        {
            m_Items = ArrayList.Synchronized(new ArrayList());
            m_iCurrentReadPos = -1;
        }

        public void Dispose()
        {
            if (null != m_Items)
            {
                m_Items.Clear();
            }

            m_iCurrentReadPos = -1;
            m_Items = null;
        }

        internal void Add(_T newEntry)
        {
            m_Items.Add(newEntry);
        }

        internal bool FMoveNext()
        {
            m_iCurrentReadPos++;
            return m_iCurrentReadPos < m_Items.Count;
        }

        internal _T GetCurrentEntry
        {
            get
            {
                return (_T) m_Items[m_iCurrentReadPos];
            }
        }

        public _T Item(int iItem)
        {
            return (_T) m_Items[iItem];
        }
 
        internal Int32 StoredItems
        {
            get { return m_Items.Count;  }
        }

        internal int Count
        {
            get { return m_Items.Count;  }
        }
       
        //      Members
        ArrayList           m_Items;
        System.Int32        m_iCurrentReadPos;           //     Current reader position 
    }
#endregion

#region SortMatchTree
    internal class TSortMatchingTree<_T> : IDisposable
    {
        internal TSortMatchingTree(TDynamicList<_T>[] partitions, bool bAscending = true)
        {
            m_bAscending = bAscending;
            m_Partitions = partitions;

            m_bWinners = new Int16[MAX_LEVELS+1, MAX_PARTITIONS];
                     
            //      Lowest level is always self
            for(Int16 iPartition = 0; iPartition < MAX_PARTITIONS; iPartition++)
            {
                m_bWinners[MAX_LEVELS, iPartition] = iPartition;
            }
        }

        public void Dispose()
        {
            m_bWinners = null;
        }
       
        public void PrimeEntries(IComparer<_T> comparer)
        {
            m_Comparer = comparer;

            for (Int16 iSlot = 0; iSlot < MAX_PARTITIONS; iSlot++)
            {
                FTreeInsert(iSlot);
            }
        }

        /*=========================================================
               0     1            Parents
            0  1  2  3           
           01 23 45 56 ...        Pairs
        =========================================================*/   
        //    Given an index who are we playing this match agianst
        //
        internal Int16 GetMatchTargetIndex(Int16 iPartition)
        {
            Int16 iMatchIndex = iPartition;
           
            iMatchIndex += (0 == iPartition % 2) ? (Int16)1 : (Int16)(-1);

            return iMatchIndex;
        }

        internal Int16 GetParentIndex(Int16 iPartition)
        {
            Int16 iParent;

            iParent = (Int16) (iPartition / 2);

            return iParent;
        }

        internal bool FTreeInsert(Int16 iPartitionIn)
        {
            bool    bRC = true;
               
            //===========================================================================
            //           
http://en.wikipedia.org/wiki/Tournament_sort
            //            http://en.wikipedia.org/wiki/Heapsort
            //            http://en.wikipedia.org/wiki/Merge_sort
            //===========================================================================       
            Int16 iPartition = iPartitionIn;
           
            for (Int16 iCurrentLevel = MAX_LEVELS; iCurrentLevel > 0; iCurrentLevel--)
            {
                Int16 iPartnerPartition = GetMatchTargetIndex(iPartition);
                Int16 iParent = GetParentIndex(iPartition);

                Int16 iPartnerCompare = m_bWinners[iCurrentLevel, iPartnerPartition];
                Int16 iCurrentCompare = m_bWinners[iCurrentLevel, iPartition];

                Int16 iPrevWinner = m_bWinners[iCurrentLevel - 1, iParent];

                if (INVALID_PARTITION == iPartnerCompare || false == m_Partitions[iPartnerCompare].FHasData)
                {
                    m_bWinners[iCurrentLevel - 1, iParent] = iCurrentCompare;
                }
                else if (INVALID_PARTITION == iCurrentCompare || false == m_Partitions[iCurrentCompare].FHasData)
                {
                    m_bWinners[iCurrentLevel - 1, iParent] = iPartnerCompare;
                }
                else
                {
                    if (true == m_bAscending)
                    {
                        if(m_Comparer.Compare(m_Partitions[iCurrentCompare].GetCurrentEntry,  m_Partitions[iPartnerCompare].GetCurrentEntry) < 0)
                        {
                            m_bWinners[iCurrentLevel - 1, iParent] = iCurrentCompare;
                        }
                        else
                        {
                            m_bWinners[iCurrentLevel - 1, iParent] = iPartnerCompare;
                        }
                    }
                    else
                    {
                        if (m_Comparer.Compare(m_Partitions[iCurrentCompare].GetCurrentEntry, m_Partitions[iPartnerCompare].GetCurrentEntry) > 0)
                        {
                            m_bWinners[iCurrentLevel - 1, iParent] = iCurrentCompare;
                        }
                        else
                        {
                            m_bWinners[iCurrentLevel - 1, iParent] = iPartnerCompare;
                        }
                    }
                }

                if (iPrevWinner == m_bWinners[iCurrentLevel - 1, iParent] && iPrevWinner != iPartitionIn)        //     I changed so we have to recheck assumptions of previous matches
                {
                    break;
                }

                iPartition = iParent;
            }
           
            return bRC;
        }

        internal bool RemoveHead(out _T entry)
        {
            bool bRC = false;
            Int16 iWinner = m_bWinners[0, 0];

            if (INVALID_PARTITION == iWinner || false == m_Partitions[iWinner].FHasData)
            {
                ;
            }
            else
            {
                entry = m_Partitions[iWinner].GetCurrentEntry;

                m_Partitions[iWinner].FMoveNext();        //  FTreeInsert has to be called in true or false case
                FTreeInsert(iWinner);

                bRC = true;
                return bRC;
            }

            entry = default(_T);
            return bRC;
        }

        internal void ValidateAllProcessed()
        {
            for (int iPartition = 0; iPartition < MAX_PARTITIONS; iPartition++)
            {
                System.Diagnostics.Debug.Assert( false == m_Partitions[iPartition].FHasData);
            }
        }

        //      Members and Constants
        public const int MAX_PARTITIONS     = 32;           //     Has to be even number to allow matching to work properly - must be power of 2 to handle max levels
        public const int INVALID_PARTITION  = 255;          //     No further entries in the partition
        public const int MAX_LEVELS         = 5;
       
        TDynamicList<_T>[]          m_Partitions;           //  Reference to the partitions for actual values
        IComparer<_T>               m_Comparer;             //  Reference to comparision
        Int16[,]                    m_bWinners;             //  Tournament tree storage
        bool                        m_bAscending;           //  Sort direction
    }
   
    public abstract class ISortReaderWriter<_T>
    {
        abstract public bool FReadEntry(out _T entry, ref int iEnumerator, int iStepping);
        abstract public bool FWriteEntry(_T entry);

        abstract public void ClearReadAhead();
        abstract public void DoReadAhead(int iSourceFileSlot);
    }

    public class TSortManager<_T> : ISortReaderWriter<_T>, IDisposable
    {
        const Int32 OUTPUT_CHIMB = 250000;

        public override void ClearReadAhead()
        {
            throw new NotImplementedException();
        }
        public override void DoReadAhead(int iSourceFileSlot)
        {
            throw new NotImplementedException();
        }

        internal TSortManager(ISortLogOutput oSortOutput)
        {
            m_oSortOutput = oSortOutput;

            m_PartitionedItemStorage = new TDynamicList<_T>[TSortMatchingTree<_T>.MAX_PARTITIONS];

            for (int iSlot = 0; iSlot < TSortMatchingTree<_T>.MAX_PARTITIONS; iSlot++)
            {
                m_PartitionedItemStorage[iSlot] = new TDynamicList<_T>();
            }

            m_MatchingTree = new TSortMatchingTree<_T>(m_PartitionedItemStorage);
           
            m_i32ActivePartition = 0;
            m_ui64TotalReadEntries = 0;
            m_ui64TotalWriteEntries = 0;

            m_stopWatch = new System.Diagnostics.Stopwatch();
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool bDisposing)
        {
            if (true == bDisposing)
            {
                m_MatchingTree.Dispose();
                m_MatchingTree = null;

                if (null != m_PartitionedItemStorage)
                {
                    for (Int32 iSlot = 0; iSlot < TSortMatchingTree<_T>.MAX_PARTITIONS; iSlot++)
                    {
                        m_PartitionedItemStorage[iSlot] = null;
                    }
                }

                m_PartitionedItemStorage = null;
                m_stopWatch = null;
            }
        }

        public void LogOutputMessage(string strMsg)
        {
            m_oSortOutput.LogOutputMessage(strMsg);
        }

        internal int ReadCPS
        {
            get
            {
                return Math.Min(Math.Max(2, Environment.ProcessorCount - 1), 4);
            }
        }
       
        static int m_iReaderThreadId = -1;
        internal void ReaderThread()
        {
            int iEnumerator = System.Threading.Interlocked.Increment(ref m_iReaderThreadId);
            int iCPUs = ReadCPS;
          
            _T entry;

            while (true == FReadEntry(out entry, ref iEnumerator, iCPUs))
            {
                Add(entry);

                System.Int64 i64Read = System.Threading.Interlocked.Increment(ref m_ui64TotalReadEntries);
                if (0 == i64Read % OUTPUT_CHIMB)
                {
                    LogOutputMessage(String.Format("     Entries read: {0} : {1} per sec", i64Read, (UInt64)((double)i64Read / Math.Max(1, m_stopWatch.ElapsedMilliseconds / 1000.0))));
                }
            }
        }

        internal void DoSort(IComparer<_T> comparer)
        {
            _T entry;

            m_stopWatch.Start();

            LogOutputMessage("Loading partitions");

            int iCPUs = ReadCPS;
            System.Threading.Thread[] readThreads = new System.Threading.Thread[iCPUs];

            for (int iThread = 0; iThread < iCPUs; iThread++)
            {
                readThreads[iThread] = new System.Threading.Thread(ReaderThread);
                readThreads[iThread].Start();
            }

            foreach (System.Threading.Thread thread in readThreads)
            {
                thread.Join();
            }

            LogOutputMessage("Reads complete, invoking parallel sort of partitions");
            SetupSortForExternalConsumption(comparer);

            ClearReadAhead();
          
            LogOutputMessage("Gathering partitions and spooling output");

            Int64 ui64StartWriteMS = m_stopWatch.ElapsedMilliseconds;
           
            while(true == RemoveHead(out entry))
            {
                m_ui64TotalWriteEntries++;
                if (false == FWriteEntry(entry))
                {
                    break;
                }
               
                if (0 == m_ui64TotalWriteEntries % OUTPUT_CHIMB)
                {
                    LogOutputMessage(String.Format("     Entries streamed: {0} : {1} per sec  {2}%", m_ui64TotalWriteEntries,
                       (UInt64)((double)m_ui64TotalWriteEntries / Math.Max(1, (m_stopWatch.ElapsedMilliseconds-ui64StartWriteMS) / 1000.0)),
                       (Int32) (((m_ui64TotalWriteEntries / (double) m_ui64TotalReadEntries) * 100.0))) );
                }
            }

            m_stopWatch.Stop();

            ClearReadAhead();

             System.Diagnostics.Debug.Assert(m_ui64TotalReadEntries == m_ui64TotalWriteEntries);
            m_MatchingTree.ValidateAllProcessed();
        }

        override public bool FReadEntry(out _T entry, ref int iEnumerator, int iStepping)
        {
            throw new NotImplementedException();
            //return false;
        }

        override public bool FWriteEntry(_T entry)
        {
            throw new NotImplementedException();
         }

        internal void Add(_T newEntry)
        {
            Int32 iPartition = System.Threading.Interlocked.Increment(ref m_i32ActivePartition) % TSortMatchingTree<_T>.MAX_PARTITIONS;
            m_PartitionedItemStorage[iPartition].Add(newEntry);
        }

        internal void SetupSortForExternalConsumption(IComparer<_T> comparer)
        {
            Parallel.ForEach(m_PartitionedItemStorage, (partitionList) =>
                                {
                                    partitionList.Sort(comparer);
                                    partitionList.FMoveNext();       //  Position to first entry in list or EOF
                                }
                );
           
             m_MatchingTree.PrimeEntries(comparer);
        }

        internal bool RemoveHead(out _T entry)
        {
            return m_MatchingTree.RemoveHead(out entry);
        }

        internal Int64 Write_Stats
        {
            get { return m_ui64TotalWriteEntries;  }
        }

        internal Int64 Read_Stats
        {
            get { return m_ui64TotalReadEntries; }
        }

        internal System.Diagnostics.Stopwatch StopWatch
        {
            get { return m_stopWatch;  }
        }

        //      Members 
        TDynamicList<_T>[]                  m_PartitionedItemStorage;
        Int32                               m_i32ActivePartition;
        TSortMatchingTree<_T>               m_MatchingTree;
        long                                m_ui64TotalReadEntries;
        long                                m_ui64TotalWriteEntries;
        System.Diagnostics.Stopwatch        m_stopWatch;
        SortManager.ISortLogOutput          m_oSortOutput;
    }
#endregion
   
}           //      End if generic sort manager namespace


namespace MergeXEvents
{
#region SortEntry
    public class CXESortEntry
    {
        internal CXESortEntry()
        {
            m_timestamp = default(DateTimeOffset);
            m_ui64EventSequence = 0;       //  0 indicates NOT set
            m_ui64ServerNameHash = 0;
            m_iCookie = 0;
            m_EventLocator = null;         /  This is only unique at a file level so the cookie used to get the proper file is important to making sure we get back to the proper offset

            m_uiUniqueIdentity = (UInt64) System.Threading.Interlocked.Increment(ref s_uiUniqueIdentity);
        }

        internal CXESortEntry(DateTimeOffset timestamp, UInt64 ui64EventSeq, UInt64 uiServerNameHash)
        {
            m_timestamp = timestamp;
            m_ui64EventSequence = ui64EventSeq;
            m_ui64ServerNameHash = uiServerNameHash;
            m_iCookie = 0;
            m_EventLocator = null;

            //      Wanted to use memory address of the entry but with GC moving things we needed a stable value
            //
            //      This is added because the data could have duplicate entries.  For example if you merge files from
            //      2 different servers the event sequence could be repeated (1 from each trace)
            //
            m_uiUniqueIdentity = (UInt64) System.Threading.Interlocked.Increment(ref s_uiUniqueIdentity);
        }

        internal UInt64 EventSequence
        {
            get { return m_ui64EventSequence; }
            set { m_ui64EventSequence = value; }
        }

        internal UInt64 ServerNameHash
        {
            get { return m_ui64ServerNameHash; }
            set { m_ui64ServerNameHash = value; }
        }

        internal DateTimeOffset Timestamp
        {
            get { return m_timestamp; }
            set { m_timestamp = value; }
        }

        internal UInt64 UniqueIdentity
        {
            get { return m_uiUniqueIdentity;  }
        }

        public EventLocator EventLocator
        {
            get { return m_EventLocator; }
            set { m_EventLocator = value; }
        }

        public Int32 Cookie
        {
            get { return m_iCookie;  }
            set { m_iCookie = value;  }
        }

        //      Members
        EventLocator    m_EventLocator;         //  So we can use this to get the event later
        Int32           m_iCookie;              //  External cookie for sort reader and writer logic
        DateTimeOffset  m_timestamp;            //  timestamp on the event
        UInt64          m_ui64EventSequence;    //  event_sequence action
        UInt64          m_ui64ServerNameHash;   //  GetHashCode of the server_principle_name action
        UInt64          m_uiUniqueIdentity;     //  Used to make sure we have a final sort order if timestamps and server names match without event sequence

        static long s_uiUniqueIdentity;
    }
#endregion

#region Comparision
    class CXEComparerSeqTimeServer : IComparer<CXESortEntry>
    {
        int IComparer<CXESortEntry>.Compare(CXESortEntry xeLeft, CXESortEntry xeRight)
        {
            Int32 iRC = 0;      //  Note we have UInt64s so we can't do simple match or it can overflow the Int32 return value

            bool bDoTimestamp = false;

            //      Event sequence
            if (0 != xeLeft.EventSequence && 0 != xeRight.EventSequence)
            {
                if (xeLeft.EventSequence < xeRight.EventSequence)
                    iRC = -1;
                else if (xeLeft.EventSequence > xeRight.EventSequence)
                    iRC = 1;
                else
                    bDoTimestamp = true;
            }
            else
            {
                bDoTimestamp = true;
            }
           
            if (true == bDoTimestamp)
            {
                if (xeLeft.Timestamp < xeRight.Timestamp)
                    iRC = -1;
                else if (xeLeft.Timestamp > xeRight.Timestamp)
                    iRC = 1;
            }

            if (0 == iRC)
            {
                if (0 != xeLeft.ServerNameHash && 0 != xeRight.ServerNameHash)
                {
                    if (xeLeft.ServerNameHash < xeRight.ServerNameHash)
                        iRC = -1;
                    else if (xeLeft.ServerNameHash > xeRight.ServerNameHash)
                        iRC = 1;
                }
            }

            if (0 == iRC)
            {
                if (xeLeft.UniqueIdentity < xeRight.UniqueIdentity)
                    iRC = -1;
                else if (xeLeft.UniqueIdentity > xeRight.UniqueIdentity)
                    iRC = 1;
            }
           
            return iRC;
        }

    }
#endregion

#region StreamToFile
    internal class CSourceFileEntry :  IDisposable
    {
        public CSourceFileEntry(String strFileName)
        {
            m_strXELName = strFileName;
            m_XEventSourceFile = null;
            m_XEEventEnumerator = null;
            #if SEPERATE_OUTPUT
                m_XEventSourceFileDuplicateForOutput = null;
                m_DuplicateOpenSemaphore = new Semaphore(0, 1);
            #endif
        }

        public void Dispose()
        {
            m_strXELName = String.Empty;

            if(null != m_XEventSourceFile)
            {
                m_XEventSourceFile.Dispose();
                m_XEventSourceFile = null;
            }

            #if SEPERATE_OUTPUT
                if(null != m_XEventSourceFileDuplicateForOutput)
                {
                    m_XEventSourceFileDuplicateForOutput.Dispose();
                    m_XEventSourceFileDuplicateForOutput = null;
                }

                if (null != m_DuplicateOpenSemaphore)
                {
                    m_DuplicateOpenSemaphore.Close();
                    m_DuplicateOpenSemaphore = null;
                }

            #endif
        }

        public String FileName
        {
            get { return m_strXELName;  }
        }

        public QueryableXEventData XELFile
        {
            get { return m_XEventSourceFile;  }
        }

        public IEnumerator<PublishedEvent> EventEnumerator
        {
            get { return m_XEEventEnumerator;  }
        }

        #if SEPERATE_OUTPUT
            public QueryableXEventData XELFileForOutput
            {
                get { return m_XEventSourceFileDuplicateForOutput; }
            }

            public IEnumerator<PublishedEvent> EventEnumeratorForOutput
            {
                get { return m_XEEventEnumeratorDuplicateForOutput; }
            }

            public void WaitForOutputOpen()
            {
                if (true == m_DuplicateOpenSemaphore.WaitOne())
                {
                    m_DuplicateOpenSemaphore.Release();
                }
            }
        #endif

        public void Open()
        {
            #if SEPERATE_OUTPUT
                //  Found if we double read the file into another provider we can pass the published
                //  event across these providers (same exact metadata and buffers) so we can seperate
                //  the retreive from serialize and increase the output speed
                //
                try
                {
                    m_XEventSourceFileDuplicateForOutput = new QueryableXEventData(m_strXELName);
                    m_XEEventEnumeratorDuplicateForOutput = m_XEventSourceFileDuplicateForOutput.GetEnumerator();
                }
                finally
                {
                    m_DuplicateOpenSemaphore.Release();
                }
            #endif

            m_XEventSourceFile = new QueryableXEventData(m_strXELName);
            m_XEEventEnumerator = m_XEventSourceFile.GetEnumerator();
        }

        public bool FIsOpen()
        {
            return null != m_XEEventEnumerator;
        }

        private QueryableXEventData         m_XEventSourceFile;
        private IEnumerator<PublishedEvent> m_XEEventEnumerator;
       
        #if SEPERATE_OUTPUT
                private QueryableXEventData         m_XEventSourceFileDuplicateForOutput;
                private IEnumerator<PublishedEvent> m_XEEventEnumeratorDuplicateForOutput;
                private Semaphore                   m_DuplicateOpenSemaphore;
        #endif
       
        private String                      m_strXELName;
    }

    public class CXEventSortStream : SortManager.TSortManager<CXESortEntry>
    {
        internal CXEventSortStream(SortManager.ISortLogOutput oLogOutput) :
            base(oLogOutput)
        {
            m_XEventSourceFiles = new SortManager.TDynamicArray<CSourceFileEntry>();
        }

        internal void DoSort()
        {
            DoSort(new CXEComparerSeqTimeServer());
        }

        override public bool FReadEntry(out CXESortEntry entry, ref int iEnumerator, int iStepping)
        {
            bool bRC = false;

            PublishedEvent publishedEvent = null;

            do
            {
                if (iEnumerator < m_XEventSourceFiles.StoredItems)
                {
                    if(false == m_XEventSourceFiles.Item(iEnumerator).FIsOpen())
                    {
                        m_XEventSourceFiles.Item(iEnumerator).Open();
                    }

                    if (true == m_XEventSourceFiles.Item(iEnumerator).EventEnumerator.MoveNext())
                    {
                        publishedEvent = m_XEventSourceFiles.Item(iEnumerator).EventEnumerator.Current;
                    }
                    else
                    {
                        iEnumerator += iStepping;
                    }
                }

            } while (null == publishedEvent && iEnumerator < m_XEventSourceFiles.StoredItems);

            if (null != publishedEvent)
            {
                entry = default(CXESortEntry);
                entry = new CXESortEntry();

                entry.EventLocator = publishedEvent.Location;
                entry.Cookie = iEnumerator;
                entry.Timestamp = publishedEvent.Timestamp;

                PublishedAction eventSeq;
                if (true == publishedEvent.Actions.TryGetValue(@"event_sequence", out eventSeq))
                {
                    entry.EventSequence = (UInt64) eventSeq.Value;
                }

                PublishedAction serverName;
                if (true == publishedEvent.Actions.TryGetValue(@"server_principal_name", out serverName))
                {
                    entry.ServerNameHash = (UInt64)serverName.Value.GetHashCode();
                }

                bRC = true;
            }
            else
            {
                entry = default(CXESortEntry);
            }

          
            return bRC;
        }

        override public bool FWriteEntry(CXESortEntry entry)
        {
            return true;
        }

        internal void AddInputFile(string strXELFile)
        {
            CSourceFileEntry se = new CSourceFileEntry(strXELFile);
            lock (m_XEventSourceFiles)
            {
                m_XEventSourceFiles.Add(se);
            }
        }

        internal SortManager.TDynamicArray<CSourceFileEntry> SourceFiles
        {
            get { return m_XEventSourceFiles; }
        }

        //      Members
        private SortManager.TDynamicArray<CSourceFileEntry> m_XEventSourceFiles;
    }

    class CParallelPayload
    {
        public CParallelPayload(CXEventFileToFileSortManager sortFileMgr, CXESortEntry sortEntry)
        {
            m_sortFileMgr = sortFileMgr;
            m_sortEntry = sortEntry;
            m_publishedEvent = null;
        }

        public CXESortEntry SortEntry
        {
            get { return m_sortEntry; }
        }

        public CXEventFileToFileSortManager SortFileMgr
        {
            get { return m_sortFileMgr; }
        }

        public PublishedEvent PublishedEvent
        {
            get { return m_publishedEvent; }
            set { m_publishedEvent = value;  }
        }

        private CXEventFileToFileSortManager m_sortFileMgr;
        private CXESortEntry m_sortEntry;
        private PublishedEvent m_publishedEvent;
    }

    internal class CScatterGatherStream : ScatterGatherParallelMgr<CParallelPayload>
    {
        public CScatterGatherStream() :
            base(2)
        {
        }

        override public void DoUnOrderedWork(CParallelPayload pPayload)
        {
            pPayload.SortFileMgr.DoUnorderedWork(pPayload);
        }
        override public void DoOrderedWork(CParallelPayload pPayload)
        {
            pPayload.SortFileMgr.DoOrderedWork(pPayload);
        }
       
        override public void OrderedWorkerStalled(CParallelPayload pPayload)
        {
        }

        override public void OrderedWorkerNoStall()
        {
        }

    }


    internal class CXEventFileToFileSortManager : CXEventSortStream
    {
        internal CXEventFileToFileSortManager(SortManager.ISortLogOutput oLogOutput)  :
            base(oLogOutput)
        {
            m_iCancelReadAhead = 0;
            m_ScatterGatherOutputStream = new CScatterGatherStream();
            m_strLastOutputName = String.Empty;
            m_lFileCount = 1;
            m_strOutputFile = String.Empty;
            m_strOutputDir = String.Empty;
            m_XEOutputStream = null;
            m_ui64RolloverCount = 0;

            m_dictReadAhead = new Dictionary<int, Thread>();
        }

        ~CXEventFileToFileSortManager()
        {
            CloseFile();
        }

        new internal void DoSort()
        {
            base.DoSort();
            m_ScatterGatherOutputStream.CompleteAllWork();
            CloseFile();
        }
      
        private void CloseFile()
        {
            if(String.Empty != m_strLastOutputName && null != m_XEOutputStream)
            {
                m_XEOutputStream.Dispose();
                m_XEOutputStream = null;
              
                string strDirectory = System.IO.Path.GetDirectoryName(m_strLastOutputName);
                strDirectory = System.IO.Path.GetFullPath(strDirectory);

                string strFileSpec = System.IO.Path.GetFileName(m_strLastOutputName);

                //  Strip extention
                strFileSpec = strFileSpec.Substring(0, strFileSpec.Length - 4);
                strFileSpec += "*.xel";

                IEnumerable<string> strFiles = System.IO.Directory.EnumerateFiles(strDirectory, strFileSpec);
               
                int iLoops = 0;
                foreach(string strFile in strFiles)
                {
                    if (0 != iLoops++)
                    {
                        SetNewOutputFileName();
                        iLoops++;
                    }

                    FileAttributes fa = File.GetAttributes(strFile);
                    if (FileAttributes.Compressed == (FileAttributes.Compressed & fa))
                    {
                        LogOutputMessage(" WARN: Performance may be impacted as output file appears to be using compression.");
                    }

                    System.IO.Directory.Move(strFile, m_strLastOutputName);
                }
                   
                m_strLastOutputName = String.Empty;
            }
        }

        internal void SetNewOutputFileName(CXESortEntry entry = null)
        {

            if (null != entry)
            {
                m_strLastOutputName = String.Format(@"{0}\{1}_0 ymd({2}-{3}-{4}) hms({5}-{6}-{7}.{8})_{9}.xel", m_strOutputDir, m_strOutputFile,
                            entry.Timestamp.ToUniversalTime().Year,
                            entry.Timestamp.ToUniversalTime().Month,
                            entry.Timestamp.ToUniversalTime().Day,
                            entry.Timestamp.ToUniversalTime().Hour,
                            entry.Timestamp.ToUniversalTime().Minute,
                            entry.Timestamp.ToUniversalTime().Second,
                            entry.Timestamp.ToUniversalTime().Millisecond,
                            m_lFileCount++);
            }
            else   //   Rollover path
            {
                string strCount = (m_lFileCount -1).ToString();     //  File count is incremented when the name was built
                string strFile = m_strLastOutputName.Substring(0, m_strLastOutputName.Length - 4 - strCount.Length);

                m_strLastOutputName = String.Format(@"{0}{1}.xel", strFile, m_lFileCount++);
            }

            if (true == File.Exists(m_strLastOutputName))
            {
                FileAttributes fa = File.GetAttributes(m_strLastOutputName);
                File.SetAttributes(m_strLastOutputName, fa & ~FileAttributes.ReadOnly);
                System.IO.File.Delete(m_strLastOutputName);
            }
        }

        internal void DoUnorderedWork(CParallelPayload payload)
        {
            lock (SourceFiles.Item(payload.SortEntry.Cookie).XELFile.EventProvider)
            {
                payload.PublishedEvent = SourceFiles.Item(payload.SortEntry.Cookie).XELFile.EventProvider.RetrieveEvent(payload.SortEntry.EventLocator);
            }
        }


#if DEBUG
        UInt64 m_ui64SeqCheck = 0;
#endif
        internal void DoOrderedWork(CParallelPayload payload)
        {
            //========================================================================
            //  Generate file name for output based on the timestamp so it is easy to see which file you want to open.
            //  Toggle close and open the output stream to force a numbered rollover
            //
            //      We use 1 millon because this is the current SSMS display limit
            //
            //      1,000,000 approaches 1GB too often so shrink the number of entries per output file
            //========================================================================
            if (null == m_XEOutputStream || 0 == (m_ui64RolloverCount + 1) % 500000)
            {
                CloseFile();
                SetNewOutputFileName(payload.SortEntry);

                //      Setup the file for output
                m_XEOutputStream = new XEventFileSerializer(m_strLastOutputName);
            }

#if DEBUG
            //  Safety check
            if (m_ui64SeqCheck < payload.SortEntry.EventSequence)
            {
                m_ui64SeqCheck = payload.SortEntry.EventSequence;
            }
            else
            {
                throw new Exception("UTILITY ERROR; Attempt to stream duplicate entry");
            }
#endif
            #if SEPERATE_OUTPUT
                SourceFiles.Item(payload.SortEntry.Cookie).XELFileForOutput.EventProvider.SerializeEvent(m_XEOutputStream, payload.PublishedEvent);
                m_ui64RolloverCount++;
            #else
                lock (SourceFiles.Item(payload.SortEntry.Cookie).XELFile.EventProvider)
                {
                    SourceFiles.Item(payload.SortEntry.Cookie).XELFile.EventProvider.SerializeEvent(m_XEOutputStream, payload.PublishedEvent);
                    m_ui64RolloverCount++;
                }
#endif
        }

        override public bool FWriteEntry(CXESortEntry entry)
        {
            CParallelPayload payload = new CParallelPayload(this, entry);
             
            //      Do inline work
             #if SEPERATE_OUTPUT
                m_ScatterGatherOutputStream.FQueueWorkItem(payload);
            #else
                m_ScatterGatherOutputStream.DoUnOrderedWork(payload);
                m_ScatterGatherOutputStream.DoOrderedWork(payload);
            #endif
      
            return true;
        }

        public override void ClearReadAhead()
        {
            //  Let everyone know they are done
            Interlocked.Increment(ref m_iCancelReadAhead);

            lock(m_dictReadAhead)
            {
                //  Make sure all active threads are done
                foreach(KeyValuePair<int, Thread> oKV in m_dictReadAhead)
                {
                    oKV.Value.Join();
                }

                m_dictReadAhead.Clear();
            }

            //  Reset for next execution
            m_iCancelReadAhead = 0;
        }
        public override void DoReadAhead(int iFileEnumSlot)
        {
            int iCookie = -1;
            Thread tReadAhead = new Thread(DoReadAheadInternal);

            lock (m_dictReadAhead)
            {
                Thread tReadAheadDummy;
                if (false == m_dictReadAhead.TryGetValue(iFileEnumSlot, out tReadAheadDummy))
                {
                    iCookie = iFileEnumSlot;
                    m_dictReadAhead.Add(iFileEnumSlot, tReadAhead);
                }
            }

            //  Do we need to start the read-ahead to suck data into file system cache
            if (iCookie > -1)
            {
                tReadAhead.Start(iCookie);
            }
        }

        internal void DoReadAheadInternal(object oParms)
        {
            try
            {
                int iCookie = (int) oParms;
                string strName = SourceFiles.Item(iCookie).FileName;
#if DEBUG
                LogOutputMessage(String.Format(" INFO: Starting file system cache read-ahead for {0}",  strName));
#endif
                //      We just open the file and read in chunks, allowing caching to fill file system cache quickly
                const int c_iMaxReadSize = ((1024 * 1024) * 4);
               
                using (FileStream s = new FileStream(strName, FileMode.Open, FileAccess.Read, FileShare.Read | FileShare.Write | FileShare.Delete, c_iMaxReadSize, FileOptions.SequentialScan))
                {
                    byte[] bData = new byte[c_iMaxReadSize];

                    ulong ulLoops = 0;
                    int iBytesRead = s.Read(bData, 0, c_iMaxReadSize);
                    while (0 == m_iCancelReadAhead && iBytesRead > 0)
                    {
                        iBytesRead = 0;
                        ulLoops++;
                        if(0 == ulLoops % 100)
                        {
                            Thread.Sleep(10);
                        }

                        iBytesRead = s.Read(bData, 0, c_iMaxReadSize);
                    }
                }

                #if SEPERATE_OUTPUT
                    SourceFiles.Item(iCookie).WaitForOutputOpen();
                    if (null != SourceFiles.Item(iCookie).EventEnumeratorForOutput)
                    {
                        while (true == SourceFiles.Item(iCookie).EventEnumeratorForOutput.MoveNext()) ;
                    }
                #endif
            }
            catch(Exception e)
            {
                LogOutputMessage(String.Format(" INFO: read ahead encounted exection for {0} ignoring.", e.ToString()));
            }
        }

        internal void SetOutputFile(string strXELFile)
        {
            m_strOutputFile = System.IO.Path.GetFileNameWithoutExtension(strXELFile);
            m_strOutputDir = System.IO.Path.GetDirectoryName(strXELFile);
        }
     
        //      Members
        Dictionary<int, Thread>                                 m_dictReadAhead;
        CScatterGatherStream                                    m_ScatterGatherOutputStream;
        string                                                  m_strOutputFile;
        string                                                  m_strOutputDir;
        XEventFileSerializer                                    m_XEOutputStream;
        string                                                  m_strLastOutputName;
        int                                                     m_lFileCount;
        int                                                     m_iCancelReadAhead;
        UInt64                                                  m_ui64RolloverCount;
    }
#endregion

    class CSortOutput : SortManager.ISortLogOutput, IDisposable
    {
        public CSortOutput(string strOutputPath)
        {
            m_OutputFile = null;

            string strDirectory = System.IO.Path.GetDirectoryName(strOutputPath);

            Program.FCreateDirectoryWithACLs(strDirectory);

            m_OutputFile = new System.IO.StreamWriter(String.Format(@"{0}\MergeEvents.log", strDirectory));
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected void Dispose(bool bDisposing)
        {
            if(true == bDisposing)
            {
                m_OutputFile.Close();
                m_OutputFile = null;
            }
        }

        override public void LogOutputMessage(string strMsg)
        {
            Program.LogToConsole(strMsg);
            if (null != m_OutputFile)
            {
                m_OutputFile.WriteLine("{0} {1} {2}", DateTime.Now, System.Threading.Thread.CurrentThread.ManagedThreadId, strMsg);
                m_OutputFile.Flush();
            }
        }

        private System.IO.StreamWriter m_OutputFile;
    }

    class Program
    {
        static internal void LogToConsole(string strMsg)
        {
            Console.WriteLine("{0} {1} {2}", DateTime.Now, System.Threading.Thread.CurrentThread.ManagedThreadId, strMsg);
        }

        static internal void ShowUsage()
        {
            LogToConsole("");
            LogToConsole("Usage");
            LogToConsole("------------------------------------------------------------------------");
            LogToConsole("/? Present this help information");
            LogToConsole("");
            LogToConsole("EXAMPLE:  MergeXEvents.exe <input spec> <output spec>");
            LogToConsole("          MergeXEvents.exe \"C:\\temp\\XEvent_0_130494871596480000.xel\" \"C:\\temp\\Sorted.xel\" ");
            LogToConsole("");
            LogToConsole("<input spec> any valid directory and wildcard.   All files matching wildcard in the directory will be processed as input");
            LogToConsole("<output spec> is any valid file name where output for the merge can be stored, including rollover files.");
            LogToConsole("");
        }

#region AssemblyInformation
        static public string AssemblyTitle
        {
            get
            {
                object[] attributes = Assembly.GetExecutingAssembly().GetCustomAttributes(typeof(AssemblyTitleAttribute), false);
                if (attributes.Length > 0)
                {
                    AssemblyTitleAttribute titleAttribute = (AssemblyTitleAttribute)attributes[0];
                    if (false == String.IsNullOrEmpty(titleAttribute.Title))
                        return titleAttribute.Title;
                }

                return System.IO.Path.GetFileNameWithoutExtension(Assembly.GetExecutingAssembly().CodeBase);
            }
        }

        static public string AssemblyVersion
        {
            get
            {
                return VersionStrings.VERSION_MAJOR_STR + VersionStrings.VERSION_MINOR_STR + VersionStrings.VERSION_BUILD_STR;
            }
        }

       static  public string AssemblyDescription
        {
            get
            {
                object[] attributes = Assembly.GetExecutingAssembly().GetCustomAttributes(typeof(AssemblyDescriptionAttribute), false);
                if (attributes.Length == 0)
                    return "";

                return ((AssemblyDescriptionAttribute)attributes[0]).Description;
            }
        }

        static public string AssemblyProduct
        {
            get
            {
                object[] attributes = Assembly.GetExecutingAssembly().GetCustomAttributes(typeof(AssemblyProductAttribute), false);
                if (attributes.Length == 0)
                    return "";
               
                return ((AssemblyProductAttribute)attributes[0]).Product;
            }
        }

        static public string AssemblyCopyright
        {
            get
            {
                object[] attributes = Assembly.GetExecutingAssembly().GetCustomAttributes(typeof(AssemblyCopyrightAttribute), false);
                if (attributes.Length == 0)
                    return "";

                return ((AssemblyCopyrightAttribute)attributes[0]).Copyright;
            }
        }

        static public string AssemblyCompany
        {
            get
            {
                object[] attributes = Assembly.GetExecutingAssembly().GetCustomAttributes(typeof(AssemblyCompanyAttribute), false);
                if (attributes.Length == 0)
                    return "";

                return ((AssemblyCompanyAttribute)attributes[0]).Company;
            }
        }
        #endregion

#region DIR_ACLS
        const int FILE_PERSISTENT_ACLS = 0x00000008;

        [DllImport("kernel32.dll", CharSet = CharSet.Unicode)]
        public static extern bool GetVolumeInformation(string lpRootPathName,
                                                            StringBuilder lpVolumeNameBuffer,
                                                            [MarshalAs(UnmanagedType.U4)] int nVolumeNameSize,
                                                            [MarshalAs(UnmanagedType.U4)] ref int lpVOlumeSerialNumber,
                                                            [MarshalAs(UnmanagedType.U4)] ref int lpMaxComponentLen,
                                                            [MarshalAs(UnmanagedType.U4)] ref int lpFileSytemFlags,
                                                            StringBuilder lpFileSystemNameBuffer,
                                                            [MarshalAs(UnmanagedType.U4)] int nFileSystemNameSize);

        static public bool FCreateDirectoryWithACLs(string strDirName)
        {
            bool bRC = false;

            System.IO.Directory.CreateDirectory(strDirName);
            DirectorySecurity dss = new DirectorySecurity();

            try
            {
                dss.AddAccessRule(new FileSystemAccessRule(@"NT AUTHORITY\ANONYMOUS LOGON",
                                                            FileSystemRights.FullControl,
                                                            AccessControlType.Deny));
            }
            catch (Exception e)
            {
                LogToConsole(String.Format("Exception encountered setting ANONYMOUS access rule {0}", e.ToString()));
                throw;
            }
       
            try
            {
                dss.AddAccessRule(new FileSystemAccessRule(System.Security.Principal.WindowsBuiltInRole.Guest.ToString(),
                                                            FileSystemRights.FullControl,
                                                            AccessControlType.Deny));
            }
            catch (Exception e)
            {
                LogToConsole(String.Format("Exception encountered setting GUEST access rule {0}", e.ToString()));
                throw;
            }

            try
            {
                dss.AddAccessRule(new FileSystemAccessRule(@"BuiltIn\Administrators",
                                                            FileSystemRights.ReadExtendedAttributes | FileSystemRights.ReadAttributes | FileSystemRights.Synchronize,
                                                            AccessControlType.Allow));
            }
            catch (Exception e)
            {
                LogToConsole(String.Format("Exception encountered setting Administrators access rule {0}", e.ToString()));
                throw;
            }

            String strCurrentUser = System.Security.Principal.WindowsIdentity.GetCurrent().Name;

            try
            {
                dss.AddAccessRule(new FileSystemAccessRule(strCurrentUser,
                                                           FileSystemRights.FullControl,
                                                           AccessControlType.Allow));
            }
            catch (Exception e)
            {
                LogToConsole(String.Format("Exception encountered setting current user access rule for: " + strCurrentUser, e));
                throw;
            }

            String strRoot = System.IO.Path.GetPathRoot(strDirName);
            if (false == strRoot.EndsWith("\\", StringComparison.Ordinal))
            {
                strRoot += "\\";
            }

            int iSerialNumber = 0;
            int iMaxComponent = 0;
            int iFlags = 0;

            bRC = GetVolumeInformation(strRoot, null, 0, ref iSerialNumber, ref iMaxComponent, ref iFlags, null, 0);
            if (false == bRC)
            {
                LogToConsole("Attempt to obtain volume information failed.");
            }
            else if (FILE_PERSISTENT_ACLS != (iFlags & FILE_PERSISTENT_ACLS))
            {
                LogToConsole("ERROR: Specified location does not support proper access controls.");
                bRC = false;
            }

            if(true == bRC)
            {
                System.IO.Directory.SetAccessControl(strDirName, dss);
                bRC = true;
            }

            return bRC;
        }
#endregion

        //==============================================================
        static int Main(string[] args)
        {
            LogToConsole(AssemblyTitle);
            LogToConsole(String.Format("Version {0}", AssemblyVersion));
            LogToConsole(AssemblyCopyright);
            LogToConsole("------------------------------------------------------------");
            LogToConsole("");
           
            if(args.Length != 2)
            {
                ShowUsage();
                return -1;
            }

            return DoWork(args[0], args[1]);
        }

        private static int DoWork(string strInput, string strOutput)
        {
            int iRC = -100;

            try
            {
                CXEventFileToFileSortManager xeSortMgr = new CXEventFileToFileSortManager(new CSortOutput(strOutput));
                string strDirectory = System.IO.Path.GetDirectoryName(strInput);
                strDirectory = System.IO.Path.GetFullPath(strDirectory);
                string strFileSpec = System.IO.Path.GetFileName(strInput);
                string[] strParts = strFileSpec.Split('_');

                strFileSpec = String.Empty;
               
                for (int iLoop = 0; iLoop < strParts.Length - 2; iLoop++ )
                {
                    strFileSpec += strParts[iLoop] + '_';
                }

                if(String.Empty != strFileSpec)
                {
                    strFileSpec += "*.xel";
                }

                int iFiles = 0;
                IEnumerable<string> strFiles = System.IO.Directory.EnumerateFiles(strDirectory, strFileSpec);
                Parallel.ForEach (strFiles, (strFile) =>
                {
                    LogToConsole(String.Format("VALIDATING INPUT SOURCE: {0}", strFile));
                
                    xeSortMgr.AddInputFile(strFile);
                    System.Threading.Interlocked.Increment(ref iFiles);

                    //      Since the XE Linq reader uses file mapping compression defeats the alignment attempts on sector boundaries
                    //
                    FileAttributes fa = File.GetAttributes(strFile);
                    if (FileAttributes.Compressed == (FileAttributes.Compressed & fa))
                    {
                        LogToConsole(" WARN: Performance may be impacted as input file appears to be using compression.");
                    }
                });

                xeSortMgr.ClearReadAhead();

                if(iFiles > 0)
                {
                    strDirectory = System.IO.Path.GetDirectoryName(strOutput);
                    strDirectory = System.IO.Path.GetFullPath(strDirectory);

                    if (false == FCreateDirectoryWithACLs(strDirectory))
                    {
                        LogToConsole(String.Format("ERROR: Unable to create or set ALCs on {0}", strDirectory));
                        iRC = -6;
                    }
                    else
                    {
                        strFileSpec = System.IO.Path.GetFullPath(strOutput);

                        LogToConsole(String.Format("      OUTPUT: {0}", strFileSpec));
                        {
                            LogToConsole("Removing files in output location with matching file name pattern.");

                            string strFileNameNoExt = System.IO.Path.GetFileNameWithoutExtension(strFileSpec);
                            string strLastOutputNamePattern = String.Format(@"{0}_0 ymd(*) hms(*)_*.xel", strFileNameNoExt);
                            strFiles = System.IO.Directory.EnumerateFiles(strDirectory, strLastOutputNamePattern);

                            foreach(string strFile in strFiles)
                            {
                                FileAttributes fa = File.GetAttributes(strFile);
                                File.SetAttributes(strFile, fa & ~FileAttributes.ReadOnly);
                                System.IO.File.Delete(strFile);
                            }
                        }

                        xeSortMgr.SetOutputFile(strFileSpec);

                        LogToConsole("");
                        LogToConsole("Beginning sort action ...");

                        xeSortMgr.DoSort();
                       
                        LogToConsole(String.Format("Sort action has completed (Elapsed {0} sec)", xeSortMgr.StopWatch.ElapsedMilliseconds / 1000.0));
                        LogToConsole("");

                        LogToConsole(String.Format("    Events read: {0}", xeSortMgr.Read_Stats));
                        LogToConsole(String.Format("Events streamed: {0}", xeSortMgr.Write_Stats));

                        if (xeSortMgr.Read_Stats != xeSortMgr.Write_Stats)
                            iRC = -3;
                        else if (0 == xeSortMgr.Read_Stats)
                            iRC = -4;
                        else
                            iRC = 0;
                    }
                }
                else
                {
                    LogToConsole("ERROR: Could not locate any input files.");
                    iRC = -2;
                }

            }
            catch (Exception e)
            {
                LogToConsole("");
                LogToConsole("Error / Exception encountered");
                LogToConsole("==========================================================");
                LogToConsole(e.ToString());
                iRC = -99;
            }

            return iRC;
        }

    }
}

ScatterGather.cs

using System;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace ScatterGatherParallelNS
{
    #region ScatterGatherParallelMgr_IMPL
    public abstract class ScatterGatherParallelMgr<_TPayload> : IDisposable
    {
        private BlockingCollection<CMsg<_TPayload>> m_FreeQueue;        //  Msg has been processed, it can handle other payload
        private BlockingCollection<CMsg<_TPayload>> m_UnOrderedQueue;   //  Msg is dirty and needs processed when on this queue
        private BlockingCollection<CMsg<_TPayload>> m_OrderedQueue;     //  Msg is ready for final processing when on this queue
        private ArrayList m_alUnOrderedThreads;
        private Thread m_OrderedThread;
        private bool m_bInitialized;
        private Int32 m_TotalUnOrderedThreads;
        private Int32 m_TotalMessages;
        private bool m_bOrderedIsCurrentlyStalled;
        private Int64 m_iTotalOrderStalls;

        public ScatterGatherParallelMgr(int iUnorderedLimit)
        {
            m_TotalUnOrderedThreads = Math.Max(2, Environment.ProcessorCount-1);
            m_TotalUnOrderedThreads = Math.Min(iUnorderedLimit, m_TotalUnOrderedThreads);
            m_TotalMessages = m_TotalUnOrderedThreads * 128;           //  ## per CPU
            m_bInitialized = false;
            m_FreeQueue = null;
            m_UnOrderedQueue = null;
            m_OrderedQueue = null;
            m_alUnOrderedThreads = null;
            m_OrderedThread = null;
            m_bOrderedIsCurrentlyStalled = false;
            m_iTotalOrderStalls = 0;
        }

        public bool FIsOrderedWorkerStalled
        {
            get { return m_bOrderedIsCurrentlyStalled; }
        }

        public Int64 TotalOrderedWorkerStalls
        {
            get { return m_iTotalOrderStalls; }
        }

        virtual public void AbortAllWorkers()
        {
            if (null != m_alUnOrderedThreads)
            {
                foreach (Thread t in m_alUnOrderedThreads)
                {
                    try
                    {
                        t.Abort();
                    }
                    catch (Exception)
                    {
                    }
                }
            }

            if (null != m_OrderedThread)
            {
                try
                {
                    m_OrderedThread.Abort();
                }
                catch (Exception)
                {
                }
            }
        }

        private void StartWorkersIfNecessary()
        {
            if (false == m_bInitialized)
            {
                m_bOrderedIsCurrentlyStalled = false;
                m_iTotalOrderStalls = 0;

                m_FreeQueue = new BlockingCollection<CMsg<_TPayload>>();            //  Order not required
                m_UnOrderedQueue = new BlockingCollection<CMsg<_TPayload>>();       //  Order not required

                ConcurrentQueue<CMsg<_TPayload>> q = new ConcurrentQueue<CMsg<_TPayload>>();  //  Order required
                m_OrderedQueue = new BlockingCollection<CMsg<_TPayload>>(q);

                for (Int32 iMsg = 0; iMsg < m_TotalMessages; iMsg++)
                {
                    m_FreeQueue.Add(new CMsg<_TPayload>());
                }

                m_alUnOrderedThreads = new ArrayList();

                for (Int32 iUnOrdered = 0; iUnOrdered < m_TotalUnOrderedThreads; iUnOrdered++)
                {
                    ThreadStart ts = new ThreadStart(UnOrderedEntryPoint);
                    Thread t = new Thread(ts);
                    m_alUnOrderedThreads.Add(t);
                    t.Start();
                }

                {
                    ThreadStart ts = new ThreadStart(OrderedEntryPoint);
                    m_OrderedThread = new Thread(ts);
                    m_OrderedThread.Start();
                }

                m_bInitialized = true;
            }
        }

        protected virtual void Dispose(bool bDispose)
        {
            CompleteAllWork();  //  If not shutdown do so now
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1063")]
        public virtual void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        public void CompleteAllWork()
        {
            if(null != m_UnOrderedQueue)
            {
                m_UnOrderedQueue.CompleteAdding();
            }

            if (null != m_alUnOrderedThreads)
            {
                foreach (Thread t in m_alUnOrderedThreads)
                {
                    t.Join();
                }

                m_alUnOrderedThreads = null;
            }

            if (null != m_OrderedQueue)
            {
                m_OrderedQueue.CompleteAdding();
            }

            if (null != m_OrderedThread && null != m_OrderedQueue)
            {
                m_OrderedThread.Join();
                m_OrderedThread = null;
            }

            if(null != m_FreeQueue)
            {
                m_FreeQueue = null;
            }

            if(null != m_UnOrderedQueue)
            {
                m_UnOrderedQueue = null;
            }

            if(null != m_OrderedQueue)
            {
                m_OrderedQueue = null;
            }

            m_bInitialized = false;
        }

        public abstract void DoUnOrderedWork(_TPayload pPayload);

        private void UnOrderedEntryPoint()
        {
            try
            {
                foreach (CMsg<_TPayload> cMsg in m_UnOrderedQueue.GetConsumingEnumerable())
                {
                    DoUnOrderedWork(cMsg.Payload);
                    cMsg.MarkReadyForOrderedProcessing();
                }
            }
            catch (ThreadAbortException)        //  Silent, main would have issued this
            {
            }
            catch (OperationCanceledException)
            {
            }
        }

        public abstract void DoOrderedWork(_TPayload pPayload);
        public abstract void OrderedWorkerStalled(_TPayload pPayload);
        public abstract void OrderedWorkerNoStall();

        private void OrderedEntryPoint()
        {
            Int32 iContigiousNoStalls   =   0;

            try
            {
                foreach (CMsg<_TPayload> cMsg in m_OrderedQueue.GetConsumingEnumerable())
                {
                    bool bAcquired = false;

                    bAcquired = cMsg.WaitForUnOrderedProcessing(0);
                    if (false == bAcquired)        //  Immediate grab attempt
                    {
                        m_iTotalOrderStalls++;
                        iContigiousNoStalls = 0;
                        m_bOrderedIsCurrentlyStalled = true;
                        OrderedWorkerStalled(cMsg.Payload); //  Signal we are going to a wait state on the specific payload
                    }
                    else
                    {
                        if (0 == (++iContigiousNoStalls % (m_TotalMessages * 0.25)))
                        {
                            OrderedWorkerNoStall(); //  Signal that prediction could feed us more
                            m_bOrderedIsCurrentlyStalled = false;
                        }
                    }

                    if (false == bAcquired)
                    {
                        cMsg.WaitForUnOrderedProcessing(-1);
                    }

                    DoOrderedWork(cMsg.Payload);

                    cMsg.MakeReadyForFreeQueue();
                    m_FreeQueue.Add(cMsg);
                }
            }
            catch (ThreadAbortException)        //  Silent, main would have issued this
            {
            }
            catch (OperationCanceledException)
            {
            }
        }

        public bool FQueueWorkItem(_TPayload pPayload)
        {
            bool bRC = false;
            StartWorkersIfNecessary();

            if (true == m_bInitialized)
            {
                CMsg<_TPayload> cMsg = m_FreeQueue.Take();
                cMsg.Payload = pPayload;

                //  Place on both queues
                m_UnOrderedQueue.Add(cMsg);
                m_OrderedQueue.Add(cMsg);
                bRC = true;
            }

            return bRC;
        }

        #region CMsg_IMPL
        public sealed class CMsg<_TPayloadType> : IDisposable
        {
            public CMsg()
            {
                m_Payload = default(_TPayloadType);
                m_OrderedReadyLock = new AutoResetEvent(false);
                MakeReadyForFreeQueue();
            }

            public void Dispose()
            {
                if (null != m_OrderedReadyLock)
                {
                    m_OrderedReadyLock.Close();
                    m_OrderedReadyLock.Dispose();
                    m_OrderedReadyLock = null;
                }

                GC.SuppressFinalize(this);
            }

            public void MakeReadyForFreeQueue()
            {
                m_Payload = default(_TPayloadType);
            }

            private _TPayloadType m_Payload;
            public _TPayloadType Payload
            {
                get { return m_Payload; }
                set { m_Payload = value; }
            }

            private AutoResetEvent m_OrderedReadyLock;
            public void MarkReadyForOrderedProcessing()
            {
                m_OrderedReadyLock.Set();
            }

            public bool WaitForUnOrderedProcessing(Int32 iTimeout)
            {
                return m_OrderedReadyLock.WaitOne(iTimeout);
            }
        }
        #endregion

    }
    #endregion

}

Powershell Test Driver

$core =   [Reflection.Assembly]::LoadFrom('Microsoft.SqlServer.XE.Core.dll');
$xelinq = [Reflection.Assembly]::LoadFrom('Microsoft.SqlServer.XEvent.Linq.dll');

$MergeXeventAssembly = [Reflection.Assembly]::LoadFrom('.\bin\x64\Release\MergeXEvents.exe');
if($null -eq $MergeXeventAssembly)
{
    throw 'ERROR: Unable to load assembly';
}

$bDumpRaw = $true;

if($true -eq $bDumpRaw)
{
    [MergeXEvents.Exporter]::GetIList('C:\temp\NewTestTraces\BigXEL\Test_0_130494871596480000.xel', 'C:\temp\NewTestTraces\BigXEL');
}
else
{
        $iRow = 0;

        $events = [MergeXEvents.Exporter]::GetIList('C:\temp\NewTestTraces\BigXEL\eEst_0_130494871596480000.xel', 'C:\temp\NewTestTraces\BigXEL');
        foreach($event in $events)
        {
             $iRow.ToString() + ' Event: ' + $event.Event_name + '|Package: ' + $event.Event_package_name + '|Timestamp: ' + $event.Event_timestamp  | format-table -auto;
             $iRow++;
        }
}

Bob Dorr - Principal Software Engineer SQL Server

Comments (0)

Skip to main content