Skip to content

Commit

Permalink
Merge pull request #36 from notpron/2.x-task-type-filter
Browse files Browse the repository at this point in the history
Filter tasks by task name
  • Loading branch information
tersmitten authored Aug 6, 2019
2 parents c68abc3 + 39b58fa commit e07aef6
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
language: php

services:
- mysql

php:
- 7.0
- 7.1
Expand Down Expand Up @@ -36,6 +39,7 @@ matrix:
branches:
only:
- master
- 2.x

before_script:
- git clone -b master https://github.com/Oefenweb/travis --depth 1 ../travis
Expand Down
37 changes: 35 additions & 2 deletions Console/Command/QueueShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
App::uses('Folder', 'Utility');
App::uses('QueuedTask', 'Model');
App::uses('AppShell', 'Console/Command');
App::uses('CakeText', 'Utility');

declare(ticks = 1);

Expand Down Expand Up @@ -119,7 +120,14 @@ public function getOptionParser() {
])->addSubcommand('runworker', [
'help' => __d('queue', 'Run a queue worker.'),
'parser' => [
'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')]
'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')],
'options' => [
'type' => [
'short' => 't',
'help' => 'Type (comma separated list possible)',
'default' => null
]
]
]
])->addSubcommand('stats', [
'help' => __d('queue', 'Display general statistics.'),
Expand Down Expand Up @@ -204,10 +212,14 @@ public function runworker() {
$this->__exit = false;

$workerStartTime = time();

$typesParam = $this->param('type');
$types = is_string($typesParam) ? $this->_stringToArray($typesParam) : [];

while (!$this->__exit) {
$this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);

$data = $this->QueuedTask->requestJob($this->_getTaskConf());
$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $types);
if ($this->QueuedTask->exit === true) {
$this->__exit = true;
} else {
Expand Down Expand Up @@ -381,4 +393,25 @@ public function signalHandler($signalNumber) {
}
}

/**
* Converts string to array
*
* @param string|null $param String to convert
* @return array
*/
protected function _stringToArray(string $param = null) : array {
if (!$param) {
return [];
}

$array = CakeText::tokenize($param);
if (is_string($array)) {
return [
$array
];
}

return array_filter($array);
}

}
38 changes: 36 additions & 2 deletions Model/QueuedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ public function createJob($taskName, $data, $notBefore = null) {
* Looks for a new job that can be processed with the current abilities
*
* @param array $capabilities Available queue worker tasks.
* @param array $types Request a job from these types (or exclude certain types), or any otherwise.
* @return mixed Job data or false.
*/
public function requestJob($capabilities) {
public function requestJob($capabilities, array $types = []) {
$idlist = [];
$wasFetched = [];

Expand All @@ -64,6 +65,10 @@ public function requestJob($capabilities) {
];
$limit = Configure::read('Queue.workers');

if ($types) {
$conditions = $this->_addFilter($conditions, 'task', $types);
}

// Generate the job specific conditions.
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
Expand Down Expand Up @@ -179,7 +184,7 @@ public function getLength($taskName = null) {
* @return array A list of task names
*/
public function getTypes() {
$fields = ['task'];
$fields = ['task', 'task'];
$group = ['task'];

return $this->find('list', compact('fields', 'group'));
Expand Down Expand Up @@ -246,4 +251,33 @@ public function cleanFailedJobs($capabilities) {
return $this->deleteAll($conditions, false);
}

/**
* Filters field `key` based on the provided values. Values prefixed with '-' are excluded.
*
* @param array $conditions Conditions
* @param string $key Key
* @param array $values Values
* @return array the conditions
*/
protected function _addFilter(array $conditions, $key, array $values) : array {
$include = [];
$exclude = [];
foreach ($values as $value) {
if (substr($value, 0, 1) === '-') {
$exclude[] = substr($value, 1);
} else {
$include[] = $value;
}
}

if ($include) {
$conditions[$key . ' IN'] = $include;
}
if ($exclude) {
$conditions[$key . ' NOT IN'] = $exclude;
}

return $conditions;
}

}
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,16 @@ Console/cake Queue.queue clean;
# Manually call cleanup_failed function to delete task data of failed tasks.
Console/cake Queue.queue clean_failed;
```

#### Running only specific tasks per worker
You can filter "running" by type:

```
Console/cake Queue.queue runworker -t MyType,AnotherType,-ThisOneToo
Console/cake Queue.queue runworker -t "-ThisOneNot"
```

Use `-` prefix to exclude. Note that you might need to use `""` around the value then to avoid it being seen as option key.

That can be helpful when migrating servers and you only want to execute certain ones on the new system or want to test specific servers.

0 comments on commit e07aef6

Please sign in to comment.