-
mariadb 에서 elasticsearch 실시간 연동하기개발/golang 2021. 3. 30. 14:46
개요
상품테이블과 상점테이블이 존재하고 두테이블 조인한 검색조건이 30개가 넘어감으로써 마땅한 인덱스를 찾기 어렵고검색시 속도 저하가 발생하여 실시간으로 elasticsearch 와 연동할수 있는 방법을 다음과 같이 구성함
구조
코드
1. binary 로그를 읽어 message queue 에 저장한다.
import "github.com/siddontang/go-mysql/replication" var productAggregationPkMap = map[string]int{ "product": 0, "product_detail": 0, "product_memo": 1, } var shopAggregationPkMap = map[string]int{ "shop": 0, "shop_memo": 1, } func main () { syncer := replication.NewBinlogSyncer(config()) //name , pos 는 redis 에 저장해 읽어옴. name, pos := getPositions() //bin00001, 12345134 streamer, _ := syncer.StartSync(mysql.Position{ Name: name, Pos: pos, }) for { ev, _ := streamer.GetEvent(ctx) p := syncer.GetNextPosition() rowsEvent, ok := ev.Event.(*replication.RowsEvent) if !ok { continue } tblName := string(rowsEvent.Table.Table) productSeqIdx, productKeyExists := productAggregationPkMap[tblName] shopSeqIdx, shopKeyExists := shopAggregationPkMap[tblName] if productKeyExists { for _, row := range rowsEvent.Rows { publishing(fmt.Sprintf("%v", row[productSeqIdx]), "product-aggregation") } continue } if shopKeyExists { for _, row := range rowsEvent.Rows { publishing(fmt.Sprintf("%v", row[shopSeqIdx]), "shop-aggregation") } } } } func config() replication.BinlogSyncerConfig { port, _ := strconv.ParseUint(os.Getenv("MARIADB_PORT"), 10, 16) cfg := replication.BinlogSyncerConfig{ ServerID: 100, Flavor: "mariadb", Host: os.Getenv("MARIADB_HOST"), Port: uint16(port), User: os.Getenv("MARIADB_USER"), Password: os.Getenv("MARIADB_PW"), } return cfg }
2. aggregation 키를 가져다 es 에 들어갈 데이터셋을 만든후 es 에 넣는다.
3. 이때 상점과 상품이 1:N 관계이므로 elasticsearch 에 다음과 같이 설정한다
PUT /mariadb2es { "mappings": { "doc": { "properties" :{ "shop_join":{ "type":"join", "relations":{ "shop":"product" } } }, "date_detection": true, "dynamic_date_formats": [ "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss.SS", "strict_date_time" ], "dynamic_templates": [ { "strings": { "match_mapping_type": "string", "mapping": { "type": "text", "fields": { "raw": { "type": "keyword", "ignore_above": 256 } } } } } ] } } }
4. 상점저장
PUT /mariadb2es/doc/shop1?routing=shop1 { "shop" : { "name" : "testMall", "memo" : "this is test mall" }, "shop_join" : { "name" : "shop" } }
5. 상품저장
PUT /mariadb2es/doc/product1?routing=shop1 { "product": { "name": "test product", "price": 1000, "memo": "this is test product" }, "shop_join": { "name": "product", "parent": "shop1" } }
6. 조회
get /mariadb2es/_search { "query": { "bool": { "must": [ { "has_parent": { "parent_type": "shop", "query": { "bool": { "must": [ { "match_phrase": { "shop.name": "testMall" } } ] } } } }, { "match_phrase": { "product.memo": "this is test product" } } ] } } }
주의사항
1.바이너리 로그 포멧은 row 타입이여야하므로 설정을 변경해준다. (https://mariadb.com/kb/en/binary-log-formats/)
2. es 에 데이터를 넣을시 부모데이터 (shop) 를 먼저 넣고 자식데이터 (product) 를 넣어야한다. 그리고 라우팅키는 부모와 자식이 같아야한다 (부모키로 설정)
참고자료
1. github.com/elastic/go-elasticsearch
2. https://stackoverflow.com/questions/33313881/how-to-handle-ddl-statements-create-alter-drop-with-row-based-replication-for/33314483
3. http://kimjmin.net/2018/01/2018-01-parent-child-to-join/
4. http://small-dbtalk.blogspot.com/2016/12/5. https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_image
'개발 > golang' 카테고리의 다른 글
[ Golang ] Rest Api 스펙이 틀릴때 (타입이 틀릴때) Json decode(Unmarshal) 하기 (0) 2021.04.02 [ Golang ] JSON Object 키 넣은 순서대로 정렬하기 (0) 2021.04.02 [ Golang ] Go 루틴 최대갯수를 제한하기 (0) 2021.04.02 [ Golang ] Go 루틴을 테스트 하기 (gomock) (0) 2021.03.11