I've started to learn Golang and channels in it. I decided to write simple application - recursive link checker. Given some URL it tries to retrieve pages, parses them and goes deeper.
Here's a code of first version.
Some questions:
I use counter
urlsInProcess
to understand when all tasks are done. But it looks a little bit awkward, isn't it?I launch manually several parsers and fetchers. Should I use here WaitGroups?
Not sure about error treatment. How to do it better?
How effective is variable transmitting between goroutines? Are there some unnecessary copying?
Has it some race conditions? For example can program got message from channel
chanTasksFinished
before than from channelchanTasksToFetch
? In this case we will exit before all tasks are done.
package main
import (
"fmt"
"golang.org/x/net/html"
"io/ioutil"
"log"
"net/http"
"strings"
)
type url string
type TaskStatus int
const (
_ TaskStatus = iota
TaskStatusNew
TaskStatusToParse
)
type Task struct {
url
depth int
resp *http.Response
//body *[]byte
}
var chanTasksToFetch = make(chan Task)
var chanTasksToParse = make(chan Task)
var chanFetchersIn = make(chan Task)
var chanTasksFinished = make(chan Task)
var mainWait = make(chan interface{})
var data = make(map[url]Task)
func getHref(t html.Token) (ok bool, href string) {
// iterate over all of the token's attribs
for _, a := range t.Attr {
if a.Key == "href" {
return true, a.Val
}
}
return
}
func extractLinks(page string) (urls []url) {
// extracts links from page and sends them into the url_channel
tokenizer := html.NewTokenizer(strings.NewReader(page))
for {
token_type := tokenizer.Next()
switch {
case token_type == html.ErrorToken:
log.Println(fmt.Sprintf("Error token_type: %s. Error: %s", token_type, tokenizer.Err()))
return
case token_type == html.StartTagToken:
token := tokenizer.Token()
if token.Data == "a" { // it's a link
log.Println("Link token: ", token)
_, href := getHref(token)
log.Println("A href=", href)
urls = append(urls, url(href))
log.Println("A href sent")
}
}
}
}
func parser(inChannel chan Task, toChannel chan Task) {
for task := range inChannel {
bytes, _ := ioutil.ReadAll(task.resp.Body)
defer task.resp.Body.Close()
var urls []url = extractLinks(string(bytes))
for _, u := range urls {
toChannel <- Task{url: u, depth: task.depth}
}
// after task is parsed we think it's done
chanTasksFinished <- task
}
}
func parsers(inChannel chan Task, toChannel chan Task) {
for i := 0; i < 5; i++ {
go parser(inChannel, toChannel)
}
}
// Fetches given Task and passes to channel
// inChannel - channel from which get new tasks
// toChannel - channel to which send fetched tasks
func fetcher(inChannel chan Task, toChannel chan Task) {
for task := range inChannel {
log.Println("Fetcher got task", task)
resp, err := http.Get(string(task.url))
if err != nil {
log.Println(fmt.Sprintf("Got error on url '%s': %s", task.url, err))
}
task.resp = resp
log.Println("Fetcher sends to channel", task)
toChannel <- task
}
}
// Launches fetcher tasks
func fetchers(inChannel chan Task, toChannel chan Task) {
//var wg sync.WaitGroup
for i := 0; i < 5; i++ {
go fetcher(inChannel, toChannel)
}
}
func taskDispatcher() {
log.Println("URL dispatcher started")
var urlsInProcess int = 0
LOOP:
for {
var task Task
select {
case task = <-chanTasksToFetch:
log.Println("TaskDispatcher got task to fetch", task)
if task.depth <= 0 {
log.Println("Depth is over for task", task)
continue
}
task.depth--
if _, present := data[task.url]; present {
// already here
log.Println("Task is already in map", task)
continue
}
// add task to map and launch it
data[task.url] = task
urlsInProcess++
chanFetchersIn <- task
case task = <-chanTasksFinished:
log.Println("Task finished: ", task)
if _, present := data[task.url]; !present {
log.Println("Finished task was not found in map!", task)
}
data[task.url] = task
urlsInProcess--
}
log.Println("Now urls in process", urlsInProcess)
if urlsInProcess == 0 {
log.Println("Time to exit from taskDispatcher")
mainWait <- nil
break LOOP
}
}
log.Println("taskDispatcher finished")
}
func main() {
go taskDispatcher()
go fetchers(chanFetchersIn, chanTasksToParse)
go parsers(chanTasksToParse, chanTasksToFetch)
chanTasksToFetch <- Task{url: "http://www.ru", depth: 2}
log.Println("wainting in mainWait")
<-mainWait
log.Println("Main exit")
}
1 Answer 1
Instead of reading the whole body to transform it later to a Reader (again):
bytes, _ := ioutil.ReadAll(task.resp.Body)
defer task.resp.Body.Close()
var urls []url = extractLinks(string(bytes))
(...)
func extractLinks(page string) (urls []url) {
// extracts links from page and sends them into the url_channel
tokenizer := html.NewTokenizer(strings.NewReader(page))
You could simply do
var urls []url = extractLinks(resp.Body)
(...)
func extractLinks(body io.ReadCloser) (urls []url) {
defer body.Close()
// extracts links from page and sends them into the url_channel
tokenizer := html.NewTokenizer(body)
When you parse the URL, you just take the href
. If it is a relative link (or a /question/123
), you won't be able to parse it further. I might need to use the url.Parse
function (beware of names collision with your url
type)
Instead of an int
for urlsInProcess
, you could use a WaitGroup
with wg.Add(1)
and .Done()
instead of ++
and --
.
You could then return this waitgroup, for the caller to do wg.Wait()
(but then you won't be able to know how many urls are currently in the work).
-
1\$\begingroup\$ This blog post is also worth a read: appliedgo.net/flow2go. And in this style (flow) I particularly like the way Egon implemented it: play.golang.org/p/Mq_eROYm-X. He let the
New***()
create the output channels and connects them togethersplit.In = hello.Out
in the main \$\endgroup\$oliverpool– oliverpool2017年04月10日 06:18:33 +00:00Commented Apr 10, 2017 at 6:18
Explore related questions
See similar questions with these tags.