Skip to content

Commit

Permalink
Replace olivere to officical elasticsearch-go
Browse files Browse the repository at this point in the history
  • Loading branch information
huangyingting committed Oct 8, 2023
1 parent bb3f575 commit d42af02
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 127 deletions.
217 changes: 101 additions & 116 deletions go/app/bs/internal/search/es.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,70 @@
package search

import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
"strconv"
"time"

"bingo/app/bs/internal/conf"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh"
"github.com/go-kratos/kratos/v2/log"
"github.com/olivere/elastic/v7"
)

const (
MAPPING = `
{
"settings":{
"number_of_shards": {{.NumberOfShards}},
"number_of_replicas": {{.NumberOfReplicas}},
"analysis": {
"normalizer": {
"lowercase": {
"type": "custom",
"filter": [
"lowercase"
]
}
}
}
},
"mappings":{
"_source": {
"enabled": true
},
"properties":{
"alias": {
"type": "keyword"
},
"oid": {
"type": "keyword"
},
"tags":{
"type":"keyword",
"normalizer": "lowercase",
"fields": {
"suggest": {
"type": "completion"
}
}
}
}
}
}
`
)

type ElasticSearch struct {
es *elastic.Client
es *elasticsearch.TypedClient
indexName string
h *log.Helper
}

func NewElasticSearch(c *conf.Search, h *log.Helper) (*ElasticSearch, error) {
es, err := elastic.NewClient(
elastic.SetURL(c.Addrs...),
elastic.SetBasicAuth(c.Username, c.Password),
elastic.SetSniff(c.Sniff),
)

cfg := elasticsearch.Config{
Addresses: c.Addrs,
Username: c.Username,
Password: c.Password,
DiscoverNodesOnStart: true,
DiscoverNodesInterval: time.Duration(5 * time.Second),
}

es, err := elasticsearch.NewTypedClient(cfg)
if err != nil {
h.Errorf("connect to elasticsearch error: %v", err)
return nil, err
}

// Dynamically create the index with the specified number of shards/replicas
tmpl, err := template.New("T").Parse(MAPPING)
if err != nil {
h.Errorf("parse mapping template error: %v", err)
return nil, err
}
var body bytes.Buffer
err = tmpl.ExecuteTemplate(&body, "T", struct {
NumberOfShards uint32
NumberOfReplicas uint32
}{
NumberOfShards: c.NumberOfShards,
NumberOfReplicas: c.NumberOfReplicas,
})
if err != nil {
h.Errorf("execute template error: %v", err)
return nil, err
}
// Settings
settings := types.NewIndexSettings()
settingsAnalysis := types.NewIndexSettingsAnalysis()
settingsAnalysis.Normalizer = map[string]types.Normalizer{"lowercase": types.CustomNormalizer{Filter: []string{"lowercase"}}}
settings.Analysis = settingsAnalysis
settings.NumberOfShards = strconv.FormatInt(int64(c.NumberOfShards), 10)
settings.NumberOfReplicas = strconv.FormatInt(int64(c.NumberOfReplicas), 10)

// Mappings
mappings := types.NewTypeMapping()
mappings.Properties["alias"] = types.NewKeywordProperty()
mappings.Properties["oid"] = types.NewKeywordProperty()
tags := types.NewKeywordProperty()
normalizer := "lowercase"
tags.Normalizer = &normalizer
tags.Fields["suggest"] = types.NewCompletionProperty()
mappings.Properties["tags"] = tags

// Create index if not created before
ctx := context.Background()
exists, err := es.IndexExists(c.IndexName).Do(ctx)
exists, err := es.Indices.Exists(c.IndexName).Do(ctx)
if err != nil {
h.Errorf("check index exist error: %v", err)
return nil, err
}
if !exists {
_, err = es.CreateIndex(c.IndexName).BodyString(body.String()).Do(ctx)
_, err = es.Indices.Create(c.IndexName).Settings(settings).Mappings(mappings).Do(ctx)
if err != nil {
h.Errorf("create index error: %v", err)
return nil, err
Expand All @@ -116,11 +80,7 @@ func NewElasticSearch(c *conf.Search, h *log.Helper) (*ElasticSearch, error) {

func (es *ElasticSearch) Index(alias Alias) error {
es.h.Debugf("index alias: %v", alias)
_, err := es.es.Index().
Index(es.indexName).Id(alias.Alias).
BodyJson(alias).
Refresh("true").
Do(context.TODO())
_, err := es.es.Index(es.indexName).Id(alias.Alias).Document(alias).Refresh(refresh.True).Do(context.Background())
if err != nil {
es.h.Errorf("index error: %v", alias)
}
Expand All @@ -129,24 +89,33 @@ func (es *ElasticSearch) Index(alias Alias) error {

func (es *ElasticSearch) SearchOr(oid string, tags []string) ([]Alias, error) {
es.h.Debugf("search or oid: %s tags: %v", oid, tags)
tagsInterface := make([]interface{}, len(tags))
var queries []types.Query
tagsFieldValue := make([]types.FieldValue, len(tags))
for i, s := range tags {
tagsInterface[i] = s
tagsFieldValue[i] = s
}
resp, err := es.es.Search().
Index(es.indexName).Query(
elastic.NewBoolQuery().
Filter(elastic.NewTermsQuery("tags", tagsInterface...),
elastic.NewTermsQuery("oid", oid))).
Do(context.TODO())

queries = append(queries, types.Query{Terms: &types.TermsQuery{
TermsQuery: map[string]types.TermsQueryField{"tags": tagsFieldValue}}})

queries = append(queries, types.Query{Term: map[string]types.TermQuery{"oid": {Value: oid}}})

b := types.NewBoolQuery()
b.Filter = queries

resp, err := es.es.Search().Index(es.indexName).Query(&types.Query{
Bool: b,
}).Do(context.Background())

if err != nil {
es.h.Errorf("search or error: %v", err)
return nil, err
}

var aliases []Alias
for _, v := range resp.Hits.Hits {
var alias Alias
if err = json.Unmarshal(v.Source, &alias); err != nil {
if err = json.Unmarshal(v.Source_, &alias); err != nil {
es.h.Errorf("search or unmarshal error: %v", err)
return nil, err
}
Expand All @@ -157,25 +126,31 @@ func (es *ElasticSearch) SearchOr(oid string, tags []string) ([]Alias, error) {

func (es *ElasticSearch) SearchAnd(oid string, tags []string) ([]Alias, error) {
es.h.Debugf("search and oid: %s tags: %v", oid, tags)
var queries []elastic.Query

var queries []types.Query

for _, v := range tags {
t := elastic.NewTermQuery("tags", v)
queries = append(queries, t)
queries = append(queries, types.Query{Term: map[string]types.TermQuery{"tags": {Value: v}}})
}
queries = append(queries, elastic.NewTermsQuery("oid", oid))
resp, err := es.es.Search().
Index(es.indexName).Query(
elastic.NewBoolQuery().
Filter(queries...)).
Do(context.TODO())

queries = append(queries, types.Query{Term: map[string]types.TermQuery{"oid": {Value: oid}}})

b := types.NewBoolQuery()
b.Filter = queries

resp, err := es.es.Search().Index(es.indexName).Query(&types.Query{
Bool: b,
}).Do(context.Background())

if err != nil {
es.h.Errorf("search and error: %v", err)
return nil, err
}

var aliases []Alias
for _, v := range resp.Hits.Hits {
var alias Alias
if err = json.Unmarshal(v.Source, &alias); err != nil {
if err = json.Unmarshal(v.Source_, &alias); err != nil {
es.h.Errorf("search and unmarshal error: %v", err)
return nil, err
}
Expand All @@ -186,50 +161,60 @@ func (es *ElasticSearch) SearchAnd(oid string, tags []string) ([]Alias, error) {

func (es *ElasticSearch) Suggest(text string) ([]string, error) {
es.h.Debugf("suggest: %s", text)
suggester := elastic.NewCompletionSuggester("tags-suggest").
Text(text).Field("tags.suggest").SkipDuplicates(true).Size(5)
searchSource := elastic.NewSearchSource().
Suggester(suggester).FetchSource(false)

result, err := es.es.Search().
Index(es.indexName).
SearchSource(searchSource).
Do(context.TODO())
prefix := "se"
size := 5
skipDuplicates := true
filedSuggester := types.NewFieldSuggester()
filedSuggester.Prefix = &prefix
filedSuggester.Completion = types.NewCompletionSuggester()
filedSuggester.Completion.Field = "tags.suggest"
filedSuggester.Completion.Size = &size
filedSuggester.Completion.SkipDuplicates = &skipDuplicates
suggester := types.NewSuggester()
suggester.Suggesters["tags-suggest"] = *filedSuggester

result, err := es.es.Search().Index(es.indexName).Suggest(suggester).Source_(false).
Do(context.Background())

if err != nil {
es.h.Errorf("suggest error: %v", err)
return nil, err
}

results := make([]string, 0)
suggest := result.Suggest["tags-suggest"]
for _, options := range suggest {
for _, option := range options.Options {
for _, tags_suggest := range result.Suggest["tags-suggest"] {
completionSuggest := tags_suggest.(*types.CompletionSuggest)
for _, option := range completionSuggest.Options {
results = append(results, option.Text)
}
}

es.h.Debugf("suggest results: %v", results)
return results, nil
}

func (es *ElasticSearch) Delete(alias string, oid string) error {
es.h.Debugf("delete alias: %s oid: %s", alias, oid)
resp, err := es.es.Search().
Index(es.indexName).Query(
elastic.NewBoolQuery().
Filter(elastic.NewTermsQuery("alias", alias),
elastic.NewTermsQuery("oid", oid),
),
).FetchSource(false).
Do(context.TODO())
var queries []types.Query

queries = append(queries, types.Query{Term: map[string]types.TermQuery{"alias": {Value: alias}}})

queries = append(queries, types.Query{Term: map[string]types.TermQuery{"oid": {Value: oid}}})

b := types.NewBoolQuery()
b.Filter = queries

resp, err := es.es.Search().Index(es.indexName).Query(&types.Query{
Bool: b,
}).Source_(false).Do(context.Background())

if err != nil {
es.h.Errorf("search error: %v", err)
return err
}

if len(resp.Hits.Hits) > 0 {
_, err := es.es.Delete().Index(es.indexName).Id(alias).Refresh("true").Do(context.TODO())
_, err := es.es.Delete(es.indexName, alias).Refresh(refresh.True).Do(context.Background())
if err != nil {
es.h.Errorf("delete error: %v", err)
return err
Expand Down
5 changes: 2 additions & 3 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/Depado/ginprom v1.7.11
github.com/denisenkom/go-mssqldb v0.12.3
github.com/elastic/go-elasticsearch/v8 v8.10.0
github.com/gin-contrib/cors v1.4.0
github.com/gin-contrib/multitemplate v0.0.0-20230212012517-45920c92c271
github.com/gin-contrib/static v0.0.1
Expand All @@ -19,7 +20,6 @@ require (
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.17
github.com/miekg/dns v1.1.56
github.com/olivere/elastic/v7 v7.0.32
github.com/prometheus/client_golang v1.16.0
github.com/rabbitmq/amqp091-go v1.8.1
github.com/shirou/gopsutil/v3 v3.23.8
Expand Down Expand Up @@ -47,6 +47,7 @@ require (
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand All @@ -67,7 +68,6 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
Expand All @@ -79,7 +79,6 @@ require (
github.com/lestrrat-go/jwx v1.2.26 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
Loading

0 comments on commit d42af02

Please sign in to comment.