当前位置:编程学习 > asp >>

C#消息队列应用程序 -2

在这个数组内部,CWorker 类创建了 CWorkerThread类的一个实现版
本。CWorkerThread 类(将在下面讨论)是一个必须继承的抽象类。导出
类定义了消息的处理方式:
aThreads = new ArrayList();
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
  WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
  wfThread.ProcessName = sfWorker.ProcessName;
  wfThread.ProcessDesc = sfWorker.ProcessDesc;
  wfThread.ThreadNumber = idx;
  wfThread.InputQueue = sfWorker.InputQueue;
  wfThread.ErrorQueue = sfWorker.ErrorQueue;
  wfThread.OutputName = sfWorker.OutputName;
  // 定义辅助类型,并将其插入辅助线程结构
  CWorkerThread wtBase;
  switch (sfWorker.ProcessType)
  {
   case WorkerFormatter.SFProcessType.ProcessRoundRobin:
     wtBase = new CWorkerThreadRoundRobin(this, wfThread);
     break;
   case WorkerFormatter.SFProcessType.ProcessAppSpecific:
     wtBase = new CWorkerThreadAppSpecific(this, wfThread);
     break;
   case WorkerFormatter.SFProcessType.ProcessAssembly:
     wtBase = new CWorkerThreadAssembly(this, wfThread);
     break;
   default:
     throw new Exception("Unknown Processing Type");
  }
  // 添加对数组的调用
  aThreads.Insert(idx, wtBase);
}

  一旦所有的对象都已创建,就可以通过调用每个线程对象的 Start方
法来启动它们:
foreach(CWorkerThread cThread in aThreads)
  cThread.Start();

  Stop、Pause 和 Continue 方法在 foreach循环里执行的操作类似。
Stop方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this);

  在类析构函数中将调用 Stop 方法,这样,在没有显式调用 Stop 方
法的情况下也可以正确地终止对象。如果调用了 Stop 方法,将不需要析
构函数。SuppressFinalize方法能够防止调用对象的 Finalize 方法(析
构函数的实际实现)。

CWorkerThread 抽象类

  CWorkerThread 是一个由 CWorkerThreadAppSpecifc、CWorkerThread
RoundRobin 和 CWorkerThreadAssembly继承的抽象类。无论如何处理消
息,队列的大部分处理是相同的,所以 CWorkerThread类提供了这一功能。
这个类提供了抽象方法(必须被实际方法替代)以管理资源和处理消息。

  类的工作再一次通过 Start、Stop、Pause 和 Continue 方法来实现。
在 Start方法中引用了输入和错误队列。在 .NET 框架中,消息由 System.
Messaging 名称空间处理:
// 尝试打开队列,并设置默认的读写属性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,则将格式化程序设置为 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

  一旦定义了消息队列引用,即会创建一个线程用于实际的处理函数
(称为 ProcessMessages)。在 .NET 框架中,使用 System.Threading
名称空间很容易实现线程处理:
procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();

  ProcessMessages 函数是基于 Boolean值的处理循环。当数值设为
False,处理循环将终止。因此,线程对象的 Stop 方法只设置这一Boolean
值,然后关闭打开的消息队列,并加入带有主线程的线程:
// 加入服务线程和处理线程
bRun = false;
procMessage.Join();
// 关闭打开的消息队列
mqInput.Close();
mqError.Close();

Pause 方法只设置一个 Boolean 值,使处理线程休眠半秒钟:

if (bPause)
  Thread.Sleep(500);

  最后,每一个 Start、Stop、Pause 和 Continue 方法将调用抽象的
OnStart 、OnStop、OnPause 和 OnContinue 方法。这些抽象方法为实现
的类提供了挂钩,以捕获和释放所需的资源。

  ProcessMessages 循环具有如下基本结构:
●接收Message。
●如果Message具有成功的Receive,则调用抽象ProcessMessage方法。
●如果Receive或ProcessMessage失败,将Message发送至错误队列中。

Message mInput;
try
{
  // 从队列中读取,并等候 1 秒
  mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
  // 将消息设置为 null
  mInput = null;
  // 查看错误代码,了解是否超时
  if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
  {
   // 如果未超时,发出一个错误并记录错误号
   LogError("Error: " + mqe.Message);
   throw mqe;
  }
}
if (mInput != null)
{
  // 得到一个要处理的消息,调用处理消息抽象方法
  try
  {
   ProcessMessage(mInput);
  }
  // 捕获已知异常状态的错误
  catch (CWorkerThreadException ex)
  {
   ProcessError(mInput, ex.Terminate);
  }
  // 捕获未知异常,并调用 Terminate
  catch
  {
   ProcessError(mInput, true);
  }
}

  ProcessError方法将错误的消息发送至错误队列。另外,它也可能引
发异常来终止线程。如果ProcessMessage方法引发了终止错误或 CWorker
ThreadException类型,它将执行此操作。

CworkerThread 导出类

  任何从 CWorkerThread中继承的类都必须提供 OnStart、OnStop、On
Pause、OnContinue和 ProcessMessage 方法。OnStart 和 OnStop方法获
取并释放处理资源。OnPause 和 OnContinue 方法允许临时释放和重新获
取这些资源。ProcessMessage方法应该处理消息,并在出现失败事件时引
发 CWorkerThreadException 异常。

  由于 CWorkerThread构造函数定义运行时参数,导出类必须调用基类
构造函数:
public CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
  : base (v_cParent, v_wfThread) {}

  导出类提供了两种类型的处理:将消息发送至另一队列,或者调用组
件方法。接收和发送消息的两种实现使用了循环技术或应用程序偏移(保
留在消息 AppSpecific属性中),作为使用哪一队列的决定因素。此方案
中的配置文件应该包括队列路径的列表。实现的 OnStart和 OnStop 方法
应该打开和关闭对这些队列的引用:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues; idx++)
{
  mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
  mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}

  在这些方案中,消息的处理很简单:将消息发送必要的输出队列。在
循环情况下,这个进程为:
try
{
  mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
  // 如果错误强制终止异常
  throw new CWorkerThreadException(ex.Message, true);
}
// 计算下一个队列号
iNextQueue++;
iNextQueue %= iQueues;

  后一种调用带消息参数的组件的实现方法比较有趣。ProcessMessage
方法使用 IWebMessage接口调入一个 .NET 组件。OnStart 和 OnStop 方
法获取和释放此组件的引用。

  此方案中的配置文件应该包含两个项目:完整的类名和类所在文件的
位置。按照 IWebMessage接口中的定义,在组件上调用 Process方法。

  要获取对象引用,需要使用 Activator.CreateInstance 方法。此函
数需要一个程序集类型。在这里,它是从程序集文件路径和类名中导出的。
一旦获取对象引用,它将被放入合适的接口:
private IWebMessage iwmSample;
private string sFilePath, sTypeName;
// 保存程序集路径和类型名称
sFilePath = wfThread.OutputName[0];
sTypeName = wfThread.OutputName[1];
// 获取对必要对象的引用
Assembly asmSample = Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// 定义给对象的必要接口
iwmSample = (IWebMessage)objSample;

  获取对象引用后,ProcessMessage方法将在 IWebMessage接口上调用
Process 方法:
WebMessageReturn wbrSample;
try
{
  // 定义方法调用的参数
  string sLabel = v_mInput.Label;
  string sBody = (string)v_mInput.Body;
  int iAppSpecific = v_mInput.AppSpecific;
  // 调用方法并捕捉返回代码
  wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch (InvalidCastException ex)
{
  // 如果在消息内容中发生错误,则强制发出一个非终止异常

补充:Web开发 , ASP.Net ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,