I'm writing this as (eventually) part of a larger program. This will serve as the bulk data insertion from .csv files we parse.
I'm primarily looking for parts that are
- breaking Go's formatting, styling, etc. rules
- redundant and/or dangerous
Right now the code works and all 4 million rows of the .csv file were properly inserted into the database.
package main
import (
"encoding/csv"
"fmt"
"gopkg.in/mgo.v2"
"io"
"log"
"os"
"runtime"
"sync"
"time"
)
// Set up primarily for ease of changing hosts if necessary
const (
MongoDBHost = "127.0.0.1:27017"
)
// TODO: Check if channel's buffer size is necessary
// ctr increments for each row of the csv inserted (allows us to keep track)
var (
channel = make(chan []string, 4000000)
ctr int
)
// Person is the model struct for inserting into a mongodb
// Each member corresponds with a field in the VRDB, sans the last member
// The last member is for updating entries
// Parsing the csv files returns a []string per row
type Person struct {
StateVoterId string
CountyVoterId string
Title string
FName string
MName string
LName string
NameSuffix string
Birthdate string
Gender string
RegStNum string
RegStFrac string
RegStName string
RegStType string
RegUnitType string
RegStPreDirection string
RegStPostDirection string
RegUnitNum string
RegCity string
RegState string
RegZipCode string
CountyCode string
PrecinctCode string
PrecinctPart string
LegislativeDistrict string
CongressionalDistrict string
Mail1 string
Mail2 string
Mail3 string
Mail4 string
MailCity string
MailZip string
MailState string
MailCountry string
RegistrationDate string
AbsenteeType string
StatusCode string
LastVoted string
Updated time.Time
}
func main() {
runtime.GOMAXPROCS(4) // Supposedly makes things faster
var wg sync.WaitGroup
// Start our mongodb session
mongoSession, err := mgo.Dial(MongoDBHost)
if err != nil {
log.Fatalf("CreateSession: %s\n", err)
}
mongoSession.SetMode(mgo.Monotonic, true)
// Start loading and parsing our .csv file
// testing file: csv.csv
// current file: big-huge-csv-file.csv
csvfile, err := os.Open("../data/big-huge-csv-file.csv")
if err != nil {
fmt.Println(err)
}
defer csvfile.Close()
reader := csv.NewReader(csvfile)
reader.Comma = '\t' // It's a tab-delimited file
reader.LazyQuotes = true // Some fields are like \t"F" ST.\t
reader.FieldsPerRecord = 0 // -1 is variable #, 0 is [0]th line's #
reader.TrimLeadingSpace = false // Keep the fields' whitespace how it is
Loop:
for {
// Add another goroutine to our wait group
wg.Add(1)
// Increment our counter per row
ctr++
data, err := reader.Read()
switch err {
case io.EOF:
fmt.Println("Finished reading CSV (hit EOF)")
break Loop
case nil:
channel <- data
go InsertPerson(channel, mongoSession, &wg, ctr)
// Handles all cases where err != EOF || err != nil
default:
fmt.Printf("Error while reading %s: %s\n", csvfile, err)
log.Fatal(err)
}
wg.Wait()
}
fmt.Println("Done")
}
func InsertPerson(c chan []string, mongoSession *mgo.Session, wg *sync.WaitGroup, ctr int) {
// Decrement wg counter when func finishes
defer wg.Done()
// Shows us our progress in increments of 5,000
if ctr%5000 == 0 {
fmt.Println(ctr)
}
// Receive from our channel
row := <-c
// Setting up our db connections
sessionCopy := mongoSession.Copy()
defer sessionCopy.Close()
// TODO: Change from "test" to prod db
collection := mongoSession.DB("test").C("People")
index := mgo.Index{
Key: []string{"StateVoterId"},
Unique: true,
DropDups: true,
Background: true,
Sparse: true,
Name: "SoS Voter IDs",
}
ensureIndexErr := collection.EnsureIndex(index)
if ensureIndexErr != nil {
log.Fatal(ensureIndexErr)
}
// Insert people into mongodb
// TODO: Find a less ugly way to do this
// According to #go-nuts the only other way uses reflection, and I'd
// rather not use reflection if I don't have to
insertErr := collection.Insert(Person{row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19], row[20], row[21], row[22], row[23], row[24], row[25], row[26], row[27], row[28], row[29], row[30], row[31], row[32], row[33], row[34], row[35], row[36], time.Now()})
// We refuse to gracefully fail as any incorrect inserts need to be
// rectified manually to ensure our data is correct
if insertErr != nil {
log.Printf("InsertPerson : ERROR : %s\n", insertErr)
log.Fatal(insertErr)
}
}
1 Answer 1
I don't have lots of experience with Go, but here are some suggestions:
- Avoid global var
First, avoid global var as much as you can: channel
and ctr
could be declared in the main()
function
- Wrong use of channel and goroutines
this line create a buffered channel containing all the rows
channel = make(chan []string, 4000000)
and for each row, you create a goroutine to make an insert. This is highly inneficient and make the code less easy to understand. I guess that you wanted to read the file and insert document in parallel to speed up the process. But this is definitely premature optimisation here, as you don't know wich part of the code is the bottleneck...
- Performance
Didn't even tried to run the provided code, I don't get how it achieve to finish:
when a Person
is inserted, the whole index is rebuild (ie 4000000 times ! ). This is very expensive and totally useless.
If the collection is empty at the beginning, just create the index when all records have been inserted, this will speed up the insertion.
Some other ideas:
- Make sure that the collection is empty before importing with
collection.DropCollection()
? - Some field from the CSV could be stored as int to reduce their size in database
- use
bulk.Unordered()
to speed up the insert, but insertion errors will be ignored... - Avoid hard coded path, use flag to provide arguments like file or database
- Make sure that the collection is empty before importing with
So the code could be :
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"time"
"gopkg.in/mgo.v2"
)
// Set up primarily for ease of changing hosts if necessary
const (
MongoDBHost = "127.0.0.1:27017"
)
// Person is the model struct for inserting into a mongodb
// Each member corresponds with a field in the VRDB, sans the last member
// The last member is for updating entries
// Parsing the csv files returns a []string per row
type Person struct {
StateVoterId string
CountyVoterId string
Title string
FName string
MName string
LName string
NameSuffix string
Birthdate string
Gender string
RegStNum string
RegStFrac string
RegStName string
RegStType string
RegUnitType string
RegStPreDirection string
RegStPostDirection string
RegUnitNum string
RegCity string
RegState string
RegZipCode string
CountyCode string
PrecinctCode string
PrecinctPart string
LegislativeDistrict string
CongressionalDistrict string
Mail1 string
Mail2 string
Mail3 string
Mail4 string
MailCity string
MailZip string
MailState string
MailCountry string
RegistrationDate string
AbsenteeType string
StatusCode string
LastVoted string
Updated time.Time
}
func main() {
// Start our mongodb session
mongoSession, err := mgo.Dial(MongoDBHost)
if err != nil {
log.Fatalf("CreateSession: %s\n", err.Error())
}
defer mongoSession.Close()
mongoSession.SetMode(mgo.Monotonic, true)
collection := mongoSession.DB("test").C("People")
// Start loading and parsing our .csv file
csvfile, err := os.Open("./file.csv")
if err != nil {
fmt.Println(err)
}
defer csvfile.Close()
reader := csv.NewReader(csvfile)
reader.Comma = '\t' // It's a tab-delimited file
reader.LazyQuotes = true // Some fields are like \t"F" ST.\t
reader.FieldsPerRecord = 0 // -1 is variable #, 0 is [0]th line's #
reader.TrimLeadingSpace = false // Keep the fields' whitespace how it is
bulk := collection.Bulk()
count := 0
for {
row, err := reader.Read()
if err == io.EOF {
_, err = bulk.Run()
if err != nil {
log.Fatalf("bulk insert failed: %s\n", err.Error())
}
break
}
bulk.Insert(Person{row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19], row[20], row[21], row[22], row[23], row[24], row[25], row[26], row[27], row[28], row[29], row[30], row[31], row[32], row[33], row[34], row[35], row[36], time.Now()})
count++
if count%1000 == 0 {
_, err = bulk.Run()
if err != nil {
log.Fatalf("bulk insert failed: %s\n", err.Error())
}
// reset the bulk
bulk = collection.Bulk()
fmt.Println(count)
}
}
// create the index once all documents have been inserted
index := mgo.Index{
Key: []string{"StateVoterId"},
Unique: true,
DropDups: true,
Background: true,
Sparse: true,
Name: "SoS Voter IDs",
}
err = collection.EnsureIndex(index)
if err != nil {
log.Fatal(err)
}
fmt.Println("Done")
}
or simply use
mongoimport --db test --collection People --type csv --headerline --file ../data/big-huge-csv-file.csv
(mongoimport is a tool provided by MongoDB to import data from different source, and is written in Go)