I wrote a camera client that will read a video stream and write to a player for playback.
I have some basic keyboard controls for recording the stream.
I coded the client with a single video stream in mind.
I refactored the code so I can have playback from multiple streams but I am having trouble with the keyboard controls.
I am really struggling with figuring out a model for properly communicating to the client from which camera it should record.
Below the code.
All of it works, but with the current model the client records from a random camera.
main.go
package main
import (
"apps/piClient/auth"
"apps/piClient/client"
"bufio"
"fmt"
"log"
"os"
"os/exec"
"sync"
"syscall"
"github.com/BurntSushi/toml"
)
func main() {
var config client.Configuration
_, err := toml.DecodeFile("config.toml", &config)
if err != nil {
log.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(1) //<- Extra points if you can tell me how to handle
//my goroutines. All are never ending for loops
record := make(chan string)
for id, camera := range config.Cameras {
h264 := make(chan client.Frame)
num := fmt.Sprintf("%v", id)
stream := "stream" + num
fromClientPort := ":808" + num
err = syscall.Mkfifo(stream, 0775)
if err != nil {
log.Println(err)
}
pipe, err := os.OpenFile(stream, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
log.Fatal(err)
}
w := bufio.NewWriter(pipe)
conn, err := auth.Login(camera.Address, camera.Password, fromClientPort, camera.Retries)
if err != nil {
log.Println(err)
continue
}
defer conn.Close()
go client.Read(conn, h264)
go client.Write(w, h264, record)
go playerRun(stream)
fmt.Printf("Press 'r + return' to start recording.\nPress 's + return' to stop recording.\n")
go func() {
reader := bufio.NewReader(os.Stdin)
for {
char, _, err := reader.ReadRune()
if err != nil {
log.Fatal(err)
}
switch char {
case 'r':
record <- "start"
char = '\x00'
case 's':
record <- "stop"
char = '\x00'
}
}
}()
}
wg.Wait()
}
func playerRun(stream string) {
cmd := exec.Command("./play", stream)
//cmd.Stderr = os.Stdout
//cmd.Stdout = os.Stdout
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
}
client.go
package client
import (
"bufio"
"fmt"
"log"
"net"
"os"
"strings"
"time"
)
type Configuration struct {
Cameras []Camera
}
type Camera struct {
Address string
Retries int
Password string
}
type state bool
type Frame []byte
func Read(conn *net.UDPConn, output chan<- Frame) {
for {
buf := make([]byte, 65507)
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
if n > 0 {
output <- buf[0:n]
}
}
}
func Write(w *bufio.Writer, input <-chan Frame, record <-chan string) {
var recording state
var fw *bufio.Writer
var file *os.File
for {
select {
case action := <-record:
switch action {
case "start":
if !recording {
fw, file = newFileWriter()
recording = true
fmt.Println("recording...")
} else {
fmt.Println("client is already recording")
}
case "stop":
if recording {
fw.Flush()
file.Close()
recording = false
fmt.Println("recording stopped")
} else {
fmt.Println("client is not recording")
}
}
case h264 := <-input:
_, err := w.Write(h264)
if err != nil {
log.Fatal(err)
}
if recording {
_, err = fw.Write(h264)
if err != nil {
log.Fatal(err)
}
}
}
}
}
func newFileWriter() (*bufio.Writer, *os.File) {
now := time.Now()
dir := "video/"
fname := fmt.Sprintf("%v.h264", now)
fname = strings.Replace(fname, " ", "-", -1)
fname = strings.Replace(fname, ":", "-", -1)
file, err := os.Create(dir + fname)
if err != nil {
log.Fatal(err)
}
fw := bufio.NewWriter(file)
return fw, file
}
1 Answer 1
- I got rid of WaitGroup and instead used a channel that all goroutines listen to in case there is an explicit kill.
- You can specify which camera to record from cmd line itself
package main
import (
"apps/piClient/auth"
"apps/piClient/client"
"bufio"
"fmt"
"log"
"os"
"os/exec"
"syscall"
"strconv"
"strings"
"os/signal"
"time"
"github.com/BurntSushi/toml"
)
func main() {
var config client.Configuration
_, err := toml.DecodeFile("config.toml", &config)
if err != nil {
log.Fatal(err)
}
done := make(chan bool)
recorder := make(map[int]chan<- string)
for id, camera := range config.Cameras {
stream := fmt.Sprintf("stream%d", id)
fromClientPort := fmt.Sprintf(":808%d", id)
err = syscall.Mkfifo(stream, 0775)
if err != nil {
log.Println(err)
continue
}
pipe, err := os.OpenFile(stream, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
log.Println(err)
continue
}
w := bufio.NewWriter(pipe)
conn, err := auth.Login(camera.Address, camera.Password, fromClientPort, camera.Retries)
if err != nil {
log.Println(err)
continue
}
h264 := make(chan client.Frame)
c := client.NewClient(id, camera)
record := make(chan string)
go c.Read(conn, h264, done)
go c.Write(w, h264, record, done)
go playerRun(stream)
recorder[id] = record
fmt.Printf(
"Press 'r%d + return' to start recording.\nPress 's%d + return' to stop recording.\n",
id, id)
}
go func(done <-chan bool) {
reader := bufio.NewReader(os.Stdin)
for {
select {
case <-done:
return
default:
line, _, _ := reader.ReadLine()
data := strings.Split(string(line), "")
id, err := strconv.Atoi(data[1])
if err != nil {
log.Println(err)
continue
}
recordChan := recorder[id]
switch data[0] {
case 'r':
recordChan <- "start"
case 's':
recordChan <- "stop"
}
}
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for range c {
close(done)
break
}
// wait for clean up
time.Sleep(2 * time.Second)
}
func playerRun(stream string) {
cmd := exec.Command("./play", stream)
//cmd.Stderr = os.Stdout
//cmd.Stdout = os.Stdout
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
}
package client
import (
"bufio"
"fmt"
"log"
"net"
"os"
"strings"
"time"
)
type Client struct {
ID int
Camera Camera
recording state
}
func NewClient(id int, c Camera) *Client {
return &Client{
ID: id,
Camera: c,
}
}
type Configuration struct {
Cameras []Camera
}
type Camera struct {
Address string
Retries int
Password string
}
type state bool
type Frame []byte
func (client *Client) Read(conn *net.UDPConn, output chan<- Frame, done <-chan bool) {
for {
select {
case <-done:
conn.Close()
return
default:
buf := make([]byte, 65507)
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
conn.Close()
log.Fatalf("Client %d failed with error %v", client.ID, err)
}
if n > 0 {
output <- buf[0:n]
}
}
}
}
func (client *Client) Write(w *bufio.Writer, input <-chan Frame, record <-chan string, done <-chan bool) {
var fw *bufio.Writer
var file *os.File
for {
select {
case <-done:
w.Flush()
if client.recording {
client.recording = false
fw.Flush()
file.Close()
}
return
case action := <-record:
switch action {
case "start":
if !client.recording {
fw, file = newFileWriter()
client.recording = true
fmt.Println("recording...")
} else {
fmt.Println("client is already recording")
}
case "stop":
if client.recording {
fw.Flush()
file.Close()
client.recording = false
fmt.Println("recording stopped")
} else {
fmt.Println("client is not recording")
}
}
case h264 := <-input:
_, err := w.Write(h264)
if err != nil {
log.Fatal(err)
}
if client.recording {
_, err = fw.Write(h264)
if err != nil {
log.Fatal(err)
}
}
}
}
}
func newFileWriter() (*bufio.Writer, *os.File) {
now := time.Now()
dir := "video/"
fname := fmt.Sprintf("%v.h264", now)
fname = strings.Replace(fname, " ", "-", -1)
fname = strings.Replace(fname, ":", "-", -1)
file, err := os.Create(dir + fname)
if err != nil {
log.Fatal(err)
}
fw := bufio.NewWriter(file)
return fw, file
}