Golang封装Elasticsearch常用功能

前言(为什么要写这篇文章)

首先看过我博客的都应该知道,我去年发了一篇Python封装Elasticsearch的文章。但那是去年了,今年我将我的检索服务后端用Golang全部重写了一波,相当于用Go重构了以前的Python代码,不过我个人感觉Golang的效率还是高于Python的,而且我还加了一些异常判断和处理,这次的代码只会比以前更好更牛逼,为了纪念这一个多月的重构历程,我将关键功能记录下来,方便自己复习和各位兄弟姐妹查看。

使用的Go包

我的Elasticsearch版本是6.3.2,6系列了,现在(2020-05-13)最新版本应该是7,不过新版本和旧版本应该就是少了Type,我的6版本代码,请各位自己自行斟酌使用。
安装指定的Go包 olivere/elastic,现在有官方驱动的包了,但是我这篇文章用的包是 olivere/elastic,所以一切的代码都是以olivere为主。

1
go get github.com/olivere/elastic

基础的使用(Simple的使用)

接下来我举几个例子来说下这个golang 如何驱动 elasticsearch的

连接Elasticserach

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package elasticdb

import (
"context"
"fmt"

"github.com/olivere/elastic"
)

func main() {
//连接127.0.0.1
client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
if err != nil {
fmt.Println(err)
return
}
//检查健康的状况,ping指定ip,不通报错
_, _, err = client.Ping(ip).Do(context.Background())
if err != nil {
fmt.Println(err)
return
}
}

创建一个index索引

这里我默认你们都有一定的Es基础,其实你把index想成Mysql里面的表就可以了。

1
2
3
4
5
6
7
8
9
//indexname 你可以想成表名
func CreateIndex(indexname string) {
client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
if err != nil {
fmt.Println(err)
return
}
client.CreateIndex(indexname).Do(context.Background())
}

删除一个index索引

想成删除一张表

1
2
3
4
5
6
7
8
9
//indexname 你可以想成表名
func CreateIndex(indexname string) {
client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
if err != nil {
fmt.Println(err)
return
}
client.DeleteIndex(indexname).Do(context.Background())
}

往指定的index当中导一条数据

想成往一张表里面导入一条数据,在Golang中,我们可以导入json的字符串,我们也可以导入golang的struct类型,例如

1
2
3
4
5
6
7
//结构体
type Task struct {
Taskid string `json:"taskid"`
Taskname string `json:"taskname"`
}
//字符串
jsonmsg := `{"taskid":"123456", "taskname":"lwb"}`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//导入数据,你需要index名,index的type,导入的数据
func PutData(index string, typ string, bodyJSON interface{}) bool {
client, _ := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
_, err := client.Index().
Index(index).
Type(typ).
BodyJson(bodyJSON).
Do(context.Background())
if err != nil {
//验证是否导入成功
fmt.Sprintf("<Put> some error occurred when put. err:%s", err.Error())
return false
}
return true
}
func main() {
//json字符串导入
jsonmsg = `{"taskid":"123456", "taskname":"hahah"}`
status := PutData("test", "doc", jsonmsg)
//struct结构体导入
task := Task{Taskid: "123", Taskname: "hahah"}
status := PutData("test", "doc", task)
}

删除一条数据

删除数据需要ID,这个ID是个啥玩意儿呢。。。就是咱们不是刚导了一条数据进去么,你可以设置这数据的唯一ID,也可以让Elasticsearch帮你自动生成一个,一般没事儿干谁自己设置啊,还容易重复,一重复就报错。。我在这里把这个删除的方法教给大家,记住这个ID一定是唯一的

1
2
3
4
5
6
7
8
func DeleteData(index, typ, id string) {
client, _ := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
_, err := client.Delete().Index(index).Type(typ).Id(id).Do(context.Background())
if err != nil {
fmt.Println(err)
return
}
}

高阶使用(条件查询/封装等)

简单讲了一下增删改,现在我们来讲一下高阶用法,高级增删改查吧,其实官方的文档讲的还算是比较清楚,不过我等大中华程序狗的姿势水平。。。至少我是图样图森破,看英文也算是废了九牛二虎之力,算是捋出来了一些高阶用法,顺手自己造了个轮子,现在我就来挑几点来说下吧。

自动选择可用的Es节点

配合olivere的ping机制,可以做一个自动检测Es ip 是否可用的逻辑,这样可以增加我们put,updatge时候的稳定性

自动检测节点Elasticseach的IP是否可用

olivere的Elasticserach sdk 限定了只能用一个ip,类似“http://127.0.0.1:9200”这样,我对原本的逻辑进行了一点改造,改成支持一个ip list,依次检测Es ip 是否可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package elasticdb

import (
"context"
"fmt"

"github.com/olivere/elastic"
)

//Elastic es的连接,
type Elastic struct {
Client *elastic.Client
host string
}

//Connect 基础的连接代码
func Connect(ip string) (*Elastic, error) {
//引入IP
client, err := elastic.NewClient(elastic.SetURL(ip))
if err != nil {
return nil, err
}
//Ping 的方式检查是否可用
_, _, err = client.Ping(ip).Do(context.Background())
if err != nil {
return nil, err
}
//输出一个struct类型,可以被继承
es := &Elastic{
Client: client,
host: ip,
}
return es, nil
}

//InitES 初始化Es连接
func InitES() (*Elastic, error) {
//host是一个列表
host := []string{"http://10.0.6.245:9200","http://10.0.6.246:9200","http://10.0.6.247:9200"}
//统计host的数量
Eslistsnum := len(host)
//如果为零就不继续接下来的逻辑
if Eslistsnum == 0 {
return nil, fmt.Errorf("Cluster Not Es Node")
}
//创建新的连接
for i, ip := range host {
//判断是不是最后一个节点ip
if (Eslistsnum - 1) != i {
es, err := Connect(ip)
//如果连接出错,则跳过
if err != nil {
fmt.Println(err)
continue
}
return es, nil
//如果是最后一个节点
} else {
es, err := Connect(ip)
//输出错误
if err != nil {
return nil, err
}
return es, nil
}
}
return nil, nil
}

后续我们可以采用继承的方法调用Es的client的连接,这个在后面我就不详细说了,聪明的你,看代码一定能整明白,再整不明白,你就直接上Git拷贝我的代码就得了。

条件查询

以前我写过一个Python的Elasticsearch Sdk,那里面的查询基本都用了query,简单来说,就是你,给Es的api发一个query,es给你返回一个查询结果。这里我会举几个常用的条件查询例子,然后用golang封装一波。
这里我先定义一下数据结构,假设我们的Elasticsearch中,有一个叫做Task的index(索引),其中存储着很多task的运行日志,它们的数据格式如下:

1
2
3
4
5
6
7
8
{
"taskid": "081c255b-936c-11ea-8001-000000000000",
"starttime": "2020/05/13 18:38:21",
"endtime": "2020/05/13 18:38:47",
"name": "cifs01",
"status": 1,
"count": 365
}

我们现在要做的就是围绕task这个index和其中的数据做条件查找的例子,我说的很明白了吧?开工了!

查询时间范围/年龄大小的条件查询方法

在业务需求中,我们经常会检索各种各样的数据,其中,范围查找应该是用的比较多的,所以我把它放到了最前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Task struct {
TaskID string `json:"taskid"`
StartTime string `json:"starttime"`
EndTime string `json:"endtime"`
Name string `json:"name"`
Status int `json:"status"`
Count int `json:"count"`
}
//查找时间范围大于2020/05/13 18:38:21,并且小于2020/05/14 18:38:21的数据
func (Es *Elastic) FindTime() {
var typ Task
boolQ := elastic.NewBoolQuery()
//生成查询语句,筛选starttime字段,查找大于2020/05/13 18:38:21,并且小于2020/05/14 18:38:21的数据
boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"))
res, _ := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
//从搜索结果中取数据的方法
for _, item := range res.Each(reflect.TypeOf(typ)) {
if t, ok := item.(Task); ok {
fmt.Println(t)
}
}
}

查询包含关键字的查询方法

我们经常遇到那种,搜那么一两个字,让你展示所有包含这一两个字的结果,就像百度,你搜个”开发”,就能搜出来例如”软件开发”,”硬件开发”等。接下来咱们也实现一个这个功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//查找包含"cifs"的所有数据
func (Es *Elastic) FindKeyword() {
//因为不确定cifs如何出现,可能是cifs01,也可能是01cifs,所以采用这种方法
keyword := "cifs"
keys := fmt.Sprintf("name:*%s*", keyword)
boolQ.Filter(elastic.NewQueryStringQuery(keys))
res, _ := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
//从搜索结果中取数据的方法
for _, item := range res.Each(reflect.TypeOf(typ)) {
if t, ok := item.(Task); ok {
fmt.Println(t)
}
}
}

多条件查询

如果说我们现在不仅仅需要找到符合时间的,也需要找到符合关键字的查询,那么就需要在查询条件上做文章。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (Es *Elastic) FindAll() {
//因为不确定cifs如何出现,可能是cifs01,也可能是01cifs,所以采用这种方法
keyword := "cifs"
keys := fmt.Sprintf("name:*%s*", keyword)
boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"), elastic.NewQueryStringQuery(keys))
res, err := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
//从搜索结果中取数据的方法
for _, item := range res.Each(reflect.TypeOf(typ)) {
if t, ok := item.(Task); ok {
fmt.Println(t)
}
}
}

统计数量/多条件统计数量

有些时候我们需要去统计符合查询条件的结果数量,做统计用,这里也有直接可用的Sdk

1
2
3
4
5
6
7
8
9
10
func (Es *Elastic) GetTaskLogCount() (int, error) {
boolQ := elastic.NewBoolQuery()
boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"))
//统计count
count, err := Es.Client.Count("task").Type("doc").Query(boolQ).Do(context.Background())
if err != nil {
return 0, nil
}
return int(count), nil
}

总结

我这边完成了几个查询/导入的基础功能,当然,我的代码大部分都放置在了github当中
放置在: https://github.com/Alexanderklau/Go_poject/tree/master/Go-Elasticdb/Elasticsearch_sdk
最近项目比较忙,我打算月中写一篇我开发的时候使用的一些Go特性,或者高级用法。如果喜欢的话麻烦Star我!最近压力颇大,想要换一个地方生活,所以也要准备离开了。如果大家有什么问题,可以直接给我提问,我看到了就会帮助大家的。

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • © 2019-2024 Yemilice lau
  • Powered by Hexo Theme Ayer
  • PV: UV:

觉得帮到你了么?赏我点儿~

支付宝
微信