Reading 16GB File in Seconds, Golang

Any computer system in today’s world generates a very high amount of logs or data daily. As the system grows, it is not feasible to store the debugging data into a database, as they’ve are immutable and it’s only going to be used for analytics and fault resolution purposes. So organisations tend to store it in files, which resides in local disks storage.

We are going to extract logs from a .txt or .log file of size 16 GB, having millions of lines using Golang.

Lets Code…!

Let’s open the file first. We will be using standard Go os.File for any file IO.

f, err := os.Open(fileName) if err != nil {
fmt.Println("cannot able to read the file", err)
return
}// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file

Once the file is opened, we have below two options to proceed with

  1. Read the file line by line, it helps to reduce the strain on memory but will take more time in IO.
  2. Read an entire file into memory at once and process the file, which will consume more memory but significantly increase the time.

As we are having file size too high, i.e 16 GB, we can’t load an entire file into memory. But the first option is also not feasible for us, as we want to process the file within seconds.

But guess what, there is a third option. Voila…! Instead on loading entire file into memory we will load the file in chunks, using bufio.NewReader(), available in Go.

r := bufio.NewReader(f)for {buf := make([]byte,4*1024) //the chunk sizen, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]if n == 0 {

if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}

Once we have the chunk, we will fork a thread, i.e Go routine, to process each chunk concurrently with other chunks. The above code would be changed to –

//sync pools to reuse the memory and decrease the preassure on //Garbage CollectorlinesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}r := bufio.NewReader(f)var wg sync.WaitGroup //wait group to keep track off all threadsfor {

buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}nextUntillNewline, err := r.ReadBytes('\n')//read entire line

if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}

wg.Add(1)
go func() {

//process each chunk concurrently
//start -> log start time, end -> log end time

ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)wg.Done()

}()
}wg.Wait()}

The above code introduces two new optimizations:-

  1. The sync.Pool is a powerful pool of instances that can be re-used to reduce the pressure on the garbage collector. We will be resuing the memory allocated to various slices. It helps us to reduce memory consumption and make our work significantly faster.
  2. The Go Routines which helps us to process the buffer chunk concurrently which significantly increases the processing speed.

Now let’s implement the ProcessChunk function, which will process the logs lines, which are of the format

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

We will be extracting logs based on the time stamp provided at the command line.

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {//another wait group to process every chunk further                             
var wg2 sync.WaitGrouplogs := stringPool.Get().(string)logs = string(chunk)linesPool.Put(chunk) //put back the chunk in pool//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")stringPool.Put(logs) //put back the string poolchunkSize := 100 //process the bunch of 100 logs in threadn := len(logsSlice)noOfThread := n / chunkSizeif n%chunkSize != 0 { //check for overflow
noOfThread++
}length := len(logsSlice)//traverse the chunk
for i := 0; i < length; i += chunkSize {

wg2.Add(1)//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]if len(text) == 0 {
continue
}

logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {

fmt.Println(text)
}
}
textSlice = nil
wg2.Done()

}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil}

The above code is benchmarked using 16 GB of a log file.

The time taken to extract the logs is around ~ 25 sec.

The entire working code is given below.

func main() {

	s := time.Now()
	args := os.Args[1:]
	if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
		fmt.Println("Please give proper command line arguments")
		return
	}
	startTimeArg := args[1]
	finishTimeArg := args[3]
	fileName := args[5]

	file, err := os.Open(fileName)
	
	if err != nil {
		fmt.Println("cannot able to read the file", err)
		return
	}
	
	defer file.Close() //close after checking err
	
	queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
	if err != nil {
		fmt.Println("Could not able to parse the start time", startTimeArg)
		return
	}

	queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
	if err != nil {
		fmt.Println("Could not able to parse the finish time", finishTimeArg)
		return
	}

	filestat, err := file.Stat()
	if err != nil {
		fmt.Println("Could not able to get the file stat")
		return
	}

	fileSize := filestat.Size()
	offset := fileSize - 1
	lastLineSize := 0

	for {
		b := make([]byte, 1)
		n, err := file.ReadAt(b, offset)
		if err != nil {
			fmt.Println("Error reading file ", err)
			break
		}
		char := string(b[0])
		if char == "\n" {
			break
		}
		offset--
		lastLineSize += n
	}

	lastLine := make([]byte, lastLineSize)
	_, err = file.ReadAt(lastLine, offset+1)

	if err != nil {
		fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
		return
	}

	logSlice := strings.SplitN(string(lastLine), ",", 2)
	logCreationTimeString := logSlice[0]

	lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
	if err != nil {
		fmt.Println("can not able to parse time : ", err)
	}

	if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
		Process(file, queryStartTime, queryFinishTime)
	}

	fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File, start time.Time, end time.Time) error {

	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]byte, 250*1024)
		return lines
	}}

	stringPool := sync.Pool{New: func() interface{} {
		lines := ""
		return lines
	}}

	r := bufio.NewReader(f)

	var wg sync.WaitGroup

	for {
		buf := linesPool.Get().([]byte)

		n, err := r.Read(buf)
		buf = buf[:n]

		if n == 0 {
			if err != nil {
				fmt.Println(err)
				break
			}
			if err == io.EOF {
				break
			}
			return err
		}

		nextUntillNewline, err := r.ReadBytes('\n')

		if err != io.EOF {
			buf = append(buf, nextUntillNewline...)
		}

		wg.Add(1)
		go func() {
			ProcessChunk(buf, &linesPool, &stringPool, start, end)
			wg.Done()
		}()

	}

	wg.Wait()
	return nil
}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

	var wg2 sync.WaitGroup

	logs := stringPool.Get().(string)
	logs = string(chunk)

	linesPool.Put(chunk)

	logsSlice := strings.Split(logs, "\n")

	stringPool.Put(logs)

	chunkSize := 300
	n := len(logsSlice)
	noOfThread := n / chunkSize

	if n%chunkSize != 0 {
		noOfThread++
	}

	for i := 0; i < (noOfThread); i++ {

		wg2.Add(1)
		go func(s int, e int) {
			defer wg2.Done() //to avaoid deadlocks
			for i := s; i < e; i++ {
				text := logsSlice[i]
				if len(text) == 0 {
					continue
				}
				logSlice := strings.SplitN(text, ",", 2)
				logCreationTimeString := logSlice[0]

				logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
				if err != nil {
					fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
					return
				}

				if logCreationTime.After(start) && logCreationTime.Before(end) {
					//fmt.Println(text)
				}
			}
			

		}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
	}

	wg2.Wait()
	logsSlice = nil
}

原文来自:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2

未经允许随便转载:看过够 » Reading 16GB File in Seconds, Golang

赞 (4) 打赏

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏