From f3f1d484498c41ac4b296b3edec7debc44510294 Mon Sep 17 00:00:00 2001 From: Will Taylor-Jackson Date: Wed, 3 Jan 2018 12:11:47 +0000 Subject: [PATCH] feat: initial commit --- README.md | 2 +- composer.json | 31 + .../Elasticsearch/Connection.php | 389 +++++++++ .../ElasticsearchServiceProvider.php | 19 + .../Elasticsearch/EloquentBuilder.php | 121 +++ .../Elasticsearch/QueryBuilder.php | 402 ++++++++++ .../Elasticsearch/QueryGrammar.php | 756 ++++++++++++++++++ .../Elasticsearch/QueryProcessor.php | 77 ++ 8 files changed, 1796 insertions(+), 1 deletion(-) create mode 100644 composer.json create mode 100644 src/DesignMyNight/Elasticsearch/Connection.php create mode 100644 src/DesignMyNight/Elasticsearch/ElasticsearchServiceProvider.php create mode 100644 src/DesignMyNight/Elasticsearch/EloquentBuilder.php create mode 100644 src/DesignMyNight/Elasticsearch/QueryBuilder.php create mode 100644 src/DesignMyNight/Elasticsearch/QueryGrammar.php create mode 100644 src/DesignMyNight/Elasticsearch/QueryProcessor.php diff --git a/README.md b/README.md index 2546dc0..fa4b716 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# eloquent-elasticsearch +# laravel-elasticsearch Use Elasticsearch as a database in Laravel to retrieve Eloquent models and perform aggregations. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..422aedc --- /dev/null +++ b/composer.json @@ -0,0 +1,31 @@ +{ + "name": "designmynight/laravel-elasticsearch", + "description": "Use Elasticsearch as a database in Laravel to retrieve Eloquent models and perform aggregations.", + "keywords": ["laravel","eloquent","elasticsearch","database","model"], + "homepage": "https://github.com/jenssegers/laravel-mongodb", + "authors": [ + { + "name": "DesignMyNight team", + "homepage": "https://designmynight.com" + } + ], + "license" : "MIT", + "require": { + "php": "^7.0.0", + "illuminate/support": "^5.5", + "illuminate/database": "^5.5", + "elasticsearch/elasticsearch": "^5.2" + }, + "autoload": { + "psr-0": { + "DesignMyNight\\Elasticsearch": "src/" + } + }, + "extra": { + "laravel": { + "providers": [ + "DesignMyNight\\Elasticsearch\\ElasticsearchServiceProvider" + ] + } + } +} \ No newline at end of file diff --git a/src/DesignMyNight/Elasticsearch/Connection.php b/src/DesignMyNight/Elasticsearch/Connection.php new file mode 100644 index 0000000..22e278e --- /dev/null +++ b/src/DesignMyNight/Elasticsearch/Connection.php @@ -0,0 +1,389 @@ +config = $config; + + $this->indexSuffix = isset($config['suffix']) ? $config['suffix'] : ''; + + // Extract the hosts from config + $hosts = explode(',', $config['host']); + + // You can pass options directly to the client + $options = array_get($config, 'options', []); + + // Create the connection + $this->connection = $this->createConnection($hosts, $config, $options); + + $this->useDefaultQueryGrammar(); + $this->useDefaultPostProcessor(); + } + + /** + * Create a new Elasticsearch connection. + * + * @param array $hosts + * @param array $config + * + * @return \Elasticsearch\Client + */ + protected function createConnection($hosts, array $config, array $options) + { + return ClientBuilder::create() + ->setHosts($hosts) + ->setSelector('\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector') + ->build(); + } + + /** + * Get the default query grammar instance. + * + * @return \Illuminate\Database\Query\Grammars\Grammar + */ + protected function getDefaultQueryGrammar() + { + return $this->withIndexSuffix(new QueryGrammar); + } + + /** + * Set the table prefix and return the grammar. + * + * @param \Illuminate\Database\Grammar $grammar + * @return \Illuminate\Database\Grammar + */ + public function withIndexSuffix(BaseGrammar $grammar) + { + $grammar->setIndexSuffix($this->indexSuffix); + + return $grammar; + } + + /** + * Get the default post processor instance. + * + * @return Processor + */ + protected function getDefaultPostProcessor() + { + return new QueryProcessor(); + } + + /** + * Get the table prefix for the connection. + * + * @return string + */ + public function getTablePrefix() + { + return $this->indexSuffix; + } + + /** + * Set the table prefix in use by the connection. + * + * @param string $prefix + * @return void + */ + public function setIndexSuffix($suffix) + { + $this->indexSuffix = $suffix; + + $this->getQueryGrammar()->setIndexSuffix($suffix); + } + + /** + * Begin a fluent query against a database table. + * + * @param string $table + * @return \Illuminate\Database\Query\Builder + */ + public function table($table) + { + + } + + /** + * Get a new raw query expression. + * + * @param mixed $value + * @return \Illuminate\Database\Query\Expression + */ + public function raw($value) + { + + } + + /** + * Run a select statement and return a single result. + * + * @param string $query + * @param array $bindings + * @return mixed + */ + public function selectOne($query, $bindings = []) + { + + } + + /** + * Run a select statement against the database. + * + * @param array $params + * @param array $bindings + * @return array + */ + public function select($params, $bindings = []) + { + return $this->connection->search($params); + } + + /** + * Run a select statement against the database using an Elasticsearch scroll cursor. + * + * @param array $params + * @param array $bindings + * @return array + */ + public function scrollSelect($params, $bindings = []) + { + $scrollTimeout = '30s'; + + $scrollParams = array( + 'scroll' => $scrollTimeout, + 'size' => min($params['body']['size'], 5000), + 'index' => $params['index'], + 'body' => $params['body'] + ); + + $results = $this->select($scrollParams); + + $scrollId = $results['_scroll_id']; + + $numFound = $results['hits']['total']; + + $numResults = count($results['hits']['hits']); + + if ( $params['body']['size'] > $numResults ){ + $results['scrollCursor'] = $this->scroll($scrollId, $scrollTimeout, $params['body']['size'] - $numResults); + } + + return $results; + } + + /** + * Run a select statement against the database using an Elasticsearch scroll cursor. + */ + public function scroll($scrollId, $scrollTimeout, $limit){ + $numResults = 0; + + // Loop until the scroll 'cursors' are exhausted or we have enough results + while ($numResults < $limit) { + // Execute a Scroll request + $results = $this->connection->scroll(array( + 'scroll_id' => $scrollId, + 'scroll' => $scrollTimeout, + )); + + // Get new scroll ID in case it's changed + $scrollId = $results['_scroll_id']; + + // Break if no results + if (empty($results['hits']['hits'])) { + break; + } + + foreach ( $results['hits']['hits'] as $result ){ + $numResults++; + + if ( $numResults > $limit){ + break; + } + + yield $result; + } + } + } + + /** + * Run an insert statement against the database. + * + * @param array $query + * @param array $bindings + * @return bool + */ + public function insert($params, $bindings = []) + { + return $this->connection->bulk($params); + } + + /** + * Run an update statement against the database. + * + * @param string $query + * @param array $bindings + * @return int + */ + public function update($query, $bindings = []) + { + return $this->connection->index($params); + } + + /** + * Run a delete statement against the database. + * + * @param string $query + * @param array $bindings + * @return int + */ + public function delete($query, $bindings = []) + { + + } + + /** + * Execute an SQL statement and return the boolean result. + * + * @param string $query + * @param array $bindings + * @return bool + */ + public function statement($query, $bindings = []) + { + + } + + /** + * Run an SQL statement and get the number of rows affected. + * + * @param string $query + * @param array $bindings + * @return int + */ + public function affectingStatement($query, $bindings = []) + { + + } + + /** + * Run a raw, unprepared query against the PDO connection. + * + * @param string $query + * @return bool + */ + public function unprepared($query) + { + + } + + /** + * Prepare the query bindings for execution. + * + * @param array $bindings + * @return array + */ + public function prepareBindings(array $bindings) + { + + } + + /** + * Execute a Closure within a transaction. + * + * @param \Closure $callback + * @param int $attempts + * @return mixed + * + * @throws \Throwable + */ + public function transaction(Closure $callback, $attempts = 1) + { + + } + + /** + * Start a new database transaction. + * + * @return void + */ + public function beginTransaction() + { + + } + + /** + * Commit the active database transaction. + * + * @return void + */ + public function commit() + { + + } + + /** + * Rollback the active database transaction. + * + * @return void + */ + public function rollBack() + { + + } + + /** + * Get the number of active transactions. + * + * @return int + */ + public function transactionLevel() + { + + } + + /** + * Execute the given callback in "dry run" mode. + * + * @param \Closure $callback + * @return array + */ + public function pretend(Closure $callback) + { + + } + + /** + * Dynamically pass methods to the connection. + * + * @param string $method + * @param array $parameters + * + * @return mixed + */ + public function __call($method, $parameters) + { + return call_user_func_array([$this->connection, $method], $parameters); + } +} diff --git a/src/DesignMyNight/Elasticsearch/ElasticsearchServiceProvider.php b/src/DesignMyNight/Elasticsearch/ElasticsearchServiceProvider.php new file mode 100644 index 0000000..b8620a0 --- /dev/null +++ b/src/DesignMyNight/Elasticsearch/ElasticsearchServiceProvider.php @@ -0,0 +1,19 @@ +app->resolving('db', function ($db) { + $db->extend('elasticsearch', function ($config) { + return new Connection($config); + }); + }); + } +} diff --git a/src/DesignMyNight/Elasticsearch/EloquentBuilder.php b/src/DesignMyNight/Elasticsearch/EloquentBuilder.php new file mode 100644 index 0000000..a5ebe03 --- /dev/null +++ b/src/DesignMyNight/Elasticsearch/EloquentBuilder.php @@ -0,0 +1,121 @@ +model = $model; + + $this->query->from($model->getTable()); + + $this->query->type($model->getSearchType()); + + return $this; + } + + /** + * Execute the query as a "select" statement. + * + * @param array $columns + * @return \Illuminate\Database\Eloquent\Collection|static[] + */ + public function get($columns = ['*']) + { + $builder = $this->applyScopes(); + + $models = $builder->getModels($columns); + + // If we got a generator response then we'll return it without eager loading + if ($models instanceof Generator){ + // Throw an exception if relations were supposed to be eager loaded + if ($this->eagerLoad){ + throw new Exception('Eager loading relations is not currently supported with Generator responses from a scroll search'); + } + + return $models; + } + // If we actually found models we will also eager load any relationships that + // have been specified as needing to be eager loaded, which will solve the + // n+1 query issue for the developers to avoid running a lot of queries. + else if (count($models) > 0) { + $models = $builder->eagerLoadRelations($models); + } + + return $builder->getModel()->newCollection($models); + } + + /** + * Get the hydrated models without eager loading. + * + * @param array $columns + * @return \Illuminate\Database\Eloquent\Model[]|Generator + */ + public function getModels($columns = ['*']) + { + $results = $this->query->get($columns); + + if ($results instanceof Generator){ + return $this->yieldResults($results); + } + else { + return $this->model->hydrate( + $results->all() + )->all(); + } + } + + /** + * Return new models as a generator + */ + protected function yieldResults($results) + { + $instance = $this->newModelInstance(); + + foreach ( $results as $result ){ + yield $instance->newFromBuilder($result); + } + } + + /** + * Paginate the given query. + * + * @param int $perPage + * @param array $columns + * @param string $pageName + * @param int|null $page + * @return \Illuminate\Contracts\Pagination\LengthAwarePaginator + * + * @throws \InvalidArgumentException + */ + public function paginate($perPage = null, $columns = ['*'], $pageName = 'page', $page = null) + { + $page = $page ?: Paginator::resolveCurrentPage($pageName); + + $perPage = $perPage ?: $this->model->getPerPage(); + + $results = $this->forPage($page, $perPage)->get($columns); + + $total = $this->toBase()->getCountForPagination(); + + return new LengthAwarePaginator($results, $total, $perPage, $page, [ + 'path' => Paginator::resolveCurrentPath(), + 'pageName' => $pageName, + ]); + } +} diff --git a/src/DesignMyNight/Elasticsearch/QueryBuilder.php b/src/DesignMyNight/Elasticsearch/QueryBuilder.php new file mode 100644 index 0000000..729fa28 --- /dev/null +++ b/src/DesignMyNight/Elasticsearch/QueryBuilder.php @@ -0,0 +1,402 @@ +type = $type; + + return $this; + } + + /** + * Add a where between statement to the query. + * + * @param string $column + * @param array $values + * @param string $boolean + * @param bool $not + * @return $this + */ + public function whereBetween($column, array $values, $boolean = 'and', $not = false) + { + $type = 'Between'; + + $this->wheres[] = compact('column', 'values', 'type', 'boolean', 'not'); + + return $this; + } + + /** + * Add a 'distance from point' statement to the query. + * + * @param string $column + * @param array $coords + * @param string $distance + * @param string $boolean + * @param bool $not + * @return $this + */ + public function whereGeoDistance($column, array $location, $distance, $boolean = 'and', $not = false) + { + $type = 'GeoDistance'; + + $this->wheres[] = compact('column', 'location', 'distance', 'type', 'boolean', 'not'); + + return $this; + } + + /** + * Add a 'distance from point' statement to the query. + * + * @param string $column + * @param array $bounds + * @return $this + */ + public function whereGeoBoundsIn($column, array $bounds) + { + $type = 'GeoBoundsIn'; + + $this->wheres[] = [ + 'column' => $column, + 'bounds' => $bounds, + 'type' => 'GeoBoundsIn', + 'boolean' => 'and', + 'not' => false + ]; + + return $this; + } + + /** + * Add a "where date" statement to the query. + * + * @param string $column + * @param string $operator + * @param mixed $value + * @param string $boolean + * @return \Illuminate\Database\Query\Builder|static + */ + public function whereDate($column, $operator, $value = null, $boolean = 'and', $not = false) + { + list($value, $operator) = $this->prepareValueAndOperator( + $value, $operator, func_num_args() == 2 + ); + + $type = 'Date'; + + $this->wheres[] = compact('column', 'operator', 'value', 'type', 'boolean', 'not'); + + return $this; + } + + /** + * Add a 'nested document' statement to the query. + * + * @param string $column + * @param \Illuminate\Database\Query\Builder|static $query + * @param string $boolean + * @return $this + */ + public function whereNestedDoc($column, $query, $boolean = 'and') + { + $type = 'NestedDoc'; + + if ( $query instanceof Closure ){ + call_user_func($query, $query = $this->newQuery()); + } + + $this->wheres[] = compact('column', 'query', 'type', 'boolean'); + + return $this; + } + + /** + * Add another query builder as a nested where to the query builder. + * + * @param \Illuminate\Database\Query\Builder|static $query + * @param string $boolean + * @return $this + */ + public function addNestedWhereQuery($query, $boolean = 'and') + { + $type = 'Nested'; + + $compiled = compact('type', 'query', 'boolean'); + + if (count($query->wheres)) { + $this->wheres[] = $compiled; + } + + if (count($query->filters)) { + $this->filters[] = $compiled; + } + + return $this; + } + + /** + * Add any where clause with given options. + * + * @return $this + */ + public function whereWithOptions(...$args) + { + $options = array_pop($args); + $type = array_shift($args); + $method = $type == 'Basic' ? 'where' : 'where' . $type; + + $this->$method(...$args); + + $this->wheres[count($this->wheres) -1]['options'] = $options; + + return $this; + } + + /** + * Add a filter query by calling the required 'where' method + * and capturing the added where as a filter + */ + public function dynamicFilter($method, $args){ + $method = lcfirst(substr($method, 6)); + + $numWheres = count($this->wheres); + + $this->$method(...$args); + + $filterType = array_pop($args) === 'postFilter' ? 'postFilters' : 'filters'; + + if ( count($this->wheres) > $numWheres ){ + $this->$filterType[] = array_pop($this->wheres); + } + + return $this; + } + + /** + * Add a text search clause to the query. + * + * @param string $query + * @param array $options + * @param string $boolean + * @return $this + */ + public function search($query, $options = [], $boolean = 'and') + { + $this->wheres[] = [ + 'type' => 'Search', + 'value' => $query, + 'boolean' => $boolean, + 'options' => $options, + ]; + + return $this; + } + + /** + * Add a parent where statement to the query. + * + * @param \Closure $callback + * @param string $boolean + * @return \Illuminate\Database\Query\Builder|static + */ + public function whereParent($documentType, $query, $boolean = 'and') + { + $this->wheres[] = [ + 'type' => 'Parent', + 'documentType' => $documentType, + 'value' => $query, + 'boolean' => $boolean + ]; + + return $this; + } + + /** + * Add a child where statement to the query. + * + * @param \Closure $callback + * @param \Illuminate\Database\Query\Builder $query + * @param string $boolean + * + * @return \Illuminate\Database\Query\Builder|static + */ + public function whereChild($documentType, $query, $boolean = 'and') + { + $this->wheres[] = [ + 'type' => 'Child', + 'documentType' => $documentType, + 'value' => $query, + 'boolean' => $boolean + ]; + + return $this; + } + + /** + * @param $key + * @param $type + * @param null $args + * @param null $aggregations + */ + public function aggregation($key, $type, $args = null, $aggregations = null) + { + if ( $args instanceof Closure ){ + call_user_func($args, $args = $this->newQuery()); + } + + if ( $aggregations instanceof Closure ){ + call_user_func($aggregations, $aggregations = $this->newQuery()); + } + + $this->aggregations[] = compact( + 'key', 'type', 'args', 'aggregations' + ); + } + + public function orderBy($column, $direction = 1, $options = null) + { + if (is_string($direction)) { + $direction = strtolower($direction) == 'asc' ? 1 : -1; + } + + $type = isset($options['type']) ? $options['type'] : 'basic'; + + $this->orders[] = compact('column', 'direction', 'type', 'options'); + + return $this; + } + + public function getAggregationResults(){ + return $this->processor->getAggregationResults(); + } + + /** + * Execute the query as a "select" statement. + * + * @param array $columns + * @return \Illuminate\Support\Collection|Generator + */ + public function get($columns = ['*']) + { + $original = $this->columns; + + if (is_null($original)) { + $this->columns = $columns; + } + + $results = $this->processor->processSelect($this, $this->runSelect()); + + $this->columns = $original; + + return $this->shouldUseScroll() ? $results : collect($results); + } + + /** + * Run the query as a "select" statement against the connection. + * + * @return array + */ + protected function runSelect() + { + if ($this->shouldUseScroll()){ + $this->rawResponse = $this->connection->scrollSelect($this->toCompiledQuery()); + } + else { + $this->rawResponse = $this->connection->select($this->toCompiledQuery()); + } + + return $this->rawResponse; + } + + /** + * Determine whether to use an Elasticsearch scroll cursor for the query. + * + * @return self + */ + public function usingScroll(bool $useScroll = true): self + { + $this->scrollSelect = $useScroll; + + return $this; + } + + /** + * Determine whether to use an Elasticsearch scroll cursor for the query. + * + * @return bool + */ + public function shouldUseScroll(): bool + { + return !!$this->scrollSelect; + } + + /** + * Get the count of the total records for the paginator. + * + * @param array $columns + * @return int + */ + public function getCountForPagination($columns = ['*']) + { + if ($rawResponse = $this->processor->getRawResponse()) { + return $rawResponse['hits']['total']; + } + } + + /** + * Get the time it took Elasticsearch to perform the query + * + * @return int time in milliseconds + */ + public function getSearchDuration() + { + if ($rawResponse = $this->processor->getRawResponse()) { + return $rawResponse['took']; + } + } + + /** + * Get the Elasticsearch representation of the query. + * + * @return string + */ + public function toCompiledQuery() + { + return $this->toSql(); + } + + public function __call($method, $parameters) + { + if (Str::startsWith($method, 'filterWhere')) { + return $this->dynamicFilter($method, $parameters); + } + + return parent::__call($method, $parameters); + } +} diff --git a/src/DesignMyNight/Elasticsearch/QueryGrammar.php b/src/DesignMyNight/Elasticsearch/QueryGrammar.php new file mode 100644 index 0000000..e9bb2b3 --- /dev/null +++ b/src/DesignMyNight/Elasticsearch/QueryGrammar.php @@ -0,0 +1,756 @@ +compileWheres($builder); + + $params = [ + 'index' => $builder->from . $this->indexSuffix, + 'body' => [ + '_source' => $builder->columns && !in_array('*', $builder->columns) ? $builder->columns : true, + 'query' => $query['query'] + ], + ]; + + if ($query['filter']){ + $params['body']['query']['bool']['filter'] = $query['filter']; + } + + if ($query['postFilter']){ + $params['body']['post_filter'] = $query['postFilter']; + } + + if ($builder->aggregations) { + $params['body']['aggregations'] = $this->compileAggregations($builder); + } + + // Apply order, offset and limit + if ($builder->orders) { + $params['body']['sort'] = $this->compileOrders($builder, $builder->orders); + } + + if ($builder->offset) { + $params['body']['from'] = $builder->offset; + } + + if (isset($builder->limit)) { + $params['body']['size'] = $builder->limit; + } + + if (!$params['body']['query']) { + unset($params['body']['query']); + } + + // print "
";
+        // print str_replace('    ', '  ', json_encode($params, JSON_PRETTY_PRINT));
+        // exit;
+
+        return $params;
+    }
+
+    protected function compileWheres(Builder $builder)
+    {
+        $queryParts = [
+            'query' => 'wheres',
+            'filter' => 'filters',
+            'postFilter' => 'postFilters'
+        ];
+
+        $compiled = [];
+
+        foreach ( $queryParts as $queryPart => $builderVar ){
+            $clauses = $builder->$builderVar ?: [];
+
+            $compiled[$queryPart] = $this->compileClauses($builder, $clauses);
+        }
+
+        return $compiled;
+    }
+
+    protected function compileClauses(Builder $builder, array $clauses)
+    {
+        $query = [];
+        $isOr  = false;
+
+        foreach ($clauses as $where) {
+            // We use different methods to compile different wheres
+            $method = 'compileWhere' . $where['type'];
+            $result = $this->{$method}($builder, $where);
+
+            // Wrap the result with a bool to make nested wheres work
+            if (count($clauses) > 0 && $where['boolean'] !== 'or') {
+                $result = ['bool' => ['must' => [$result]]];
+            }
+
+            // If this is an 'or' query then add all previous parts to a 'should'
+            if (!$isOr && $where['boolean'] == 'or') {
+                $isOr = true;
+
+                if ($query) {
+                    $query = ['bool' => ['should' => [$query]]];
+                } else {
+                    $query['bool']['should'] = [];
+                }
+            }
+
+            // Add the result to the should clause if this is an Or query
+            if ($isOr) {
+                $query['bool']['should'][] = $result;
+            } else {
+                // Merge the compiled where with the others
+                $query = array_merge_recursive($query, $result);
+            }
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereBasic(Builder $builder, $where)
+    {
+        $value = $this->getValueForWhere($builder, $where);
+
+        $operatorsMap = [
+            '>'  => 'gt',
+            '>=' => 'gte',
+            '<'  => 'lt',
+            '<=' => 'lte',
+        ];
+
+        if (is_null($value)) {
+            $query = [
+                'exists' => [
+                    'field' => $where['column'],
+                ],
+            ];
+        } else if (in_array($where['operator'], array_keys($operatorsMap))) {
+            $operator = $operatorsMap[$where['operator']];
+            $query    = [
+                'range' => [
+                    $where['column'] => [
+                        $operator => $value,
+                    ],
+                ],
+            ];
+        } else {
+            $query = [
+                'term' => [
+                    $where['column'] => $value,
+                ],
+            ];
+        }
+
+        $query = $this->applyOptionsToClause($query, $where);
+
+        if (($where['operator'] == '!=' && !is_null($value)) || ($where['operator'] == '=' && is_null($value))) {
+            $query = [
+                'bool' => [
+                    'must_not' => [
+                        $query,
+                    ],
+                ],
+            ];
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereDate($builder, $where)
+    {
+        if ( $where['operator'] == '=' ){
+            $value = $this->getValueForWhere($builder, $where);
+
+            $where['value'] = [$value, $value];
+
+            return $this->compileWhereBetween($builder, $where);
+        }
+        else {
+            return $this->compileWhereBasic($builder, $where);
+        }
+    }
+
+    protected function compileWhereNested($builder, $where)
+    {
+        $compiled = $this->compileWheres($where['query']);
+
+        foreach ( $compiled as $queryPart => $clauses ){
+            $compiled[$queryPart] = array_map(function($clause) use ($where){
+                if ($clause){
+                    $this->applyOptionsToClause($clause, $where);
+                }
+
+                return $clause;
+            }, $clauses);
+        }
+
+        $compiled = array_filter($compiled);
+
+        return reset($compiled);
+    }
+
+    protected function applyWhereRelationship($builder, $where, $relationship)
+    {
+        $compiled = $this->compileWheres($where['value']);
+
+        $relationshipFilter = 'has_' . $relationship;
+
+        $query = [
+            $relationshipFilter => [
+                'type'  => $where['documentType'],
+                'query' => $compiled['query'],
+            ],
+        ];
+
+        $query = $this->applyOptionsToClause($query, $where);
+
+        return $query;
+    }
+
+    protected function compileWhereParent($builder, $where)
+    {
+        return $this->applyWhereRelationship($builder, $where, 'parent');
+    }
+
+    protected function compileWhereChild($builder, $where)
+    {
+        return $this->applyWhereRelationship($builder, $where, 'child');
+    }
+
+    protected function compileWhereIn($builder, $where, $not = false)
+    {
+        $column = $where['column'];
+        $values = $this->getValueForWhere($builder, $where);
+
+        $query = [
+            'terms' => [
+                $column => array_values($values),
+            ],
+        ];
+
+        $query = $this->applyOptionsToClause($query, $where);
+
+        if ($not) {
+            $query = [
+                'bool' => [
+                    'must_not' => [
+                        $query,
+                    ],
+                ],
+            ];
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereNotIn($builder, $where)
+    {
+        return $this->compileWhereIn($builder, $where, true);
+    }
+
+    protected function compileWhereNull($builder, $where)
+    {
+        $where['operator'] = '=';
+
+        return $this->compileWhereBasic($builder, $where);
+    }
+
+    protected function compileWhereNotNull($builder, $where)
+    {
+        $where['operator'] = '!=';
+
+        return $this->compileWhereBasic($builder, $where);
+    }
+
+    protected function compileWhereBetween($builder, $where)
+    {
+        $column = $where['column'];
+        $values = $this->getValueForWhere($builder, $where);
+
+        if ($where['not']) {
+            $query = [
+                'bool' => [
+                    'should' => [
+                        [
+                            'range' => [
+                                $column => [
+                                    'lte' => $values[0],
+                                ],
+                            ],
+                        ],
+                        [
+                            'range' => [
+                                $column => [
+                                    'gte' => $values[1],
+                                ],
+                            ],
+                        ],
+                    ],
+                ],
+            ];
+        } else {
+            $query = [
+                'range' => [
+                    $column => [
+                        'gte' => $values[0],
+                        'lte' => $values[1]
+                    ],
+                ],
+            ];
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereExists($builder, $where, $not = false)
+    {
+        $query = [
+            'exists' => [
+                'field' => $where['query']->columns[0],
+            ],
+        ];
+
+        $query = $this->applyOptionsToClause($query, $where);
+
+        if ($not) {
+            $query = [
+                'bool' => [
+                    'must_not' => [
+                        $query,
+                    ],
+                ],
+            ];
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereNotExists($builder, $where)
+    {
+        return $this->compileWhereExists($builder, $where, true);
+    }
+
+    protected function compileWhereSearch($builder, $where)
+    {
+        $fields = '_all';
+
+        if (!empty($where['options']['fields'])) {
+            $fields = $where['options']['fields'];
+        }
+
+        if (is_array($fields) && !is_numeric(array_keys($fields)[0])) {
+            $fieldsWithBoosts = [];
+
+            foreach ($fields as $field => $boost) {
+                $fieldsWithBoosts[] = "{$field}^{$boost}";
+            }
+
+            $fields = $fieldsWithBoosts;
+        }
+
+        if (is_array($fields) && count($fields) > 1) {
+            $type = isset($where['options']['matchType']) ? $where['options']['matchType'] : 'most_fields';
+
+            $query = array(
+                'multi_match' => array(
+                    'query'  => $where['value'],
+                    'type'   => $type,
+                    'fields' => $fields,
+                ),
+            );
+        } else {
+            $fields = is_array($fields) ? reset($fields) : $fields;
+
+            $query = array(
+                'match' => array(
+                    $fields => $where['value'],
+                ),
+            );
+        }
+
+        if (!empty($where['options']['constant_score'])) {
+            $query = [
+                'constant_score' => [
+                    'query' => $query,
+                ],
+            ];
+        }
+
+        return $query;
+    }
+
+    protected function compileWhereGeoDistance($builder, $where)
+    {
+        $query = [
+            'geo_distance' => [
+                'distance'       => $where['distance'],
+                $where['column'] => $where['location'],
+            ],
+        ];
+
+        return $query;
+    }
+
+    protected function compileWhereGeoBoundsIn($builder, $where)
+    {
+        $query = [
+            'geo_bounding_box' => [
+                $where['column'] => $where['bounds'],
+            ],
+        ];
+
+        return $query;
+    }
+
+    protected function compileWhereNestedDoc($builder, $where)
+    {
+        $wheres = $this->compileWheres($where['query']);
+
+        $query = [
+            'nested' => [
+                'path' => $where['column']
+            ],
+        ];
+
+        $query['nested'] = array_merge($query['nested'], array_filter($wheres));
+
+        return $query;
+    }
+
+    protected function getValueForWhere($builder, $where)
+    {
+        switch ($where['type']) {
+            case 'In':
+            case 'NotIn':
+            case 'Between':
+                $value = $where['values'];
+                break;
+
+            case 'Null':
+            case 'NotNull':
+                $value = null;
+                break;
+
+            default:
+                $value = $where['value'];
+        }
+
+        // Convert DateTime values to UTCDateTime.
+        if ($value instanceof DateTime) {
+            $value = $this->convertDateTime($value);
+        }
+        // Convert DateTime values to UTCDateTime.
+        else if ($value instanceof ObjectID) {
+            $value = $this->convertKey($value);
+        } else if (is_array($value)) {
+            foreach ($value as &$val) {
+                if ($val instanceof DateTime) {
+                    $val = $this->convertDateTime($val);
+                } else if ($val instanceof ObjectID) {
+                    $val = $this->convertKey($val);
+                }
+            }
+        }
+
+        return $value;
+    }
+
+    protected function applyOptionsToClause($clause, $where)
+    {
+        if (!isset($where['options'])) {
+            return $clause;
+        }
+
+        $optionsToApply = ['boost'];
+        $options        = array_intersect_key($where['options'], array_flip($optionsToApply));
+
+        foreach ($options as $option => $value) {
+            $funcName = "apply" . ucfirst($option) . "Option";
+
+            if (method_exists($this, $funcName)){
+                $this->$funcName($clause, $value);
+            }
+        }
+
+        return $clause;
+    }
+
+    protected function applyBoostOption($clause, $value)
+    {
+        $firstKey = key($clause);
+
+        if ($firstKey !== 'term'){
+            return $clause[$firstKey]['boost'] = $value;
+        }
+
+        $clause['term'] = [
+            'type' => [
+                'value' => $clause['term']['type'],
+                'boost' => $value
+            ]
+        ];
+
+        return  $clause;
+    }
+
+    protected function compileAggregations(Builder $builder)
+    {
+        $aggregations = [];
+
+        foreach ($builder->aggregations as $aggregation) {
+            $result = $this->compileAggregation($builder, $aggregation);
+
+            $aggregations = array_merge($aggregations, $result);
+        }
+
+        return $aggregations;
+    }
+
+    protected function compileAggregation(Builder $builder, $aggregation)
+    {
+        $key = $aggregation['key'];
+
+        $method = 'compile' . ucfirst(camel_case($aggregation['type'])) . 'Aggregation';
+
+        $compiled = [
+            $key => $this->$method($aggregation)
+        ];
+
+        if ( isset($aggregation['aggregations']) ){
+            $compiled[$key]['aggregations'] = $this->compileAggregations($aggregation['aggregations']);
+        }
+
+        return $compiled;
+    }
+
+    protected function compileFilterAggregation($aggregation)
+    {
+        $compiled = [];
+
+        $filter = $this->compileWheres($aggregation['args']);
+
+        $compiled = [
+            'filter' => $filter['filter'] ?: (object) []
+        ];
+
+        return $compiled;
+    }
+
+    protected function compileNestedAggregation($aggregation)
+    {
+        $path = is_array($aggregation['args']) ? $aggregation['args']['path'] : $aggregation['args'];
+
+        return [
+            'nested' => [
+                'path' => $path
+            ]
+        ];
+    }
+
+    protected function compileTermsAggregation($aggregation)
+    {
+        $field = is_array($aggregation['args']) ? $aggregation['args']['field'] : $aggregation['args'];
+
+        $compiled = [
+            'terms' => [
+                'field' => $field
+            ]
+        ];
+
+        if ( is_array($aggregation['args']) && isset($aggregation['args']['size']) ){
+            $compiled['terms']['size'] = $aggregation['args']['size'];
+        }
+
+        return $compiled;
+    }
+
+    protected function compileDateHistogramAggregation($aggregation)
+    {
+        $field = is_array($aggregation['args']) ? $aggregation['args']['field'] : $aggregation['args'];
+
+        $compiled = [
+            'date_histogram' => [
+                'field' => $field
+            ]
+        ];
+
+        if ( is_array($aggregation['args']) && isset($aggregation['args']['interval']) ){
+            $compiled['date_histogram']['interval'] = $aggregation['args']['interval'];
+        }
+
+        return $compiled;
+    }
+
+    protected function compileExistsAggregation($aggregation)
+    {
+        $field = is_array($aggregation['args']) ? $aggregation['args']['field'] : $aggregation['args'];
+
+        $compiled = [
+            'exists' => [
+                'field' => $field
+            ]
+        ];
+
+        return $compiled;
+    }
+
+    protected function compileReverseNestedAggregation($aggregation)
+    {
+        return [
+            'reverse_nested' => (object) []
+        ];
+    }
+
+    protected function compileSumAggregation($aggregation)
+    {
+        return $this->compileMetricAggregation($aggregation);
+    }
+
+    protected function compileMetricAggregation($aggregation)
+    {
+        $metric = $aggregation['type'];
+
+        $field = is_array($aggregation['args']) ? $aggregation['args']['field'] : $aggregation['args'];
+
+        return [
+            $metric => [
+                'field' => $field
+            ]
+        ];
+    }
+
+    protected function compileChildrenAggregation($aggregation)
+    {
+        $type = is_array($aggregation['args']) ? $aggregation['args']['type'] : $aggregation['args'];
+
+        return [
+            'children' => [
+                'type' => $type
+            ]
+        ];
+    }
+
+    protected function compileOrders(Builder $builder, $orders = [])
+    {
+        $compiledOrders = [];
+
+        foreach ($orders as $order) {
+            $column = $order['column'];
+
+            $type = $order['type'] ?? 'basic';
+
+            switch ($type) {
+                case 'geoDistance' :
+                    $orderSettings = [
+                        $column         => $order['options']['coordinates'],
+                        'order'         => $order['direction'] < 0 ? 'desc' : 'asc',
+                        'unit'          => $order['options']['unit'] ?? 'km',
+                        'distance_type' => $order['options']['distanceType'] ?? 'plane',
+                    ];
+
+                    $column = '_geo_distance';
+                    break;
+
+                default :
+                    $orderSettings = [
+                        'order' => $order['direction'] < 0 ? 'desc' : 'asc'
+                    ];
+
+                    $allowedOptions = ['missing', 'mode'];
+
+                    $options = isset($order['options']) ? array_intersect_key($order['options'], array_flip($allowedOptions)) : [];
+
+                    $orderSettings = array_merge($options, $orderSettings);
+            }
+
+            $compiledOrders[] = [
+                $column => $orderSettings,
+            ];
+        }
+
+        return $compiledOrders;
+    }
+
+    public function compileInsert(Builder $builder, array $values)
+    {
+        $params = [];
+
+        foreach ($values as $doc) {
+            if (isset($doc['child_documents'])) {
+                foreach ($doc['child_documents']['documents'] as $childDoc) {
+                    $params['body'][] = [
+                        'index' => [
+                            '_index' => $builder->from . $this->indexSuffix,
+                            '_type'  => isset($doc['child_documents']['type']) ? $doc['child_documents']['type'] : $builder->type,
+                            '_id'    => $childDoc['id'],
+                            'parent' => $doc['id'],
+                        ],
+                    ];
+
+                    $params['body'][] = $childDoc;
+                }
+
+                unset($doc['child_documents']);
+            }
+
+            $params['body'][] = [
+                'index' => [
+                    '_index' => $builder->from . $this->indexSuffix,
+                    '_type'  => $builder->type,
+                    '_id'    => $doc['id'],
+                ],
+            ];
+
+            $params['body'][] = $doc;
+        }
+
+        return $params;
+    }
+
+    protected function convertKey($value)
+    {
+        return (string) $value;
+    }
+
+    protected function convertDateTime($value)
+    {
+        return $value->format('Y-m-d\TH:i:s');
+    }
+
+    /**
+     * Get the grammar's index suffix.
+     *
+     * @return string
+     */
+    public function getIndexSuffix()
+    {
+        return $this->indexSuffix;
+    }
+
+    /**
+     * Set the grammar's table suffix.
+     *
+     * @param  string  $suffix
+     * @return $this
+     */
+    public function setIndexSuffix($suffix)
+    {
+        $this->indexSuffix = $suffix;
+
+        return $this;
+    }
+}
diff --git a/src/DesignMyNight/Elasticsearch/QueryProcessor.php b/src/DesignMyNight/Elasticsearch/QueryProcessor.php
new file mode 100644
index 0000000..fdd9c28
--- /dev/null
+++ b/src/DesignMyNight/Elasticsearch/QueryProcessor.php
@@ -0,0 +1,77 @@
+rawResponse = $results;
+
+        $this->aggregations = $results['aggregations'] ?? [];
+
+        // Return a generator if we got a scroll cursor in the results
+        if (isset($results['_scroll_id'])){
+            return $this->yieldResults($results);
+        }
+        else {
+            $documents = [];
+
+            foreach ($results['hits']['hits'] as $result) {
+                $documents[] = $this->documentFromResult($result);
+            }
+
+            return $documents;
+        }
+    }
+
+    protected function yieldResults($results)
+    {
+        // First yield each result from the initial request
+        foreach ($results['hits']['hits'] as $result){
+            yield $this->documentFromResult($result);
+        }
+
+        // Then go through the scroll cursor getting one result at a time
+        if (isset($results['scrollCursor'])){
+            foreach ($results['scrollCursor'] as $result){
+                // TODO: add _id to result
+                $document = $this->documentFromResult($result);
+                yield $document;
+            }
+        }
+    }
+
+    protected function documentFromResult($result)
+    {
+        $document = $result['_source'];
+        $document['_id'] = $result['_id'];
+
+        return $document;
+    }
+
+    public function getRawResponse()
+    {
+        return $this->rawResponse;
+    }
+
+    public function getAggregationResults()
+    {
+        return $this->aggregations;
+    }
+}