Imports System.Configuration Imports System.Messaging Imports System.Runtime.Remoting Imports System.Runtime.Remoting.Channels Imports System.Runtime.Remoting.Channels.Tcp Imports System.IO Imports System.Runtime.Serialization.Formatters.Binary ''' ''' ''' Namespace Server ''' ''' Implements the batch queue service. ''' ''' ''' This class implements the server-side batch queue functionality. ''' It must be hosted within some process, typically a Windows Service. ''' It may also be hosted within a console application, which is ''' useful for testing and debugging. ''' Public Class BatchQueueService Private Shared mChannel As TcpServerChannel Private Shared mQueue As MessageQueue Private Shared mMonitor As Threading.Thread Private Shared WithEvents mTimer As New System.Timers.Timer Private Shared mRunning As Boolean Private Shared mActiveEntries As Hashtable = Hashtable.Synchronized(New Hashtable) Private Shared mSync As New Threading.AutoResetEvent(False) Private Shared mWaitToEnd As New Threading.ManualResetEvent(False) ''' ''' Returns the name of the batch queue server. ''' Public Shared ReadOnly Property Name() As String Get Return LISTENER_NAME() End Get End Property #Region " Start/Stop " ''' ''' Starts the batch queue. ''' ''' ''' Call this as your Windows Service starts up to ''' start the batch queue itself. This will cause ''' the queue to start listening for user requests ''' via remoting and to the MSMQ for queued jobs. ''' Public Shared Sub Start() mTimer.AutoReset = False ' open and/or create queue If MessageQueue.Exists(QUEUE_NAME) Then mQueue = New MessageQueue(QUEUE_NAME) Else mQueue = MessageQueue.Create(QUEUE_NAME) End If mQueue.MessageReadPropertyFilter.Extension = True ' start reading from queue mRunning = True mMonitor = New Threading.Thread(AddressOf MonitorQueue) mMonitor.Name = "MonitorQueue" mMonitor.Start() ' start remoting for Server.BatchQueue If mChannel Is Nothing Then ' set application name (virtual root name) RemotingConfiguration.ApplicationName = LISTENER_NAME() ' set up channel Dim properties As New Hashtable() properties("name") = "TcpBinary" properties("priority") = "2" properties("port") = CStr(PORT()) Dim svFormatter As New BinaryServerFormatterSinkProvider() 'TODO: uncomment the following line for .NET 1.1 'svFormatter.TypeFilterLevel = Runtime.Serialization.Formatters.TypeFilterLevel.Full mChannel = New TcpServerChannel(properties, svFormatter) Channels.ChannelServices.RegisterChannel(mChannel) ' register our class RemotingConfiguration.RegisterWellKnownServiceType( _ GetType(Server.BatchQueue), _ "BatchQueue.rem", _ WellKnownObjectMode.SingleCall) Else mChannel.StartListening(Nothing) End If Dim sb As New Text.StringBuilder() sb.Append("Batch queue processor started") sb.Append(vbCrLf) sb.AppendFormat("Name: {0}", Name) sb.Append(vbCrLf) sb.AppendFormat("Port: {0}", PORT) sb.Append(vbCrLf) sb.AppendFormat("Queue: {0}", QUEUE_NAME) sb.Append(vbCrLf) sb.AppendFormat("Max jobs: {0}", MAX_ENTRIES) sb.Append(vbCrLf) System.Diagnostics.EventLog.WriteEntry( _ Name, sb.ToString, EventLogEntryType.Information) End Sub ''' ''' Stops the batch queue. ''' ''' ''' ''' Call this as your Windows Service is stopping. It ''' stops the batch queue, causing it to stop listening ''' for user requests via remoting and to stop looking for ''' jobs in the MSMQ queue. ''' ''' NOTE that this method will not return until any ''' currently active (executing) batch jobs have completed. ''' ''' Public Shared Sub [Stop]() ' stop remoting for Server.BatchQueue mChannel.StopListening(Nothing) ' signal to stop working mRunning = False mSync.Set() mMonitor.Join() ' close the queue mQueue.Close() If mActiveEntries.Count > 0 Then ' wait for work to end mWaitToEnd.WaitOne() End If End Sub #End Region #Region " Process messages " ' this will be running on a background thread Private Shared Sub MonitorQueue() While mRunning ScanQueue() mSync.WaitOne() End While End Sub ' this runs on a threadpool thread Private Shared Sub mTimer_Elapsed(ByVal sender As Object, _ ByVal e As System.Timers.ElapsedEventArgs) Handles mTimer.Elapsed mTimer.Stop() mSync.Set() End Sub ' this is called by MonitorQueue Private Shared Sub ScanQueue() Dim msg As Message Dim holdUntil As Date Dim nextWake As Date = Date.MaxValue Dim en As MessageEnumerator = mQueue.GetMessageEnumerator While en.MoveNext() msg = en.Current holdUntil = CDate(Text.Encoding.ASCII.GetString(msg.Extension)) If holdUntil <= Now Then If mActiveEntries.Count < MAX_ENTRIES() Then ProcessEntry(mQueue.ReceiveById(msg.Id)) Else ' the queue is busy, go to sleep Exit Sub End If ElseIf holdUntil < nextWake Then ' find the minimum holduntil value nextWake = holdUntil End If End While If nextWake < Date.MaxValue AndAlso nextWake > Now Then ' we have at least one entry holding, so set the ' timer to wake us when it should be run mTimer.Interval = nextWake.Subtract(Now).TotalMilliseconds mTimer.Start() End If End Sub Private Shared Sub ProcessEntry(ByVal msg As Message) ' get entry from queue Dim entry As BatchEntry Dim formatter As New BinaryFormatter() entry = CType(formatter.Deserialize(msg.BodyStream), BatchEntry) ' make active entry.Info.SetStatus(BatchEntryStatus.Active) mActiveEntries.Add(entry.Info.ID, entry.Info) ' start processing entry on background thread Threading.ThreadPool.QueueUserWorkItem(AddressOf entry.Execute) End Sub ' called by BatchEntry when it is done processing so ' we know that it is complete and we can start another ' job if needed Friend Shared Sub Deactivate(ByVal entry As BatchEntry) mActiveEntries.Remove(entry.Info.ID) mSync.Set() If Not mRunning AndAlso mActiveEntries.Count = 0 Then ' indicate that there are no active workers mWaitToEnd.Set() End If End Sub #End Region #Region " Enqueue/Dequeue/LoadEntries " Friend Shared Sub Enqueue(ByVal Entry As BatchEntry) Dim msg As New Message() Dim f As New BinaryFormatter() With msg .Label = Entry.ToString .Priority = Entry.Info.Priority .Extension = Text.Encoding.ASCII.GetBytes(CStr(Entry.Info.HoldUntil)) Entry.Info.SetMessageID(.Id) f.Serialize(.BodyStream, Entry) End With mQueue.Send(msg) mSync.Set() End Sub Friend Shared Sub Dequeue(ByVal Entry As BatchEntryInfo) Dim label As String Dim msg As Message Dim msgID As String label = Entry.ToString Dim en As MessageEnumerator = mQueue.GetMessageEnumerator mQueue.MessageReadPropertyFilter.Label = True While en.MoveNext() If en.Current.Label = label Then ' we found a match msgID = en.Current.Id Exit While End If End While If Len(msgID) > 0 Then mQueue.ReceiveById(msgID) End If End Sub Friend Shared Sub LoadEntries(ByVal List As BatchEntries) ' load our list of BatchEntry objects Dim msgs() As Message Dim msg As Message Dim formatter As New BinaryFormatter() Dim de As DictionaryEntry Dim entry As Server.BatchEntry ' get all active entries SyncLock mActiveEntries.SyncRoot For Each de In Server.BatchQueueService.mActiveEntries List.Add(CType(de.Value, BatchEntryInfo)) Next End SyncLock ' get all queued entries msgs = mQueue.GetAllMessages For Each msg In msgs entry = CType(formatter.Deserialize(msg.BodyStream), Server.BatchEntry) entry.Info.SetMessageID(msg.Id) List.Add(entry.Info) Next End Sub #End Region #Region " Utility functions " Private Shared Function QUEUE_NAME() As String Return ".\private$\" & ConfigurationSettings.AppSettings("QueueName") End Function Private Shared Function LISTENER_NAME() As String Return ConfigurationSettings.AppSettings("ListenerName") End Function Private Shared Function PORT() As Integer Return CInt(ConfigurationSettings.AppSettings("ListenerPort")) End Function Private Shared Function MAX_ENTRIES() As Integer Return CInt(ConfigurationSettings.AppSettings("MaxActiveEntries")) End Function #End Region End Class End Namespace