I am trying to write a script to read a CSV file containing 1 million domain names, look up these domain names and store the results in another CSV file. I am trying the following code, but the number of records I am able to parse with time is quite less. I want to optimize the performance of this to be able to parse more records in less amount of time and also keep the CPU utilization under control. What all should I be focusing on or where can I get help regarding the same:
package main
import (
"fmt"
"io"
"time"
"net"
"os"
"encoding/csv"
"strings"
)
func main() {
resc, errc := make(chan string), make(chan error)
fmt.Println("start time", time.Now())
concurrency := 1000
sem := make(chan bool, concurrency)
csvfile, err := os.Open("1-million-rows.csv")
if err != nil {
fmt.Println(err)
return
}
defer csvfile.Close()
reader := csv.NewReader(csvfile)
reader.FieldsPerRecord = -1 // see the Reader struct information below
row_count := 0
for {
sem <- true
record, err := reader.Read()
if err == io.EOF {
break
}
row_count += 1
// Stop at EOF.
domain_name := record[1]
go func(domain_name string) {
defer func() { <-sem }()
body, err := lookup(domain_name)
if err != nil {
errc <- err
return
}
resc <- string(body)
}(domain_name)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
for i := 0; i < row_count; i++ {
select {
case res := <-resc:
fmt.Println(res)
case err := <-errc:
fmt.Println(err)
}
}
}
func lookup(domain_name string) (string, error) {
ip, err := net.LookupIP(domain_name)
if err != nil {
return "", err
}
var ip_addresses []string
for i := range ip{
address := ip[i]
ip_addresses = append(ip_addresses, address.String())
}
row := domain_name + ",[" + strings.Join(ip_addresses,":") + "]," + time.Now().String()
fmt.Println(row)
return row, nil
}
I am running the script and redirecting the output to another file (which will contain the output records that I need). Another problem that I am facing is that the program almost stops (zero CPU utilization) after a few seconds and after printing nearly 600 lines. Is there anything wrong with the way I am controlling concurrency?
1 Answer 1
Bugs
Your code does have a problem with the concurrency, and it's a big one.... your buffered channels are not going to be able to complete the system. The issue is the unbuffered resc
and errc
channels:
resc, errc := make(chan string), make(chan error)
Your code attempts to loop through a million input records, but your concurrency is set relatively low at 1000. After you have looped a thousand times, you have 1000 go-routines all looking to write a result to either the resc
or errc
channels.... but, because those channels are unbuffered, there needs to be something reading from them before they can be written to...
... but you have to queue up all 1000000 CSV records before you start to read those channels.
Because you never complete the loops (you're blocked writing to the resc
channel), you never release a value from the sem
either.... thus, you never get to progress in the loop.
Your code is buggy.
A solution would be to put the entire CSV-reading code in to a separate go-routine, so that your can start reading from the resc
/errc
channels immediately.
struct
instead
Instead of having 2 return channels errc
and resc
, it would be better to return a simple struct:
type dnsLookup struct {
domain string
ips []string
err error
}
then you can monitor just a single channel, and can also correlate any results to a name in a better way, and also correlate an error better too.
Concurrency
The use of the sem
channel is OK for concurrency, but for a job like this I would instead defer to having a more discrete mechanism. Have a channel that you push domain names on to from the CSV parser, and then have X number of concurrent go-routines reading from that. Closing the channel indicates no-more-data to process. Use wait-groups to monitor completion....
func lookupRoutine(source <-chan string, wg *sync.Waitgroup, results chan dnsLookup) {
defer wg.Done()
for name := range source {
results <- lookup(name)
}
}
The above loop will process any/all available names from the source (until it is closed), and will then send values to the results
channel.
Then, in your main loop, you can have a channel to send CSV parse results to:
names := make(chan string, 1000)
results := make(chan dnsLookup, 1000)
// parse names in a goroutine
go parseCSVData(csvfile, names)
wg := new(sync.WaitGroup)
wg.Add(concurrency)
for I := 0; i < concurrency; i++ {
// parallel routine for lookups
go lookupRoutine(names, wg, results)
}
// close the results when all lookup routines complete:
go func() {
wg.Wait()
close(results)
}
for r := range results {
// print the results out here
...
}
-
\$\begingroup\$ Wow! that worked perfectly. Well, I am quite new to this language to had to work by patching together code from the web. I still need to dig deeper into the subject to fully understand it. I tested with a sample of 20k records and by varying the concurrency from 50 to 500, I could see a reduction in time but increase in the number of
i/o timeout
errors. Also, looks like I will have to divide the file into chunks and run on multiple servers in parallel to achieve the desired performance \$\endgroup\$Mandeep Singh– Mandeep Singh2016年04月26日 22:17:49 +00:00Commented Apr 26, 2016 at 22:17 -
\$\begingroup\$ currently, I am also doing some work in the similar project. But as a beginner, I still confused about the answer above. Then, could you share the modified codes with me if you don't mine. Great thanks. \$\endgroup\$Jennifer Dai– Jennifer Dai2017年10月29日 10:53:56 +00:00Commented Oct 29, 2017 at 10:53