2
\$\begingroup\$

I'm writing this as (eventually) part of a larger program. This will serve as the bulk data insertion from .csv files we parse.

I'm primarily looking for parts that are

  1. breaking Go's formatting, styling, etc. rules
  2. redundant and/or dangerous

Right now the code works and all 4 million rows of the .csv file were properly inserted into the database.

package main
import (
 "encoding/csv"
 "fmt"
 "gopkg.in/mgo.v2"
 "io"
 "log"
 "os"
 "runtime"
 "sync"
 "time"
)
// Set up primarily for ease of changing hosts if necessary
const (
 MongoDBHost = "127.0.0.1:27017"
)
// TODO: Check if channel's buffer size is necessary
// ctr increments for each row of the csv inserted (allows us to keep track)
var (
 channel = make(chan []string, 4000000)
 ctr int
)
// Person is the model struct for inserting into a mongodb
// Each member corresponds with a field in the VRDB, sans the last member
// The last member is for updating entries
// Parsing the csv files returns a []string per row
type Person struct {
 StateVoterId string
 CountyVoterId string
 Title string
 FName string
 MName string
 LName string
 NameSuffix string
 Birthdate string
 Gender string
 RegStNum string
 RegStFrac string
 RegStName string
 RegStType string
 RegUnitType string
 RegStPreDirection string
 RegStPostDirection string
 RegUnitNum string
 RegCity string
 RegState string
 RegZipCode string
 CountyCode string
 PrecinctCode string
 PrecinctPart string
 LegislativeDistrict string
 CongressionalDistrict string
 Mail1 string
 Mail2 string
 Mail3 string
 Mail4 string
 MailCity string
 MailZip string
 MailState string
 MailCountry string
 RegistrationDate string
 AbsenteeType string
 StatusCode string
 LastVoted string
 Updated time.Time
}
func main() {
 runtime.GOMAXPROCS(4) // Supposedly makes things faster
 var wg sync.WaitGroup
 // Start our mongodb session
 mongoSession, err := mgo.Dial(MongoDBHost)
 if err != nil {
 log.Fatalf("CreateSession: %s\n", err)
 }
 mongoSession.SetMode(mgo.Monotonic, true)
 // Start loading and parsing our .csv file
 // testing file: csv.csv
 // current file: big-huge-csv-file.csv
 csvfile, err := os.Open("../data/big-huge-csv-file.csv")
 if err != nil {
 fmt.Println(err)
 }
 defer csvfile.Close()
 reader := csv.NewReader(csvfile)
 reader.Comma = '\t' // It's a tab-delimited file
 reader.LazyQuotes = true // Some fields are like \t"F" ST.\t
 reader.FieldsPerRecord = 0 // -1 is variable #, 0 is [0]th line's #
 reader.TrimLeadingSpace = false // Keep the fields' whitespace how it is
Loop:
 for {
 // Add another goroutine to our wait group
 wg.Add(1)
 // Increment our counter per row
 ctr++
 data, err := reader.Read()
 switch err {
 case io.EOF:
 fmt.Println("Finished reading CSV (hit EOF)")
 break Loop
 case nil:
 channel <- data
 go InsertPerson(channel, mongoSession, &wg, ctr)
 // Handles all cases where err != EOF || err != nil
 default:
 fmt.Printf("Error while reading %s: %s\n", csvfile, err)
 log.Fatal(err)
 }
 wg.Wait()
 }
 fmt.Println("Done")
}
func InsertPerson(c chan []string, mongoSession *mgo.Session, wg *sync.WaitGroup, ctr int) {
 // Decrement wg counter when func finishes
 defer wg.Done()
 // Shows us our progress in increments of 5,000
 if ctr%5000 == 0 {
 fmt.Println(ctr)
 }
 // Receive from our channel
 row := <-c
 // Setting up our db connections
 sessionCopy := mongoSession.Copy()
 defer sessionCopy.Close()
 // TODO: Change from "test" to prod db
 collection := mongoSession.DB("test").C("People")
 index := mgo.Index{
 Key: []string{"StateVoterId"},
 Unique: true,
 DropDups: true,
 Background: true,
 Sparse: true,
 Name: "SoS Voter IDs",
 }
 ensureIndexErr := collection.EnsureIndex(index)
 if ensureIndexErr != nil {
 log.Fatal(ensureIndexErr)
 }
 // Insert people into mongodb
 // TODO: Find a less ugly way to do this
 // According to #go-nuts the only other way uses reflection, and I'd
 // rather not use reflection if I don't have to
 insertErr := collection.Insert(Person{row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19], row[20], row[21], row[22], row[23], row[24], row[25], row[26], row[27], row[28], row[29], row[30], row[31], row[32], row[33], row[34], row[35], row[36], time.Now()})
 // We refuse to gracefully fail as any incorrect inserts need to be
 // rectified manually to ensure our data is correct
 if insertErr != nil {
 log.Printf("InsertPerson : ERROR : %s\n", insertErr)
 log.Fatal(insertErr)
 }
}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Nov 19, 2014 at 7:50
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

I don't have lots of experience with Go, but here are some suggestions:

  1. Avoid global var

First, avoid global var as much as you can: channel and ctr could be declared in the main() function

  1. Wrong use of channel and goroutines

this line create a buffered channel containing all the rows

channel = make(chan []string, 4000000)

and for each row, you create a goroutine to make an insert. This is highly inneficient and make the code less easy to understand. I guess that you wanted to read the file and insert document in parallel to speed up the process. But this is definitely premature optimisation here, as you don't know wich part of the code is the bottleneck...

  1. Performance

Didn't even tried to run the provided code, I don't get how it achieve to finish: when a Personis inserted, the whole index is rebuild (ie 4000000 times ! ). This is very expensive and totally useless. If the collection is empty at the beginning, just create the index when all records have been inserted, this will speed up the insertion.

  1. Some other ideas:

    • Make sure that the collection is empty before importing with collection.DropCollection() ?
    • Some field from the CSV could be stored as int to reduce their size in database
    • use bulk.Unordered() to speed up the insert, but insertion errors will be ignored...
    • Avoid hard coded path, use flag to provide arguments like file or database

So the code could be :

package main
import (
 "encoding/csv"
 "fmt"
 "io"
 "log"
 "os"
 "time"
 "gopkg.in/mgo.v2"
)
// Set up primarily for ease of changing hosts if necessary
const (
 MongoDBHost = "127.0.0.1:27017"
)
// Person is the model struct for inserting into a mongodb
// Each member corresponds with a field in the VRDB, sans the last member
// The last member is for updating entries
// Parsing the csv files returns a []string per row
type Person struct {
 StateVoterId string
 CountyVoterId string
 Title string
 FName string
 MName string
 LName string
 NameSuffix string
 Birthdate string
 Gender string
 RegStNum string
 RegStFrac string
 RegStName string
 RegStType string
 RegUnitType string
 RegStPreDirection string
 RegStPostDirection string
 RegUnitNum string
 RegCity string
 RegState string
 RegZipCode string
 CountyCode string
 PrecinctCode string
 PrecinctPart string
 LegislativeDistrict string
 CongressionalDistrict string
 Mail1 string
 Mail2 string
 Mail3 string
 Mail4 string
 MailCity string
 MailZip string
 MailState string
 MailCountry string
 RegistrationDate string
 AbsenteeType string
 StatusCode string
 LastVoted string
 Updated time.Time
}
func main() {
 // Start our mongodb session
 mongoSession, err := mgo.Dial(MongoDBHost)
 if err != nil {
 log.Fatalf("CreateSession: %s\n", err.Error())
 }
 defer mongoSession.Close()
 mongoSession.SetMode(mgo.Monotonic, true)
 collection := mongoSession.DB("test").C("People")
 // Start loading and parsing our .csv file
 csvfile, err := os.Open("./file.csv")
 if err != nil {
 fmt.Println(err)
 }
 defer csvfile.Close()
 reader := csv.NewReader(csvfile)
 reader.Comma = '\t' // It's a tab-delimited file
 reader.LazyQuotes = true // Some fields are like \t"F" ST.\t
 reader.FieldsPerRecord = 0 // -1 is variable #, 0 is [0]th line's #
 reader.TrimLeadingSpace = false // Keep the fields' whitespace how it is
 bulk := collection.Bulk()
 count := 0
 for {
 row, err := reader.Read()
 if err == io.EOF {
 _, err = bulk.Run()
 if err != nil {
 log.Fatalf("bulk insert failed: %s\n", err.Error())
 }
 break
 }
 bulk.Insert(Person{row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19], row[20], row[21], row[22], row[23], row[24], row[25], row[26], row[27], row[28], row[29], row[30], row[31], row[32], row[33], row[34], row[35], row[36], time.Now()})
 count++
 if count%1000 == 0 {
 _, err = bulk.Run()
 if err != nil {
 log.Fatalf("bulk insert failed: %s\n", err.Error())
 }
 // reset the bulk
 bulk = collection.Bulk()
 fmt.Println(count)
 }
 }
 // create the index once all documents have been inserted
 index := mgo.Index{
 Key: []string{"StateVoterId"},
 Unique: true,
 DropDups: true,
 Background: true,
 Sparse: true,
 Name: "SoS Voter IDs",
 }
 err = collection.EnsureIndex(index)
 if err != nil {
 log.Fatal(err)
 }
 fmt.Println("Done")
}

or simply use

mongoimport --db test --collection People --type csv --headerline --file ../data/big-huge-csv-file.csv

(mongoimport is a tool provided by MongoDB to import data from different source, and is written in Go)

answered Jul 24, 2017 at 14:56
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.