The TCP server in the code snippet below is designed to:
- Read data using one thread while writing data with another thread.
- Allow only one client to be connected at a time.
- Automatically close the connection with the client and re-establish it if the connection is lost.
std::mutex
is used to protected the socket fd which is read/written by two threads. Is it a better way to achieve this goal without ASIO or other third party libraries?
@Guntram Blohm said it's perfectly ok to read and write a socket from two threads at the same time.
However, m_socket_fd
is used by threads which both read and write it(i.e. TcpServer::send()
may be still called while the value of m_socket_fd
is modified after accept() sucessfully returns and TcpServer::close
may be called at the same time by either of the two threads without the mutex).
Here is the code snippet:
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <array>
#include <chrono>
#include <thread>
namespace Network {
bool is_valid_ip(const std::string& input_ip);
int set_nonblock(int sockfd);
int enb_reuse_addr_port(int listen_fd);
int set_send_timeout(int socket_fd, struct timeval timeout);
int set_recv_timeout(int socket_fd, struct timeval timeout);
int disable_nagle(int socket_fd);
} // namespace Network
bool Network::is_valid_ip(const std::string& input_ip) {
int num_point = 3;
int temp_num = -1;
if (input_ip.empty()) {
return false;
}
for (auto it = input_ip.begin(); it != input_ip.end(); it++) {
if (*it == '.') {
num_point--;
if (temp_num < 0 || temp_num > 255) {
return false;
}
temp_num = -1;
if (num_point >= 0) {
continue;
} else {
return false;
}
}
if (*it < '0' || *it > '9') {
return false;
}
temp_num == -1 ? temp_num = *it - '0' : temp_num = *it - '0' + temp_num * 10;
}
if (num_point != 0 || (temp_num < 0 || temp_num > 255)) {
return false;
}
return true;
}
int Network::set_nonblock(int sockfd) {
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
return 0;
}
int Network::enb_reuse_addr_port(int listen_fd) {
int opt = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
return -1;
}
return 0;
}
int Network::set_send_timeout(int socket_fd, struct timeval timeout) {
return setsockopt(socket_fd, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout, sizeof(struct timeval));
}
int Network::set_recv_timeout(int socket_fd, struct timeval timeout) {
return setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(struct timeval));
}
int Network::disable_nagle(int socket_fd) {
int nagle_status = 1;
return setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, (char*)&nagle_status, sizeof(int));
}
class TcpServer {
public:
TcpServer(const std::string& ip,
unsigned short port,
unsigned int send_timeout, //microseconds
unsigned int recv_timeout); //microseconds
AlgTcpServer(const AlgTcpServer&) = delete;
AlgTcpServer& operator=(const AlgTcpServer&) = delete;
int start();
int process();
int send(const std::vector<char>& data);
int close();
private:
const std::string m_ip;
const unsigned short m_port;
const unsigned m_recv_timeout;
const unsigned m_send_timeout;
int m_listen_fd;
int m_socket_fd;
std::vector<char> m_data;
std::mutex m_socket_mtx;
};
TcpServer::TcpServer(const std::string& ip,
unsigned short port,
unsigned int send_timeout,
unsigned int recv_timeout)
: m_ip(ip), m_port(port), m_recv_timeout(recv_timeout), m_send_timeout(send_timeout),
m_listen_fd(-1), m_socket_fd(-1) {}
int TcpServer::start() {
if (!Network::is_valid_ip(m_ip)) {
return -1;
}
while (true) {
m_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (m_listen_fd < 0) {
return -2;
}
Network::enb_reuse_addr_port(m_listen_fd);
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(server_sockaddr));
server_sockaddr.sin_family = AF_INET;
in_addr_t serviceIp = inet_addr(m_ip.c_str());
server_sockaddr.sin_addr.s_addr = serviceIp;
server_sockaddr.sin_port = htons(m_port);
if (bind(m_listen_fd, (struct sockaddr*)&server_sockaddr, sizeof(server_sockaddr)) < 0) {
perror("bind failed");
return -3;
}
if (listen(m_listen_fd, 1) < 0) {
perror("listen failed");
return -4;
}
while (true) {
struct sockaddr_in client_address;
socklen_t client_address_len = sizeof(client_address);
int socket_fd =
accept(m_listen_fd, (struct sockaddr*)&client_address, &client_address_len);
if (socket_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
} else {
perror("accept failed");
::close(m_listen_fd);
return -4;
}
} else {
Network::disable_nagle(socket_fd);
struct timeval timeout_recv;
timeout_recv.tv_sec = 0;
timeout_recv.tv_usec = m_recv_timeout;
Network::set_recv_timeout(socket_fd, timeout_recv);
struct timeval timeout_send;
timeout_send.tv_sec = 0;
timeout_send.tv_usec = m_send_timeout;
Network::set_send_timeout(socket_fd, timeout_send);
::close(m_listen_fd);
m_listen_fd = -1;
{
//m_socket_fd also used by another thread to send out data
std::lock_guard<std::mutex> lock(m_socket_mtx);
m_socket_fd = socket_fd;
}
break;
}
}
process();
}
return 0;
}
int TcpServer::process() {
int ret = 0;
std::cout << "enter " << __PRETTY_FUNCTION__ << std::endl;
std::array<char, 100> tmp;
while (true) {
tmp.fill('0円');
int ret = 0;
{
std::lock_guard<std::mutex> lock(m_socket_mtx);
if (m_socket_fd < 0) {
std::cout << "read: invalid fd" << std::endl;
return -1;
}
ret = recv(m_socket_fd, tmp.data(), tmp.size(), 0);
}
if (ret < 0) {
if (errno != EAGAIN || errno != EWOULDBLOCK) {
perror("recv failed");
ret = -2;
break;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
} else if (ret == 0) {
{
std::lock_guard<std::mutex> lock(m_socket_mtx);
this->close();
}
std::cout << "other side has been closed" << std::endl;
ret = -3;
break;
} else {
std::cout << tmp.data() << std::endl;
}
}
return ret;
}
int TcpServer::close() {
if (m_socket_fd > 0) {
::close(m_socket_fd);
shutdown(m_socket_fd, SHUT_RDWR);
m_socket_fd = -1;
}
return 0;
}
int TcpServer::send(const std::vector<char>& data) {
if (data.empty()) {
return -1;
}
const size_t len = data.size();
size_t num_sended = 0;
std::lock_guard<std::mutex> lock(m_socket_mtx);
if (m_socket_fd < 0) {
std::cout << "m_socket_fd is not valid" << std::endl;
return -2;
}
while (num_sended < len) {
ssize_t res = ::send(m_socket_fd, data.data() + num_sended, len - num_sended, MSG_NOSIGNAL);
if (res < 0) {
perror("send failed");
this->close();
break;
}
num_sended += res;
}
return 0;
}
int main() {
TcpServer server("127.0.0.1", 6666, 100, 50);
std::thread manager_and_reciver([&server]() { server.start(); });
std::thread sender([&server]() {
while (true) {
std::string str("hello, world!");
std::vector<char> data{str.begin(), str.end()};
server.send(data);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
};
});
manager_and_reciver.join();
sender.join();
}
2 Answers 2
It's perfectly ok to read and write a socket from two threads at the same time.
You don't need to mutex anything, just have one thread read and one thread write. But if the point of the program was learning about threads and mutexes, that's OK. If you have, for example, two writers and don't want them to interfere with each other, you might want to mutex them, or read up about which write operations of which size on which kind of socket are or aren't atomic.
If you use prefixes to variable names, explain them.
It seems like all your private variables have a m_
prefix. Why? Just so you can write something like m_ip(ip)
in your TcpServer
constructor? Why m_
, not p_
for private? If you use a convention like this, explain it somewhere. Preferably in a style guide, not your code, but we don't have your style guide here.
(Personally, I don't like prefixes at all if they aren't used to group variables (all a_xxx
are one thing, and all b_xxx
are some other thing.) But we don't need to go down the rabbithole of why they do or don't make sense here).
Don't parse IP addresses.
There's a ready made getaddrinfo
which can accept anything - ipv4 address, ipv6 address, hostname - and turn that into everything you need for bind
, listen
, accept
and connect
. Saves you a lot of time, code, and possible bugs. Except, of course, if "how do I turn an ip address into something I need for networking calls" is part of your learning goal.
Don't use busy loops when reading from the network.
Don't read/recv
, and sleep for a random amount of time in case of EAGAIN
or EWOULDBLOCK
. Use select
or poll
which will return once data is there.
Pay attention when to close
and when to shutdown
a socket.
shutdown
allows you to shutdown individual directions of a socket; close
implies a bidirectional shutdown
. You can't do anything with a socket after close
, your shutdown
in TcpServer::close
won't do anything. Except if you're really unlucky and some other part of the application, on some other thread, opens a new socket in between those operations, gets the same file descriptior, and promptly has its socket shutdown by you. Those are the sporadic errors that are extremely hard to find.
Your EAGAIN/EWOULDBLOCK
logic is wrong.
if (errno != EAGAIN || errno != EWOULDBLOCK) {
should have &&
, not ||
. In your version, even if you get EAGAIN
, the if
would be true because of the second half EAGAIN != EWOULDBLOCK
.
Consider failing hard if you detect what has to be a logic problem in your application.
Your TcpServer::send
checks if m_socket_fd
is initialized, writes some message out if it is not, and returns. From what I see (and I might be missing some mutex locks), this can happen if your sender thread starts quickly before the manager/receiver has initialized the socket, which will result in some log spam.
So, either, do not log this condition at all, and slap on some comment // this is ok to happen during initialization
, or if you think it should never happen at all, use exit
or abort
(which will mean it'll be much more prominent when it fails during testing) or throw an exception.
Imagine you're sending a file instead of repeating "hello world" over and over; some of your transmitted files might be missing the first (few) line(s), which you'll realize weeks or months later and might be very hard to debug. If your program crashes in those cases, you'll at least notice immediately and probably not lose data.
Is it a better way to achieve this goal without ASIO or other third party libraries?
Personally, I wouldn't have implemented this in separate threads at all; I'd have written a main loop that poll
s the listener and connection sockets, accept
s, recv
s and send
s depending on the result of poll
, and uses the poll
timeout to time the send
s to one every 100 ms. But that's far from your implementation so will probably miss your learning goals.
If you're trying to write a small application, using basic stuff like a mutex
is completely ok, and I consider that much better than pulling in a third party library, of which you use 1%, but which blows up your final code considerably, and gives you a headache every half year when you need to update it.
-
1\$\begingroup\$ sidenote:
: ip(ip)
actually works as expected(?), no need to name either any differently. \$\endgroup\$Remember Monica– Remember Monica2024年06月25日 23:11:32 +00:00Commented Jun 25, 2024 at 23:11 -
\$\begingroup\$ Thanke you. 1. You said it's perfectly ok to read and write a socket from two threads at the same time. However,
m_socket_fd
is used by threads which both read and write it(i.e.TcpServer::send()
may be still called while the value ofm_socket_fd
is modified afteraccept()
sucessfully returns andTcpServer::close
may be called by either of the two threads). 2. You said read up about which write operations of which size on which kind of socket are or aren't atomic. Sorry I can't get it.Could you please explain that for me? 3.m_
prefix is short for member. \$\endgroup\$John– John2024年06月26日 07:07:36 +00:00Commented Jun 26, 2024 at 7:07 -
\$\begingroup\$
TcpServer::send
is used to send message every 30ms. Losing some messages is acceptable. \$\endgroup\$John– John2024年06月26日 12:56:21 +00:00Commented Jun 26, 2024 at 12:56 -
\$\begingroup\$ @John: re your 2) If you have a UDP socket, and have two threads send smallish packets over it, you're guaranteed that packets from those threads stay intact. You won't get partial packets which are resumed after a packet from another thread. If it's a TCP socket, and you're writing large chunks, especially when buffers run full, there's a good chance your chunks will be split. When exactly depends on many factors, so you might need a mutex, might want a mutex just to be certain, or know you don't need a mutex. \$\endgroup\$Guntram Blohm– Guntram Blohm2024年06月26日 16:14:37 +00:00Commented Jun 26, 2024 at 16:14
-
\$\begingroup\$ @GuntramBlohm If you have a UDP socket, and have two threads send smallish packets over it, you're guaranteed that packets from those threads stay intact. What about long message via UDP? It's hard to say how many bytes could be smallish. So even for the UDP, mutex is still needed if it would be sent by multiple threads. Am I right? \$\endgroup\$John– John2024年06月27日 03:06:55 +00:00Commented Jun 27, 2024 at 3:06
fcntl()
can fail:
According to the Linux man page:
On error, -1 is returned, and
errno
is set to indicate the error.
You're assuming in the below code that it can never fail:
int Network::set_nonblock(int sockfd) {
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
return 0;
}
setsockopt()
can also fail:
Network::set_recv_timeout(socket_fd, timeout_recv);
...
Network::set_send_timeout(socket_fd, timeout_send);
What is the point of returning a value if you are simply going to ignore it?
I also do not see why these wrappers are returning an int
and not a bool
.
Simplify:
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(server_sockaddr));
is better as:
struct sockaddr_in server_sockaddr {0};
And:
temp_num == -1 ? temp_num = *it - '0' : temp_num = *it - '0' + temp_num * 10;
is better as:
temp_num = temp_num == -1 ? *it - '0' : *it - '0' + temp_num * 10;
And:
int Network::enb_reuse_addr_port(int listen_fd) {
int opt = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
return -1;
}
return 0;
}
is better as:
int Network::enb_reuse_addr_port(int listen_fd) {
int opt = 1;
return setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
According to setsockopt()
's man page:
Upon successful completion,
setsockopt()
shall return 0. Otherwise, -1 shall be returned anderrno
set to indicate the error.
Do not return magic negative values:
-1, -2, -3, etc. have zero context. Use an enumerated type; Network::accept_failed
is easier to read, understand, and memorise than some random signed integer. `Moreover, the class member functions should not be printing random error messages to standard error, especially undocumented error messages. Leave the decision to the caller.
Do not unnecessarily flush the output buffer:
std::cout << "other side has been closed" << std::endl;
std::endl
has two purposes, to print a newline, and to flush the underlying output buffer. You do not require manual flushing with each call to std::cout
. But I believe std::cerr
/std::clog
would be more appropriate to use here.
-
\$\begingroup\$ Thank you for the detailed explanation. How do you think about the usage of
mutex
which is mentioned in bold in the question? \$\endgroup\$John– John2024年06月25日 08:50:03 +00:00Commented Jun 25, 2024 at 8:50 -
\$\begingroup\$ @John You take me out of my country there. But C++ has plenty of reviewers, and that can be addressed by others. \$\endgroup\$Madagascar– Madagascar2024年06月25日 08:52:21 +00:00Commented Jun 25, 2024 at 8:52
-
\$\begingroup\$ How to properly handle the errors when
fcntl
andsetsockopt
fail? I need 1.set the socket as non-blocking and the address 2. set suitable timeout for reading and writing 3. reuse the address and port. \$\endgroup\$John– John2024年06月25日 09:07:19 +00:00Commented Jun 25, 2024 at 9:07 -
\$\begingroup\$ For
fcntl()
, you'd simply fail. Forsetsockopt()
, you may wish to retry onENOBUFS
, and fail onENOMEM
and other fatal errors. See the respective man pages. \$\endgroup\$Madagascar– Madagascar2024年06月25日 09:15:58 +00:00Commented Jun 25, 2024 at 9:15 -
\$\begingroup\$ On reading the man page, while
fcntl()
can fail, thatfcntl()
cannot. \$\endgroup\$Joshua– Joshua2024年06月26日 16:24:23 +00:00Commented Jun 26, 2024 at 16:24
Explore related questions
See similar questions with these tags.