BatchFileProcessing(3)-使用SSB与数据库通信
我们已经将数据从xls,csv,txt等文件中的解析出来,进行必须的数据验证,
然后将正确的数据以XML格式保存到磁盘,并将事务型数据更新到DB,
比如生成的磁盘文件名,CheckResult,CheckMemo等。
我们再一起回顾下数据库表设计:
USE SSB3
GO
CREATE TABLE [BatchInventoryQueue]
(
TransactionNumber INT IDENTITY(1,1) NOT NULL,
BatchFileID INT NOT NULL,
RowIndex INT NOT NULL,
ItemNumber INT NOT NULL,
[FileName] NVARCHAR(256) NULL,
HasCheck CHAR(1) NULL,
CheckResult CHAR(1) NULL,
CheckMemo NVARCHAR(2000) NULL,
HasSendSSB CHAR(1) NULL, --是否尝试发送过SSB
SSBSendResult CHAR(1) NULL,--发送SSB是否成功
SSBMemo NVARCHAR(2000) NULL,--SSB处理结果
CONSTRAINT PK_TransactionNumber_BatchInventoryQueue PRIMARY KEY CLUSTERED
(
TransactionNumber ASC
)
)
下面我们要将正确的数据以XML格式发送到数据库中。因为我们面临的数据量非常大,
所以我们需要考虑负载均衡,比如多台服务器部署。那么就可能会面临数据冲突的问题。
我这里的情况是要求多台服务器部署,那么如何给服务器分数据,又不造成冲突呢?
方案一:用表的主键TransactionNumber与服务器数据取模
1 SELECT TOP(@BatchSize) TransactionNumber,
2 [FileName]
3 FROM dbo.BatchInventoryQueue
4 WHERE HasCheck=Y AND CheckResult=S
5 AND [FileName] IS NOT NULL
6 AND HasSendSSB IS NULL AND TransactionNumber%@Throtting=@TrottingMod
优点:实现简单。
缺点:如果其中一台服务器失败,那么应该属于它处理的数据将一直得不到处理。
方案二:结合SQL Server的锁特性,在查询数据时避免冲突如下:
1 UPDATE TOP(@BatchSize) dbo.BatchInventoryQueue
2 SET HasSendSSB=I--inprocessing
3 OUTPUT DELETED.TransactionNumber,
4 DELETED.[FileName]
5 WHERE HasCheck=Y AND CheckResult=S
6 AND [FileName] IS NOT NULL
7 AND HasSendSSB IS NULL
将HasSendSSB更新为I,标示正在发送。由于在更新数据时,进程会获取UPDLOCK,那么下
一个服务器再查询数据时就必须等待直到第一个进程更新完毕。并发执行情况下,难免会有问题,
因此需要考虑容错机制。即用另一个Job定时监视(SSBSendResult IS NULL OR SSBSendResult=N)
AND HasSendSSB=I的数据,并将状态清空(SET HasSendSSB=NULL,SSBSendResult=NULL),
等待程序下次再次处理。不过这种情况应该非常少。
1 UPDATE dbo.BatchInventoryQueue
2 SET HasSendSSB=NULL,
3 SSBSendResult=NULL
4 WHERE (SSBSendResult IS NULL OR SSBSendResult=N)
5 AND HasSendSSB=I
接着我们要生成SSBMessage,这里我使用VTemplate模版引擎来生成.代码如下:
SSBMessageBase:
1 public abstract class SSBMessageBase
2 {
3 public string Subject { get; set; }
4
5 public string FromService { get; set; }
6
7 public string ToService { get; set; }
8
9 public SSBMessageHead Head { get; set; }
10 }
SSBMessageHead:
1 public class SSBMessageHead
2 {
3 public string Action { get; set; }
4
5 public string TransactionCode { get; set; }
6 }
SSBMessageFromFile:
1 public class SSBMessageFromFile : SSBMessageBase
2 {
3 public string FileName { get; set; }
4 }
VTemplate:
1 <vt:template>
2 <Publish>
3 <Subject>{$:ssb.Subject}/Subject>
4 <FromService>{$:ssb.FromService}</FromService>
5 <ToService>{$:ssb.ToService}</ToService>
6 <Message>
7 <Head>
8 <Action>{$:ssb.Action}</Action>
9 <TransactionCode>{$:ssb.TransactionCode}</TransactionCode>
10 </Head>
11 <Body>
12 <vt:output file="$ssb.FileName" charset="utf-8" />
13 </Body>
14 </Message>
15 </Publish>
16 </vt:template>
SSBUtility:
1 public class SSBUtility
2 {
3 private string VtSSBMessage(SSBMessageBase ssb)
4 {
5 string fileName = Path.Combine(AppDomain.CurrentDomain.BaseDirectory,@"TemplatesSSB.vt");
6
7 TemplateDocument doc =new TemplateDocument(fileName, Encoding.UTF8);
8 doc.SetValue("ssb", ssb);
9
10 StringBuilder sb = new StringBuilder();
11 StringWriter sw = new StringWriter(sb);
12 doc.Render(sw);
13 sw.Close();
14
15 return sb.ToString();
16 }
17
18 public void SendSSB(SSBMessageBase ssb)
19 {
20 string msg = VtSSBMessage(ssb);
21 using (SqlConnection conn = new SqlConnection(JobConfigs.SSBConnectionString))
22 {
23 using (SqlCommand cmd = new SqlCommand())
24 {
25 cmd.Connection = conn;
26 cmd.CommandType = CommandType.StoredProcedure;
27 cmd.CommandText = JobConfigs.SSBSendProc;//dbo.[UP_Send_Inventory]
28 SqlParameter p = new SqlParameter("@Message", SqlDbType.Xml);
29 p.Value = msg;
30 cmd.Parameters.Add(p);
31 conn.Open();
32 cmd.ExecuteNonQuery();
33 conn.Close();
34 }
35 }
36 }
37 }
再接着就是SSB创建部分,下面列举代码示例:
1 USE [master]
2 GO
3
4 IF EXISTS(SELECT * FROM sys.databases where [name]=SSB)
5 DROP DATABASE SSB
6
7 CREATE DATABASE SSB
8 GO
9
10 --enable service broker on database
11 ALTER DATABASE SSB
12 SET TRUSTWORTHY ON
13 GO
14
15 USE SSB
16 GO
17
18 --create a test table
19 CREATE TABLE dbo.Inventory
20 (
21 [ItemNumber] INT IDENTITY(1,1) PRIMARY KEY,
22 [Inventory] INT NOT NULL
23 )
24
25 INSERT INTO dbo.Inventory([Inventory])
26 VALUES(1)
27
28
补充:综合编程 , 其他综合 ,