Camera streaming app for the pizero, hobby project.
I want the cam to be sending frames only if there is an active client on the other end.
For this purpose the cam is listening on a port for client keep alive messages. If no messages are received for 5 seconds, camera stops sending frames.
I built the detector but I am not confident I did a really good job. Mainly because timeout detection is not consistent. Cam will detect a timeout somewhere between 5 and 30s.
Advice on how to improve is very much appreciated.
EDIT:
It was pointed out that the code was not working as I intended. Feedback in the comment section helped me fix the inconsistency.
Code sample is now working.
package server
import (
"fmt"
"log"
"net"
"strconv"
"time"
)
type session struct {
Conn *net.UDPConn
Message chan string
Password string
Address chan *net.UDPAddr
}
type monitor struct {
Conn *net.UDPConn
IsLive bool
Kill chan bool
LastMsg time.Time
ListenTick *time.Ticker
MonitorTick *time.Ticker
}
func newMonitor(port string) *monitor {
return &monitor{
Conn: bindAddress(port),
IsLive: false,
Kill: make(chan bool),
ListenTick: time.NewTicker(time.Second),
MonitorTick: time.NewTicker(time.Second),
}
}
func (m *monitor) listen() {
//program hangs here if no ticker is used. why???
for range m.ListenTick.C {
if !m.IsLive {
continue
}
buffer := make([]byte, 10)
n, err := m.Conn.Read(buffer)
if err != nil {
log.Println(err)
}
if len(buffer[0:n]) > 0 {
m.LastMsg = time.Now()
}
}
}
func (m *monitor) detectTimeOut() {
for range m.MonitorTick.C {
if !m.IsLive {
continue
}
if time.Since(m.LastMsg) > time.Second*5 {
fmt.Println("No response")
m.IsLive = false
m.Kill <- true
}
}
}
func Run(port, password *string, frame <-chan []byte, done <-chan bool) {
session := newSession(port, password)
go session.listenForClients()
fmt.Println("Listening for clients on: ", session.Conn.LocalAddr())
defer session.Conn.Close()
heartBeatPort := stringPortToInt(*port)
monitor := newMonitor(fmt.Sprintf(":%v", heartBeatPort+1))
go monitor.listen()
go monitor.detectTimeOut()
defer monitor.Conn.Close()
var address *net.UDPAddr
for {
select {
case <-done:
fmt.Println("Stopping server")
return
case address = <-session.Address:
fmt.Println("New client authenticated from address: ", address)
monitor.IsLive = true
monitor.LastMsg = time.Now()
case f := <-frame:
switch address {
case nil:
continue
default:
_, err := session.Conn.WriteToUDP(f, address)
if err != nil {
log.Println(err)
}
}
case <-monitor.Kill:
fmt.Println("Client timed out")
address = nil
default:
continue
}
}
}
func newSession(port, password *string) *session {
return &session{
Conn: bindAddress(*port),
Message: make(chan string),
Password: *password,
Address: make(chan *net.UDPAddr),
}
}
func (s session) listenForClients() {
for {
buf := make([]byte, 1024)
n, addr, err := s.Conn.ReadFromUDP(buf)
if err != nil {
log.Println(err)
}
m := buf[0:n]
if s.Password == "" {
s.Address <- addr
continue
}
if s.authenticate(string(m), addr) {
s.Address <- addr
}
}
}
func (s *session) authenticate(message string, address *net.UDPAddr) bool {
if s.Password == message {
_, _ = s.Conn.WriteToUDP([]byte("ok"), address)
return true
}
return false
}
func bindAddress(port string) *net.UDPConn {
laddr, err := net.ResolveUDPAddr("udp", port)
if err != nil {
log.Fatal(err)
}
listenAddr, err := net.ListenUDP("udp", laddr)
if err != nil {
log.Fatal(err)
}
return listenAddr
}
func stringPortToInt(number string) int {
n, err := strconv.Atoi(number[1:])
if err != nil {
log.Fatal(err)
}
return n
}
1 Answer 1
For the timeout detection, you could simplify your logic, by making use of the SetReadDeadline
. Thanks to this, your timers are not needed anymore.
Your monitor would then look like this:
type monitor struct {
Conn *net.UDPConn
Kill chan bool
}
func newMonitor(port string) *monitor {
return &monitor{
Conn: bindAddress(port),
Kill: make(chan bool),
}
}
func (m *monitor) detectTimeOut(delay time.Duration) {
buffer := make([]byte, 10)
m.Conn.SetReadDeadline(time.Now().Add(delay))
for {
n, err := m.Conn.Read(buffer)
if err != nil {
log.Println(err)
}
if n > 0 {
// something was read before the deadline
// let's delay the deadline
m.Conn.SetReadDeadline(time.Now().Add(delay))
}
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
// Timeout error
fmt.Println("No response")
m.Kill <- true
return
}
}
}
And since this method returns on timeout, it needs to be launched every time an new address listens:
heartBeatPort := stringPortToInt(*port)
monitor := newMonitor(fmt.Sprintf(":%v", heartBeatPort+1))
defer monitor.Conn.Close()
var address *net.UDPAddr
for {
select {
case <-done:
fmt.Println("Stopping server")
return
case address = <-session.Address:
fmt.Println("New client authenticated from address: ", address)
go monitor.detectTimeOut(time.Second)
case f := <-frame:
switch address {
case nil:
continue
default:
_, err := session.Conn.WriteToUDP(f, address)
if err != nil {
log.Println(err)
}
}
case <-monitor.Kill:
fmt.Println("Client timed out")
address = nil
default:
continue
}
}
This code code should work (I can't test it), but the way the "address" is managed could be more elegant (for example to manage in case of multiple listeners).
Here is a proposition, where each listener is in charge of forwarding the frames to a particular address, via a given connection.
The main loop is in charge of maintaining a slice of all those listeners and forward every frame to all of them (without blocking).
With this code, we see clearly, that only 2 connections are used (monitor and session), which might not be the best idea (I don't know how they behave when concurrent goroutines write data at the same time). To fix this, you should simply adapt the main loop.
type listener struct {
frames chan []byte
}
func NewListener() listener {
return listener{
frames: make(chan []byte),
}
}
func (l listener) forwardFrames(conn *net.UDPConn, addr *net.UDPAddr) {
for f := range l.frames {
_, err := conn.WriteToUDP(f, addr)
if err != nil {
log.Println(err)
}
}
}
func (l listener) newFramesChannel() chan<- []byte {
return l.frames
}
func (l listener) detectTimeOut(conn net.Conn, delay time.Duration, deadListener chan chan []byte) {
buffer := make([]byte, 10)
conn.SetReadDeadline(time.Now().Add(delay))
for {
n, err := conn.Read(buffer)
if err != nil {
log.Println(err)
}
if n > 0 {
// something was read before the deadline
// let's delay the deadline
conn.SetReadDeadline(time.Now().Add(delay))
}
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
// Timeout error
fmt.Println("No response")
deadListener <- l.frames
return
}
}
}
func Run(port, password *string, frame <-chan []byte, done <-chan bool) {
session := newSession(port, password)
go session.listenForClients()
fmt.Println("Listening for clients on: ", session.Conn.LocalAddr())
defer session.Conn.Close()
heartBeatPort := stringPortToInt(*port)
monitor := newMonitor(fmt.Sprintf(":%v", heartBeatPort+1))
defer monitor.Conn.Close()
listeners := make([](chan<- []byte), 0)
deadListener := make(chan chan []byte, 0)
for {
select {
case <-done:
fmt.Println("Stopping server")
for _, l := range listeners {
close(l)
}
return
case address := <-session.Address:
fmt.Println("New client authenticated from address: ", address)
l := NewListener()
go l.forwardFrames(session.Conn, address)
go l.detectTimeOut(monitor.Conn, time.Second, deadListener)
listeners = append(listeners, l.newFramesChannel())
case f := <-frame:
for _, l := range listeners {
// Send frame to each listener, without waiting
select {
case l <- f:
default:
}
}
case l := <-deadListener:
// remove l from the listeners slice
for i := 0; i < len(listeners); i++ {
if listeners[i] == l {
listeners[i] = listeners[len(listeners)-1]
listeners = listeners[:len(listeners)-1]
close(l)
break
}
}
}
}
}
for range m.Tick.C
read from the same channel : only one of them will get the generated ticks) \$\endgroup\$detector
loop. I edited the code on my question to reflect it. Code is now working as expected. \$\endgroup\$