.NET(C#):线程安全集合的阻塞BlockingCollection的使用
目录
• 1. 限制最大容量:BoundedCapacity
• 2. 禁止加入:CompleteAdding和IsCompleted
• 3. 枚举:GetConsumingEnumerable和BlockingCollection本身
• 4. GetConsumingEnumerable和CompleteAdding
返回目录
1. 限制最大容量:BoundedCapacity
BoundedCapacity属性和CompleteAdding方法,它们都可以从某种方式上限制元素被加入到集合中。但BoundedCapacity是用来限制集合的最大容量,当容量已满后,后续的添加操作会被阻塞,一旦有元素被移除,那么阻塞的添加操作会成功执行。
比如下面代码,试图将1-50加入到BlockingCollection,此时默认内部是ConcurrentBag,当然你可以指定任意IProducerConsumerCollection。我们把BoundedCapacity设置成2。
var bcollec = new BlockingCollection<int>(2);
//试图添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
bcollec.Add(i);
Console.WriteLine("加入:" + i);
});
});
Thread.Sleep(1000);
Console.WriteLine("调用一次Take");
bcollec.Take();
Thread.Sleep(Timeout.Infinite);
可能的输出:
加入:37
加入:13
调用一次Take
加入:25
只有最多两个可以加入,然后调用Take后,下一个元素才会被加入。(注意此时Parallel.For中会有多个线程处于阻塞状态,因为无法加入数据)。
返回目录
2. 禁止加入:CompleteAdding和IsCompleted
CompleteAdding方法则是直接不允许任何元素被加入集合,即使是当前元素的数量小于BoundedCapacity属性。
代码:
var bcollec = new BlockingCollection<int>(5);
//试图添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
Console.WriteLine("准备加入:" + i);
bcollec.Add(i);
Console.WriteLine("== 成功加入:" + i);
Thread.Sleep(1000);
});
});
//等待一小段时间后马上调用CompleteAdding
Thread.Sleep(500);
Console.WriteLine("调用CompleteAdding");
bcollec.CompleteAdding();
Thread.Sleep(Timeout.Infinite);
上述代码可能的输出:
准备加入:1
准备加入:13
准备加入:25
准备加入:37
== 成功加入:13
== 成功加入:1
== 成功加入:37
== 成功加入:25
调用CompleteAdding
准备加入:2
准备加入:38
准备加入:26
准备加入:14
可以看到,虽然BlockingCollection的BoundedCapacity为5,但是由于提前调用了CompleteAdding,即使当前集合只有4个元素,也不会再同意新的加入操作了。
那么CompleteAdding有什么用?当使用了CompleteAdding方法后且集合内没有元素的时候,另一个属性IsCompleted此时会为True,这个属性可以用来判断是否当前集合内的所有元素都被处理完,而BlockingCollection背后的IProducerConsumerCollection恰恰常用来处理此类生产者-消费者问题的。
下面我们首先在多个线程中试图往BlockingCollection中加入元素,然后中途调用CompleteAdding,接着通过IsCompleted属性逐个处理被成功加入的元素。
如下代码:
var bcollec = new BlockingCollection<int>();
//试图添加1-50
Task.Run(() =>
{
Parallel.For(1, 51, i =>
{
bcollec.Add(i);
Console.WriteLine("成功加入:" + i);
Thread.Sleep(1000);
});
});
//等待一小段时间后马上调用CompleteAdding
Thread.Sleep(700);
Console.WriteLine("调用CompleteAdding");
bcollec.CompleteAdding();
Console.WriteLine("容器元素数量:" + bcollec.Count);
while (!bcollec.IsCompleted)
{
var res = bcollec.Take();
Console.WriteLine("取出:" + res);
}
Console.WriteLine("操作完成");
Thread.Sleep(Timeout.Infinite);
可能的输出:
成功加入:37
成功加入:25
成功加入:13
成功加入:1
调用CompleteAdding
容器元素数量:4
取出:1
取出:37
取出:25
取出:13
操作完成
返回目录
3. 枚举:GetConsumingEnumerable和BlockingCollection本身
BlockingCollection有两种枚举方法,首先BlockingCollection本身继承自IEnumerable<T>,所以它自己就可以被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它自己也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection自己作为IEnumerable会返回一个一定时间内的集合片段,也就是只会枚举在那个时间点上内部集合的元素。
看下面代码:
var bcollec = new BlockingCollection<int>();
//试图添加1-10
Task.Run(() =>
{
var forOpt = new ParallelOptions()
{
//防止在某些硬件上并发数太多
MaxDegreeOfParallelism = 2
};
Parallel.For(1, 11, forOpt, i =>
{
bcollec.Add(i);
Console.WriteLine("成功加入:" + i);
Thread.Sleep(500);
});
});
Thread.Sleep(700);
//开始枚举www.zzzyk.com
Task.Run(() =>
{
foreach (var i in bcollec)
Console.WriteLine("输出:" + i);
});
Thread.Sleep(Timeout.Infinite);
我们边加入元素边进行枚举(直接在BlockingCollection上foreach),可能的输出:
成功加入:1
成功加入:6
成功加入:2
成功加入:7
输出:1
输出:6
输出:2
输出:7
成功加入:8
成功加入:3
成功加入:4
成功加入:9
成功加入:5
成功加入:10
可以看到,BlockingCollection本身的迭代器只能反映出一时的容器内容。
而BlockingCollection还有一个GetConsumingEnumerable方法,同样返回一个IEnumerable<T>,这个可枚举的集合背后的迭代器不同于BlockingCollection本身的迭代器,它可以返回最新的加入的元素,如果当前时间段没有元素被加入,它会阻塞然后等新加进来的元素。
我们把上面的使用BlockingCollection本身枚举代码中的枚举Task改成这样:
//开始枚举
Task.Run(() =>
{
foreach (var i in bcollec.GetConsumingEnumerable())
Console.WriteLine("输出:" + i);
Console.WriteLine("完成枚举");
});
可能的输出:
成功加入:6
成功加入:1
成功加入:2
成功加入:7<
补充:Web开发 , ASP.Net ,