From c78934a448d0126709fccec3d5a636b3baa87da4 Mon Sep 17 00:00:00 2001 From: Charles McGarvey Date: Sat, 15 May 2010 01:10:43 -0600 Subject: [PATCH] sockets documentation and cleanup --- src/Moof/Dispatch.cc | 8 +- src/Moof/Dispatch.hh | 16 +- src/Moof/Service.cc | 13 +- src/Moof/Service.hh | 55 ++- src/Moof/Socket.hh | 781 +++++++++++++++++++++++++++++++------------ src/Moof/Thread.hh | 504 ++++++++++++++++++++++------ 6 files changed, 1034 insertions(+), 343 deletions(-) diff --git a/src/Moof/Dispatch.cc b/src/Moof/Dispatch.cc index 83fecae..af0faea 100644 --- a/src/Moof/Dispatch.cc +++ b/src/Moof/Dispatch.cc @@ -72,7 +72,7 @@ public: mHandles.erase(id); } - void dispatch(const std::string& event, const Message* message) + void dispatch(const std::string& event) { std::pair callbacks(mCallbacks.equal_range(event)); @@ -80,7 +80,7 @@ public: for (CallbackIt it = callbacks.first; it != callbacks.second; ++it) { Function callback = (*it).second.second; - callback(message); + callback(); } } @@ -131,10 +131,10 @@ void Dispatch::removeTarget(unsigned id) } -void Dispatch::dispatch(const std::string& event, const Message* message) +void Dispatch::dispatch(const std::string& event) { // pass through - mImpl->dispatch(event, message); + mImpl->dispatch(event); } diff --git a/src/Moof/Dispatch.hh b/src/Moof/Dispatch.hh index 4911c12..e357f02 100644 --- a/src/Moof/Dispatch.hh +++ b/src/Moof/Dispatch.hh @@ -34,18 +34,8 @@ class Dispatch void removeTarget(unsigned id); -public: - - /** - * Interface for a notification class. - */ - - class Message - { - public: - virtual ~Message() {}; - }; +public: class Handle { @@ -92,7 +82,7 @@ public: mutable unsigned mId; }; - typedef boost::function Function; + typedef boost::function Function; Dispatch(); @@ -101,7 +91,7 @@ public: Handle addTarget(const std::string& event, const Function& callback, Handle handle); - void dispatch(const std::string& event, const Message* message = 0); + void dispatch(const std::string& event); static Dispatch& global(); }; diff --git a/src/Moof/Service.cc b/src/Moof/Service.cc index 43689af..a5fae0f 100644 --- a/src/Moof/Service.cc +++ b/src/Moof/Service.cc @@ -15,20 +15,11 @@ namespace Mf { -ServiceBroadcaster::ServiceBroadcaster(const std::string& name) +ServiceFinder::ServiceFinder(const std::string& service, int type) { } -void ServiceBroadcaster::update(Scalar t, Scalar dt) -{ -} - - -ServiceLocator::ServiceLocator(const std::string& name) -{ -} - -void ServiceLocator::update(Scalar t, Scalar dt) +void ServiceFinder::update(Scalar t, Scalar dt) { } diff --git a/src/Moof/Service.hh b/src/Moof/Service.hh index 5870d57..7c5e6b7 100644 --- a/src/Moof/Service.hh +++ b/src/Moof/Service.hh @@ -12,29 +12,76 @@ #ifndef _MOOF_SERVICE_HH_ #define _MOOF_SERVICE_HH_ +#include + #include +#include namespace Mf { -class ServiceBroadcaster +/** + * Class representing a network service. + */ +class Service { public: - ServiceBroadcaster(const std::string& name); + /** + * Construct a network service. + * \param address The address of the host. + * \param text The service information. + */ + Service(const SocketAddress& address, const std::string& text); + + + /** + * Get the host address. + * \return The address. + */ + const SocketAddress& address() const; + + /** + * Get the service information. + * \return The service information as a string. + */ + const std::string& text() const; + + + /** + * Publish the service on the network. + */ + void publish(); void update(Scalar t, Scalar dt); + + +private: + + SocketAddress mAddress; + std::string mText; }; -class ServiceLocator +class ServiceFinder { public: - ServiceLocator(const std::string& name); + ServiceFinder(const std::string& service, int type = SOCK_STREAM); void update(Scalar t, Scalar dt); + + + const std::vector& services() const + { + return mServices; + } + + +private: + + std::vector mServices; }; diff --git a/src/Moof/Socket.hh b/src/Moof/Socket.hh index fcf1a3e..dd43274 100644 --- a/src/Moof/Socket.hh +++ b/src/Moof/Socket.hh @@ -47,10 +47,17 @@ namespace Mf { +/** + * A class to represent the address of a remote host, including the type of + * service and socket communication. + */ class SocketAddress { public: + /** + * Construct an unspecified address. + */ SocketAddress() : mSize(0), mType(0) @@ -59,36 +66,70 @@ public: mAddr.in.sin_port = 0; } - SocketAddress(const std::string& service, const std::string& name, - int type = SOCK_STREAM, int family = AF_UNSPEC) + /** + * Construct an address with a specified host. The address can be used + * to connect to a host. + * \param service The service name or port number. + * \param host The numeric IP address of the host. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + */ + SocketAddress(const std::string& service, + const std::string& host, + int type = SOCK_STREAM, + int family = AF_UNSPEC) { - init(service, name, type, family); + init(service, host, type, family); } + /** + * Construct an address without a specified host. The address can be + * used to accept on a local port. + * \param service The service name or port number. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + */ SocketAddress(const std::string& service, - int type = SOCK_STREAM, int family = AF_UNSPEC) + int type = SOCK_STREAM, + int family = AF_UNSPEC) { init(service, type, family); } - SocketAddress(const struct addrinfo* addr, const std::string& name) + /** + * Construct an address from the information in an addrinfo structure. + * \param addr The addrinfo structure. + */ + SocketAddress(const struct addrinfo* addr) : + mSize(addr->ai_addrlen), + mType(addr->ai_socktype) { - mType = addr->ai_socktype; memcpy(&mAddr.sa, addr->ai_addr, addr->ai_addrlen); - mName = name; - mSize = addr->ai_addrlen; + getServiceAndHostName(mService, mHost); } - SocketAddress(const struct sockaddr* addr, size_t size, - int type = SOCK_STREAM) + /** + * Construct an address from a sockaddr structure. + * \param addr The sockaddr structure. + * \param size The size of the sockaddr structure. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + */ + SocketAddress(const struct sockaddr* addr, + size_t size, + int type = SOCK_STREAM) : + mSize(size), + mType(type) { - mType = type; memcpy(&mAddr.sa, addr, size); - mSize = size; - setNameFromAddress(); + getServiceAndHostName(mService, mHost); } + /** + * Get an IPv4 broadcast address. + * \param service The service name or port number. + * \return The socket address. + */ static SocketAddress broadcast(const std::string& service) { std::istringstream stream(service); @@ -104,35 +145,35 @@ public: } - void init(const std::string& service, const std::string& name = "", - int type = SOCK_STREAM, int family = AF_UNSPEC) - { - ASSERT(type == SOCK_STREAM || type == SOCK_DGRAM); - ASSERT(family == AF_INET || family == AF_INET6 || family == AF_UNSPEC); - - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = family; - hints.ai_socktype = type; - hints.ai_flags = AI_PASSIVE; - - struct addrinfo* addr; - int status = getaddrinfo(name.length() > 0 ? name.c_str() : 0, - service.c_str(), &hints, &addr); - if (status == 0) + /** + * Initialize the address with a specified host. The address can be + * used to connect to a host. + * \param service The service name or port number. + * \param host The numeric IP address of the host. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + */ + void init(const std::string& service, + const std::string& host, + int type = SOCK_STREAM, + int family = AF_UNSPEC) + { + struct addrinfo* addr = resolve(service.c_str(), host.c_str(), + type, family, + AI_ADDRCONFIG | AI_NUMERICHOST | AI_V4MAPPED); + if (addr) { + mSize = addr->ai_addrlen; mType = addr->ai_socktype; memcpy(&mAddr.sa, addr->ai_addr, addr->ai_addrlen); - mSize = addr->ai_addrlen; - if (name != "") mName = name; - else setNameFromAddress(); + mService = service; + mHost = host; freeaddrinfo(addr); } else { - Mf::logWarning(gai_strerror(status)); mType = 0; mSize = 0; mAddr.sa.sa_family = AF_UNSPEC; @@ -140,99 +181,242 @@ public: } } + /** + * Initialize the address without a specified host. The address can be + * used to accept on a local port. + * \param service The service name or port number. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + */ void init(const std::string& service, - int type = SOCK_STREAM, int family = AF_UNSPEC) + int type = SOCK_STREAM, + int family = AF_UNSPEC) { - init(service, "", type, family); + struct addrinfo* addr = resolve(service.c_str(), 0, + type, family, + AI_PASSIVE); + if (addr) + { + mSize = addr->ai_addrlen; + mType = addr->ai_socktype; + memcpy(&mAddr.sa, addr->ai_addr, addr->ai_addrlen); + + mService = service; + getHost(mHost); + + freeaddrinfo(addr); + } + else + { + mType = 0; + mSize = 0; + mAddr.sa.sa_family = AF_UNSPEC; + mAddr.in.sin_port = 0; + } } - const std::string& name() const + /** + * Get the name of the service. This could also be a port number if + * there is no service name associated with the number. + * \return The service. + */ + const std::string& service() const { - return mName; + return mService; } - void setName(const std::string& name) + /** + * Get the name of the host. This may be the host used to construct + * the address, or a resolved numeric host if none was used. + * \return The host. + */ + const std::string& host() const { - mName = name; + return mHost; } + /** + * Get the port number of the address service. + * \return Port number. + */ unsigned short port() const { return ntohs(mAddr.in.sin_port); } + /** + * Get the type of socket associated with the service of this address. + * \return Socket type; either SOCK_STREAM or SOCK_DGRAM. + */ int type() const { return mType; } + /** + * Get the family of the protocol associated with the address. + * \return Protocol family; either AF_INET, AF_INET6, or AF_UNSPEC. + */ int family() const { return mAddr.sa.sa_family; } + /** + * Get the sockaddr structure of the address. + * \return The sockaddr structure. + */ const struct sockaddr* address() const { return mSize != 0 ? &mAddr.sa : 0; } + /** + * Get the size of the sockaddr structure of the address. + * \return The size of the sockaddr structure. + */ size_t size() const { return mSize; } - static int resolve(const std::string& service, const std::string& name, - int type, int family, std::vector& resolved) + /** + * Get a list of addresses resolved to by the given search criteria. + * This can be used to perform lookups for name resolution, so this + * method may take some time to return. Use the ResolveTask class to + * resolve addresses asynchronously. + * \param service The service name or port number. + * \param host The name of the local or remote host. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + * \param resolved The list to be filled with addresses. + * \return 0 on success, -1 on error. + */ + static int resolve(const std::string& service, + const std::string& host, + int type, + int family, + std::vector& resolved) + { + struct addrinfo* list = resolve(service.c_str(), host.c_str(), + type, family, + AI_ADDRCONFIG | AI_V4MAPPED); + int result = collectAddresses(list, resolved); + freeaddrinfo(list); + return result; + } + + /** + * Get a list of addresses resolved to by the given search criteria. + * The addresses will be suitable for accepting on a local port. + * \param service The service name or port number. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + * \param resolved The list to be filled with addresses. + * \return 0 on success, -1 on error. + */ + static int resolve(const std::string& service, + int type, + int family, + std::vector& resolved) + { + struct addrinfo* list = resolve(service.c_str(), 0, + type, family, + AI_PASSIVE); + int result = collectAddresses(list, resolved); + freeaddrinfo(list); + return result; + } + + +private: + + static struct addrinfo* resolve(const char* service, + const char* node, + int type, + int family, + int flags) { ASSERT(type == SOCK_STREAM || type == SOCK_DGRAM); ASSERT(family == AF_INET || family == AF_INET6 || family == AF_UNSPEC); - resolved.clear(); - struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = family; hints.ai_socktype = type; - hints.ai_flags = AI_PASSIVE; + hints.ai_flags = flags; + + struct addrinfo* addr; + int status = getaddrinfo(node, service, &hints, &addr); - struct addrinfo* list; - int status = getaddrinfo(name.length() > 0 ? name.c_str() : 0, - service.length() > 0 ? service.c_str() : 0, &hints, &list); if (status == 0) { - for (struct addrinfo* addr = list; - addr != 0; addr = addr->ai_next) - { - resolved.push_back(SocketAddress(addr, name)); - } - - freeaddrinfo(list); + return addr; } else { Mf::logWarning(gai_strerror(status)); - return -1; + return 0; } + } + + static int collectAddresses(struct addrinfo* addresses, + std::vector& resolved) + { + if (addresses) + { + resolved.clear(); + + for (struct addrinfo* addr = addresses; + addr != 0; + addr = addr->ai_next) + { + resolved.push_back(SocketAddress(addr)); + } - return 0; + return 0; + } + else return -1; } -private: + void getService(std::string& service) + { + char value[64] = {'\0'}; + int result = getnameinfo(&mAddr.sa, mSize, + 0, 0, + value, sizeof(value), + (mType == SOCK_DGRAM) ? NI_DGRAM : 0); + if (result == 0) service.assign(value); + } - void setNameFromAddress() + void getHost(std::string& host) { -#if defined(_WIN32) - // inet_ntop was introduced in Vista - mName = inet_ntoa(mAddr.in.sin_addr); -#else - char name[INET6_ADDRSTRLEN] = {'\0'}; - inet_ntop(mAddr.sa.sa_family, &mAddr.sa, name, sizeof(name)); - mName = name; -#endif + char value[256] = {'\0'}; + int result = getnameinfo(&mAddr.sa, mSize, + value, sizeof(value), + 0, 0, + NI_NUMERICHOST); + if (result == 0) host.assign(value); + } + + void getServiceAndHostName(std::string& service, std::string& host) + { + char serv[64] = {'\0'}; + char node[256] = {'\0'}; + int result = getnameinfo(&mAddr.sa, mSize, + node, sizeof(node), + serv, sizeof(serv), + NI_NUMERICHOST | + (mType == SOCK_DGRAM) ? NI_DGRAM : 0); + if (result == 0) + { + service.assign(serv); + host.assign(node); + } } @@ -243,228 +427,320 @@ private: sockaddr_storage storage; } mAddr; size_t mSize; - std::string mName; int mType; + + std::string mHost; + std::string mService; }; +/** + * The socket class represents a connection or between this node and a + * remote node. + */ class Socket { struct Impl { + SocketAddress address; int fd; bool isConnected; - SocketAddress address; - }; + Impl() : + fd(-1), + isConnected(false) {} -public: + Impl(const SocketAddress& address, int flags = 0) : + address(address), + fd(::socket(address.family(), address.type(), flags)), + isConnected(false) {} + } mImpl; - Socket() : - mFd(-1), - mIsConnected(false) {} - Socket(const SocketAddress& address) : - mFd(-1), - mIsConnected(false), - mAddress(address) {} +public: + /** + * Construct a socket with no associated peer. + */ + Socket() {} + + /** + * Construct a socket with an address. + * \param address The address. + * \param flags The socket options. + */ + Socket(const SocketAddress& address, int flags = 0) : + mImpl(address, flags) {} + + /** + * Construct a socket with a specified host. The socket can be used to + * connect to a host. + * \param service The service name or port number. + * \param host The numeric IP address of the host. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + * \param flags The socket options. + */ Socket(const std::string& service, const std::string& name, - int type = SOCK_STREAM, int family = AF_UNSPEC) : - mFd(-1), - mIsConnected(false), - mAddress(SocketAddress(service, name, type, family)) {} - - Socket(const std::string& service, - int type = SOCK_STREAM, int family = AF_UNSPEC) : - mFd(-1), - mIsConnected(false), - mAddress(SocketAddress(service, type, family)) {} - - - Socket(Socket& move) : - mFd(move.mFd), - mIsConnected(move.mIsConnected), - mAddress(move.mAddress) - { - move.mFd = -1; - move.mIsConnected = false; - } - - Socket(Impl move) : - mFd(move.fd), - mIsConnected(move.isConnected), - mAddress(move.address) {} - - Socket& operator=(Socket& move) - { -#if defined(_WIN32) - if (mFd != -1) closesocket(mFd); -#else - if (mFd != -1) close(mFd); -#endif - mFd = move.mFd; - mIsConnected = move.mIsConnected; - mAddress = move.mAddress; - move.mFd = -1; - move.mIsConnected = false; - return *this; - } - - Socket& operator=(Impl move) - { -#if defined(_WIN32) - if (mFd != -1) closesocket(mFd); -#else - if (mFd != -1) close(mFd); -#endif - mFd = move.fd; - mIsConnected = move.isConnected; - mAddress = move.address; - return *this; - } - - operator Impl() - { - Impl impl; - impl.fd = mFd; - impl.isConnected = mIsConnected; - impl.address = mAddress; - mFd = -1; - mIsConnected = false; - return impl; - } - - + int type = SOCK_STREAM, int family = AF_UNSPEC, + int flags = 0) : + mImpl(SocketAddress(service, name, type, family), flags) {} + + /** + * Construct a socket without a specified host. The socket can be used + * to accept sockets on a local port. + * \param service The service name or port number. + * \param type The type of socket; either SOCK_STREAM or SOCK_DGRAM. + * \param family The family; can be AF_INET or AF_INET6. + * \param flags The socket options. + */ + Socket(const std::string& service, int type = SOCK_STREAM, + int family = AF_UNSPEC, int flags = 0) : + mImpl(SocketAddress(service, type, family), flags) {} + + + /** + * Deconstruct the socket, closing it. + */ ~Socket() { -#if defined(_WIN32) - if (mFd != -1) closesocket(mFd); -#else - if (mFd != -1) close(mFd); -#endif + close(); } + /** + * Get whether or not the socket is connected. + * \return True if the socket is connected, false otherwise. + */ bool isConnected() const { - return mIsConnected; + return mImpl.isConnected; } + /** + * Get the address associated with the socket. + */ const SocketAddress& address() const { - return mAddress; + return mImpl.address; } + /** + * Connect the socket to its peer. + * \return 0 on success, -1 on failure. + */ int connect() { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); - int result = ::connect(mFd, mAddress.address(), mAddress.size()); - mIsConnected = result != -1; + int result = ::connect(mImpl.fd, + mImpl.address.address(), + mImpl.address.size()); + mImpl.isConnected = result != -1; return result; } + /** + * Disconnect a connected socket from its peer. + * \param flags Specify the socket directions to close. + * \return 0 on success, -1 on failure. + */ + int disconnect(int flags = SHUT_RDWR) + { + return shutdown(mImpl.fd, flags); + } + + + /** + * Bind the socket to interface and port number specified in the + * address. + * \return 0 on success, -1 on failure. + */ int bind() { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); - return ::bind(mFd, mAddress.address(), mAddress.size()); + return ::bind(mImpl.fd, + mImpl.address.address(), + mImpl.address.size()); } + /** + * Listen on the socket for incoming connections. This is only useful + * for sockets of type SOCK_STREAM. + * \param backlog The number of unaccepted connections to queue. + * \return 0 on success, -1 on failure. + */ int listen(int backlog = SOMAXCONN) { - return ::listen(mFd, backlog > 0 ? backlog : SOMAXCONN); + return ::listen(mImpl.fd, backlog > 0 ? backlog : SOMAXCONN); } + /** + * Accept a new connection on the socket. This is only useful for + * sockets of type SOCK_STREAM. + * \param socket Set to the new socket on return. + * \return 0 on success, -1 on failure. + */ int accept(Socket& socket) { - Socket temp = Socket(mFd); - if (temp.mFd != -1) + Socket temp = Socket(mImpl.fd); + if (temp.mImpl.fd != -1) { socket = temp; - return socket.mFd; + return socket.mImpl.fd; } return -1; } + /** + * Set an integer socket option. + * \param option The option to set. + * \param value The new value. + * \return 0 on success, -1 on failure. + */ int set(int option, int value = 0) { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); if (option == SO_NONBLOCK) { #ifdef HAVE_FCNTL - int flags = fcntl(mFd, F_GETFL); - return fcntl(mFd, F_SETFL, (value ? O_NONBLOCK : 0) | flags); + int flags = fcntl(mImpl.fd, F_GETFL); + return fcntl(mImpl.fd, + F_SETFL, + flags | (value ? O_NONBLOCK : 0)); #else - return ioctl(mFd, FIONBIO, value); + return ioctl(mImpl.fd, FIONBIO, value); #endif } - return setsockopt(mFd, SOL_SOCKET, option, &value, sizeof(value)); + return setsockopt(mImpl.fd, SOL_SOCKET, option, + &value, sizeof(value)); } + /** + * Set a string socket option. + * \param option The option to set. + * \param value The new value. + * \return 0 on success, -1 on failure. + */ int set(int option, const std::string& value) { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); - return setsockopt(mFd, SOL_SOCKET, option, - value.data(), value.length()); + return setsockopt(mImpl.fd, SOL_SOCKET, option, + value.data(), value.length()); } + /** + * Get an integer socket option. + * \param option The option to set. + * \param value The new value. + * \return 0 on success, -1 on failure. + */ int get(int option, int& value) { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); if (option == SO_NONBLOCK) { #ifdef HAVE_FCNTL - int flags = fcntl(mFd, F_GETFL); + int flags = fcntl(mImpl.fd, F_GETFL); return flags & O_NONBLOCK; #else - return ioctl(mFd, FIONBIO, &value); + return ioctl(mImpl.fd, FIONBIO, &value); #endif } socklen_t optlen = sizeof(value); - return getsockopt(mFd, SOL_SOCKET, option, &value, &optlen); + return getsockopt(mImpl.fd, SOL_SOCKET, option, &value, &optlen); } + /** + * Get a string socket option. + * \param option The option to set. + * \param value The new value. + * \return 0 on success, -1 on failure. + */ int get(int option, std::string& value) { - if (mFd == -1) mFd = socket(mAddress.family(), mAddress.type(), 0); - char str[64] = {'\0'}; + char str[256] = {'\0'}; socklen_t optlen = sizeof(str); - int result = getsockopt(mFd, SOL_SOCKET, option, &str, &optlen); + int result = getsockopt(mImpl.fd, SOL_SOCKET, option, + &str, &optlen); value = str; return result; } - ssize_t write(const void* bytes, size_t size) + /** + * Write some bytes to the socket. Use this for connected sockets. + * \param bytes The bytes. + * \param size The number of bytes. + * \param flags The send options. + * \return The number of bytes written. + */ + ssize_t write(const void* bytes, size_t size, int flags = 0) { - return send(mFd, bytes, size, 0); + return send(mImpl.fd, bytes, size, flags); } + + /** + * Write some bytes to the socket using the given address. Use this + * for unconnected sockets. + * \param bytes The bytes. + * \param size The number of bytes. + * \param address The address to send to. + * \param flags The send options. + * \return The number of bytes written. + */ ssize_t write(const void* bytes, size_t size, - const SocketAddress& address) + const SocketAddress& address, int flags = 0) { - return sendto(mFd, bytes, size, 0, + return sendto(mImpl.fd, bytes, size, flags, address.address(), address.size()); } - ssize_t write(const Packet& packet) + /** + * Write a packet to the socket. Use this for connected sockets. + * \param packet The packet. + * \param flags The send options. + * \return The number of bytes written. + */ + ssize_t write(const Packet& packet, int flags = 0) { - return write(packet.bytes(), packet.size()); + return write(packet.bytes(), packet.size(), flags); } - ssize_t write(const Packet& packet, const SocketAddress& address) + /** + * Write a packet to the socket using the given address. Use this for + * unconnected sockets. + * \param packet The packet. + * \param address The address to send to. + * \param flags The send options. + * \return The number of bytes written. + */ + ssize_t write(const Packet& packet, const SocketAddress& address, + int flags = 0) { - return write(packet.bytes(), packet.size(), address); + return write(packet.bytes(), packet.size(), address, flags); } - ssize_t read(void* bytes, size_t size) + /** + * Read some bytes from the socket. Use this for connected sockets. + * \param bytes The buffer to store the bytes. + * \param size The size of the buffer. + * \param flags The recv options. + * \return The number of bytes read. + */ + ssize_t read(void* bytes, size_t size, int flags = 0) { - return recv(mFd, bytes, size, 0); + return recv(mImpl.fd, bytes, size, flags); } - ssize_t read(void* bytes, size_t size, SocketAddress& address) + /** + * Read some bytes from the socket using the given address. Use this + * for unconnected sockets. + * \param bytes The buffer to store the bytes. + * \param size The size of the buffer. + * \param address The address to read from. + * \param flags The recv options. + * \return The number of bytes read. + */ + ssize_t read(void* bytes, size_t size, SocketAddress& address, + int flags = 0) { union { @@ -473,31 +749,84 @@ public: } addr; socklen_t length = sizeof(addr); - ssize_t result = recvfrom(mFd, bytes, size, 0, &addr.sa, &length); + ssize_t result = recvfrom(mImpl.fd, bytes, size, flags, + &addr.sa, &length); if (result != -1) { - address = SocketAddress(&addr.sa, length, mAddress.type()); + address = SocketAddress(&addr.sa, length, mImpl.address.type()); } return result; } - ssize_t read(Packet& packet) + /** + * Read a packet from the socket. Use this for connected sockets. + * \param packet Set to the packet read on return. + * \param flags The recv options. + * \return The number of bytes read. + */ + ssize_t read(Packet& packet, int flags = 0) { char buffer[65536]; - ssize_t result = read(buffer, sizeof(buffer)); + ssize_t result = read(buffer, sizeof(buffer), flags); if (result != -1) packet = Packet(buffer, result); return result; } - ssize_t read(Packet& packet, SocketAddress& address) + /** + * Read a packet from the socket using the given address. Use this for + * unconnected sockets. + * \param packet Set to the packet read on return. + * \param address The address to read from. + * \param flags The recv options. + * \return The number of bytes read. + */ + ssize_t read(Packet& packet, SocketAddress& address, int flags = 0) { char buffer[65536]; - ssize_t result = read(buffer, sizeof(buffer), address); + ssize_t result = read(buffer, sizeof(buffer), address, flags); if (result != -1) packet = Packet(buffer, result); return result; } + // The rest of this junk is used to implement the "move" semantics + // correctly, since it makes no sense for socket objects to be copied. + + Socket(Socket& move) : + mImpl(move.mImpl) + { + move.mImpl.fd = -1; + move.mImpl.isConnected = false; + } + + Socket(Impl move) : + mImpl(move) {} + + Socket& operator=(Socket& move) + { + close(); + mImpl = move.mImpl; + move.mImpl.fd = -1; + move.mImpl.isConnected = false; + return *this; + } + + Socket& operator=(Impl move) + { + close(); + mImpl = move; + return *this; + } + + operator Impl() + { + Impl impl(mImpl); + mImpl.fd = -1; + mImpl.isConnected = false; + return impl; + } + + private: Socket(int fd) @@ -510,45 +839,78 @@ private: } addr; socklen_t length = sizeof(addr); - mFd = ::accept(fd, &addr.sa, &length); - if (mFd != -1) + mImpl.fd = ::accept(fd, &addr.sa, &length); + if (mImpl.fd != -1) { - mIsConnected = true; - mAddress = SocketAddress(&addr.sa, length); + mImpl.isConnected = true; + mImpl.address = SocketAddress(&addr.sa, length); } } - - int mFd; - bool mIsConnected; - SocketAddress mAddress; + void close() + { +#if defined(_WIN32) + if (mImpl.fd != -1) closesocket(mImpl.fd); +#else + if (mImpl.fd != -1) ::close(mImpl.fd); +#endif + } }; +/** + * An asynchronous task to resolve addresses. + */ class ResolverTask : public ThreadedTask { public: - ResolverTask(const std::string& service, const std::string& name, - int type = SOCK_STREAM, int family = AF_UNSPEC) : + /** + * Construct a resolver task from a service and hostname. + * \param service Server name or port number. + * \param host The hostname or numeric address. + * \param type The type of communication. + * \param family The requested protocol family. + */ + ResolverTask(const std::string& service, + const std::string& host, + int type = SOCK_STREAM, + int family = AF_UNSPEC) : mIsDone(false) { mFunction = boost::bind(&ResolverTask::resolve, - this, service, name, type, family); + this, service, host, type, family); } + /** + * Get whether or not the task is done. + * \return True if the task has finished, false otherwise. + */ bool isDone() const { return mIsDone; } + /** + * Start the task. This does nothing if the task was already run or is + * currently running. + */ void run() { - if (!mThread) mThread = Mf::detachFunction(mFunction); + if (!isDone() && !mThread.isValid()) + { + mThread = Thread::detach(mFunction); + } } + /** + * Get the addresses resolved. This is filled and safe to access after + * the task finishes. + * \return List of addresses. + * \see isDone() + */ const std::vector& addresses() const { return mAddressList; @@ -557,11 +919,14 @@ public: private: - int resolve(const std::string& service, const std::string& name, - int type, int family) + int resolve(const std::string& service, + const std::string& host, + int type, + int family) { - int status = SocketAddress::resolve(service, name, - type, family, mAddressList); + int status = SocketAddress::resolve(service, host, + type, family, + mAddressList); mIsDone = true; return status; } @@ -569,7 +934,7 @@ private: std::vector mAddressList; bool mIsDone; - Function mFunction; + Thread::Function mFunction; }; diff --git a/src/Moof/Thread.hh b/src/Moof/Thread.hh index 3e0bd82..4888ab6 100644 --- a/src/Moof/Thread.hh +++ b/src/Moof/Thread.hh @@ -9,104 +9,180 @@ * **************************************************************************/ -#ifndef _MOOF_THREAD_HH_ -#define _MOOF_THREAD_HH_ - /** - * @file Thread.hh + * \file Thread.hh * Light C++ wrapper around the SDL threads API. */ +#ifndef _MOOF_THREAD_HH_ +#define _MOOF_THREAD_HH_ + +#include #include #include +#include + namespace Mf { -// -// The detach function detaches a separate thread by calling 'func' with -// the 'arg' parameter. -// +/** + * Represents a thread which may be running. You cannot instantiate a + * thread object directly; new threads are created by detaching functions + * using the detach() method. Once a thread is detached, it will continue + * running until the function returns. You don't need to keep the thread + * object you want to wait() or kill() the thread later. + */ +class Thread +{ +public: -typedef SDL_Thread* Thread; + typedef boost::function Function; -typedef boost::function Function; + /** + * Construct an invalid thread object which has no association with any + * real thread. + */ + Thread() : + mThread(0) {} -inline int detach_(void* arg) -{ - //Function function = *(Function*)arg; - int code = (*(Function*)arg)(); + /** + * Execute a function in a new thread. + * \param function The function to execute. + * \return The new thread, or an invalid thread if an error occurred. + */ + static Thread detach(const Function& function) + { + Function* fcopy = new Function(function); + SDL_Thread* thread = SDL_CreateThread(&Thread::run, (void*)fcopy); + if (thread == 0) delete fcopy; + return Thread(thread); + } - delete (Function*)arg; - return code; -} -inline Thread detachFunction(const Function& function) -{ - Function* fcopy = new Function(function); - Thread thread = SDL_CreateThread(detach_, (void*)fcopy); + /** + * Wait for the thread to terminate, getting its return value. The + * thread will be invalidated. + * \return The integer value returned by the detached function. + */ + int wait() + { + int i; + SDL_WaitThread(mThread, &i); + mThread = 0; + return i; + } - if (thread == 0) delete fcopy; - return thread; -} + /** + * Forcefully kill the thread without giving it a chance to clean up + * after itself. The thread will be invalidated. Don't use this. + */ + void kill() + { + SDL_KillThread(mThread); + mThread = 0; + } + /** + * Get whether or not the thread object is associated with a real + * thread. + * \return True if the thread is valid, false otherwise. + */ + bool isValid() const + { + return mThread != 0; + } -inline int waitOnThread(Thread thread) -{ - int i; - SDL_WaitThread(thread, &i); - return i; -} -inline void killThread(Thread thread) -{ - SDL_KillThread(thread); -} + /** + * Get a unique identifier for this thread, if it is valid. + * \return The identifier. + */ + uint32_t identifier() const + { + return SDL_GetThreadID(mThread); + } + /** + * Get the unique identifier of the calling thread. + * \return The identifier. + */ + static uint32_t currentIdentifier() + { + return SDL_ThreadID(); + } -// -// The identifier function returns a unique integer for the calling thread. -// -inline unsigned getThreadIdentifier() -{ - return SDL_ThreadID(); -} +private: -inline unsigned getThreadIdentifier(Thread thread) -{ - return SDL_GetThreadID(thread); -} + Thread(SDL_Thread* thread) : + mThread(thread) {} + static int run(void* arg) + { + int code = (*(Function*)arg)(); + delete (Function*)arg; + return code; + } + + SDL_Thread* mThread; +}; + + +/** + * An abstract class representing some task that is to be run + * asynchronously. + */ class AsyncTask { public: + /** + * Deconstruct the task. + */ virtual ~AsyncTask() {} + /** + * Get whether or not the task is done. + * \return True if the task is done, false otherwise. + */ virtual bool isDone() const = 0; + /** + * Begin the task. + */ virtual void run() = 0; + + /** + * Block the current thread until the task is finished. + * \return A value representing the state of the finished task. + */ virtual int wait() = 0; }; +/** + * An asynchronous task that is run to be executed in a separated thread. + */ class ThreadedTask { public: - ThreadedTask() : - mThread(0) {} - - Thread thread() const { return mThread; } + /** + * Get the thread object the task is executing in. + * \return The thread. + */ + const Thread& thread() const { return mThread; } + /** + * Block the current thread until the task thread is finished. + * \return The integer value returned by the task function. + */ int wait() { - int code = waitOnThread(mThread); - mThread = 0; - return code; + return mThread.wait(); } @@ -116,205 +192,427 @@ protected: }; -//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - +/** + * A mutex to protect sensitive sections of code from threads which might + * otherwise cause unpredictable results. + */ class Mutex { - friend class Condition; - public: - Mutex() - { - mMutex = SDL_CreateMutex(); - } + /** + * Construct a mutex. + */ + Mutex() : + mMutex(SDL_CreateMutex()) {} + + /** + * Deconstruct a mutex. + */ ~Mutex() { SDL_DestroyMutex(mMutex); } + + /** + * Block until the calling thread can secure exclusive access to the + * code protected by the mutex. + * \return True if the lock was acquired, false otherwise. + * \see Lock + */ bool acquireLock() { return (SDL_LockMutex(mMutex) == 0); } + + /** + * Unlock the mutex. Call this after the sensitive block of code to + * allow another thread to acquire the lock. + * \return True if the mutex was unlocked, false otherwise. + * \see Lock + */ bool releaseLock() { return (SDL_UnlockMutex(mMutex) == 0); } + + /** + * As an alternative method for locking, objects of this class will + * automagically release the lock if it is still locked at + * deconstruction. Therefore, it's generally safer to use this method + * since it makes it much more difficult to forget to unlock a mutex. + */ class Lock { - friend class Condition; - public: - Lock(Mutex& mutex) - { - mMutex = &mutex; - mIsLocked = false; - } + /** + * Construct a lock. + * \param mutex The mutex. + */ + Lock(Mutex& mutex) : + mMutex(mutex), + mIsLocked(false) {} + + /** + * Deconstruct a lock. The lock is automagically released if it is + * still locked. + */ ~Lock() { if (mIsLocked) release(); } + + /** + * Try to acquire a lock on the mutex. + * \return True if the mutex was locked, false otherwise. + */ bool acquire() { - return (mIsLocked = mMutex->acquireLock()); + return (mIsLocked = mMutex.acquireLock()); } + + /** + * Release the lock. + * \return True if the mutex was unlocked, false otherwise. + */ bool release() { - return mMutex->releaseLock(); + bool result = mMutex.releaseLock(); mIsLocked = false; + return result; } + + /** + * Get whether or not the mutex is locked. + * \return True if the mutex is locked, false otherwise. + */ bool isLocked() const { return mIsLocked; } + protected: - Mutex* mMutex; + Mutex& mMutex; bool mIsLocked; + + friend class Condition; }; - class ScopedLock : public Lock + /** + * This type of lock tries to acquire a lock on the mutex during + * construction and releases the lock on deconstruction. + */ + class ScopedLock : private Lock { public: + /** + * Construct a lock. + * \param mutex The mutex. + */ ScopedLock(Mutex& mutex) : Lock(mutex) { acquire(); } + + /** + * Get whether or not the mutex is locked. + * \return True if the mutex is locked, false otherwise. + */ + bool isLocked() const + { + return Lock::isLocked(); + } }; + private: SDL_mutex* mMutex; + + friend class Condition; }; +/** + * A class representing a condition variable. + */ class Condition { public: + /** + * Construct a condition. + */ Condition() { - condition_ = SDL_CreateCond(); + mCondition = SDL_CreateCond(); } + + /** + * Deconstruct a condition. + */ ~Condition() { - SDL_DestroyCond(condition_); + SDL_DestroyCond(mCondition); } + + /** + * Unlock the mutex and wait for another thread to notify the thread, + * at which point the mutex will be re-locked and the method will + * return. + * \param mutex The mutex. + * \return True if the thread was notified, false otherwise. + */ + bool wait(Mutex& mutex) + { + return (SDL_CondWait(mCondition, mutex.mMutex) == 0); + } + + /** + * Unlock the mutex associated with a lock and wait for another thread + * to notify the thread, at which point the lock will be re-locked and + * the method will return. + * \param lock The lock. + * \return True if the thread was notified, false otherwise. + */ bool wait(Mutex::Lock& lock) { - return (SDL_CondWait(condition_, lock.mMutex->mMutex) == 0); + return (SDL_CondWait(mCondition, lock.mMutex.mMutex) == 0); } - bool wait(Mutex::Lock& lock, unsigned ms) + + /** + * Unlock the mutex and wait for another thread to notify the thread, + * at which point the mutex will be re-locked and the method will + * return. If the thread was not notified before a certain number of + * seconds, the method will return anyway. + * \param mutex The mutex. + * \param timeout Number of seconds to wait. + * \return True if the thread was notified, false otherwise. + */ + bool wait(Mutex& mutex, Scalar timeout) { - // TODO for consistency, this function should take seconds - return (SDL_CondWaitTimeout(condition_, - lock.mMutex->mMutex, ms) == 0); + Uint32 ms = timeout * SCALAR(1000.0); + return (SDL_CondWaitTimeout(mCondition, mutex.mMutex, ms) == 0); } + /** + * Unlock the mutex associated with a lock and wait for another thread + * to notify the thread, at which point the lock will be re-locked and + * the method will return. If the thread was not notified before a + * certain number of seconds, the method will return anyway. + * \param lock The lock. + * \param timeout Number of seconds to wait. + * \return True if the thread was notified, false otherwise. + */ + bool wait(Mutex::Lock& lock, Scalar timeout) + { + Uint32 ms = timeout * SCALAR(1000.0); + return (SDL_CondWaitTimeout(mCondition, + lock.mMutex.mMutex, ms) == 0); + } + + + /** + * Notify one other thread that is waiting on the condition. + * \return True on success, false otherwise. + */ bool notify() { - return (SDL_CondSignal(condition_) == 0); + return (SDL_CondSignal(mCondition) == 0); } + + /** + * Notify all other threads that are waiting on the condition. + * \return True on success, false otherwise. + */ bool notifyAll() { - return (SDL_CondBroadcast(condition_) == 0); + return (SDL_CondBroadcast(mCondition) == 0); } + private: - SDL_cond* condition_; + SDL_cond* mCondition; }; +/** + * A semaphore class. + */ class Semaphore { public: - Semaphore(unsigned int value) + /** + * Construct a semaphore. + * \param value The initial value of the semaphore. + */ + Semaphore(uint32_t value) { - semaphore_ = SDL_CreateSemaphore(value); + mSemaphore = SDL_CreateSemaphore(value); } + + /** + * Deconstruct a semaphore. + */ ~Semaphore() { - SDL_DestroySemaphore(semaphore_); + SDL_DestroySemaphore(mSemaphore); } + + /** + * Block until the calling thread can secure exclusive access to the + * code protected by the semaphore. + * \return True if the lock was acquired, false otherwise. + * \see Lock + */ bool acquireLock() { - return (SDL_SemWait(semaphore_) == 0); + return (SDL_SemWait(mSemaphore) == 0); } - bool releaseLock() + + /** + * Block until the calling thread can secure exclusive access to the + * code protected by the semaphore, or until the timeout expires. + * \param timeout Number of seconds to try. + * \return True if the lock was acquired, false otherwise. + */ + bool acquireLock(Scalar timeout) { - return (SDL_SemPost(semaphore_) == 0); + Uint32 ms = timeout * SCALAR(1000.0); + return (SDL_SemWaitTimeout(mSemaphore, ms) == 0); } - bool tryLock() + /** + * Unlock the semaphore. Call this after the sensitive block of code + * to allow another thread to acquire the lock. + * \return True if the semaphore was unlocked, false otherwise. + * \see Lock + */ + bool releaseLock() { - return (SDL_SemTryWait(semaphore_) == 0); + return (SDL_SemPost(mSemaphore) == 0); } - bool tryLock(unsigned ms) + + /** + * Try to lock the semaphore, but don't block if the lock is not + * immediately available. + * \return True if the semaphore was locked, false otherwise. + */ + bool tryLock() { - // TODO for consistency, this function should take seconds - return (SDL_SemWaitTimeout(semaphore_, ms) == 0); + return (SDL_SemTryWait(mSemaphore) == 0); } - + + /** + * As an alternative method for locking, objects of this class will + * automagically release the lock if it is still locked at + * deconstruction. Therefore, it's generally safer to use this method + * since it makes it much more difficult to forget to unlock a + * semaphore. + */ class Lock { public: - Lock(Semaphore& semaphore) - { - semaphore_ = &semaphore; - mIsLocked = false; - } + /** + * Construct a lock. + * \param semaphore The semaphore. + */ + Lock(Semaphore& semaphore) : + mSemaphore(semaphore), + mIsLocked(false) {} + + /** + * Deconstruct a lock. The lock is automagically released if it is + * still locked. + */ ~Lock() { if (mIsLocked) release(); } + + /** + * Try to acquire a lock on the semaphore. + * \return True if the semaphore was locked, false otherwise. + */ bool acquire() { - return (mIsLocked = semaphore_->acquireLock()); + return (mIsLocked = mSemaphore.acquireLock()); } + + /** + * Release the lock. + * \return True if the semaphore was unlocked, false otherwise. + */ bool release() { - return semaphore_->releaseLock(); mIsLocked = false; + bool result = mSemaphore.releaseLock(); + mIsLocked = false; + return result; } + /** + * Get whether or not the semaphore is locked. + * \return True if the semaphore is locked, false otherwise. + */ bool isLocked() const { return mIsLocked; } + protected: - Semaphore* semaphore_; + Semaphore& mSemaphore; bool mIsLocked; }; - class ScopedLock : public Lock + /** + * This type of lock tries to acquire a lock on the semaphore during + * construction and releases the lock on deconstruction. + */ + class ScopedLock : private Lock { public: + /** + * Construct a lock. + * \param semaphore The semaphore. + */ ScopedLock(Semaphore& semaphore) : Lock(semaphore) { acquire(); } + + /** + * Get whether or not the semaphore is locked. + * \return True if the semaphore is locked, false otherwise. + */ + bool isLocked() const + { + return Lock::isLocked(); + } }; + private: - SDL_sem* semaphore_; + SDL_sem* mSemaphore; }; -- 2.45.2