读取大文件日志,做流量统计,现在执行 500M 压缩包需 6 分钟,大家看如何优化

  1. 需求描述:日志压缩包统计5分钟粒度的流量情况
  2. 问题描述:当前执行13G压缩包需1小时,最小化后的可用脚本执行500M需6分钟
  3. 最小化代码:
package main
import (
    "bufio"
    "compress/gzip"
    "log"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"
    "fmt"
    "github.com/Tropicana33/common/golog"
)
var domainTfcDB DomainTfcDB = DomainTfcDB{
    db:   make(map[string]map[int64]int64),
}
type DomainTfcDB struct {
    db   map[string]map[int64]int64 // domain==>time==>traffic
}
var domainTfc DomainTfc = DomainTfc{
    lock: new(sync.RWMutex),
    data:   make(map[int64]int64),
}
type DomainTfc struct {
    lock *sync.RWMutex
    data   map[int64]int64 // domain==>time==>traffic
}
type Entry struct {
    Time   time.Time
    Size   int64
}
func main() {
    fmt.Println("start")
    ReadGzip("1.gz", "www.zheng.com", 4, 10, ParseAndSave)
}
func ParseAndSave(e *Entry) {
    domainTfc.lock.Lock()
    defer domainTfc.lock.Unlock()
    t := UnixToMinuteByGrad(e.Time.Unix(), 5) // 默认5分钟粒度
    if _, ok := domainTfc.data[t]; !ok {
        domainTfc.data[t] = e.Size
    } else {
        if _, exist := domainTfc.data[t]; !exist {
            domainTfc.data[t] = e.Size
        } else {
            domainTfc.data[t] += e.Size
        }
    }
}
func ReadGzip(fp string, domainStr string, timeIndex int, tfcIndex int, f func(e *Entry)) {
    start := time.Now()
    golog.Info("开始解析文件",fp)
    file, err := os.Open(fp)
    if err != nil {
        golog.Error(err)
        return
    }
    defer file.Close()
    gz, err := gzip.NewReader(file)
    if err != nil {
        golog.Error(err)
        return
    }
    defer gz.Close()
    scanner := bufio.NewScanner(gz)
    for scanner.Scan() {
        l := scanner.Text()
        l = strings.TrimSpace(l)
                //fmt.Println(l)
        es := strings.Fields(l)
        if len(es) < timeIndex+1 || len(es) < tfcIndex {
            continue
        }         
        size, err := strconv.ParseInt(es[tfcIndex], 10, 64)
        if err != nil {
            log.Println(err)
            continue
        }
        var t time.Time
        t, err = time.Parse("[02/Jan/2006:15:4:5 -0700]", es[timeIndex]+" "+es[timeIndex+1])
        if err != nil {
            log.Println("parse time error:", err)
            continue
        }
        e := Entry{
            Time:   t,
            Size:   size,
        }
        if f != nil {
            f(&e)
        }
    }
    for t, trf := range domainTfc.data {
            fmt.Println("time: ",t, "trf: ", trf)
    }
    end := time.Now()
    golog.Info("解析文件", fp, "耗费", end.Sub(start).Seconds(), "s")
}
func UnixToMinuteByGrad(t int64, grad int64) int64 {
    return t / (grad * 60) * (grad * 60)
}
  1. 压缩文件生成:复制如下内容几百遍,生成文件通过 gz 压缩成 1.gz
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 525280
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 161841
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 82789
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 82555
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 162263
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 82767
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 163119
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 530505
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 162097
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 153962
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 162207
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 161769
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 577676
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 125212
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 82030
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 439327
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 525356
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81038
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81791
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 1364
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 269821
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 162072
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81948
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 2310
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81757
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 323314
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 0
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 535139
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 0
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 0
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 525279
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81038
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 81708
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 525655
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 551774
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 339554
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 1058054
    www.zheng.com [21/Nov/2019:22:39:50 +0800] 899987
  2. 困惑:如何才能提高效率,最初版本代码
package main
import (
    "bufio"
    "compress/gzip"
    "log"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"
    "github.com/Tropicana33/common/golog"
)
var domainTfcDB DomainTfcDB = DomainTfcDB{
    lock: new(sync.RWMutex),
    db:   make(map[string]map[int64]int64),
}
type DomainTfcDB struct {
    lock *sync.RWMutex
    db   map[string]map[int64]int64 // domain==>time==>traffic
}
type Entry struct {
    Domain string
    Time   time.Time
    Size   int64
}
func ParseAndSave(e *Entry) {
    domainTfcDB.lock.Lock()
    defer domainTfcDB.lock.Unlock()

    t := UnixToMinuteByGrad(e.Time.Unix(), grad) // 默认5分钟粒度
    if _, ok := domainTfcDB.db[e.Domain]; !ok {
        m := make(map[int64]int64)
        m[t] = e.Size
        domainTfcDB.db[e.Domain] = m
    } else {
        if _, exist := domainTfcDB.db[e.Domain][t]; !exist {
            domainTfcDB.db[e.Domain][t] = e.Size
        } else {
            domainTfcDB.db[e.Domain][t] += e.Size
        }
    }
}
func ReadGzip(fp string, domainStr string, timeIndex int, tfcIndex int, f func(e *Entry), wg *sync.WaitGroup) {
    defer wg.Done()
    //golog.Info("start read file", fp)
    start := time.Now()
    file, err := os.Open(fp)
    if err != nil {
        golog.Error(err)
        return
    }
    defer file.Close()
    gz, err := gzip.NewReader(file)
    if err != nil {
        golog.Error(err)
        return
    }
    defer gz.Close()
    scanner := bufio.NewScanner(gz)
    for scanner.Scan() {
        l := scanner.Text()
        l = strings.TrimSpace(l)
        es := strings.Fields(l)
        if len(es) < timeIndex+1 || len(es) < tfcIndex {
            continue
        }        
        size, err := strconv.ParseInt(es[tfcIndex], 10, 64)
        if err != nil {
            log.Println(err)
            continue
        }
                var t time.Time
                if !time_style {
                t, err = time.Parse("[02/Jan/2006:15:4:5 -0700]", es[timeIndex]+" "+es[timeIndex+1])
                } else {
                        t, err = time.Parse("20060102150405", es[timeIndex])
                }
        if err != nil {
            log.Println("parse time error:", err)
            continue
        }
        e := Entry{
            Domain: domainStr,
            Time:   t,
            Size:   size,
        }
        if f != nil {
            f(&e)
        }
    }
    end := time.Now()
    golog.Info("解析文件", fp, "耗费", end.Sub(start).Nanoseconds()/1000, "s")
}
func UnixToMinuteByGrad(t int64, grad int64) int64 {
    return t / (grad * 60) * (grad * 60)
}
讨论数量: 2

mapreduce

4年前 评论
Smart1man (楼主) 4年前

input (origin file) -> master (truncate file) -> worker (reduce func) -> output (result)

4年前 评论
Smart1man (楼主) 4年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!