Skip to content

Commit

Permalink
WIP - Add elasticsearch index configuration for settings, mapping and…
Browse files Browse the repository at this point in the history
… aliases
  • Loading branch information
fabianaromagnoli authored and mmenozzi committed Nov 22, 2024
1 parent 86bd7da commit fe7f537
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 26 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ flows:
initial_polling_interval: 1000 # Optional: initial polling delay that a worker waits when it has to wait for a dependency that is not idle
maximum_polling_interval: 60000 # Optional: maximum polling delay that a worker waits when it has to wait for a dependency that is not idle
polling_interval_multiplier: 2 # Optional: polling delay increase factor whenever a worker is waiting for a dependency that is not idle
es_index: # Optional: the create/update ElasticSearch index API body (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html). This is useful if you want to control index mapping, settings and aliases.
settings: # For example you can set the total_fields limit to an higher (or lower) value:
index:
es_index_settings: # Optional: the update ElasticSearch index API settings (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-index-settings-api-request-body). This is useful if you want to control index settings.
index: # For example you can set the total_fields limit to an higher (or lower) value
mapping:
total_fields:
limit: 2000
es_index_mapping: # Optional: the update ElasticSearch index API mapping (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#put-mapping-api-request-body). This is useful if you want to control index mapping.
properties:
title:
type: text
es_index_aliases: # Optional: the update ElasticSearch index API aliases (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-add-alias.html#add-alias-api-request-body). This is useful if you want to control index aliases.
my_alias:
is_hidden: true

other_flow_1:
# ...
Expand Down
18 changes: 13 additions & 5 deletions esb.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ services:
flows:
sample_flow: # The flow "code" and will be the Beanstalkd tube name
description: Sample Flow # The flow description
es_index: # Optional: the create/update ElasticSearch index API body (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html). This is useful if you want to control index mapping, settings and aliases.
settings: # For example you can set the total_fields limit to an higher (or lower) value:
index:
total_fields:
limit: 2000
es_index_settings: # Optional: the update ElasticSearch index API settings (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-index-settings-api-request-body). This is useful if you want to control index settings.
index:
mapping:
total_fields:
limit: 2000
es_index_mapping: # Optional: the update ElasticSearch index API mapping (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#put-mapping-api-request-body). This is useful if you want to control index mapping.
properties:
title:
type: text
es_index_aliases: # Optional: the update ElasticSearch index API aliases (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-add-alias.html#add-alias-api-request-body). This is useful if you want to control index aliases.
my_alias:
is_hidden: true

producer:
service: My\Esb\Producer # A producer service ID defined above
batch_size: 1200 # Jobs are produced in batches of 1200 jobs. Optional: default is 1000
Expand Down
4 changes: 3 additions & 1 deletion src/FlowConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public function getConfigTreeBuilder(): TreeBuilder
->arrayPrototype()
->children()
->scalarNode('description')->isRequired()->cannotBeEmpty()->end()
->variableNode('es_index')->defaultNull()->end()
->variableNode('es_index_settings')->defaultNull()->end()
->variableNode('es_index_mapping')->defaultNull()->end()
->variableNode('es_index_aliases')->defaultNull()->end()
->arrayNode('producer')
->children()
->scalarNode('service')->isRequired()->end()
Expand Down
27 changes: 25 additions & 2 deletions src/Model/FlowConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,31 @@ public function getProducerBatchSize(): int
/**
* @return array<array-key, mixed>|null
*/
public function getElasticSearchIndexCreateOrUpdateBody(): ?array
public function getElasticSearchIndexUpdateSettingsBody(): ?array
{
return $this->config['es_index'] ?? null;
return $this->config['es_index_settings'] ?? null;
}

/**
* @return array<array-key, mixed>|null
*/
public function getElasticSearchIndexUpdateMappingBody(): ?array
{
return $this->config['es_index_mapping'] ?? null;
}

/**
* @return array<array-key, mixed>|null
*/
public function getElasticSearchIndexUpdateAliasesBody(): ?array
{
return $this->config['es_index_aliases'] ?? null;
}

public function hasAdditionalIndexConfiguration(): bool
{
return $this->getElasticSearchIndexUpdateSettingsBody() !== null ||
$this->getElasticSearchIndexUpdateMappingBody() !== null ||
$this->getElasticSearchIndexUpdateAliasesBody() !== null;
}
}
44 changes: 41 additions & 3 deletions src/Service/ElasticSearch.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,49 @@ public function getClient(): Client
}

/**
* @param array<array-key, mixed>|null $createOrUpdateBody
* @return Amp\Promise<bool>
*/
public function setElasticSearchIndex(string $indexName, array $createOrUpdateBody = null): Amp\Promise
public function indexExists(string $indexName): Amp\Promise
{
return $this->client->createOrUpdateIndex($indexName, $createOrUpdateBody);
return Amp\call(function () use ($indexName) {
try {
$response = yield $this->client->existsIndex($indexName);
if ($response->getStatusCode() === 200) {
return true;
}
return false;
} catch (Error $error) {
if ($error->getCode() === 404) {
return false;
}
throw $error;
}
});
}

/**
* @param array<array-key, mixed>|null $indexName
*/
public function setElasticSearchIndex(string $indexName): Amp\Promise
{
return $this->client->createIndex($indexName);
}


public function setElasticSearchIndexSettings(string $indexName, array $updateSettingsBody = null): Amp\Promise
{
return $this->client->updateIndexSettings($indexName, $updateSettingsBody);
}


public function setElasticSearchIndexMapping(string $indexName, array $updateMappingBody = null): Amp\Promise
{
return $this->client->updateMappings($indexName, $updateMappingBody);
}

public function setElasticSearchIndexAlias(string $indexName, string $aliasName, array $aliasBody = null): Amp\Promise
{
return $this->client->createOrUpdateAlias($indexName, $aliasName, $aliasBody);
}

/**
Expand Down
68 changes: 56 additions & 12 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,62 @@ public function __construct(
public function boot(): Promise
{
return call(function () {
if ($this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() !== null) {
yield $this->elasticSearch->setElasticSearchIndex(
$this->flowConfig->getTube(),
$this->flowConfig->getElasticSearchIndexCreateOrUpdateBody()
);
$this->logger->debug(
'Successfully set ElasticSearch index',
[
'index' => $this->flowConfig->getTube(),
'body' => $this->flowConfig->getElasticSearchIndexCreateOrUpdateBody()
]
);
if ($this->flowConfig->hasAdditionalIndexConfiguration()) {
$indexExists = yield $this->elasticSearch->indexExists($this->flowConfig->getTube());
if (!$indexExists) {
yield $this->elasticSearch->setElasticSearchIndex($this->flowConfig->getTube());
$this->logger->debug(
'Successfully created ElasticSearch index',
['index' => $this->flowConfig->getTube()]
);
}

if ($this->flowConfig->getElasticSearchIndexUpdateSettingsBody()) {
yield $this->elasticSearch->setElasticSearchIndexSettings(
$this->flowConfig->getTube(),
$this->flowConfig->getElasticSearchIndexUpdateSettingsBody()
);
$this->logger->debug(
'Successfully set ElasticSearch index settings',
[
'index' => $this->flowConfig->getTube(),
'body' => $this->flowConfig->getElasticSearchIndexUpdateSettingsBody()
]
);
}

if ($this->flowConfig->getElasticSearchIndexUpdateMappingBody()) {
yield $this->elasticSearch->setElasticSearchIndexMapping(
$this->flowConfig->getTube(),
$this->flowConfig->getElasticSearchIndexUpdateMappingBody()
);
$this->logger->debug(
'Successfully set ElasticSearch index mapping',
[
'index' => $this->flowConfig->getTube(),
'body' => $this->flowConfig->getElasticSearchIndexUpdateMappingBody()
]
);
}

if ($this->flowConfig->getElasticSearchIndexUpdateAliasesBody()) {
$elasticSearchIndexUpdateAliases = $this->flowConfig->getElasticSearchIndexUpdateAliasesBody();
foreach ($elasticSearchIndexUpdateAliases as $aliasName => $aliasBody) {
yield $this->elasticSearch->setElasticSearchIndexAlias(
$this->flowConfig->getTube(),
$aliasName,
$aliasBody
);
$this->logger->debug(
'Successfully set ElasticSearch index alias',
[
'index' => $this->flowConfig->getTube(),
'alias' => $aliasName,
'body' => $aliasBody
]
);
}
}
}
//Producer
yield $this->beanstalkClient->use($this->flowConfig->getTube());
Expand Down
55 changes: 55 additions & 0 deletions tests/Integration/ElasticSearchIndexingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,61 @@ function ($log) {
$this->assertCount(1, $successfullyIndexedLog);
}

/**
* @test
*/
public function itUpdatesIndexSettingsAccordingToFlowConfigTest()
{
$producerDir = vfsStream::url('root/producer_dir');
$workerFile = vfsStream::url('root/worker.data');
self::createKernel([
'services' => [
DummyFilesystemRepeatProducer::class => ['arguments' => [$producerDir]],
DummyFilesystemWorker::class => ['arguments' => [$workerFile]],
],
'flows' => [
self::FLOW_CODE => [
'description' => 'ElasticSearch Indexing Test Repeat Flow',
'es_index_settings' => ['index' => ['mapping' => ['total_fields' => ['limit' => 2000]]]],
'producer' => ['service' => DummyFilesystemRepeatProducer::class],
'worker' => ['service' => DummyFilesystemWorker::class],
]
]
]);
mkdir($producerDir);
Loop::delay(
200,
function () use ($producerDir) {
$veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value'));
file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument);
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
$this->stopWhen(function () {
$successLog = array_filter(
$this->logHandler()->getRecords(),
function ($log) {
return strpos($log['message'], 'Successfully worked a Job') !== false;
}
);
return count($successLog) >= 2;
});
self::$kernel->boot();

Promise\wait($this->esClient->refresh());
$search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, ''));
$this->assertCount(1, $search['hits']['hits']);
$this->assertFalse($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch'));
$logRecords = $this->logHandler()->getRecords();
$successfullyIndexedLog = array_filter(
$logRecords,
function ($log) {
return strpos($log['message'], 'Successfully enqueued a new Job') !== false;
}
);
$this->assertCount(2, $successfullyIndexedLog);
}

private function assertForEachJob(callable $callable, array $jobsData)
{
/** @var Serializer $serializer */
Expand Down

0 comments on commit fe7f537

Please sign in to comment.