读取大文件日志,做流量统计,现在执行 500M 压缩包需 6 分钟,大家看如何优化
- 需求描述:日志压缩包统计5分钟粒度的流量情况
- 问题描述:当前执行13G压缩包需1小时,最小化后的可用脚本执行500M需6分钟
- 最小化代码:
package main
import (
"bufio"
"compress/gzip"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
"fmt"
"github.com/Tropicana33/common/golog"
)
var domainTfcDB DomainTfcDB = DomainTfcDB{
db: make(map[string]map[int64]int64),
}
type DomainTfcDB struct {
db map[string]map[int64]int64 // domain==>time==>traffic
}
var domainTfc DomainTfc = DomainTfc{
lock: new(sync.RWMutex),
data: make(map[int64]int64),
}
type DomainTfc struct {
lock *sync.RWMutex
data map[int64]int64 // domain==>time==>traffic
}
type Entry struct {
Time time.Time
Size int64
}
func main() {
fmt.Println("start")
ReadGzip("1.gz", "www.zheng.com", 4, 10, ParseAndSave)
}
func ParseAndSave(e *Entry) {
domainTfc.lock.Lock()
defer domainTfc.lock.Unlock()
t := UnixToMinuteByGrad(e.Time.Unix(), 5) // 默认5分钟粒度
if _, ok := domainTfc.data[t]; !ok {
domainTfc.data[t] = e.Size
} else {
if _, exist := domainTfc.data[t]; !exist {
domainTfc.data[t] = e.Size
} else {
domainTfc.data[t] += e.Size
}
}
}
func ReadGzip(fp string, domainStr string, timeIndex int, tfcIndex int, f func(e *Entry)) {
start := time.Now()
golog.Info("开始解析文件",fp)
file, err := os.Open(fp)
if err != nil {
golog.Error(err)
return
}
defer file.Close()
gz, err := gzip.NewReader(file)
if err != nil {
golog.Error(err)
return
}
defer gz.Close()
scanner := bufio.NewScanner(gz)
for scanner.Scan() {
l := scanner.Text()
l = strings.TrimSpace(l)
//fmt.Println(l)
es := strings.Fields(l)
if len(es) < timeIndex+1 || len(es) < tfcIndex {
continue
}
size, err := strconv.ParseInt(es[tfcIndex], 10, 64)
if err != nil {
log.Println(err)
continue
}
var t time.Time
t, err = time.Parse("[02/Jan/2006:15:4:5 -0700]", es[timeIndex]+" "+es[timeIndex+1])
if err != nil {
log.Println("parse time error:", err)
continue
}
e := Entry{
Time: t,
Size: size,
}
if f != nil {
f(&e)
}
}
for t, trf := range domainTfc.data {
fmt.Println("time: ",t, "trf: ", trf)
}
end := time.Now()
golog.Info("解析文件", fp, "耗费", end.Sub(start).Seconds(), "s")
}
func UnixToMinuteByGrad(t int64, grad int64) int64 {
return t / (grad * 60) * (grad * 60)
}
- 压缩文件生成:复制如下内容几百遍,生成文件通过 gz 压缩成 1.gz
www.zheng.com [21/Nov/2019:22:39:50 +0800] 525280 www.zheng.com [21/Nov/2019:22:39:50 +0800] 161841 www.zheng.com [21/Nov/2019:22:39:50 +0800] 82789 www.zheng.com [21/Nov/2019:22:39:50 +0800] 82555 www.zheng.com [21/Nov/2019:22:39:50 +0800] 162263 www.zheng.com [21/Nov/2019:22:39:50 +0800] 82767 www.zheng.com [21/Nov/2019:22:39:50 +0800] 163119 www.zheng.com [21/Nov/2019:22:39:50 +0800] 530505 www.zheng.com [21/Nov/2019:22:39:50 +0800] 162097 www.zheng.com [21/Nov/2019:22:39:50 +0800] 153962 www.zheng.com [21/Nov/2019:22:39:50 +0800] 162207 www.zheng.com [21/Nov/2019:22:39:50 +0800] 161769 www.zheng.com [21/Nov/2019:22:39:50 +0800] 577676 www.zheng.com [21/Nov/2019:22:39:50 +0800] 125212 www.zheng.com [21/Nov/2019:22:39:50 +0800] 82030 www.zheng.com [21/Nov/2019:22:39:50 +0800] 439327 www.zheng.com [21/Nov/2019:22:39:50 +0800] 525356 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81038 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81791 www.zheng.com [21/Nov/2019:22:39:50 +0800] 1364 www.zheng.com [21/Nov/2019:22:39:50 +0800] 269821 www.zheng.com [21/Nov/2019:22:39:50 +0800] 162072 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81948 www.zheng.com [21/Nov/2019:22:39:50 +0800] 2310 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81757 www.zheng.com [21/Nov/2019:22:39:50 +0800] 323314 www.zheng.com [21/Nov/2019:22:39:50 +0800] 0 www.zheng.com [21/Nov/2019:22:39:50 +0800] 535139 www.zheng.com [21/Nov/2019:22:39:50 +0800] 0 www.zheng.com [21/Nov/2019:22:39:50 +0800] 0 www.zheng.com [21/Nov/2019:22:39:50 +0800] 525279 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81038 www.zheng.com [21/Nov/2019:22:39:50 +0800] 81708 www.zheng.com [21/Nov/2019:22:39:50 +0800] 525655 www.zheng.com [21/Nov/2019:22:39:50 +0800] 551774 www.zheng.com [21/Nov/2019:22:39:50 +0800] 339554 www.zheng.com [21/Nov/2019:22:39:50 +0800] 1058054 www.zheng.com [21/Nov/2019:22:39:50 +0800] 899987
- 困惑:如何才能提高效率,最初版本代码
package main
import (
"bufio"
"compress/gzip"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/Tropicana33/common/golog"
)
var domainTfcDB DomainTfcDB = DomainTfcDB{
lock: new(sync.RWMutex),
db: make(map[string]map[int64]int64),
}
type DomainTfcDB struct {
lock *sync.RWMutex
db map[string]map[int64]int64 // domain==>time==>traffic
}
type Entry struct {
Domain string
Time time.Time
Size int64
}
func ParseAndSave(e *Entry) {
domainTfcDB.lock.Lock()
defer domainTfcDB.lock.Unlock()
t := UnixToMinuteByGrad(e.Time.Unix(), grad) // 默认5分钟粒度
if _, ok := domainTfcDB.db[e.Domain]; !ok {
m := make(map[int64]int64)
m[t] = e.Size
domainTfcDB.db[e.Domain] = m
} else {
if _, exist := domainTfcDB.db[e.Domain][t]; !exist {
domainTfcDB.db[e.Domain][t] = e.Size
} else {
domainTfcDB.db[e.Domain][t] += e.Size
}
}
}
func ReadGzip(fp string, domainStr string, timeIndex int, tfcIndex int, f func(e *Entry), wg *sync.WaitGroup) {
defer wg.Done()
//golog.Info("start read file", fp)
start := time.Now()
file, err := os.Open(fp)
if err != nil {
golog.Error(err)
return
}
defer file.Close()
gz, err := gzip.NewReader(file)
if err != nil {
golog.Error(err)
return
}
defer gz.Close()
scanner := bufio.NewScanner(gz)
for scanner.Scan() {
l := scanner.Text()
l = strings.TrimSpace(l)
es := strings.Fields(l)
if len(es) < timeIndex+1 || len(es) < tfcIndex {
continue
}
size, err := strconv.ParseInt(es[tfcIndex], 10, 64)
if err != nil {
log.Println(err)
continue
}
var t time.Time
if !time_style {
t, err = time.Parse("[02/Jan/2006:15:4:5 -0700]", es[timeIndex]+" "+es[timeIndex+1])
} else {
t, err = time.Parse("20060102150405", es[timeIndex])
}
if err != nil {
log.Println("parse time error:", err)
continue
}
e := Entry{
Domain: domainStr,
Time: t,
Size: size,
}
if f != nil {
f(&e)
}
}
end := time.Now()
golog.Info("解析文件", fp, "耗费", end.Sub(start).Nanoseconds()/1000, "s")
}
func UnixToMinuteByGrad(t int64, grad int64) int64 {
return t / (grad * 60) * (grad * 60)
}
推荐文章: