Last bit of new cluster code, ready to test.

This commit is contained in:
Adam Ierymenko 2017-06-06 16:11:19 -07:00
commit 951d911531
2 changed files with 416 additions and 334 deletions

View file

@ -66,11 +66,7 @@
#include "Phy.hpp"
#include "OSUtils.hpp"
/**
* Period between binder rescans/refreshes
*
* OneService also does this on detected restarts.
*/
// Period between refreshes of bindings
#define ZT_BINDER_REFRESH_PERIOD 30000
namespace ZeroTier {
@ -105,10 +101,7 @@ public:
Binder() {}
/**
* Close all bound ports
*
* This should be called on shutdown. It closes listen sockets and UDP ports
* but not TCP connections from any TCP listen sockets.
* Close all bound ports, should be called on shutdown
*
* @param phy Physical interface
*/
@ -116,9 +109,9 @@ public:
void closeAll(Phy<PHY_HANDLER_TYPE> &phy)
{
Mutex::Lock _l(_lock);
for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) {
phy.close(i->udpSock,false);
phy.close(i->tcpListenSock,false);
for(std::vector<_Binding>::iterator b(_bindings.begin());b!=_bindings.end();++b) {
phy.close(b->udpSock,false);
phy.close(b->tcpListenSock,false);
}
}
@ -129,7 +122,7 @@ public:
* changes, on startup, or periodically (e.g. every 30-60s).
*
* @param phy Physical interface
* @param port Port to bind to on all interfaces (TCP and UDP)
* @param ports Ports to bind on all interfaces
* @param ignoreInterfacesByName Ignore these interfaces by name
* @param ignoreInterfacesByNamePrefix Ignore these interfaces by name-prefix (starts-with, e.g. zt ignores zt*)
* @param ignoreInterfacesByAddress Ignore these interfaces by address
@ -137,11 +130,10 @@ public:
* @tparam INTERFACE_CHECKER Type for class containing shouldBindInterface() method
*/
template<typename PHY_HANDLER_TYPE,typename INTERFACE_CHECKER>
void refresh(Phy<PHY_HANDLER_TYPE> &phy,unsigned int port,INTERFACE_CHECKER &ifChecker)
void refresh(Phy<PHY_HANDLER_TYPE> &phy,unsigned int *ports,unsigned int portCount,INTERFACE_CHECKER &ifChecker)
{
std::map<InetAddress,std::string> localIfAddrs;
PhySocket *udps;
//PhySocket *tcps;
PhySocket *udps,*tcps;
Mutex::Lock _l(_lock);
#ifdef __WINDOWS__
@ -161,8 +153,10 @@ public:
case InetAddress::IP_SCOPE_GLOBAL:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_PRIVATE:
ip.setPort(port);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string()));
for(int x=0;x<portCount;++x) {
ip.setPort(ports[x]);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string()));
}
break;
}
}
@ -231,8 +225,10 @@ public:
case InetAddress::IP_SCOPE_GLOBAL:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_PRIVATE:
ip.setPort(port);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(devname)));
for(int x=0;x<portCount;++x) {
ip.setPort(ports[x]);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(devname)));
}
break;
}
}
@ -249,11 +245,8 @@ public:
configuration.ifc_buf = nullptr;
if (controlfd < 0) goto ip4_address_error;
if (ioctl(controlfd, SIOCGIFCONF, &configuration) < 0) goto ip4_address_error;
configuration.ifc_buf = (char*)malloc(configuration.ifc_len);
if (ioctl(controlfd, SIOCGIFCONF, &configuration) < 0) goto ip4_address_error;
for (int i=0; i < (int)(configuration.ifc_len / sizeof(ifreq)); i ++) {
@ -262,9 +255,8 @@ public:
if (addr->sa_family != AF_INET) continue;
std::string ifname = request.ifr_ifrn.ifrn_name;
// name can either be just interface name or interface name followed by ':' and arbitrary label
if (ifname.find(':') != std::string::npos) {
if (ifname.find(':') != std::string::npos)
ifname = ifname.substr(0, ifname.find(':'));
}
InetAddress ip(&(((struct sockaddr_in *)addr)->sin_addr),4,0);
if (ifChecker.shouldBindInterface(ifname.c_str(), ip)) {
@ -274,8 +266,10 @@ public:
case InetAddress::IP_SCOPE_GLOBAL:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_PRIVATE:
ip.setPort(port);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip, ifname));
for(int x=0;x<portCount;++x) {
ip.setPort(ports[x]);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,ifname));
}
break;
}
}
@ -306,8 +300,10 @@ public:
case InetAddress::IP_SCOPE_GLOBAL:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_PRIVATE:
ip.setPort(port);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(ifa->ifa_name)));
for(int x=0;x<portCount;++x) {
ip.setPort(ports[x]);
localIfAddrs.insert(std::pair<InetAddress,std::string>(ip,std::string(ifa->ifa_name)));
}
break;
}
}
@ -322,59 +318,57 @@ public:
// Default to binding to wildcard if we can't enumerate addresses
if (localIfAddrs.empty()) {
localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((uint32_t)0,port),std::string()));
localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((const void *)"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,port),std::string()));
}
// Close any old bindings to anything that doesn't exist anymore
for(typename std::vector<_Binding>::const_iterator bi(_bindings.begin());bi!=_bindings.end();++bi) {
if (localIfAddrs.find(bi->address) == localIfAddrs.end()) {
phy.close(bi->udpSock,false);
phy.close(bi->tcpListenSock,false);
for(int x=0;x<portCount;++x) {
localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((uint32_t)0,ports[x]),std::string()));
localIfAddrs.insert(std::pair<InetAddress,std::string>(InetAddress((const void *)"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",16,ports[x]),std::string()));
}
}
std::vector<_Binding> newBindings;
// Save bindings that are still valid, close those that are not
for(std::vector<_Binding>::iterator b(_bindings.begin());b!=_bindings.end();++b) {
if (localIfAddrs.find(b->address) != localIfAddrs.end()) {
newBindings.push_back(*b);
} else {
phy.close(b->udpSock,false);
phy.close(b->tcpListenSock,false);
}
}
// Create new bindings for those not already bound
for(std::map<InetAddress,std::string>::const_iterator ii(localIfAddrs.begin());ii!=localIfAddrs.end();++ii) {
typename std::vector<_Binding>::const_iterator bi(_bindings.begin());
while (bi != _bindings.end()) {
if (bi->address == ii->first) {
newBindings.push_back(*bi);
typename std::vector<_Binding>::const_iterator bi(newBindings.begin());
while (bi != newBindings.end()) {
if (bi->address == ii->first)
break;
}
++bi;
}
if (bi == _bindings.end()) {
if (bi == newBindings.end()) {
udps = phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(ii->first)),(void *)0,ZT_UDP_DESIRED_BUF_SIZE);
if (udps) {
//tcps = phy.tcpListen(reinterpret_cast<const struct sockaddr *>(&ii),(void *)0);
//if (tcps) {
tcps = phy.tcpListen(reinterpret_cast<const struct sockaddr *>(&(ii->first)),(void *)0);
if ((udps)&&(tcps)) {
#ifdef __LINUX__
// Bind Linux sockets to their device so routes tha we manage do not override physical routes (wish all platforms had this!)
if (ii->second.length() > 0) {
int fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(udps);
char tmp[256];
Utils::scopy(tmp,sizeof(tmp),ii->second.c_str());
if (fd >= 0) {
if (setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp)) != 0) {
fprintf(stderr,"WARNING: unable to set SO_BINDTODEVICE to bind %s to %s\n",ii->first.toIpString().c_str(),ii->second.c_str());
}
}
}
// Bind Linux sockets to their device so routes tha we manage do not override physical routes (wish all platforms had this!)
if (ii->second.length() > 0) {
char tmp[256];
Utils::scopy(tmp,sizeof(tmp),ii->second.c_str());
int fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(udps);
if (fd >= 0)
setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp));
fd = (int)Phy<PHY_HANDLER_TYPE>::getDescriptor(tcps);
if (fd >= 0)
setsockopt(fd,SOL_SOCKET,SO_BINDTODEVICE,tmp,strlen(tmp));
}
#endif // __LINUX__
newBindings.push_back(_Binding());
newBindings.back().udpSock = udps;
//newBindings.back().tcpListenSock = tcps;
newBindings.back().address = ii->first;
//} else {
// phy.close(udps,false);
//}
newBindings.push_back(_Binding());
newBindings.back().udpSock = udps;
newBindings.back().tcpListenSock = tcps;
newBindings.back().address = ii->first;
}
}
}
// Swapping pointers and then letting the old one fall out of scope is faster than copying again
_bindings.swap(newBindings);
}
@ -402,58 +396,79 @@ public:
* @param data Data to send
* @param len Length of data
* @param v4ttl If non-zero, send this packet with the specified IP TTL (IPv4 only)
* @return -1 == local doesn't match any bound address, 0 == send failure, 1 == send successful
*/
template<typename PHY_HANDLER_TYPE>
inline bool udpSend(Phy<PHY_HANDLER_TYPE> &phy,const InetAddress &local,const InetAddress &remote,const void *data,unsigned int len,unsigned int v4ttl = 0) const
inline int udpSend(Phy<PHY_HANDLER_TYPE> &phy,const InetAddress &local,const InetAddress &remote,const void *data,unsigned int len,unsigned int v4ttl = 0) const
{
PhySocket *s;
typename std::vector<_Binding>::const_iterator i;
int result;
Mutex::Lock _l(_lock);
if (local) {
for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) {
if (i->address == local) {
if ((v4ttl)&&(local.ss_family == AF_INET))
phy.setIp4UdpTtl(i->udpSock,v4ttl);
const bool result = phy.udpSend(i->udpSock,reinterpret_cast<const struct sockaddr *>(&remote),data,len);
if ((v4ttl)&&(local.ss_family == AF_INET))
phy.setIp4UdpTtl(i->udpSock,255);
return result;
if (remote.ss_family == AF_INET) {
if (local) {
for(i=_bindings.begin();i!=_bindings.end();++i) {
if (
(i->address.ss_family == AF_INET) &&
(reinterpret_cast<const struct sockaddr_in *>(&(i->address))->sin_port == reinterpret_cast<const struct sockaddr_in *>(&local)->sin_port) &&
(reinterpret_cast<const struct sockaddr_in *>(&(i->address))->sin_addr.s_addr == reinterpret_cast<const struct sockaddr_in *>(&local)->sin_addr.s_addr)
)
{
s = i->udpSock;
goto Binder_send_packet;
}
}
} else {
for(i=_bindings.begin();i!=_bindings.end();++i) {
if (i->address.ss_family == AF_INET) {
s = i->udpSock;
goto Binder_send_packet;
}
}
}
return false;
} else {
bool result = false;
for(typename std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i) {
if (i->address.ss_family == remote.ss_family) {
if ((v4ttl)&&(remote.ss_family == AF_INET))
phy.setIp4UdpTtl(i->udpSock,v4ttl);
result |= phy.udpSend(i->udpSock,reinterpret_cast<const struct sockaddr *>(&remote),data,len);
if ((v4ttl)&&(remote.ss_family == AF_INET))
phy.setIp4UdpTtl(i->udpSock,255);
if (local) {
for(i=_bindings.begin();i!=_bindings.end();++i) {
if (
(i->address.ss_family == AF_INET6) &&
(reinterpret_cast<const struct sockaddr_in6 *>(&(i->address))->sin6_port == reinterpret_cast<const struct sockaddr_in6 *>(&local)->sin6_port) &&
(!memcmp(reinterpret_cast<const struct sockaddr_in6 *>(&(i->address))->sin6_addr.s6_addr,reinterpret_cast<const struct sockaddr_in6 *>(&local)->sin6_addr.s6_addr,16))
)
{
s = i->udpSock;
goto Binder_send_packet;
}
}
} else {
for(i=_bindings.begin();i!=_bindings.end();++i) {
if (i->address.ss_family == AF_INET6) {
s = i->udpSock;
goto Binder_send_packet;
}
}
}
return result;
}
return -1;
Binder_send_packet:
if (v4ttl) phy.setIp4UdpTtl(s,v4ttl);
result = (int)phy.udpSend(s,reinterpret_cast<const struct sockaddr *>(&remote),data,len);
if (v4ttl) phy.setIp4UdpTtl(s,255);
return result;
}
/**
* @return All currently bound local interface addresses
*/
inline std::vector<InetAddress> allBoundLocalInterfaceAddresses()
inline std::vector<InetAddress> allBoundLocalInterfaceAddresses() const
{
Mutex::Lock _l(_lock);
std::vector<InetAddress> aa;
for(std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i)
aa.push_back(i->address);
return aa;
}
/**
* @param aa Vector to append local interface addresses to
*/
inline void allBoundLocalInterfaceAddresses(std::vector<InetAddress> &aa)
{
Mutex::Lock _l(_lock);
for(std::vector<_Binding>::const_iterator i(_bindings.begin());i!=_bindings.end();++i)
aa.push_back(i->address);
for(std::vector<_Binding>::const_iterator b(_bindings.begin());b!=_bindings.end();++b)
aa.push_back(b->address);
return aa;
}
private: