From 874bdbc78722002e5fff299f56013cc32d8db630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E5=B5=A9?= Date: Fri, 20 Jul 2018 13:30:27 +0800 Subject: [PATCH] =?UTF-8?q?RedisScheduler=E9=87=8D=E6=96=B0=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/DotnetSpider.Common/Request.cs | 14 +- .../Scheduler/DuplicateRemovedScheduler.cs | 2 +- .../QueueDuplicateRemovedScheduler.cs | 14 + src/DotnetSpider.Core/Spider.cs | 7 +- .../Scheduler/RedisSchedulerTest.cs | 613 ++++++------ .../Scheduler/RedisScheduler.cs | 896 ++++++++---------- 6 files changed, 747 insertions(+), 799 deletions(-) diff --git a/src/DotnetSpider.Common/Request.cs b/src/DotnetSpider.Common/Request.cs index 8e4f2a728..0368f5faf 100644 --- a/src/DotnetSpider.Common/Request.cs +++ b/src/DotnetSpider.Common/Request.cs @@ -132,13 +132,13 @@ public override bool Equals(object obj) Request request = (Request)obj; - if (!Depth.Equals(request.Depth)) return false; - if (!CycleTriedTimes.Equals(request.CycleTriedTimes)) return false; - if (!Referer.Equals(request.Referer)) return false; - if (!Origin.Equals(request.Origin)) return false; - if (!Method.Equals(request.Method)) return false; - if (!Priority.Equals(request.Priority)) return false; - if (!Content.Equals(request.Content)) return false; + if (!Equals(Depth, request.Depth)) return false; + if (!Equals(CycleTriedTimes, request.CycleTriedTimes)) return false; + if (!Equals(Referer, request.Referer)) return false; + if (!Equals(Origin, request.Origin)) return false; + if (!Equals(Method, request.Method)) return false; + if (!Equals(Priority, request.Priority)) return false; + if (!Equals(Content, request.Content)) return false; if (Properties == null) { diff --git a/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs b/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs index 15eff67c1..37f49a3f6 100644 --- a/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs +++ b/src/DotnetSpider.Core/Scheduler/DuplicateRemovedScheduler.cs @@ -96,7 +96,7 @@ public int Depth /// 添加请求对象到队列 /// /// 请求对象 - public void Push(Request request, Func shouldReserved) + public void Push(Request request, Func shouldReserved = null) { var action = new Action(() => { diff --git a/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs b/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs index 989793112..f1f1c13a8 100644 --- a/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs +++ b/src/DotnetSpider.Core/Scheduler/QueueDuplicateRemovedScheduler.cs @@ -165,5 +165,19 @@ public override void Dispose() } base.Dispose(); } + + /// + /// 取得队列中所有的请求对象 + /// + internal Request[] All + { + get + { + lock (_lock) + { + return _queue.ToArray(); + } + } + } } } \ No newline at end of file diff --git a/src/DotnetSpider.Core/Spider.cs b/src/DotnetSpider.Core/Spider.cs index b39cafe3f..5e809c24c 100644 --- a/src/DotnetSpider.Core/Spider.cs +++ b/src/DotnetSpider.Core/Spider.cs @@ -737,9 +737,12 @@ protected override void Execute(params string[] arguments) var downloader = Downloader.Clone(); while (Status == Status.Running) { - // 从队列中取出一个请求 + // 从队列中取出一个请求, 因为 Site 是共享对象, 每个Request都保留了引用, 序列存到Redis或其它数据库中浪费带宽和空间, 因此 Site对象不保存到数据库中 Request request = Scheduler.Poll(); - + if (request != null && request.Site == null) + { + request.Site = Site; + } // 如果队列中没有需要处理的请求, 则开始等待, 一直到设定的 EmptySleepTime 结束, 则认为爬虫应该结束了 if (request == null) { diff --git a/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs b/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs index 732dc7d5b..fa9ec4a5b 100644 --- a/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs +++ b/src/DotnetSpider.Extension.Test/Scheduler/RedisSchedulerTest.cs @@ -1,306 +1,307 @@ -//using System; -//using System.Collections.Generic; -//using DotnetSpider.Core; -//using DotnetSpider.Core.Scheduler; -//using Xunit; -//using DotnetSpider.Core.Downloader; -//using DotnetSpider.Core.Pipeline; -//using DotnetSpider.Core.Processor; -//using DotnetSpider.Core.Monitor; - -//namespace DotnetSpider.Extension.Test.Scheduler -//{ - -// public class RedisSchedulerTest -// { -// private Extension.Scheduler.RedisScheduler GetRedisScheduler() -// { -// return new Extension.Scheduler.RedisScheduler("127.0.0.1:6379,serviceName=Scheduler.NET,keepAlive=8,allowAdmin=True,connectTimeout=10000,password=,abortConnect=True,connectRetry=20"); -// } - -// [Fact(DisplayName = "PushAndPoll1")] -// public void PushAndPoll1() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); - -// ISpider spider = new DefaultSpider(); -// scheduler.Init(spider); -// scheduler.Dispose(); - -// Request request = new Request("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", null) { Site = spider.Site }; -// request.PutExtra("1", "2"); -// scheduler.Push(request); -// Request result = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", result.Url.ToString()); -// Assert.Equal("2", request.GetExtra("1")); -// Request result1 = scheduler.Poll(); -// Assert.Null(result1); -// scheduler.Dispose(); -// } - -// [Fact(DisplayName = "RedisScheduler_PushAndPollBreadthFirst")] -// public void PushAndPollBreadthFirst() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); -// scheduler.TraverseStrategy = TraverseStrategy.Bfs; -// ISpider spider = new DefaultSpider(); -// scheduler.Init(spider); -// scheduler.Dispose(); - -// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; -// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; -// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; -// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; -// scheduler.Push(request1); -// scheduler.Push(request2); -// scheduler.Push(request3); -// scheduler.Push(request4); - -// Request result = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/1", result.Url.ToString()); -// Request result1 = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/2", result1.Url.ToString()); -// scheduler.Dispose(); -// scheduler.Dispose(); -// } - -// [Fact(DisplayName = "RedisScheduler_PushAndPollDepthFirst")] -// public void PushAndPollDepthFirst() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); -// scheduler.TraverseStrategy = TraverseStrategy.Dfs; -// ISpider spider = new DefaultSpider(); -// scheduler.Init(spider); -// scheduler.Dispose(); -// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; -// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; -// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; -// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; -// scheduler.Push(request1); -// scheduler.Push(request2); -// scheduler.Push(request3); -// scheduler.Push(request4); - -// Request result = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/4", result.Url.ToString()); -// Request result1 = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/3", result1.Url.ToString()); -// scheduler.Dispose(); -// scheduler.Dispose(); -// } - -// [Fact(DisplayName = "LoadPerformace")] -// public void LoadPerformace() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); -// Spider spider = new DefaultSpider("test", new Site()); -// spider.Monitor = new LogMonitor(); -// scheduler.Init(spider); -// scheduler.Dispose(); -// var start = DateTime.Now; -// for (int i = 0; i < 40000; i++) -// { -// scheduler.Push(new Request("http://www.a.com/" + i, null) { Site = spider.Site }); -// } - -// var end = DateTime.Now; -// double seconds = (end - start).TotalSeconds; -// scheduler.Dispose(); - -// var start1 = DateTime.Now; -// HashSet list = new HashSet(); -// for (int i = 0; i < 40000; i++) -// { -// list.Add(new Request("http://www.a.com/" + i, null)); -// } -// scheduler.Import(list); -// var end1 = DateTime.Now; -// double seconds1 = (end1 - start1).TotalSeconds; -// Assert.True(seconds1 < seconds); -// scheduler.Dispose(); -// } - -// [Fact(DisplayName = "RedisScheduler_Load")] -// public void Load() -// { -// QueueDuplicateRemovedScheduler scheduler = new QueueDuplicateRemovedScheduler(); -// ISpider spider = new DefaultSpider("test", new Site()); -// scheduler.Init(spider); - -// scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site }); - -// Extension.Scheduler.RedisScheduler redisScheduler = GetRedisScheduler(); -// redisScheduler.Init(spider); - -// redisScheduler.Dispose(); - -// redisScheduler.Import(scheduler.All); - -// Assert.Equal("http://www.d.com/", redisScheduler.Poll().Url.ToString()); -// Assert.Equal("http://www.c.com/", redisScheduler.Poll().Url.ToString()); -// Assert.Equal("http://www.b.com/", redisScheduler.Poll().Url.ToString()); -// Assert.Equal("http://www.a.com/", redisScheduler.Poll().Url.ToString()); - -// redisScheduler.Dispose(); -// } - -// [Fact(DisplayName = "RedisScheduler_Status")] -// public void Status() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); -// ISpider spider = new DefaultSpider("test", new Site()); -// scheduler.Init(spider); - -// scheduler.Dispose(); - -// scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site }); -// scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site }); - -// Assert.Equal(0, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.LeftRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); -// scheduler.IncreaseErrorCount(); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(0, scheduler.SuccessRequestsCount); -// scheduler.IncreaseSuccessCount(); -// Assert.Equal(1, scheduler.SuccessRequestsCount); - -// scheduler.Poll(); -// Assert.Equal(3, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); - -// scheduler.Poll(); -// Assert.Equal(2, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); - -// scheduler.Poll(); -// Assert.Equal(1, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); - -// scheduler.Poll(); -// Assert.Equal(0, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); - -// scheduler.Poll(); -// scheduler.Poll(); -// Assert.Equal(0, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// Assert.Equal(1, scheduler.ErrorRequestsCount); -// Assert.Equal(4, scheduler.TotalRequestsCount); - -// scheduler.Dispose(); -// } - -// //[Fact] -// //public void MultiInit() -// //{ -// // Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); - -// // ISpider spider = new DefaultSpider(); -// // scheduler.Init(spider); -// // string queueKey = scheduler.GetQueueKey(); -// // string setKey = scheduler.GetSetKey(); -// // string itemKey = scheduler.GetItemKey(); -// // string errorCountKey = scheduler.GetErrorCountKey(); -// // string successCountKey = scheduler.GetSuccessCountKey(); -// // scheduler.Init(spider); -// // Assert.Equal(queueKey, scheduler.GetQueueKey()); -// // Assert.Equal(setKey, scheduler.GetSetKey()); -// // Assert.Equal(itemKey, scheduler.GetItemKey()); -// // Assert.Equal(errorCountKey, scheduler.GetErrorCountKey()); -// // Assert.Equal(successCountKey, scheduler.GetSuccessCountKey()); - -// // scheduler.Dispose(); -// // scheduler.Dispose(); -// //} - -// [Fact(DisplayName = "RedisScheduler_Clear")] -// public void Clear() -// { -// Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); - -// ISpider spider = new DefaultSpider(); -// scheduler.Init(spider); -// scheduler.Dispose(); -// Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; -// Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; -// Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; -// Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; -// scheduler.Push(request1); -// scheduler.Push(request2); -// scheduler.Push(request3); -// scheduler.Push(request4); - -// Request result = scheduler.Poll(); -// Assert.Equal("http://www.ibm.com/4", result.Url.ToString()); - -// scheduler.Dispose(); -// } - -// [Fact(DisplayName = "RedisScheduler_RetryRequest")] -// public void RetryRequest() -// { -// var site = new Site { EncodingName = "UTF-8" }; - -// var scheduler = new QueueDuplicateRemovedScheduler(); - -// site.AddStartUrl("http://www.baidu.com"); -// site.AddStartUrl("http://www.163.com/"); - -// Spider spider = Spider.Create(site, -// // crawler identity -// "cnblogs_" + DateTime.Now.ToString("yyyyMMddhhmmss"), -// // use memoery queue scheduler -// scheduler, -// // default page processor will save whole html, and extract urls to target urls via regex -// new TestPageProcessor()) -// // save crawler result to file in the folder: \{running directory}\data\{crawler identity}\{guid}.dsd -// .AddPipeline(new FilePipeline()); -// spider.Monitor = new LogMonitor(); -// // dowload html by http client -// spider.Downloader = new HttpClientDownloader(); - -// spider.ThreadNum = 1; -// // traversal deep 遍历深度 -// spider.Scheduler.Depth = 3; -// spider.ClearSchedulerAfterCompleted = false; -// spider.EmptySleepTime = 6000; -// // start crawler 启动爬虫 -// spider.Run(); - -// Assert.Equal(5, spider.RetriedTimes.Value); -// Assert.Equal(0, scheduler.LeftRequestsCount); -// Assert.Equal(1, scheduler.SuccessRequestsCount); -// // 重试次数应该包含 -// Assert.Equal(5, scheduler.ErrorRequestsCount); -// } - -// class TestPageProcessor : BasePageProcessor -// { -// protected override void Handle(Page page) -// { -// if (page.Request.Url.ToString() == "http://www.163.com/") -// { -// throw new SpiderException(""); -// } -// else -// { -// page.AddTargetRequest("http://www.163.com/", 0, false); -// } -// } -// } -// } -//} \ No newline at end of file +using System; +using System.Collections.Generic; +using DotnetSpider.Core; +using DotnetSpider.Core.Scheduler; +using Xunit; +using DotnetSpider.Core.Downloader; +using DotnetSpider.Core.Pipeline; +using DotnetSpider.Core.Processor; +using DotnetSpider.Core.Monitor; +using DotnetSpider.Common; +using DotnetSpider.Downloader; + +namespace DotnetSpider.Extension.Test.Scheduler +{ + + public class RedisSchedulerTest + { + private Extension.Scheduler.RedisScheduler GetRedisScheduler(string identity) + { + return new Extension.Scheduler.RedisScheduler(identity, "127.0.0.1:6379,serviceName=Scheduler.NET,keepAlive=8,allowAdmin=True,connectTimeout=10000,password=,abortConnect=True,connectRetry=20"); + } + + [Fact(DisplayName = "PushAndPoll1")] + public void PushAndPoll1() + { + ISpider spider = new DefaultSpider(); + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + scheduler.Dispose(); + + Request request = new Request("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", null) { Site = spider.Site }; + request.Properties.Add("1", "2"); + scheduler.Push(request, null); + Request result = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/developerworks/cn/java/j-javadev2-22/", result.Url.ToString()); + Assert.Equal("2", request.Properties["1"]); + Request result1 = scheduler.Poll(); + Assert.Null(result1); + scheduler.Dispose(); + } + + [Fact(DisplayName = "RedisScheduler_PushAndPollBreadthFirst")] + public void PushAndPollBreadthFirst() + { + ISpider spider = new DefaultSpider(); + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + scheduler.TraverseStrategy = TraverseStrategy.Bfs; + + + scheduler.Dispose(); + + Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; + Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; + Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; + Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; + scheduler.Push(request1); + scheduler.Push(request2); + scheduler.Push(request3); + scheduler.Push(request4); + + Request result = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/1", result.Url.ToString()); + Request result1 = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/2", result1.Url.ToString()); + scheduler.Dispose(); + scheduler.Dispose(); + } + + [Fact(DisplayName = "RedisScheduler_PushAndPollDepthFirst")] + public void PushAndPollDepthFirst() + { + ISpider spider = new DefaultSpider(); + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + scheduler.TraverseStrategy = TraverseStrategy.Dfs; + + + scheduler.Dispose(); + Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; + Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; + Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; + Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; + scheduler.Push(request1); + scheduler.Push(request2); + scheduler.Push(request3); + scheduler.Push(request4); + + Request result = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/4", result.Url.ToString()); + Request result1 = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/3", result1.Url.ToString()); + scheduler.Dispose(); + scheduler.Dispose(); + } + + [Fact(DisplayName = "LoadPerformace")] + public void LoadPerformace() + { + Spider spider = new DefaultSpider("test", new Site()); + spider.Monitor = new LogMonitor(); + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + scheduler.Dispose(); + var start = DateTime.Now; + for (int i = 0; i < 40000; i++) + { + scheduler.Push(new Request("http://www.a.com/" + i, null) { Site = spider.Site }); + } + + var end = DateTime.Now; + double seconds = (end - start).TotalSeconds; + scheduler.Dispose(); + + var start1 = DateTime.Now; + HashSet list = new HashSet(); + for (int i = 0; i < 40000; i++) + { + list.Add(new Request("http://www.a.com/" + i, null)); + } + scheduler.Reload(list); + var end1 = DateTime.Now; + double seconds1 = (end1 - start1).TotalSeconds; + Assert.True(seconds1 < seconds); + scheduler.Dispose(); + } + + [Fact(DisplayName = "RedisScheduler_Load")] + public void Load() + { + QueueDuplicateRemovedScheduler scheduler = new QueueDuplicateRemovedScheduler(); + ISpider spider = new DefaultSpider("test", new Site()); + + + scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site }); + + Extension.Scheduler.RedisScheduler redisScheduler = GetRedisScheduler(spider.Identity); + + redisScheduler.Dispose(); + + redisScheduler.Reload(scheduler.All); + + Assert.Equal("http://www.d.com/", redisScheduler.Poll().Url.ToString()); + Assert.Equal("http://www.c.com/", redisScheduler.Poll().Url.ToString()); + Assert.Equal("http://www.b.com/", redisScheduler.Poll().Url.ToString()); + Assert.Equal("http://www.a.com/", redisScheduler.Poll().Url.ToString()); + + redisScheduler.Dispose(); + } + + [Fact(DisplayName = "RedisScheduler_Status")] + public void Status() + { + ISpider spider = new DefaultSpider("test", new Site()); + + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + + scheduler.Dispose(); + + scheduler.Push(new Request("http://www.a.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.b.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.c.com/", null) { Site = spider.Site }); + scheduler.Push(new Request("http://www.d.com/", null) { Site = spider.Site }); + + Assert.Equal(0, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.LeftRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + scheduler.IncreaseErrorCount(); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(0, scheduler.SuccessRequestsCount); + scheduler.IncreaseSuccessCount(); + Assert.Equal(1, scheduler.SuccessRequestsCount); + + scheduler.Poll(); + Assert.Equal(3, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + + scheduler.Poll(); + Assert.Equal(2, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + + scheduler.Poll(); + Assert.Equal(1, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + + scheduler.Poll(); + Assert.Equal(0, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + + scheduler.Poll(); + scheduler.Poll(); + Assert.Equal(0, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + Assert.Equal(1, scheduler.ErrorRequestsCount); + Assert.Equal(4, scheduler.TotalRequestsCount); + + scheduler.Dispose(); + } + + //[Fact] + //public void MultiInit() + //{ + // Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(); + + // ISpider spider = new DefaultSpider(); + // scheduler.Init(spider); + // string queueKey = scheduler.GetQueueKey(); + // string setKey = scheduler.GetSetKey(); + // string itemKey = scheduler.GetItemKey(); + // string errorCountKey = scheduler.GetErrorCountKey(); + // string successCountKey = scheduler.GetSuccessCountKey(); + // scheduler.Init(spider); + // Assert.Equal(queueKey, scheduler.GetQueueKey()); + // Assert.Equal(setKey, scheduler.GetSetKey()); + // Assert.Equal(itemKey, scheduler.GetItemKey()); + // Assert.Equal(errorCountKey, scheduler.GetErrorCountKey()); + // Assert.Equal(successCountKey, scheduler.GetSuccessCountKey()); + + // scheduler.Dispose(); + // scheduler.Dispose(); + //} + + [Fact(DisplayName = "RedisScheduler_Clear")] + public void Clear() + { + ISpider spider = new DefaultSpider(); + Extension.Scheduler.RedisScheduler scheduler = GetRedisScheduler(spider.Identity); + + + + scheduler.Dispose(); + Request request1 = new Request("http://www.ibm.com/1", null) { Site = spider.Site }; + Request request2 = new Request("http://www.ibm.com/2", null) { Site = spider.Site }; + Request request3 = new Request("http://www.ibm.com/3", null) { Site = spider.Site }; + Request request4 = new Request("http://www.ibm.com/4", null) { Site = spider.Site }; + scheduler.Push(request1); + scheduler.Push(request2); + scheduler.Push(request3); + scheduler.Push(request4); + + Request result = scheduler.Poll(); + Assert.Equal("http://www.ibm.com/4", result.Url.ToString()); + + scheduler.Dispose(); + } + + [Fact(DisplayName = "RedisScheduler_RetryRequest")] + public void RetryRequest() + { + var site = new Site { EncodingName = "UTF-8" }; + + var scheduler = new QueueDuplicateRemovedScheduler(); + + site.AddRequests("http://www.baidu.com"); + site.AddRequests("http://www.163.com/"); + + Spider spider = Spider.Create(site, + // crawler identity + "cnblogs_" + DateTime.Now.ToString("yyyyMMddhhmmss"), + // use memoery queue scheduler + scheduler, + // default page processor will save whole html, and extract urls to target urls via regex + new TestPageProcessor()) + // save crawler result to file in the folder: \{running directory}\data\{crawler identity}\{guid}.dsd + .AddPipeline(new FilePipeline()); + spider.Monitor = new LogMonitor(); + // dowload html by http client + spider.Downloader = new HttpClientDownloader(); + + spider.ThreadNum = 1; + // traversal deep 遍历深度 + spider.Scheduler.Depth = 3; + spider.ClearSchedulerAfterCompleted = false; + spider.EmptySleepTime = 6000; + // start crawler 启动爬虫 + spider.Run(); + + Assert.Equal(5, spider.RetriedTimes.Value); + Assert.Equal(0, scheduler.LeftRequestsCount); + Assert.Equal(1, scheduler.SuccessRequestsCount); + // 重试次数应该包含 + Assert.Equal(6, scheduler.ErrorRequestsCount); + } + + class TestPageProcessor : BasePageProcessor + { + protected override void Handle(Page page) + { + if (page.Request.Url.ToString() == "http://www.163.com/") + { + throw new SpiderException(""); + } + else + { + page.AddTargetRequest("http://www.163.com/", 0, false); + } + } + } + } +} \ No newline at end of file diff --git a/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs b/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs index 80088a962..039523101 100644 --- a/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs +++ b/src/DotnetSpider.Extension/Scheduler/RedisScheduler.cs @@ -1,483 +1,413 @@ -//using DotnetSpider.Core; -//using DotnetSpider.Core.Scheduler; -//using DotnetSpider.Core.Scheduler.Component; -//using DotnetSpider.Core.Infrastructure; -//using Newtonsoft.Json; -//using System.Collections.Generic; -//using StackExchange.Redis; -//using DotnetSpider.Extension.Infrastructure; -//using DotnetSpider.Core.Redial; -//using System; -//using Polly; -//using Polly.Retry; -//using System.Linq; -//using DotnetSpider.Common; - -//namespace DotnetSpider.Extension.Scheduler -//{ -// /// -// /// Use Redis as url scheduler for distributed crawlers. -// /// -// public class RedisScheduler : DuplicateRemovedScheduler, IDuplicateRemover -// { -// private readonly object _locker = new object(); -// private const string TasksKey = "dotnetspider:tasks"; -// private readonly RetryPolicy _retryPolicy = Policy.Handle().Retry(30); -// private string _queueKey; -// private string _setKey; -// private string _itemKey; -// private string _errorCountKey; -// private string _successCountKey; -// private string _identityMd5; -// private readonly AutomicLong _successCounter = new AutomicLong(0); -// private readonly AutomicLong _errorCounter = new AutomicLong(0); -// private readonly string _connectString; -// private RedisConnection _redisConnection; - -// /// -// /// 批量加载时的每批次加载数 -// /// -// public int BatchCount { get; set; } = 1000; - -// public override bool IsDistributed => true; - -// /// -// /// RedisScheduler是否会使用互联网 -// /// -// protected override bool UseInternet { get; set; } = true; - -// /// -// /// 构造方法 -// /// -// /// Redis连接字符串 -// public RedisScheduler(string connectString) -// { -// if (string.IsNullOrWhiteSpace(connectString)) -// { -// throw new SpiderException("Redis connect string should not be empty"); -// } -// _connectString = connectString; -// DuplicateRemover = this; -// } - -// /// -// /// 构造方法 -// /// -// public RedisScheduler() -// { -// _connectString = Env.RedisConnectString; -// DuplicateRemover = this; -// } - -// /// -// /// 初始化队列 -// /// -// /// 爬虫对象 -// public override void Init(ISpider spider) -// { -// base.Init(spider); - -// if (string.IsNullOrWhiteSpace(_connectString)) -// { -// throw new SpiderException("Redis connect string should not be null or empty"); -// } - -// if (string.IsNullOrWhiteSpace(_identityMd5)) -// { -// var md5 = CryptoUtil.Md5Encrypt(spider.Identity); -// _itemKey = $"dotnetspider:scheduler:{md5}:items"; -// _setKey = $"dotnetspider:scheduler:{md5}:set"; -// _queueKey = $"dotnetspider:scheduler:{md5}:queue"; -// _errorCountKey = $"dotnetspider:scheduler:{md5}:numberOfFailures"; -// _successCountKey = $"dotnetspider:scheduler:{md5}:numberOfSuccessful"; - -// _identityMd5 = md5; - -// var action = new Action(() => -// { -// _redisConnection = Cache.Instance.Get(_connectString); -// if (_redisConnection == null) -// { -// _redisConnection = new RedisConnection(_connectString); -// Cache.Instance.Set(_connectString, _redisConnection); -// } -// _redisConnection.Database.SortedSetAdd(TasksKey, spider.Identity, (long)DateTimeUtil.GetCurrentUnixTimeNumber()); -// }); - -// if (UseInternet) -// { -// NetworkCenter.Current.Execute("rds-init", action); -// } -// else -// { -// action(); -// } -// } -// } - -// /// -// /// Reset duplicate check. -// /// -// public override void ResetDuplicateCheck() -// { -// var action = new Action(() => -// { -// _redisConnection.Database.KeyDelete(_setKey); -// }); -// if (UseInternet) -// { -// NetworkCenter.Current.Execute("rds-reset", action); -// } -// else -// { -// action(); -// } -// } - -// /// -// /// Check whether the request is duplicate. -// /// -// /// Request -// /// Whether the request is duplicate. -// public virtual bool IsDuplicate(Request request) -// { -// return _retryPolicy.Execute(() => -// { -// bool isDuplicate = _redisConnection.Database.SetContains(_setKey, request.Identity); -// if (!isDuplicate) -// { -// _redisConnection.Database.SetAdd(_setKey, request.Identity); -// } -// return isDuplicate; -// }); -// } - -// /// -// /// 取得一个需要处理的请求对象 -// /// -// /// 请求对象 -// public override Request Poll() -// { -// if (UseInternet) -// { -// return NetworkCenter.Current.Execute("rds-poll", ImplPollRequest); -// } -// else -// { -// return ImplPollRequest(); -// } -// } - -// /// -// /// 剩余链接数 -// /// -// public override long LeftRequestsCount -// { -// get -// { -// if (UseInternet) -// { -// return NetworkCenter.Current.Execute("rds-left", () => _redisConnection.Database.ListLength(_queueKey)); -// } -// else -// { -// return _redisConnection.Database.ListLength(_queueKey); -// } -// } -// } - -// /// -// /// 总的链接数 -// /// -// public override long TotalRequestsCount -// { -// get -// { -// if (UseInternet) -// { -// return NetworkCenter.Current.Execute("rds-total", () => _redisConnection.Database.SetLength(_setKey)); -// } -// else -// { -// return _redisConnection.Database.SetLength(_setKey); -// } -// } -// } - -// /// -// /// 采集成功的链接数 -// /// -// public override long SuccessRequestsCount => _successCounter.Value; - -// /// -// /// 采集失败的次数, 不是链接数, 如果一个链接采集多次都失败会记录多次 -// /// -// public override long ErrorRequestsCount => _errorCounter.Value; - -// /// -// /// 采集成功的链接数加 1 -// /// -// public override void IncreaseSuccessCount() -// { -// _successCounter.Inc(); -// } - -// /// -// /// 采集失败的次数加 1 -// /// -// public override void IncreaseErrorCount() -// { -// _errorCounter.Inc(); -// } - -// /// -// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. -// /// -// public override void Dispose() -// { -// if (UseInternet) -// { -// NetworkCenter.Current.Execute("rds-inc-clear", () => -// { -// _redisConnection.Database.KeyDelete(_queueKey); -// _redisConnection.Database.KeyDelete(_setKey); -// _redisConnection.Database.KeyDelete(_itemKey); -// _redisConnection.Database.KeyDelete(_successCountKey); -// _redisConnection.Database.KeyDelete(_errorCountKey); -// }); -// } -// else -// { -// _redisConnection.Database.KeyDelete(_queueKey); -// _redisConnection.Database.KeyDelete(_setKey); -// _redisConnection.Database.KeyDelete(_itemKey); -// _redisConnection.Database.KeyDelete(_successCountKey); -// _redisConnection.Database.KeyDelete(_errorCountKey); -// } -// } - -// /// -// /// 批量导入 -// /// -// /// 请求对象 -// public override void Import(IEnumerable requests) -// { -// var action = new Action(() => -// { -// lock (_locker) -// { -// int batchCount = BatchCount; - -// var count = requests.Count(); -// int cacheSize = count > batchCount ? batchCount : count; -// RedisValue[] identities = new RedisValue[cacheSize]; -// HashEntry[] items = new HashEntry[cacheSize]; -// int i = 0; -// int j = count % batchCount; -// int n = count / batchCount; - -// foreach (var request in requests) -// { -// identities[i] = request.Identity; -// items[i] = new HashEntry(request.Identity, JsonConvert.SerializeObject(request)); -// ++i; -// if (i == batchCount) -// { -// --n; - -// _redisConnection.Database.SetAdd(_setKey, identities); -// _redisConnection.Database.ListRightPush(_queueKey, identities); -// _redisConnection.Database.HashSet(_itemKey, items, CommandFlags.HighPriority); - -// i = 0; -// if (n != 0) -// { -// identities = new RedisValue[batchCount]; -// items = new HashEntry[batchCount]; -// } -// else -// { -// identities = new RedisValue[j]; -// items = new HashEntry[j]; -// } -// } -// } - -// if (i > 0) -// { -// _redisConnection.Database.SetAdd(_setKey, identities); -// _redisConnection.Database.ListRightPush(_queueKey, identities); -// _redisConnection.Database.HashSet(_itemKey, items); -// } -// } -// }); -// if (UseInternet) -// { -// NetworkCenter.Current.Execute("rds-import", action); -// } -// else -// { -// action(); -// } -// } - -// /// -// /// 把队列中的请求对象转换成List -// /// -// /// 请求对象的List -// public HashSet ToList() -// { -// HashSet requests = new HashSet(); -// Request request; -// while ((request = Poll()) != null) -// { -// requests.Add(request); -// } -// return requests; -// } - -// //public override bool IsExited -// //{ -// // get -// // { -// // try -// // { -// // if (UseInternet) -// // { -// // return NetworkCenter.Current.Execute("rds-isexited", () => -// // { -// // var result = _redisConnection.Database.HashGet(TaskStatsKey, _identityMd5); -// // if (result.HasValue) -// // { -// // return result == 1; -// // } -// // else -// // { -// // return false; -// // } -// // }); -// // } -// // else -// // { -// // var result = _redisConnection.Database.HashGet(TaskStatsKey, _identityMd5); -// // if (result.HasValue) -// // { -// // return result == 1; -// // } -// // else -// // { -// // return false; -// // } -// // } - -// // } -// // catch -// // { -// // return false; -// // } -// // } -// // set -// // { -// // var action = new Action(() => -// // { -// // _redisConnection.Database.HashSet(TaskStatsKey, _identityMd5, value ? 1 : 0); -// // }); -// // if (UseInternet) -// // { -// // NetworkCenter.Current.Execute("rds-isexited", action); -// // } -// // else -// // { -// // action(); -// // } -// // } -// //} - -// /// -// /// 如果链接不是重复的就添加到队列中 -// /// -// /// 请求对象 -// protected override void PushWhenNoDuplicate(Request request) -// { -// request.Site = request.Site ?? Spider.Site; -// _retryPolicy.Execute(() => -// { -// _redisConnection.Database.ListRightPush(_queueKey, request.Identity); -// string field = request.Identity; -// string value = JsonConvert.SerializeObject(request); - -// _redisConnection.Database.HashSet(_itemKey, field, value); -// }); -// } - -// private Request ImplPollRequest() -// { -// return _retryPolicy.Execute(() => -// { -// RedisValue value; -// switch (TraverseStrategy) -// { -// case TraverseStrategy.Dfs: -// { -// value = _redisConnection.Database.ListRightPop(_queueKey); -// break; -// } -// case TraverseStrategy.Bfs: -// { -// value = _redisConnection.Database.ListLeftPop(_queueKey); -// break; -// } -// default: -// { -// throw new NotImplementedException(); -// } -// } -// if (!value.HasValue) -// { -// return null; -// } -// string field = value.ToString(); - -// string json = _redisConnection.Database.HashGet(_itemKey, field); - -// if (!string.IsNullOrEmpty(json)) -// { -// var result = JsonConvert.DeserializeObject(json); -// _redisConnection.Database.HashDelete(_itemKey, field); -// result.Site = Spider.Site; -// return result; -// } -// return null; -// }); -// } - -// #region For Test - -// internal string GetQueueKey() -// { -// return _queueKey; -// } - -// internal string GetSetKey() -// { -// return _setKey; -// } - -// internal string GetItemKey() -// { -// return _itemKey; -// } - -// internal string GetErrorCountKey() -// { -// return _errorCountKey; -// } - -// internal string GetSuccessCountKey() -// { -// return _successCountKey; -// } - -// #endregion -// } -//} +using DotnetSpider.Core; +using DotnetSpider.Core.Scheduler; +using DotnetSpider.Core.Scheduler.Component; +using DotnetSpider.Core.Infrastructure; +using Newtonsoft.Json; +using System.Collections.Generic; +using StackExchange.Redis; +using DotnetSpider.Extension.Infrastructure; +using System; +using Polly; +using Polly.Retry; +using System.Linq; +using DotnetSpider.Common; +using DotnetSpider.Downloader; + +namespace DotnetSpider.Extension.Scheduler +{ + /// + /// Use Redis as url scheduler for distributed crawlers. + /// + public class RedisScheduler : DuplicateRemovedScheduler, IDuplicateRemover + { + private readonly object _locker = new object(); + private const string TasksKey = "dotnetspider:tasks"; + private readonly RetryPolicy _retryPolicy = Policy.Handle().Retry(30); + private string _queueKey; + private string _setKey; + private string _itemKey; + private string _errorCountKey; + private string _successCountKey; + private string _identityMd5; + private readonly AutomicLong _successCounter = new AutomicLong(0); + private readonly AutomicLong _errorCounter = new AutomicLong(0); + private readonly string _connectString; + private RedisConnection _redisConnection; + private readonly string _identity; + + /// + /// 批量加载时的每批次加载数 + /// + public int BatchCount { get; set; } = 1000; + + public override bool IsDistributed => true; + + /// + /// RedisScheduler是否会使用互联网 + /// + protected override bool UseInternet { get; set; } = true; + + /// + /// 构造方法 + /// + /// 对列标识 + public RedisScheduler(string identity) : this(identity, Env.RedisConnectString) + { + } + + /// + /// 构造方法 + /// + /// 对列标识 + /// Redis连接字符串 + public RedisScheduler(string identity, string connectString) + { + if (string.IsNullOrWhiteSpace(identity)) + { + throw new ArgumentNullException("identity should not be empty"); + } + if (string.IsNullOrWhiteSpace(connectString)) + { + throw new ArgumentNullException("connectString should not be empty"); + } + _identity = identity; + _connectString = connectString; + DuplicateRemover = this; + + var md5 = CryptoUtil.Md5Encrypt(identity); + _itemKey = $"dotnetspider:scheduler:{md5}:items"; + _setKey = $"dotnetspider:scheduler:{md5}:set"; + _queueKey = $"dotnetspider:scheduler:{md5}:queue"; + _errorCountKey = $"dotnetspider:scheduler:{md5}:countOfFailures"; + _successCountKey = $"dotnetspider:scheduler:{md5}:countOfSuccess"; + + _identityMd5 = md5; + + var action = new Action(() => + { + _redisConnection = (RedisConnection)Cache.Instance.Get(_connectString); + if (_redisConnection == null) + { + _redisConnection = new RedisConnection(_connectString); + Cache.Instance.Set(_connectString, _redisConnection); + } + _redisConnection.Database.SortedSetAdd(TasksKey, _identity, (long)DateTimeUtil.GetCurrentUnixTimeNumber()); + }); + + if (UseInternet) + { + NetworkCenter.Current.Execute("rds-init", action); + } + else + { + action(); + } + } + + /// + /// Reset duplicate check. + /// + public override void ResetDuplicateCheck() + { + var action = new Action(() => + { + _redisConnection.Database.KeyDelete(_setKey); + }); + if (UseInternet) + { + NetworkCenter.Current.Execute("rds-reset", action); + } + else + { + action(); + } + } + + /// + /// Check whether the request is duplicate. + /// + /// Request + /// Whether the request is duplicate. + public virtual bool IsDuplicate(Request request) + { + return _retryPolicy.Execute(() => + { + bool isDuplicate = _redisConnection.Database.SetContains(_setKey, request.Identity); + if (!isDuplicate) + { + _redisConnection.Database.SetAdd(_setKey, request.Identity); + } + return isDuplicate; + }); + } + + /// + /// 取得一个需要处理的请求对象 + /// + /// 请求对象 + public override Request Poll() + { + if (UseInternet) + { + return NetworkCenter.Current.Execute("rds-poll", ImplPollRequest); + } + else + { + return ImplPollRequest(); + } + } + + /// + /// 剩余链接数 + /// + public override long LeftRequestsCount + { + get + { + if (UseInternet) + { + return NetworkCenter.Current.Execute("rds-left", () => _redisConnection.Database.ListLength(_queueKey)); + } + else + { + return _redisConnection.Database.ListLength(_queueKey); + } + } + } + + /// + /// 总的链接数 + /// + public override long TotalRequestsCount + { + get + { + if (UseInternet) + { + return NetworkCenter.Current.Execute("rds-total", () => _redisConnection.Database.SetLength(_setKey)); + } + else + { + return _redisConnection.Database.SetLength(_setKey); + } + } + } + + /// + /// 采集成功的链接数 + /// + public override long SuccessRequestsCount => _successCounter.Value; + + /// + /// 采集失败的次数, 不是链接数, 如果一个链接采集多次都失败会记录多次 + /// + public override long ErrorRequestsCount => _errorCounter.Value; + + /// + /// 采集成功的链接数加 1 + /// + public override void IncreaseSuccessCount() + { + _successCounter.Inc(); + } + + /// + /// 采集失败的次数加 1 + /// + public override void IncreaseErrorCount() + { + _errorCounter.Inc(); + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public override void Dispose() + { + if (UseInternet) + { + NetworkCenter.Current.Execute("rds-inc-clear", () => + { + _redisConnection.Database.KeyDelete(_queueKey); + _redisConnection.Database.KeyDelete(_setKey); + _redisConnection.Database.KeyDelete(_itemKey); + _redisConnection.Database.KeyDelete(_successCountKey); + _redisConnection.Database.KeyDelete(_errorCountKey); + }); + } + else + { + _redisConnection.Database.KeyDelete(_queueKey); + _redisConnection.Database.KeyDelete(_setKey); + _redisConnection.Database.KeyDelete(_itemKey); + _redisConnection.Database.KeyDelete(_successCountKey); + _redisConnection.Database.KeyDelete(_errorCountKey); + } + } + + /// + /// 批量导入 + /// + /// 请求对象 + public override void Reload(IEnumerable requests) + { + var action = new Action(() => + { + lock (_locker) + { + int batchCount = BatchCount; + + var count = requests.Count(); + int cacheSize = count > batchCount ? batchCount : count; + RedisValue[] identities = new RedisValue[cacheSize]; + HashEntry[] items = new HashEntry[cacheSize]; + int i = 0; + int j = count % batchCount; + int n = count / batchCount; + + foreach (var request in requests) + { + identities[i] = request.Identity; + items[i] = new HashEntry(request.Identity, JsonConvert.SerializeObject(request)); + ++i; + if (i == batchCount) + { + --n; + + _redisConnection.Database.SetAdd(_setKey, identities); + _redisConnection.Database.ListRightPush(_queueKey, identities); + _redisConnection.Database.HashSet(_itemKey, items, CommandFlags.HighPriority); + + i = 0; + if (n != 0) + { + identities = new RedisValue[batchCount]; + items = new HashEntry[batchCount]; + } + else + { + identities = new RedisValue[j]; + items = new HashEntry[j]; + } + } + } + + if (i > 0) + { + _redisConnection.Database.SetAdd(_setKey, identities); + _redisConnection.Database.ListRightPush(_queueKey, identities); + _redisConnection.Database.HashSet(_itemKey, items); + } + } + }); + if (UseInternet) + { + NetworkCenter.Current.Execute("rds-import", action); + } + else + { + action(); + } + } + + /// + /// 把队列中的请求对象转换成List + /// + /// 请求对象的List + public HashSet ToList() + { + HashSet requests = new HashSet(); + Request request; + while ((request = Poll()) != null) + { + requests.Add(request); + } + return requests; + } + + /// + /// 如果链接不是重复的就添加到队列中 + /// + /// 请求对象 + protected override void PushWhenNoDuplicate(Request request) + { + _retryPolicy.Execute(() => + { + _redisConnection.Database.ListRightPush(_queueKey, request.Identity); + string field = request.Identity; + string value = JsonConvert.SerializeObject(request); + + _redisConnection.Database.HashSet(_itemKey, field, value); + }); + } + + private Request ImplPollRequest() + { + return _retryPolicy.Execute(() => + { + RedisValue value; + switch (TraverseStrategy) + { + case TraverseStrategy.Dfs: + { + value = _redisConnection.Database.ListRightPop(_queueKey); + break; + } + case TraverseStrategy.Bfs: + { + value = _redisConnection.Database.ListLeftPop(_queueKey); + break; + } + default: + { + throw new NotImplementedException(); + } + } + if (!value.HasValue) + { + return null; + } + string field = value.ToString(); + + string json = _redisConnection.Database.HashGet(_itemKey, field); + + if (!string.IsNullOrEmpty(json)) + { + var result = JsonConvert.DeserializeObject(json); + _redisConnection.Database.HashDelete(_itemKey, field); + return result; + } + return null; + }); + } + + #region For Test + + internal string GetQueueKey() + { + return _queueKey; + } + + internal string GetSetKey() + { + return _setKey; + } + + internal string GetItemKey() + { + return _itemKey; + } + + internal string GetErrorCountKey() + { + return _errorCountKey; + } + + internal string GetSuccessCountKey() + { + return _successCountKey; + } + + #endregion + } +}