我们知道TiDB的binlog记录了所有已经执行成功的dml语句,类似mysql binlog row模式
,TiDB官方也提供了reparo可以进行解析binlog,如下所示:
[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:153] ["Parsed start TSO"] [ts=449217508147200000]
[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:160] ["Parsed stop TSO"] [ts=449222855884800000]
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 1
merchant_id(varchar): 2
coupon_code(varchar): 4
customer_id(varchar): 4
customer_mobile_no(varchar): 5
trade_channel(varchar): 03
trade_store_id(varchar): 1440040
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 6
external_order_id(varchar): <nil>
receive_channel_code(varchar): 105
coupon_card_type(tinyint): 0
coupon_type(tinyint): 1
receive_type(smallint): 6
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 2
merchant_id(varchar): <nil>
coupon_code(varchar): 5
customer_id(varchar): 6
customer_mobile_no(varchar): 7
trade_channel(varchar): 03
trade_store_id(varchar): 1420452
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 8
external_order_id(varchar): <nil>
receive_channel_code(varchar): 03
coupon_card_type(tinyint): 0
coupon_type(tinyint): 0
receive_type(smallint): 4
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1
另外reparo 支持print 和mysql 两种模式,print只做解析打印到标准输出,不执行 SQL,mysql:是直接再下游数据库执行SQL
但是我们有的时候需要进去反解析,比如我们误删除了一些数据,我们要把误删除的数据解析成INSERT语句,这怎么办呢,TIDB目前不提供这种反解析工具,于是自己写了一个工具进行解析,代码如下:
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"strings"
"time"
)
var (
binlogFile string
schema string
table string
sqlType string
whereSql string
logPath string
filedRe = regexp.MustCompile(`\(.*?\)+:.*`)
logOutFile *log.Logger
)
type filed struct {
Name string `json:"name"`
Value string `json:"value"`
}
type Parser struct {
lines []filed
}
// type lists struct {
// filed []filed
// }
func compressStr(str string) string {
str = strings.ReplaceAll(str, " ", "")
r := strings.NewReplacer("\r", "", "\n", "")
str = r.Replace(str)
//匹配一个或多个空白符的正则表达式
reg := regexp.MustCompile("\\s+")
return reg.ReplaceAllString(str, "")
}
func main() {
flag.StringVar(&binlogFile, "binlogFile", "", "binlog日志文件路径")
flag.StringVar(&schema, "schema", "", "要解析的数据库名称")
flag.StringVar(&table, "table", "", "要解析的表名称")
flag.StringVar(&sqlType, "sqlType", "", "要解析的dml类型")
flag.StringVar(&logPath, "logPath", "", "输出文件路径名称")
flag.Parse()
if binlogFile == "" {
log.Println("请输入binlog日志文件路径...")
return
}
if schema == "" {
log.Println("请输入要解析的数据库名称...")
return
}
if table == "" {
log.Println("请输入要解析的表名称...")
return
}
if sqlType == "" {
log.Println("请输入要解析的dml类型...")
return
}
if logPath == "" {
log.Println("请输入输出日志文件路径...")
return
}
outSlowLogFile(logPath)
file, err := os.Open(binlogFile)
if err != nil {
log.Println("读取文件失败...")
return
}
schemaRe := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, sqlType)
//schemaRe2 := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, "Insert")
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024*10)
inHeader := false
var f filed
var array []string
var time2, _ = time.Parse("2006-01-02 15:04:05", "2024-04-01 00:00:00")
fields := ""
for scanner.Scan() {
line := scanner.Text()
if compressStr(line) == compressStr(schemaRe) {
if fields != "" {
fields = fmt.Sprintf("{%v}", fields)
array = append(array, fields)
}
fields = ""
inHeader = true
continue
} else if strings.Contains(line, "sync binlog success") || strings.Contains(line, "read file end") {
inHeader = false
continue
} else {
if inHeader {
if filedRe.MatchString(line) {
f.Name = strings.Split(line, "(")[0]
tmpValue := strings.Split(line, ": ")
if len(tmpValue) == 1 {
inHeader = false
continue
} else {
f.Value = strings.Split(line, ": ")[1]
}
if fields == "" {
fields = fmt.Sprintf("\"%s\":\"%s\" ", f.Name, f.Value)
} else {
fields += fmt.Sprintf(",\"%s\":\"%s\"", f.Name, f.Value)
}
} else {
inHeader = false
continue
}
}
}
}
if fields != "" {
fields = fmt.Sprintf("{%v}", fields)
array = append(array, fields)
}
if err := scanner.Err(); err != nil {
log.Println(err)
}
for i := 0; i < len(array); i++ {
//decoder := json.NewDecoder(strings.NewReader(array[i]))
//
//for key, value := range JsonToMap(array[i]) {
// fmt.Printf("键:%v,值:%d\n", key, value)
//}
m := make(map[string]string)
err = json.Unmarshal([]byte(array[i]), &m)
if err != nil {
fmt.Printf("Unmarshal with error: %+v", err)
}
keys := ""
values := ""
isExec := false
//newKeys := make([]string, 0, len(m))
//for k := range m {
// newKeys = append(newKeys, k)
//}
对切片进行排序
//sort.Strings(newKeys)
for key, value := range m {
if keys == "" {
keys = fmt.Sprintf("%v", key)
if value == "<nil>" {
values = fmt.Sprintf("%v", "NULL")
} else {
values = fmt.Sprintf("'%v'", value)
}
} else {
keys += fmt.Sprintf(",%v", key)
if value == "<nil>" {
values += fmt.Sprintf(",%v", "NULL")
} else {
values += fmt.Sprintf(",'%v'", value)
}
}
if key == "updated_date" {
isExec = true
updateTime, _ := time.Parse("2006-01-02 15:04:05", value)
if updateTime.After(time2) {
isExec = true
} else {
isExec = false
}
}
}
if isExec {
inSql := fmt.Sprintf("insert into coupon_trade_record(%v) values(%v);", keys, values)
logOutFile.Println(inSql)
}
}
}
func outSlowLogFile(outFile string) {
outFilePath, err := os.OpenFile(outFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
if err != nil {
log.Println(fmt.Sprintf("创建输出文件失败:%s", err))
return
}
logOutFile = log.New(outFilePath, "", log.Lmsgprefix)
}
通过代码可以将DELETE sql直接转成insert 语句:
至此完成将数据重新插入到业务库里面,即可完成恢复