diff --git a/src/DotnetSpider.Common/Request.cs b/src/DotnetSpider.Common/Request.cs
index 8e4f2a728..0368f5faf 100644
--- a/src/DotnetSpider.Common/Request.cs
+++ b/src/DotnetSpider.Common/Request.cs
@@ -132,13 +132,13 @@ public override bool Equals(object obj)
Request request = (Request)obj;
- if (!Depth.Equals(request.Depth)) return false;
- if (!CycleTriedTimes.Equals(request.CycleTriedTimes)) return false;
- if (!Referer.Equals(request.Referer)) return false;
- if (!Origin.Equals(request.Origin)) return false;
- if (!Method.Equals(request.Method)) return false;
- if (!Priority.Equals(request.Priority)) return false;
- if (!Content.Equals(request.Content)) return false;
+ if (!Equals(Depth, request.Depth)) return false;
+ if (!Equals(CycleTriedTimes, request.CycleTriedTimes)) return false;
+ if (!Equals(Referer, request.Referer)) return false;
+ if (!Equals(Origin, request.Origin)) return false;
+ if (!Equals(Method, request.Method)) return false;
+ if (!Equals(Priority, request.Priority)) return false;
+ if (!Equals(Content, request.Content)) return false;
if (Properties == null)
{
diff --git a/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs b/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs
index 15eff67c1..37f49a3f6 100644
--- a/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs
+++ b/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs
@@ -96,7 +96,7 @@ public int Depth
/// 添加请求对象到队列
///
/// 请求对象
- public void Push(Request request, Func shouldReserved)
+ public void Push(Request request, Func shouldReserved = null)
{
var action = new Action(() =>
{
diff --git a/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs b/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs
index 989793112..f1f1c13a8 100644
--- a/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs
+++ b/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs
@@ -165,5 +165,19 @@ public override void Dispose()
}
base.Dispose();
}
+
+ ///
+ /// 取得队列中所有的请求对象
+ ///
+ internal Request[] All
+ {
+ get
+ {
+ lock (_lock)
+ {
+ return _queue.ToArray();
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/DotnetSpider.Core/Spider.cs b/src/DotnetSpider.Core/Spider.cs
index b39cafe3f..5e809c24c 100644
--- a/src/DotnetSpider.Core/Spider.cs
+++ b/src/DotnetSpider.Core/Spider.cs
@@ -737,9 +737,12 @@ protected override void Execute(params string[] arguments)
var downloader = Downloader.Clone();
while (Status == Status.Running)
{
- // 从队列中取出一个请求
+ // 从队列中取出一个请求, 因为 Site 是共享对象, 每个Request都保留了引用, 序列存到Redis或其它数据库中浪费带宽和空间, 因此 Site对象不保存到数据库中
Request request = Scheduler.Poll();
-
+ if (request != null && request.Site == null)
+ {
+ request.Site = Site;
+ }
// 如果队列中没有需要处理的请求, 则开始等待, 一直到设定的 EmptySleepTime 结束, 则认为爬虫应该结束了
if (request == null)
{
diff --git a/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs b/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs
index 732dc7d5b..fa9ec4a5b 100644
--- a/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs
+++ b/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs
@@ -1,306 +1,307 @@
-//using System;
-//using System.Collections.Generic;
-//using DotnetSpider.Core;
-//using DotnetSpider.Core.Scheduler;
-//using Xunit;
-//using DotnetSpider.Core.Downloader;
-//using DotnetSpider.Core.Pipeline;
-//using DotnetSpider.Core.Processor;
-//using DotnetSpider.Core.Monitor;
-
-//namespace DotnetSpider.Extension.Test.Scheduler
-//{
-
-// public class RedisSchedulerTest
-// {
-// private Extension.Scheduler.RedisScheduler GetRedisScheduler()
-// {
-// return new Extension.Scheduler.RedisScheduler("127.0.0.1:6379,serviceName=Scheduler.NET,keepAlive=8,allowAdmin=True,connectTimeout=10000,password=,abortConnect=True,connectRetry=20");
-// }
-
-// [Fact(DisplayName = "PushAndPoll1")]
-// public void PushAndPoll1()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-
-// ISpider spider = new DefaultSpider();
-// scheduler.Init(spider);
-// scheduler.Dispose();
-
-// Request request = new Request("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", null) { Site = spider.Site };
-// request.PutExtra("1", "2");
-// scheduler.Push(request);
-// Request result = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", result.Url.ToString());
-// Assert.Equal("2", request.GetExtra("1"));
-// Request result1 = scheduler.Poll();
-// Assert.Null(result1);
-// scheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "RedisScheduler_PushAndPollBreadthFirst")]
-// public void PushAndPollBreadthFirst()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-// scheduler.TraverseStrategy = TraverseStrategy.Bfs;
-// ISpider spider = new DefaultSpider();
-// scheduler.Init(spider);
-// scheduler.Dispose();
-
-// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
-// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
-// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
-// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
-// scheduler.Push(request1);
-// scheduler.Push(request2);
-// scheduler.Push(request3);
-// scheduler.Push(request4);
-
-// Request result = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/1", result.Url.ToString());
-// Request result1 = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/2", result1.Url.ToString());
-// scheduler.Dispose();
-// scheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "RedisScheduler_PushAndPollDepthFirst")]
-// public void PushAndPollDepthFirst()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-// scheduler.TraverseStrategy = TraverseStrategy.Dfs;
-// ISpider spider = new DefaultSpider();
-// scheduler.Init(spider);
-// scheduler.Dispose();
-// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
-// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
-// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
-// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
-// scheduler.Push(request1);
-// scheduler.Push(request2);
-// scheduler.Push(request3);
-// scheduler.Push(request4);
-
-// Request result = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/4", result.Url.ToString());
-// Request result1 = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/3", result1.Url.ToString());
-// scheduler.Dispose();
-// scheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "LoadPerformace")]
-// public void LoadPerformace()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-// Spider spider = new DefaultSpider("test", new Site());
-// spider.Monitor = new LogMonitor();
-// scheduler.Init(spider);
-// scheduler.Dispose();
-// var start = DateTime.Now;
-// for (int i = 0; i < 40000; i++)
-// {
-// scheduler.Push(new Request("http://www.a.com/" + i, null) { Site = spider.Site });
-// }
-
-// var end = DateTime.Now;
-// double seconds = (end - start).TotalSeconds;
-// scheduler.Dispose();
-
-// var start1 = DateTime.Now;
-// HashSet list = new HashSet();
-// for (int i = 0; i < 40000; i++)
-// {
-// list.Add(new Request("http://www.a.com/" + i, null));
-// }
-// scheduler.Import(list);
-// var end1 = DateTime.Now;
-// double seconds1 = (end1 - start1).TotalSeconds;
-// Assert.True(seconds1 < seconds);
-// scheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "RedisScheduler_Load")]
-// public void Load()
-// {
-// QueueDuplicateRemovedScheduler scheduler = new QueueDuplicateRemovedScheduler();
-// ISpider spider = new DefaultSpider("test", new Site());
-// scheduler.Init(spider);
-
-// scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site });
-
-// Extension.Scheduler.RedisScheduler redisScheduler = GetRedisScheduler();
-// redisScheduler.Init(spider);
-
-// redisScheduler.Dispose();
-
-// redisScheduler.Import(scheduler.All);
-
-// Assert.Equal("http://www.d.com/", redisScheduler.Poll().Url.ToString());
-// Assert.Equal("http://www.c.com/", redisScheduler.Poll().Url.ToString());
-// Assert.Equal("http://www.b.com/", redisScheduler.Poll().Url.ToString());
-// Assert.Equal("http://www.a.com/", redisScheduler.Poll().Url.ToString());
-
-// redisScheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "RedisScheduler_Status")]
-// public void Status()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-// ISpider spider = new DefaultSpider("test", new Site());
-// scheduler.Init(spider);
-
-// scheduler.Dispose();
-
-// scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site });
-// scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site });
-
-// Assert.Equal(0, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.LeftRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-// scheduler.IncreaseErrorCount();
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(0, scheduler.SuccessRequestsCount);
-// scheduler.IncreaseSuccessCount();
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-
-// scheduler.Poll();
-// Assert.Equal(3, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-
-// scheduler.Poll();
-// Assert.Equal(2, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-
-// scheduler.Poll();
-// Assert.Equal(1, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-
-// scheduler.Poll();
-// Assert.Equal(0, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-
-// scheduler.Poll();
-// scheduler.Poll();
-// Assert.Equal(0, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// Assert.Equal(1, scheduler.ErrorRequestsCount);
-// Assert.Equal(4, scheduler.TotalRequestsCount);
-
-// scheduler.Dispose();
-// }
-
-// //[Fact]
-// //public void MultiInit()
-// //{
-// // Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-
-// // ISpider spider = new DefaultSpider();
-// // scheduler.Init(spider);
-// // string queueKey = scheduler.GetQueueKey();
-// // string setKey = scheduler.GetSetKey();
-// // string itemKey = scheduler.GetItemKey();
-// // string errorCountKey = scheduler.GetErrorCountKey();
-// // string successCountKey = scheduler.GetSuccessCountKey();
-// // scheduler.Init(spider);
-// // Assert.Equal(queueKey, scheduler.GetQueueKey());
-// // Assert.Equal(setKey, scheduler.GetSetKey());
-// // Assert.Equal(itemKey, scheduler.GetItemKey());
-// // Assert.Equal(errorCountKey, scheduler.GetErrorCountKey());
-// // Assert.Equal(successCountKey, scheduler.GetSuccessCountKey());
-
-// // scheduler.Dispose();
-// // scheduler.Dispose();
-// //}
-
-// [Fact(DisplayName = "RedisScheduler_Clear")]
-// public void Clear()
-// {
-// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
-
-// ISpider spider = new DefaultSpider();
-// scheduler.Init(spider);
-// scheduler.Dispose();
-// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
-// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
-// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
-// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
-// scheduler.Push(request1);
-// scheduler.Push(request2);
-// scheduler.Push(request3);
-// scheduler.Push(request4);
-
-// Request result = scheduler.Poll();
-// Assert.Equal("http://www.ibm.com/4", result.Url.ToString());
-
-// scheduler.Dispose();
-// }
-
-// [Fact(DisplayName = "RedisScheduler_RetryRequest")]
-// public void RetryRequest()
-// {
-// var site = new Site { EncodingName = "UTF-8" };
-
-// var scheduler = new QueueDuplicateRemovedScheduler();
-
-// site.AddStartUrl("http://www.baidu.com");
-// site.AddStartUrl("http://www.163.com/");
-
-// Spider spider = Spider.Create(site,
-// // crawler identity
-// "cnblogs_" + DateTime.Now.ToString("yyyyMMddhhmmss"),
-// // use memoery queue scheduler
-// scheduler,
-// // default page processor will save whole html, and extract urls to target urls via regex
-// new TestPageProcessor())
-// // save crawler result to file in the folder: \{running directory}\data\{crawler identity}\{guid}.dsd
-// .AddPipeline(new FilePipeline());
-// spider.Monitor = new LogMonitor();
-// // dowload html by http client
-// spider.Downloader = new HttpClientDownloader();
-
-// spider.ThreadNum = 1;
-// // traversal deep 遍历深度
-// spider.Scheduler.Depth = 3;
-// spider.ClearSchedulerAfterCompleted = false;
-// spider.EmptySleepTime = 6000;
-// // start crawler 启动爬虫
-// spider.Run();
-
-// Assert.Equal(5, spider.RetriedTimes.Value);
-// Assert.Equal(0, scheduler.LeftRequestsCount);
-// Assert.Equal(1, scheduler.SuccessRequestsCount);
-// // 重试次数应该包含
-// Assert.Equal(5, scheduler.ErrorRequestsCount);
-// }
-
-// class TestPageProcessor : BasePageProcessor
-// {
-// protected override void Handle(Page page)
-// {
-// if (page.Request.Url.ToString() == "http://www.163.com/")
-// {
-// throw new SpiderException("");
-// }
-// else
-// {
-// page.AddTargetRequest("http://www.163.com/", 0, false);
-// }
-// }
-// }
-// }
-//}
\ No newline at end of file
+using System;
+using System.Collections.Generic;
+using DotnetSpider.Core;
+using DotnetSpider.Core.Scheduler;
+using Xunit;
+using DotnetSpider.Core.Downloader;
+using DotnetSpider.Core.Pipeline;
+using DotnetSpider.Core.Processor;
+using DotnetSpider.Core.Monitor;
+using DotnetSpider.Common;
+using DotnetSpider.Downloader;
+
+namespace DotnetSpider.Extension.Test.Scheduler
+{
+
+ public class RedisSchedulerTest
+ {
+ private Extension.Scheduler.RedisScheduler GetRedisScheduler(string identity)
+ {
+ return new Extension.Scheduler.RedisScheduler(identity, "127.0.0.1:6379,serviceName=Scheduler.NET,keepAlive=8,allowAdmin=True,connectTimeout=10000,password=,abortConnect=True,connectRetry=20");
+ }
+
+ [Fact(DisplayName = "PushAndPoll1")]
+ public void PushAndPoll1()
+ {
+ ISpider spider = new DefaultSpider();
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+ scheduler.Dispose();
+
+ Request request = new Request("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", null) { Site = spider.Site };
+ request.Properties.Add("1", "2");
+ scheduler.Push(request, null);
+ Request result = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", result.Url.ToString());
+ Assert.Equal("2", request.Properties["1"]);
+ Request result1 = scheduler.Poll();
+ Assert.Null(result1);
+ scheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "RedisScheduler_PushAndPollBreadthFirst")]
+ public void PushAndPollBreadthFirst()
+ {
+ ISpider spider = new DefaultSpider();
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+ scheduler.TraverseStrategy = TraverseStrategy.Bfs;
+
+
+ scheduler.Dispose();
+
+ Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
+ Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
+ Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
+ Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
+ scheduler.Push(request1);
+ scheduler.Push(request2);
+ scheduler.Push(request3);
+ scheduler.Push(request4);
+
+ Request result = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/1", result.Url.ToString());
+ Request result1 = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/2", result1.Url.ToString());
+ scheduler.Dispose();
+ scheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "RedisScheduler_PushAndPollDepthFirst")]
+ public void PushAndPollDepthFirst()
+ {
+ ISpider spider = new DefaultSpider();
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+ scheduler.TraverseStrategy = TraverseStrategy.Dfs;
+
+
+ scheduler.Dispose();
+ Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
+ Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
+ Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
+ Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
+ scheduler.Push(request1);
+ scheduler.Push(request2);
+ scheduler.Push(request3);
+ scheduler.Push(request4);
+
+ Request result = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/4", result.Url.ToString());
+ Request result1 = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/3", result1.Url.ToString());
+ scheduler.Dispose();
+ scheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "LoadPerformace")]
+ public void LoadPerformace()
+ {
+ Spider spider = new DefaultSpider("test", new Site());
+ spider.Monitor = new LogMonitor();
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+ scheduler.Dispose();
+ var start = DateTime.Now;
+ for (int i = 0; i < 40000; i++)
+ {
+ scheduler.Push(new Request("http://www.a.com/" + i, null) { Site = spider.Site });
+ }
+
+ var end = DateTime.Now;
+ double seconds = (end - start).TotalSeconds;
+ scheduler.Dispose();
+
+ var start1 = DateTime.Now;
+ HashSet list = new HashSet();
+ for (int i = 0; i < 40000; i++)
+ {
+ list.Add(new Request("http://www.a.com/" + i, null));
+ }
+ scheduler.Reload(list);
+ var end1 = DateTime.Now;
+ double seconds1 = (end1 - start1).TotalSeconds;
+ Assert.True(seconds1 < seconds);
+ scheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "RedisScheduler_Load")]
+ public void Load()
+ {
+ QueueDuplicateRemovedScheduler scheduler = new QueueDuplicateRemovedScheduler();
+ ISpider spider = new DefaultSpider("test", new Site());
+
+
+ scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site });
+
+ Extension.Scheduler.RedisScheduler redisScheduler = GetRedisScheduler(spider.Identity);
+
+ redisScheduler.Dispose();
+
+ redisScheduler.Reload(scheduler.All);
+
+ Assert.Equal("http://www.d.com/", redisScheduler.Poll().Url.ToString());
+ Assert.Equal("http://www.c.com/", redisScheduler.Poll().Url.ToString());
+ Assert.Equal("http://www.b.com/", redisScheduler.Poll().Url.ToString());
+ Assert.Equal("http://www.a.com/", redisScheduler.Poll().Url.ToString());
+
+ redisScheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "RedisScheduler_Status")]
+ public void Status()
+ {
+ ISpider spider = new DefaultSpider("test", new Site());
+
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+
+ scheduler.Dispose();
+
+ scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site });
+ scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site });
+
+ Assert.Equal(0, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.LeftRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+ scheduler.IncreaseErrorCount();
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(0, scheduler.SuccessRequestsCount);
+ scheduler.IncreaseSuccessCount();
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+
+ scheduler.Poll();
+ Assert.Equal(3, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+
+ scheduler.Poll();
+ Assert.Equal(2, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+
+ scheduler.Poll();
+ Assert.Equal(1, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+
+ scheduler.Poll();
+ Assert.Equal(0, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+
+ scheduler.Poll();
+ scheduler.Poll();
+ Assert.Equal(0, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ Assert.Equal(1, scheduler.ErrorRequestsCount);
+ Assert.Equal(4, scheduler.TotalRequestsCount);
+
+ scheduler.Dispose();
+ }
+
+ //[Fact]
+ //public void MultiInit()
+ //{
+ // Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler();
+
+ // ISpider spider = new DefaultSpider();
+ // scheduler.Init(spider);
+ // string queueKey = scheduler.GetQueueKey();
+ // string setKey = scheduler.GetSetKey();
+ // string itemKey = scheduler.GetItemKey();
+ // string errorCountKey = scheduler.GetErrorCountKey();
+ // string successCountKey = scheduler.GetSuccessCountKey();
+ // scheduler.Init(spider);
+ // Assert.Equal(queueKey, scheduler.GetQueueKey());
+ // Assert.Equal(setKey, scheduler.GetSetKey());
+ // Assert.Equal(itemKey, scheduler.GetItemKey());
+ // Assert.Equal(errorCountKey, scheduler.GetErrorCountKey());
+ // Assert.Equal(successCountKey, scheduler.GetSuccessCountKey());
+
+ // scheduler.Dispose();
+ // scheduler.Dispose();
+ //}
+
+ [Fact(DisplayName = "RedisScheduler_Clear")]
+ public void Clear()
+ {
+ ISpider spider = new DefaultSpider();
+ Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity);
+
+
+
+ scheduler.Dispose();
+ Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site };
+ Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site };
+ Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site };
+ Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site };
+ scheduler.Push(request1);
+ scheduler.Push(request2);
+ scheduler.Push(request3);
+ scheduler.Push(request4);
+
+ Request result = scheduler.Poll();
+ Assert.Equal("http://www.ibm.com/4", result.Url.ToString());
+
+ scheduler.Dispose();
+ }
+
+ [Fact(DisplayName = "RedisScheduler_RetryRequest")]
+ public void RetryRequest()
+ {
+ var site = new Site { EncodingName = "UTF-8" };
+
+ var scheduler = new QueueDuplicateRemovedScheduler();
+
+ site.AddRequests("http://www.baidu.com");
+ site.AddRequests("http://www.163.com/");
+
+ Spider spider = Spider.Create(site,
+ // crawler identity
+ "cnblogs_" + DateTime.Now.ToString("yyyyMMddhhmmss"),
+ // use memoery queue scheduler
+ scheduler,
+ // default page processor will save whole html, and extract urls to target urls via regex
+ new TestPageProcessor())
+ // save crawler result to file in the folder: \{running directory}\data\{crawler identity}\{guid}.dsd
+ .AddPipeline(new FilePipeline());
+ spider.Monitor = new LogMonitor();
+ // dowload html by http client
+ spider.Downloader = new HttpClientDownloader();
+
+ spider.ThreadNum = 1;
+ // traversal deep 遍历深度
+ spider.Scheduler.Depth = 3;
+ spider.ClearSchedulerAfterCompleted = false;
+ spider.EmptySleepTime = 6000;
+ // start crawler 启动爬虫
+ spider.Run();
+
+ Assert.Equal(5, spider.RetriedTimes.Value);
+ Assert.Equal(0, scheduler.LeftRequestsCount);
+ Assert.Equal(1, scheduler.SuccessRequestsCount);
+ // 重试次数应该包含
+ Assert.Equal(6, scheduler.ErrorRequestsCount);
+ }
+
+ class TestPageProcessor : BasePageProcessor
+ {
+ protected override void Handle(Page page)
+ {
+ if (page.Request.Url.ToString() == "http://www.163.com/")
+ {
+ throw new SpiderException("");
+ }
+ else
+ {
+ page.AddTargetRequest("http://www.163.com/", 0, false);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs b/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs
index 80088a962..039523101 100644
--- a/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs
+++ b/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs
@@ -1,483 +1,413 @@
-//using DotnetSpider.Core;
-//using DotnetSpider.Core.Scheduler;
-//using DotnetSpider.Core.Scheduler.Component;
-//using DotnetSpider.Core.Infrastructure;
-//using Newtonsoft.Json;
-//using System.Collections.Generic;
-//using StackExchange.Redis;
-//using DotnetSpider.Extension.Infrastructure;
-//using DotnetSpider.Core.Redial;
-//using System;
-//using Polly;
-//using Polly.Retry;
-//using System.Linq;
-//using DotnetSpider.Common;
-
-//namespace DotnetSpider.Extension.Scheduler
-//{
-// ///
-// /// Use Redis as url scheduler for distributed crawlers.
-// ///
-// public class RedisScheduler : DuplicateRemovedScheduler, IDuplicateRemover
-// {
-// private readonly object _locker = new object();
-// private const string TasksKey = "dotnetspider:tasks";
-// private readonly RetryPolicy _retryPolicy = Policy.Handle().Retry(30);
-// private string _queueKey;
-// private string _setKey;
-// private string _itemKey;
-// private string _errorCountKey;
-// private string _successCountKey;
-// private string _identityMd5;
-// private readonly AutomicLong _successCounter = new AutomicLong(0);
-// private readonly AutomicLong _errorCounter = new AutomicLong(0);
-// private readonly string _connectString;
-// private RedisConnection _redisConnection;
-
-// ///
-// /// 批量加载时的每批次加载数
-// ///
-// public int BatchCount { get; set; } = 1000;
-
-// public override bool IsDistributed => true;
-
-// ///
-// /// RedisScheduler是否会使用互联网
-// ///
-// protected override bool UseInternet { get; set; } = true;
-
-// ///
-// /// 构造方法
-// ///
-// /// Redis连接字符串
-// public RedisScheduler(string connectString)
-// {
-// if (string.IsNullOrWhiteSpace(connectString))
-// {
-// throw new SpiderException("Redis connect string should not be empty");
-// }
-// _connectString = connectString;
-// DuplicateRemover = this;
-// }
-
-// ///
-// /// 构造方法
-// ///
-// public RedisScheduler()
-// {
-// _connectString = Env.RedisConnectString;
-// DuplicateRemover = this;
-// }
-
-// ///
-// /// 初始化队列
-// ///
-// /// 爬虫对象
-// public override void Init(ISpider spider)
-// {
-// base.Init(spider);
-
-// if (string.IsNullOrWhiteSpace(_connectString))
-// {
-// throw new SpiderException("Redis connect string should not be null or empty");
-// }
-
-// if (string.IsNullOrWhiteSpace(_identityMd5))
-// {
-// var md5 = CryptoUtil.Md5Encrypt(spider.Identity);
-// _itemKey = $"dotnetspider:scheduler:{md5}:items";
-// _setKey = $"dotnetspider:scheduler:{md5}:set";
-// _queueKey = $"dotnetspider:scheduler:{md5}:queue";
-// _errorCountKey = $"dotnetspider:scheduler:{md5}:numberOfFailures";
-// _successCountKey = $"dotnetspider:scheduler:{md5}:numberOfSuccessful";
-
-// _identityMd5 = md5;
-
-// var action = new Action(() =>
-// {
-// _redisConnection = Cache.Instance.Get(_connectString);
-// if (_redisConnection == null)
-// {
-// _redisConnection = new RedisConnection(_connectString);
-// Cache.Instance.Set(_connectString, _redisConnection);
-// }
-// _redisConnection.Database.SortedSetAdd(TasksKey, spider.Identity, (long)DateTimeUtil.GetCurrentUnixTimeNumber());
-// });
-
-// if (UseInternet)
-// {
-// NetworkCenter.Current.Execute("rds-init", action);
-// }
-// else
-// {
-// action();
-// }
-// }
-// }
-
-// ///
-// /// Reset duplicate check.
-// ///
-// public override void ResetDuplicateCheck()
-// {
-// var action = new Action(() =>
-// {
-// _redisConnection.Database.KeyDelete(_setKey);
-// });
-// if (UseInternet)
-// {
-// NetworkCenter.Current.Execute("rds-reset", action);
-// }
-// else
-// {
-// action();
-// }
-// }
-
-// ///
-// /// Check whether the request is duplicate.
-// ///
-// /// Request
-// /// Whether the request is duplicate.
-// public virtual bool IsDuplicate(Request request)
-// {
-// return _retryPolicy.Execute(() =>
-// {
-// bool isDuplicate = _redisConnection.Database.SetContains(_setKey, request.Identity);
-// if (!isDuplicate)
-// {
-// _redisConnection.Database.SetAdd(_setKey, request.Identity);
-// }
-// return isDuplicate;
-// });
-// }
-
-// ///
-// /// 取得一个需要处理的请求对象
-// ///
-// /// 请求对象
-// public override Request Poll()
-// {
-// if (UseInternet)
-// {
-// return NetworkCenter.Current.Execute("rds-poll", ImplPollRequest);
-// }
-// else
-// {
-// return ImplPollRequest();
-// }
-// }
-
-// ///
-// /// 剩余链接数
-// ///
-// public override long LeftRequestsCount
-// {
-// get
-// {
-// if (UseInternet)
-// {
-// return NetworkCenter.Current.Execute("rds-left", () => _redisConnection.Database.ListLength(_queueKey));
-// }
-// else
-// {
-// return _redisConnection.Database.ListLength(_queueKey);
-// }
-// }
-// }
-
-// ///
-// /// 总的链接数
-// ///
-// public override long TotalRequestsCount
-// {
-// get
-// {
-// if (UseInternet)
-// {
-// return NetworkCenter.Current.Execute("rds-total", () => _redisConnection.Database.SetLength(_setKey));
-// }
-// else
-// {
-// return _redisConnection.Database.SetLength(_setKey);
-// }
-// }
-// }
-
-// ///
-// /// 采集成功的链接数
-// ///
-// public override long SuccessRequestsCount => _successCounter.Value;
-
-// ///
-// /// 采集失败的次数, 不是链接数, 如果一个链接采集多次都失败会记录多次
-// ///
-// public override long ErrorRequestsCount => _errorCounter.Value;
-
-// ///
-// /// 采集成功的链接数加 1
-// ///
-// public override void IncreaseSuccessCount()
-// {
-// _successCounter.Inc();
-// }
-
-// ///
-// /// 采集失败的次数加 1
-// ///
-// public override void IncreaseErrorCount()
-// {
-// _errorCounter.Inc();
-// }
-
-// ///
-// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
-// ///
-// public override void Dispose()
-// {
-// if (UseInternet)
-// {
-// NetworkCenter.Current.Execute("rds-inc-clear", () =>
-// {
-// _redisConnection.Database.KeyDelete(_queueKey);
-// _redisConnection.Database.KeyDelete(_setKey);
-// _redisConnection.Database.KeyDelete(_itemKey);
-// _redisConnection.Database.KeyDelete(_successCountKey);
-// _redisConnection.Database.KeyDelete(_errorCountKey);
-// });
-// }
-// else
-// {
-// _redisConnection.Database.KeyDelete(_queueKey);
-// _redisConnection.Database.KeyDelete(_setKey);
-// _redisConnection.Database.KeyDelete(_itemKey);
-// _redisConnection.Database.KeyDelete(_successCountKey);
-// _redisConnection.Database.KeyDelete(_errorCountKey);
-// }
-// }
-
-// ///
-// /// 批量导入
-// ///
-// /// 请求对象
-// public override void Import(IEnumerable requests)
-// {
-// var action = new Action(() =>
-// {
-// lock (_locker)
-// {
-// int batchCount = BatchCount;
-
-// var count = requests.Count();
-// int cacheSize = count > batchCount ? batchCount : count;
-// RedisValue[] identities = new RedisValue[cacheSize];
-// HashEntry[] items = new HashEntry[cacheSize];
-// int i = 0;
-// int j = count % batchCount;
-// int n = count / batchCount;
-
-// foreach (var request in requests)
-// {
-// identities[i] = request.Identity;
-// items[i] = new HashEntry(request.Identity, JsonConvert.SerializeObject(request));
-// ++i;
-// if (i == batchCount)
-// {
-// --n;
-
-// _redisConnection.Database.SetAdd(_setKey, identities);
-// _redisConnection.Database.ListRightPush(_queueKey, identities);
-// _redisConnection.Database.HashSet(_itemKey, items, CommandFlags.HighPriority);
-
-// i = 0;
-// if (n != 0)
-// {
-// identities = new RedisValue[batchCount];
-// items = new HashEntry[batchCount];
-// }
-// else
-// {
-// identities = new RedisValue[j];
-// items = new HashEntry[j];
-// }
-// }
-// }
-
-// if (i > 0)
-// {
-// _redisConnection.Database.SetAdd(_setKey, identities);
-// _redisConnection.Database.ListRightPush(_queueKey, identities);
-// _redisConnection.Database.HashSet(_itemKey, items);
-// }
-// }
-// });
-// if (UseInternet)
-// {
-// NetworkCenter.Current.Execute("rds-import", action);
-// }
-// else
-// {
-// action();
-// }
-// }
-
-// ///
-// /// 把队列中的请求对象转换成List
-// ///
-// /// 请求对象的List
-// public HashSet ToList()
-// {
-// HashSet requests = new HashSet();
-// Request request;
-// while ((request = Poll()) != null)
-// {
-// requests.Add(request);
-// }
-// return requests;
-// }
-
-// //public override bool IsExited
-// //{
-// // get
-// // {
-// // try
-// // {
-// // if (UseInternet)
-// // {
-// // return NetworkCenter.Current.Execute("rds-isexited", () =>
-// // {
-// // var result = _redisConnection.Database.HashGet(TaskStatsKey, _identityMd5);
-// // if (result.HasValue)
-// // {
-// // return result == 1;
-// // }
-// // else
-// // {
-// // return false;
-// // }
-// // });
-// // }
-// // else
-// // {
-// // var result = _redisConnection.Database.HashGet(TaskStatsKey, _identityMd5);
-// // if (result.HasValue)
-// // {
-// // return result == 1;
-// // }
-// // else
-// // {
-// // return false;
-// // }
-// // }
-
-// // }
-// // catch
-// // {
-// // return false;
-// // }
-// // }
-// // set
-// // {
-// // var action = new Action(() =>
-// // {
-// // _redisConnection.Database.HashSet(TaskStatsKey, _identityMd5, value ? 1 : 0);
-// // });
-// // if (UseInternet)
-// // {
-// // NetworkCenter.Current.Execute("rds-isexited", action);
-// // }
-// // else
-// // {
-// // action();
-// // }
-// // }
-// //}
-
-// ///
-// /// 如果链接不是重复的就添加到队列中
-// ///
-// /// 请求对象
-// protected override void PushWhenNoDuplicate(Request request)
-// {
-// request.Site = request.Site ?? Spider.Site;
-// _retryPolicy.Execute(() =>
-// {
-// _redisConnection.Database.ListRightPush(_queueKey, request.Identity);
-// string field = request.Identity;
-// string value = JsonConvert.SerializeObject(request);
-
-// _redisConnection.Database.HashSet(_itemKey, field, value);
-// });
-// }
-
-// private Request ImplPollRequest()
-// {
-// return _retryPolicy.Execute(() =>
-// {
-// RedisValue value;
-// switch (TraverseStrategy)
-// {
-// case TraverseStrategy.Dfs:
-// {
-// value = _redisConnection.Database.ListRightPop(_queueKey);
-// break;
-// }
-// case TraverseStrategy.Bfs:
-// {
-// value = _redisConnection.Database.ListLeftPop(_queueKey);
-// break;
-// }
-// default:
-// {
-// throw new NotImplementedException();
-// }
-// }
-// if (!value.HasValue)
-// {
-// return null;
-// }
-// string field = value.ToString();
-
-// string json = _redisConnection.Database.HashGet(_itemKey, field);
-
-// if (!string.IsNullOrEmpty(json))
-// {
-// var result = JsonConvert.DeserializeObject(json);
-// _redisConnection.Database.HashDelete(_itemKey, field);
-// result.Site = Spider.Site;
-// return result;
-// }
-// return null;
-// });
-// }
-
-// #region For Test
-
-// internal string GetQueueKey()
-// {
-// return _queueKey;
-// }
-
-// internal string GetSetKey()
-// {
-// return _setKey;
-// }
-
-// internal string GetItemKey()
-// {
-// return _itemKey;
-// }
-
-// internal string GetErrorCountKey()
-// {
-// return _errorCountKey;
-// }
-
-// internal string GetSuccessCountKey()
-// {
-// return _successCountKey;
-// }
-
-// #endregion
-// }
-//}
+using DotnetSpider.Core;
+using DotnetSpider.Core.Scheduler;
+using DotnetSpider.Core.Scheduler.Component;
+using DotnetSpider.Core.Infrastructure;
+using Newtonsoft.Json;
+using System.Collections.Generic;
+using StackExchange.Redis;
+using DotnetSpider.Extension.Infrastructure;
+using System;
+using Polly;
+using Polly.Retry;
+using System.Linq;
+using DotnetSpider.Common;
+using DotnetSpider.Downloader;
+
+namespace DotnetSpider.Extension.Scheduler
+{
+ ///
+ /// Use Redis as url scheduler for distributed crawlers.
+ ///
+ public class RedisScheduler : DuplicateRemovedScheduler, IDuplicateRemover
+ {
+ private readonly object _locker = new object();
+ private const string TasksKey = "dotnetspider:tasks";
+ private readonly RetryPolicy _retryPolicy = Policy.Handle().Retry(30);
+ private string _queueKey;
+ private string _setKey;
+ private string _itemKey;
+ private string _errorCountKey;
+ private string _successCountKey;
+ private string _identityMd5;
+ private readonly AutomicLong _successCounter = new AutomicLong(0);
+ private readonly AutomicLong _errorCounter = new AutomicLong(0);
+ private readonly string _connectString;
+ private RedisConnection _redisConnection;
+ private readonly string _identity;
+
+ ///
+ /// 批量加载时的每批次加载数
+ ///
+ public int BatchCount { get; set; } = 1000;
+
+ public override bool IsDistributed => true;
+
+ ///
+ /// RedisScheduler是否会使用互联网
+ ///
+ protected override bool UseInternet { get; set; } = true;
+
+ ///
+ /// 构造方法
+ ///
+ /// 对列标识
+ public RedisScheduler(string identity) : this(identity, Env.RedisConnectString)
+ {
+ }
+
+ ///
+ /// 构造方法
+ ///
+ /// 对列标识
+ /// Redis连接字符串
+ public RedisScheduler(string identity, string connectString)
+ {
+ if (string.IsNullOrWhiteSpace(identity))
+ {
+ throw new ArgumentNullException("identity should not be empty");
+ }
+ if (string.IsNullOrWhiteSpace(connectString))
+ {
+ throw new ArgumentNullException("connectString should not be empty");
+ }
+ _identity = identity;
+ _connectString = connectString;
+ DuplicateRemover = this;
+
+ var md5 = CryptoUtil.Md5Encrypt(identity);
+ _itemKey = $"dotnetspider:scheduler:{md5}:items";
+ _setKey = $"dotnetspider:scheduler:{md5}:set";
+ _queueKey = $"dotnetspider:scheduler:{md5}:queue";
+ _errorCountKey = $"dotnetspider:scheduler:{md5}:countOfFailures";
+ _successCountKey = $"dotnetspider:scheduler:{md5}:countOfSuccess";
+
+ _identityMd5 = md5;
+
+ var action = new Action(() =>
+ {
+ _redisConnection = (RedisConnection)Cache.Instance.Get(_connectString);
+ if (_redisConnection == null)
+ {
+ _redisConnection = new RedisConnection(_connectString);
+ Cache.Instance.Set(_connectString, _redisConnection);
+ }
+ _redisConnection.Database.SortedSetAdd(TasksKey, _identity, (long)DateTimeUtil.GetCurrentUnixTimeNumber());
+ });
+
+ if (UseInternet)
+ {
+ NetworkCenter.Current.Execute("rds-init", action);
+ }
+ else
+ {
+ action();
+ }
+ }
+
+ ///
+ /// Reset duplicate check.
+ ///
+ public override void ResetDuplicateCheck()
+ {
+ var action = new Action(() =>
+ {
+ _redisConnection.Database.KeyDelete(_setKey);
+ });
+ if (UseInternet)
+ {
+ NetworkCenter.Current.Execute("rds-reset", action);
+ }
+ else
+ {
+ action();
+ }
+ }
+
+ ///
+ /// Check whether the request is duplicate.
+ ///
+ /// Request
+ /// Whether the request is duplicate.
+ public virtual bool IsDuplicate(Request request)
+ {
+ return _retryPolicy.Execute(() =>
+ {
+ bool isDuplicate = _redisConnection.Database.SetContains(_setKey, request.Identity);
+ if (!isDuplicate)
+ {
+ _redisConnection.Database.SetAdd(_setKey, request.Identity);
+ }
+ return isDuplicate;
+ });
+ }
+
+ ///
+ /// 取得一个需要处理的请求对象
+ ///
+ /// 请求对象
+ public override Request Poll()
+ {
+ if (UseInternet)
+ {
+ return NetworkCenter.Current.Execute("rds-poll", ImplPollRequest);
+ }
+ else
+ {
+ return ImplPollRequest();
+ }
+ }
+
+ ///
+ /// 剩余链接数
+ ///
+ public override long LeftRequestsCount
+ {
+ get
+ {
+ if (UseInternet)
+ {
+ return NetworkCenter.Current.Execute("rds-left", () => _redisConnection.Database.ListLength(_queueKey));
+ }
+ else
+ {
+ return _redisConnection.Database.ListLength(_queueKey);
+ }
+ }
+ }
+
+ ///
+ /// 总的链接数
+ ///
+ public override long TotalRequestsCount
+ {
+ get
+ {
+ if (UseInternet)
+ {
+ return NetworkCenter.Current.Execute("rds-total", () => _redisConnection.Database.SetLength(_setKey));
+ }
+ else
+ {
+ return _redisConnection.Database.SetLength(_setKey);
+ }
+ }
+ }
+
+ ///
+ /// 采集成功的链接数
+ ///
+ public override long SuccessRequestsCount => _successCounter.Value;
+
+ ///
+ /// 采集失败的次数, 不是链接数, 如果一个链接采集多次都失败会记录多次
+ ///
+ public override long ErrorRequestsCount => _errorCounter.Value;
+
+ ///
+ /// 采集成功的链接数加 1
+ ///
+ public override void IncreaseSuccessCount()
+ {
+ _successCounter.Inc();
+ }
+
+ ///
+ /// 采集失败的次数加 1
+ ///
+ public override void IncreaseErrorCount()
+ {
+ _errorCounter.Inc();
+ }
+
+ ///
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+ ///
+ public override void Dispose()
+ {
+ if (UseInternet)
+ {
+ NetworkCenter.Current.Execute("rds-inc-clear", () =>
+ {
+ _redisConnection.Database.KeyDelete(_queueKey);
+ _redisConnection.Database.KeyDelete(_setKey);
+ _redisConnection.Database.KeyDelete(_itemKey);
+ _redisConnection.Database.KeyDelete(_successCountKey);
+ _redisConnection.Database.KeyDelete(_errorCountKey);
+ });
+ }
+ else
+ {
+ _redisConnection.Database.KeyDelete(_queueKey);
+ _redisConnection.Database.KeyDelete(_setKey);
+ _redisConnection.Database.KeyDelete(_itemKey);
+ _redisConnection.Database.KeyDelete(_successCountKey);
+ _redisConnection.Database.KeyDelete(_errorCountKey);
+ }
+ }
+
+ ///
+ /// 批量导入
+ ///
+ /// 请求对象
+ public override void Reload(IEnumerable requests)
+ {
+ var action = new Action(() =>
+ {
+ lock (_locker)
+ {
+ int batchCount = BatchCount;
+
+ var count = requests.Count();
+ int cacheSize = count > batchCount ? batchCount : count;
+ RedisValue[] identities = new RedisValue[cacheSize];
+ HashEntry[] items = new HashEntry[cacheSize];
+ int i = 0;
+ int j = count % batchCount;
+ int n = count / batchCount;
+
+ foreach (var request in requests)
+ {
+ identities[i] = request.Identity;
+ items[i] = new HashEntry(request.Identity, JsonConvert.SerializeObject(request));
+ ++i;
+ if (i == batchCount)
+ {
+ --n;
+
+ _redisConnection.Database.SetAdd(_setKey, identities);
+ _redisConnection.Database.ListRightPush(_queueKey, identities);
+ _redisConnection.Database.HashSet(_itemKey, items, CommandFlags.HighPriority);
+
+ i = 0;
+ if (n != 0)
+ {
+ identities = new RedisValue[batchCount];
+ items = new HashEntry[batchCount];
+ }
+ else
+ {
+ identities = new RedisValue[j];
+ items = new HashEntry[j];
+ }
+ }
+ }
+
+ if (i > 0)
+ {
+ _redisConnection.Database.SetAdd(_setKey, identities);
+ _redisConnection.Database.ListRightPush(_queueKey, identities);
+ _redisConnection.Database.HashSet(_itemKey, items);
+ }
+ }
+ });
+ if (UseInternet)
+ {
+ NetworkCenter.Current.Execute("rds-import", action);
+ }
+ else
+ {
+ action();
+ }
+ }
+
+ ///
+ /// 把队列中的请求对象转换成List
+ ///
+ /// 请求对象的List
+ public HashSet ToList()
+ {
+ HashSet requests = new HashSet();
+ Request request;
+ while ((request = Poll()) != null)
+ {
+ requests.Add(request);
+ }
+ return requests;
+ }
+
+ ///
+ /// 如果链接不是重复的就添加到队列中
+ ///
+ /// 请求对象
+ protected override void PushWhenNoDuplicate(Request request)
+ {
+ _retryPolicy.Execute(() =>
+ {
+ _redisConnection.Database.ListRightPush(_queueKey, request.Identity);
+ string field = request.Identity;
+ string value = JsonConvert.SerializeObject(request);
+
+ _redisConnection.Database.HashSet(_itemKey, field, value);
+ });
+ }
+
+ private Request ImplPollRequest()
+ {
+ return _retryPolicy.Execute(() =>
+ {
+ RedisValue value;
+ switch (TraverseStrategy)
+ {
+ case TraverseStrategy.Dfs:
+ {
+ value = _redisConnection.Database.ListRightPop(_queueKey);
+ break;
+ }
+ case TraverseStrategy.Bfs:
+ {
+ value = _redisConnection.Database.ListLeftPop(_queueKey);
+ break;
+ }
+ default:
+ {
+ throw new NotImplementedException();
+ }
+ }
+ if (!value.HasValue)
+ {
+ return null;
+ }
+ string field = value.ToString();
+
+ string json = _redisConnection.Database.HashGet(_itemKey, field);
+
+ if (!string.IsNullOrEmpty(json))
+ {
+ var result = JsonConvert.DeserializeObject(json);
+ _redisConnection.Database.HashDelete(_itemKey, field);
+ return result;
+ }
+ return null;
+ });
+ }
+
+ #region For Test
+
+ internal string GetQueueKey()
+ {
+ return _queueKey;
+ }
+
+ internal string GetSetKey()
+ {
+ return _setKey;
+ }
+
+ internal string GetItemKey()
+ {
+ return _itemKey;
+ }
+
+ internal string GetErrorCountKey()
+ {
+ return _errorCountKey;
+ }
+
+ internal string GetSuccessCountKey()
+ {
+ return _successCountKey;
+ }
+
+ #endregion
+ }
+}