Skip to content

Commit

Permalink
修复测试用例
Browse files Browse the repository at this point in the history
  • Loading branch information
zlzforever committed Feb 18, 2021
1 parent 3b35033 commit aae494e
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/DotnetSpider.MySql/Scheduler/MySqlQueueScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ public async Task InitializeAsync(string spiderId)

await using var conn = new MySqlConnection(_options.ConnectionString);
await conn.ExecuteAsync($@"
CREATE TABLE IF NOT EXISTS {spiderId}_set
CREATE TABLE IF NOT EXISTS `{spiderId}_set`
(
hash varchar(32) not null primary key,
request mediumblob not null,
state char(1),
creation_time timestamp default CURRENT_TIMESTAMP not null
);");
await conn.ExecuteAsync($@"
CREATE TABLE IF NOT EXISTS {spiderId}_queue
CREATE TABLE IF NOT EXISTS `{spiderId}_queue`
(
id int auto_increment primary key,
hash varchar(32) not null,
request mediumblob not null,
constraint {spiderId}_queue_hash_uindex unique (hash)
);");
_totalSql = $"SELECT COUNT(*) FROM {_spiderId}_set";
_resetDuplicateCheckSql = $"TRUNCATE {_spiderId}_set; TRUNCATE {_spiderId}_queue;";
_insertSetSql = $"INSERT IGNORE INTO {_spiderId}_set (hash, request) VALUES (@Hash, @Request); ";
_insertQueueSql = $"INSERT IGNORE INTO {_spiderId}_queue (hash, request) VALUES (@Hash, @Request);";
_successSql = $"UPDATE {_spiderId}_set SET state = 'S' WHERE hash = @Hash;";
_failSql = $"UPDATE {_spiderId}_set SET state = 'E' WHERE hash = @Hash;";
_totalSql = $"SELECT COUNT(*) FROM `{_spiderId}_set`";
_resetDuplicateCheckSql = $"TRUNCATE `{_spiderId}_set`; TRUNCATE {_spiderId}_queue;";
_insertSetSql = $"INSERT IGNORE INTO `{_spiderId}_set` (hash, request) VALUES (@Hash, @Request); ";
_insertQueueSql = $"INSERT IGNORE INTO `{_spiderId}_queue` (hash, request) VALUES (@Hash, @Request);";
_successSql = $"UPDATE `{_spiderId}_set` SET state = 'S' WHERE hash = @Hash;";
_failSql = $"UPDATE `{_spiderId}_set` SET state = 'E' WHERE hash = @Hash;";
}

public async Task<IEnumerable<Request>> DequeueAsync(int count = 1)
Expand Down Expand Up @@ -106,7 +106,7 @@ public async Task<IEnumerable<Request>> DequeueAsync(int count = 1)
var ids = string.Join(",", rows.Select(x => $"'{x["id"]}'"));
var hashes = string.Join(",", rows.Select(x => $"'{x["hash"]}'"));
await conn.ExecuteAsync(
$"DELETE FROM {_spiderId}_queue WHERE id IN ({ids}); UPDATE {_spiderId}_set SET state = 'P' WHERE hash IN ({hashes});",
$"DELETE FROM `{_spiderId}_queue` WHERE id IN ({ids}); UPDATE `{_spiderId}_set` SET state = 'P' WHERE hash IN ({hashes});",
null, transaction);
await transaction.CommitAsync();
var list = new List<Request>();
Expand Down Expand Up @@ -189,15 +189,15 @@ public virtual async Task ResetDuplicateCheckAsync()
public async Task CleanAsync()
{
await using var conn = new MySqlConnection(_options.ConnectionString);
await conn.ExecuteAsync($"DROP table {_spiderId}_set");
await conn.ExecuteAsync($"DROP table {_spiderId}_queue");
await conn.ExecuteAsync($"DROP table `{_spiderId}_set`");
await conn.ExecuteAsync($"DROP table `{_spiderId}_queue`");
}

public async Task ReloadAsync()
{
await using var conn = new MySqlConnection(_options.ConnectionString);
await conn.ExecuteAsync(
$@"INSERT IGNORE INTO {_spiderId}_queue (hash, request) SELECT hash, request FROM {_spiderId}_set where state = 'P'");
$@"INSERT IGNORE INTO `{_spiderId}_queue` (hash, request) SELECT hash, request FROM `{_spiderId}_set` where state = 'P'");
}

public async Task SuccessAsync(Request request)
Expand Down

0 comments on commit aae494e

Please sign in to comment.