diff --git a/src/Connection.php b/src/Connection.php index 05d1723..92281bf 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -3,12 +3,16 @@ namespace DesignMyNight\Elasticsearch; use Closure; -use DesignMyNight\Elasticsearch\Exceptions\BulkInsertQueryException; -use DesignMyNight\Elasticsearch\Exceptions\QueryException; +use DesignMyNight\Elasticsearch\Database\Schema\Blueprint; +use DesignMyNight\Elasticsearch\Database\Schema\ElasticsearchBuilder; +use DesignMyNight\Elasticsearch\Database\Schema\Grammars\ElasticsearchGrammar; use Elasticsearch\ClientBuilder; use Illuminate\Database\Connection as BaseConnection; use Illuminate\Database\Events\QueryExecuted; use Illuminate\Database\Grammar as BaseGrammar; +use Illuminate\Database\QueryException; +use Illuminate\Database\Schema\Builder; +use Illuminate\Support\Arr; class Connection extends BaseConnection { @@ -23,6 +27,24 @@ class Connection extends BaseConnection protected $requestTimeout; + /** + * Map configuration array keys with ES ClientBuilder setters + * + * @var array + */ + protected $configMappings = [ + 'sslVerification' => 'setSSLVerification', + 'sniffOnStart' => 'setSniffOnStart', + 'retries' => 'setRetries', + 'httpHandler' => 'setHandler', + 'connectionPool' => 'setConnectionPool', + 'connectionSelector' => 'setSelector', + 'serializer' => 'setSerializer', + 'connectionFactory' => 'setConnectionFactory', + 'endpoint' => 'setEndpoint', + 'namespaces' => 'registerNamespace', + ]; + /** * Create a new Elasticsearch connection instance. * @@ -31,15 +53,13 @@ class Connection extends BaseConnection public function __construct(array $config) { $this->config = $config; - - $this->indexSuffix = isset($config['suffix']) ? $config['suffix'] : ''; + $this->indexSuffix = $config['suffix'] ?? ''; // Extract the hosts from config - $hostsConfig = $config['hosts'] ?? $config['host']; - $hosts = explode(',', $hostsConfig); + $hosts = explode(',', $config['hosts'] ?? $config['host']); // You can pass options directly to the client - $options = array_get($config, 'options', []); + $options = Arr::get($config, 'options', []); // Create the connection $this->connection = $this->createConnection($hosts, $config, $options); @@ -49,204 +69,257 @@ public function __construct(array $config) } /** - * Create a new Elasticsearch connection. + * Dynamically pass methods to the connection. * - * @param array $hosts - * @param array $config + * @param string $method + * @param array $parameters * - * @return \Elasticsearch\Client + * @return mixed */ - protected function createConnection($hosts, array $config, array $options) + public function __call($method, $parameters) { - // apply config to each host - $hosts = array_map(function($host) use ($config) { - $port = !empty($config['port']) ? $config['port'] : 9200; - - $scheme = !empty($config['scheme']) ? $config['scheme'] : 'http'; - - // force https for port 443 - $scheme = (int) $port === 443 ? 'https' : $scheme; - - return [ - 'host' => $host, - 'port' => $port, - 'scheme' => $scheme, - 'user' => !empty($config['username']) ? $config['username'] : null, - 'pass' => !empty($config['password']) ? $config['password'] : null, - ]; - }, $hosts); - - return ClientBuilder::create() - ->setHosts($hosts) - ->setSelector('\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector') - ->build(); + return call_user_func_array([$this->connection, $method], $parameters); } /** - * Get the default query grammar instance. + * Run an SQL statement and get the number of rows affected. * - * @return \Illuminate\Database\Query\Grammars\Grammar + * @param string $query + * @param array $bindings + * + * @return int */ - protected function getDefaultQueryGrammar() + public function affectingStatement($query, $bindings = []) { - return $this->withIndexSuffix(new QueryGrammar); + // } /** - * Set the table prefix and return the grammar. + * Start a new database transaction. * - * @param \Illuminate\Database\Grammar $grammar - * @return \Illuminate\Database\Grammar + * @return void */ - public function withIndexSuffix(BaseGrammar $grammar) + public function beginTransaction() { - $grammar->setIndexSuffix($this->indexSuffix); - - return $grammar; + // } /** - * Get the default post processor instance. + * Commit the active database transaction. * - * @return Processor + * @return void */ - protected function getDefaultPostProcessor() + public function commit() { - return new QueryProcessor(); + // } /** - * Get the table prefix for the connection. - * - * @return string + * @param string $index + * @param string $name */ - public function getTablePrefix() + public function createAlias(string $index, string $name): void { - return $this->indexSuffix; + $this->indices()->putAlias(compact('index', 'name')); } /** - * Log a query in the connection's query log. + * @param string $index + * @param array $body + */ + public function createIndex(string $index, array $body): void + { + $this->indices()->create(compact('index', 'body')); + } + + /** + * Run a select statement against the database and return a generator. * - * @param string $query - * @param array $bindings - * @param float|null $time - * @return void + * @param string $query + * @param array $bindings + * @param bool $useReadPdo + * + * @return \Generator */ - public function logQuery($query, $bindings, $time = null) + public function cursor($query, $bindings = [], $useReadPdo = false) { - $this->event(new QueryExecuted(json_encode($query), $bindings, $time, $this)); + $scrollTimeout = '30s'; + $limit = $query['size'] ?? 0; - if ($this->loggingQueries) { - $this->queryLog[] = compact('query', 'bindings', 'time'); + $scrollParams = [ + 'scroll' => $scrollTimeout, + 'size' => 100, // Number of results per shard + 'index' => $query['index'], + 'body' => $query['body'], + ]; + + $results = $this->select($scrollParams); + + $scrollId = $results['_scroll_id']; + + $numResults = count($results['hits']['hits']); + + foreach ($results['hits']['hits'] as $result) { + yield $result; + } + + if (!$limit || $limit > $numResults) { + $limit = $limit ? $limit - $numResults : $limit; + + foreach ($this->scroll($scrollId, $scrollTimeout, $limit) as $result) { + yield $result; + } } } /** - * Set the table prefix in use by the connection. + * Run a delete statement against the database. * - * @param string $prefix - * @return void + * @param string $query + * @param array $bindings + * + * @return array */ - public function setIndexSuffix($suffix) + public function delete($query, $bindings = []) { - $this->indexSuffix = $suffix; + return $this->run( + $query, + $bindings, + Closure::fromCallable([$this->connection, 'deleteByQuery']) + ); + } - $this->getQueryGrammar()->setIndexSuffix($suffix); + /** + * @param string $index + */ + public function dropIndex(string $index): void + { + $this->indices()->delete(compact('index')); } /** - * Begin a fluent query against a database table. - * - * @param string $table - * @return \Illuminate\Database\Query\Builder + * Get the timeout for the entire Elasticsearch request + * @return float */ - public function table($table) + public function getRequestTimeout(): float { + return $this->requestTimeout; } /** - * Get a new raw query expression. - * - * @param mixed $value - * @return \Illuminate\Database\Query\Expression + * @return ElasticsearchBuilder|\Illuminate\Database\Schema\Builder */ - public function raw($value) + public function getSchemaBuilder() { + return new ElasticsearchBuilder($this); } /** - * Run a select statement and return a single result. + * @return ElasticsearchGrammar|\Illuminate\Database\Schema\Grammars\Grammar + */ + public function getSchemaGrammar() + { + return new ElasticsearchGrammar(); + } + + /** + * Get the table prefix for the connection. * - * @param string $query - * @param array $bindings - * @return mixed + * @return string */ - public function selectOne($query, $bindings = [], $useReadPdo = true) + public function getTablePrefix() { + return $this->indexSuffix; } /** - * Run a select statement against the database. + * Run an insert statement against the database. * - * @param array $params - * @param array $bindings - * @return array + * @param array $params + * @param array $bindings + * + * @return bool */ - public function select($params, $bindings = [], $useReadPdo = true) + public function insert($params, $bindings = []) { return $this->run( $this->addClientParams($params), $bindings, - Closure::fromCallable([$this->connection, 'search']) + Closure::fromCallable([$this->connection, 'bulk']) ); } /** - * Run a select statement against the database and return a generator. + * Log a query in the connection's query log. * - * @param string $query - * @param array $bindings - * @param bool $useReadPdo - * @return \Generator + * @param string $query + * @param array $bindings + * @param float|null $time + * + * @return void */ - public function cursor($query, $bindings = [], $useReadPdo = false) + public function logQuery($query, $bindings, $time = null) { - $scrollTimeout = '30s'; - $limit = $query['size'] ?? 0; - - $scrollParams = array( - 'scroll' => $scrollTimeout, - 'size' => 100, // Number of results per shard - 'index' => $query['index'], - 'body' => $query['body'] - ); - - $results = $this->select($scrollParams); + $this->event(new QueryExecuted(json_encode($query), $bindings, $time, $this)); - $scrollId = $results['_scroll_id']; + if ($this->loggingQueries) { + $this->queryLog[] = compact('query', 'bindings', 'time'); + } + } - $numResults = count($results['hits']['hits']); + /** + * Prepare the query bindings for execution. + * + * @param array $bindings + * + * @return array + */ + public function prepareBindings(array $bindings) + { + return $bindings; + } - foreach ($results['hits']['hits'] as $result) { - yield $result; - } + /** + * Execute the given callback in "dry run" mode. + * + * @param \Closure $callback + * + * @return array + */ + public function pretend(Closure $callback) + { + // + } - if (! $limit || $limit > $numResults) { - $limit = $limit ? $limit - $numResults : $limit; + /** + * Get a new raw query expression. + * + * @param mixed $value + * + * @return \Illuminate\Database\Query\Expression + */ + public function raw($value) + { + // + } - foreach ($this->scroll($scrollId, $scrollTimeout, $limit) as $result) { - yield $result; - } - } + /** + * Rollback the active database transaction. + * + * @return void + */ + public function rollBack($toLevel = null) + { + // } /** * Run a select statement against the database using an Elasticsearch scroll cursor. * - * @param string $scrollId - * @param string $scrollTimeout - * @param int $limit + * @param string $scrollId + * @param string $scrollTimeout + * @param int $limit + * * @return \Generator */ public function scroll(string $scrollId, string $scrollTimeout = '30s', int $limit = 0) @@ -254,12 +327,12 @@ public function scroll(string $scrollId, string $scrollTimeout = '30s', int $lim $numResults = 0; // Loop until the scroll 'cursors' are exhausted or we have enough results - while (! $limit || $numResults < $limit) { + while (!$limit || $numResults < $limit) { // Execute a Scroll request - $results = $this->connection->scroll(array( + $results = $this->connection->scroll([ 'scroll_id' => $scrollId, - 'scroll' => $scrollTimeout, - )); + 'scroll' => $scrollTimeout, + ]); // Get new scroll ID in case it's changed $scrollId = $results['_scroll_id']; @@ -282,228 +355,268 @@ public function scroll(string $scrollId, string $scrollTimeout = '30s', int $lim } /** - * Run an insert statement against the database. + * Run a select statement against the database. * * @param array $params * @param array $bindings - * @return bool - * @throws BulkInsertQueryException + * + * @return array */ - public function insert($params, $bindings = []) + public function select($params, $bindings = [], $useReadPdo = true) { - $result = $this->run( + return $this->run( $this->addClientParams($params), $bindings, - Closure::fromCallable([$this->connection, 'bulk']) + Closure::fromCallable([$this->connection, 'search']) ); - - if (empty($result['errors'])) { - throw new BulkInsertQueryException($result); - } - - return true; } /** - * Run an update statement against the database. + * Run a select statement and return a single result. * - * @param string $query - * @param array $bindings - * @return array + * @param string $query + * @param array $bindings + * + * @return mixed */ - public function update($query, $bindings = []) + public function selectOne($query, $bindings = [], $useReadPdo = true) { - return $this->run( - $query, - $bindings, - Closure::fromCallable([$this->connection, 'index']) - ); + // } /** - * Run a delete statement against the database. + * Get a new query builder instance. * - * @param string $query - * @param array $bindings - * @return array + * @return */ - public function delete($query, $bindings = []) + public function query() { - return $this->run( - $query, - $bindings, - Closure::fromCallable([$this->connection, 'delete']) + return new QueryBuilder( + $this, $this->getQueryGrammar(), $this->getPostProcessor() ); } /** - * Execute an SQL statement and return the boolean result. + * Set the table prefix in use by the connection. * - * @param string $query - * @param array $bindings - * @return bool + * @param string $prefix + * + * @return void */ - public function statement($query, $bindings = []) + public function setIndexSuffix($suffix) { + $this->indexSuffix = $suffix; + + $this->getQueryGrammar()->setIndexSuffix($suffix); } /** - * Run an SQL statement and get the number of rows affected. + * Get the timeout for the entire Elasticsearch request * - * @param string $query - * @param array $bindings - * @return int + * @param float $requestTimeout seconds + * + * @return self */ - public function affectingStatement($query, $bindings = []) + public function setRequestTimeout(float $requestTimeout): self { + $this->requestTimeout = $requestTimeout; + + return $this; } /** - * Run a raw, unprepared query against the PDO connection. + * Execute an SQL statement and return the boolean result. + * + * @param string $query + * @param array $bindings * - * @param string $query * @return bool */ - public function unprepared($query) + public function statement($query, $bindings = [], Blueprint $blueprint = null) { + // } /** - * Prepare the query bindings for execution. + * Execute a Closure within a transaction. * - * @param array $bindings - * @return array - */ - public function prepareBindings(array $bindings) - { - return $bindings; - } - - /** - * Run a search query. + * @param \Closure $callback + * @param int $attempts * - * @param array $query - * @param array $bindings - * @param \Closure $callback * @return mixed * - * @throws \DesignMyNight\Elasticsearch\Exceptions\QueryException + * @throws \Throwable */ - protected function runQueryCallback($query, $bindings, Closure $callback) + public function transaction(Closure $callback, $attempts = 1) { - try { - $result = $callback($query, $bindings); - } catch (Exception $e) { - throw new QueryException($query, null, $e); - } - - return $result; + // } /** - * Execute a Closure within a transaction. - * - * @param \Closure $callback - * @param int $attempts - * @return mixed + * Get the number of active transactions. * - * @throws \Throwable + * @return int */ - public function transaction(Closure $callback, $attempts = 1) + public function transactionLevel() { + // } /** - * Start a new database transaction. + * Run a raw, unprepared query against the PDO connection. * - * @return void + * @param string $query + * + * @return bool */ - public function beginTransaction() + public function unprepared($query) { + // } /** - * Commit the active database transaction. + * Run an update statement against the database. * - * @return void + * @param string $query + * @param array $bindings + * + * @return array */ - public function commit() + public function update($query, $bindings = []) { + $updateMethod = isset($query['body']['query']) ? 'updateByQuery' : 'update'; + return $this->run( + $query, + $bindings, + Closure::fromCallable([$this->connection, $updateMethod]) + ); } /** - * Rollback the active database transaction. - * - * @return void + * @param string $index + * @param array $body */ - public function rollBack($toLevel = null) + public function updateIndex(string $index, array $body): void { + $this->indices()->putMapping(compact('index', 'body')); } /** - * Get the number of active transactions. + * Set the table prefix and return the grammar. * - * @return int + * @param \Illuminate\Database\Grammar $grammar + * + * @return \Illuminate\Database\Grammar */ - public function transactionLevel() + public function withIndexSuffix(BaseGrammar $grammar) { + $grammar->setIndexSuffix($this->indexSuffix); + + return $grammar; } /** - * Execute the given callback in "dry run" mode. + * Add client-specific parameters to the request params + * + * @param array $params * - * @param \Closure $callback * @return array */ - public function pretend(Closure $callback) + protected function addClientParams(array $params): array { + if ($this->requestTimeout) { + $params['client']['timeout'] = $this->requestTimeout; + } + + return $params; } /** - * Get the timeout for the entire Elasticsearch request - * @return float + * Create a new Elasticsearch connection. + * + * @param array $hosts + * @param array $config + * + * @return \Elasticsearch\Client */ - public function getRequestTimeout(): float + protected function createConnection($hosts, array $config, array $options) { - return $this->requestTimeout; + // apply config to each host + $hosts = array_map(function ($host) use ($config) { + $port = !empty($config['port']) ? $config['port'] : 9200; + + $scheme = !empty($config['scheme']) ? $config['scheme'] : 'http'; + + // force https for port 443 + $scheme = (int) $port === 443 ? 'https' : $scheme; + + return [ + 'host' => $host, + 'port' => $port, + 'scheme' => $scheme, + 'user' => !empty($config['username']) ? $config['username'] : null, + 'pass' => !empty($config['password']) ? $config['password'] : null, + ]; + }, $hosts); + + $clientBuilder = ClientBuilder::create() + ->setHosts($hosts); + + $elasticConfig = config('elasticsearch.connections.' . config('elasticsearch.defaultConnection', 'default'), []); + // Set additional client configuration + foreach ($this->configMappings as $key => $method) { + $value = Arr::get($elasticConfig, $key); + if (is_array($value)) { + foreach ($value as $vItem) { + $clientBuilder->$method($vItem); + } + } elseif ($value !== null) { + $clientBuilder->$method($value); + } + } + + return $clientBuilder->build(); } /** - * Get the timeout for the entire Elasticsearch request - * @param float $requestTimeout seconds - * @return self + * Get the default post processor instance. + * + * @return Processor */ - public function setRequestTimeout(float $requestTimeout): self + protected function getDefaultPostProcessor() { - $this->requestTimeout = $requestTimeout; - - return $this; + return new QueryProcessor(); } /** - * Add client-specific parameters to the request params - * @param array $params - * @return array + * Get the default query grammar instance. + * + * @return \Illuminate\Database\Query\Grammars\Grammar */ - protected function addClientParams(array $params): array + protected function getDefaultQueryGrammar() { - if ($this->requestTimeout) { - $params['client']['timeout'] = $this->requestTimeout; - } - - return $params; + return $this->withIndexSuffix(new QueryGrammar); } /** - * Dynamically pass methods to the connection. + * Run a search query. * - * @param string $method - * @param array $parameters + * @param array $query + * @param array $bindings + * @param \Closure $callback * * @return mixed + * + * @throws \DesignMyNight\Elasticsearch\QueryException */ - public function __call($method, $parameters) + protected function runQueryCallback($query, $bindings, Closure $callback) { - return call_user_func_array([$this->connection, $method], $parameters); + try { + $result = $callback($query, $bindings); + } catch (\Exception $e) { + throw new QueryException($query, $bindings, $e); + } + + return $result; } }