Imported Upstream version 1.2.4

This commit is contained in:
didyouexpectthat 2018-01-12 18:20:00 -08:00
commit 4722a0b75a
398 changed files with 38633 additions and 24919 deletions

View file

@ -44,6 +44,7 @@
#include "Packet.hpp"
#include "Switch.hpp"
#include "Node.hpp"
#include "Network.hpp"
#include "Array.hpp"
namespace ZeroTier {
@ -254,7 +255,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey[ZT_POLY1305_KEY_LEN];
memset(polykey,0,sizeof(polykey));
s20.encrypt12(polykey,polykey,sizeof(polykey));
s20.crypt12(polykey,polykey,sizeof(polykey));
// Compute 16-byte MAC
char mac[ZT_POLY1305_MAC_LEN];
@ -266,7 +267,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
// Decrypt!
dmsg.setSize(len - 24);
s20.decrypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
s20.crypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
}
if (dmsg.size() < 4)
@ -341,17 +342,20 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
Identity id;
ptr += id.deserialize(dmsg,ptr);
if (id) {
RR->topology->saveIdentity(id);
{
Mutex::Lock _l(_remotePeers_m);
_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)];
if (!rp.lastHavePeerReceived) {
RR->topology->saveIdentity((void *)0,id);
RR->identity.agree(id,rp.key,ZT_PEER_SECRET_KEY_LENGTH);
}
rp.lastHavePeerReceived = RR->node->now();
}
_ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
for(unsigned int i=0;i<qc;++i)
this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
_sendQueue->returnToPool(q,qc);
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
@ -361,7 +365,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
case CLUSTER_MESSAGE_WANT_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress));
if ( (peer) && (peer->hasClusterOptimalPath(RR->node->now())) ) {
if ( (peer) && (peer->hasLocalClusterOptimalPath(RR->node->now())) ) {
Buffer<1024> buf;
peer->identity().serialize(buf);
Mutex::Lock _l2(_members[fromMemberId].lock);
@ -396,7 +400,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress));
if ((localPeer)&&(numRemotePeerPaths > 0)) {
InetAddress bestLocalV4,bestLocalV6;
localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6);
localPeer->getRendezvousAddresses(now,bestLocalV4,bestLocalV6);
InetAddress bestRemoteV4,bestRemoteV6;
for(unsigned int i=0;i<numRemotePeerPaths;++i) {
@ -455,7 +459,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
}
RR->sw->send(rendezvousForLocal,true,0);
RR->sw->send((void *)0,rendezvousForLocal,true);
}
}
} break;
@ -466,9 +470,18 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
Packet outp(rcpt,RR->identity.address(),verb);
outp.append(dmsg.field(ptr,len),len); ptr += len;
RR->sw->send(outp,true,0);
RR->sw->send((void *)0,outp,true);
//TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break;
case CLUSTER_MESSAGE_NETWORK_CONFIG: {
const SharedPtr<Network> network(RR->node->network(dmsg.at<uint64_t>(ptr)));
if (network) {
// Copy into a Packet just to conform to Network API. Eventually
// will want to refactor.
network->handleConfigChunk((void *)0,0,Address(),Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(dmsg),ptr);
}
} break;
}
} catch ( ... ) {
TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
@ -494,7 +507,84 @@ void Cluster::broadcastHavePeer(const Identity &id)
}
}
void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len)
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_NETWORK_CONFIG,chunk,len);
}
}
int Cluster::checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret)
{
const uint64_t now = RR->node->now();
mostRecentTs = 0;
int mostRecentMemberId = -1;
{
Mutex::Lock _l2(_remotePeers_m);
std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
for(;;) {
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
break;
else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
mostRecentTs = rpe->second.lastHavePeerReceived;
memcpy(peerSecret,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH);
mostRecentMemberId = (int)rpe->first.second;
}
++rpe;
}
}
const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)
mostRecentMemberId = -1;
bool sendWantPeer = true;
{
Mutex::Lock _l(_remotePeers_m);
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
rp.lastSentWantPeer = now;
} else {
sendWantPeer = false; // don't flood WANT_PEER
}
}
if (sendWantPeer) {
char tmp[ZT_ADDRESS_LENGTH];
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
}
}
}
}
return mostRecentMemberId;
}
bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len)
{
if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check
return false;
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
if (i1->ss_family == i2->ss_family) {
TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
RR->node->putPacket((void *)0,*i1,*i2,data,len);
return true;
}
}
}
return false;
}
void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
{
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
return;
@ -502,87 +592,101 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
const uint64_t now = RR->node->now();
uint64_t mostRecentTs = 0;
unsigned int mostRecentMemberId = 0xffffffff;
int mostRecentMemberId = -1;
{
Mutex::Lock _l2(_remotePeers_m);
std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
for(;;) {
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
break;
else if (rpe->second > mostRecentTs) {
mostRecentTs = rpe->second;
mostRecentMemberId = rpe->first.second;
else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
mostRecentTs = rpe->second.lastHavePeerReceived;
mostRecentMemberId = (int)rpe->first.second;
}
++rpe;
}
}
const uint64_t age = now - mostRecentTs;
if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff));
const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
// Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing
const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0));
// Poll everyone with WANT_PEER if the age of our most recent entry is
// approaching expiration (or has expired, or does not exist).
char tmp[ZT_ADDRESS_LENGTH];
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
bool sendWantPeer = true;
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
Mutex::Lock _l(_remotePeers_m);
_RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
rp.lastSentWantPeer = now;
} else {
sendWantPeer = false; // don't flood WANT_PEER
}
}
if (sendWantPeer) {
char tmp[ZT_ADDRESS_LENGTH];
toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
}
}
}
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
_sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
return;
}
}
Buffer<1024> buf;
if (unite) {
InetAddress v4,v6;
if (fromPeerAddress) {
SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
if (fromPeer)
fromPeer->getBestActiveAddresses(now,v4,v6);
}
uint8_t addrCount = 0;
if (v4)
++addrCount;
if (v6)
++addrCount;
if (addrCount) {
toPeerAddress.appendTo(buf);
fromPeerAddress.appendTo(buf);
buf.append(addrCount);
if (mostRecentMemberId >= 0) {
Buffer<1024> buf;
if (unite) {
InetAddress v4,v6;
if (fromPeerAddress) {
SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
if (fromPeer)
fromPeer->getRendezvousAddresses(now,v4,v6);
}
uint8_t addrCount = 0;
if (v4)
v4.serialize(buf);
++addrCount;
if (v6)
v6.serialize(buf);
}
}
{
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
if (buf.size() > 0)
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
if (i1->ss_family == i2->ss_family) {
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
RR->node->putPacket(*i1,*i2,data,len);
return;
}
++addrCount;
if (addrCount) {
toPeerAddress.appendTo(buf);
fromPeerAddress.appendTo(buf);
buf.append(addrCount);
if (v4)
v4.serialize(buf);
if (v6)
v6.serialize(buf);
}
}
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
return;
{
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
if (buf.size() > 0)
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
if (i1->ss_family == i2->ss_family) {
TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
RR->node->putPacket((void *)0,*i1,*i2,data,len);
return;
}
}
}
TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
}
}
}
@ -644,8 +748,8 @@ void Cluster::doPeriodicTasks()
_lastCleanedRemotePeers = now;
Mutex::Lock _l(_remotePeers_m);
for(std::map< std::pair<Address,unsigned int>,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT)
for(std::map< std::pair<Address,unsigned int>,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT)
_remotePeers.erase(rp++);
else ++rp;
}
@ -719,7 +823,9 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
std::vector<InetAddress> best;
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance);
#ifdef ZT_TRACE
unsigned int bestMember = _id;
#endif
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -731,7 +837,9 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
const double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) {
bestDistance = mdist;
#ifdef ZT_TRACE
bestMember = *mid;
#endif
best = m.zeroTierPhysicalEndpoints;
}
}
@ -754,6 +862,19 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
}
}
bool Cluster::isClusterPeerFrontplane(const InetAddress &ip) const
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
for(std::vector<InetAddress>::const_iterator i2(_members[*mid].zeroTierPhysicalEndpoints.begin());i2!=_members[*mid].zeroTierPhysicalEndpoints.end();++i2) {
if (ip == *i2)
return true;
}
}
return false;
}
void Cluster::status(ZT_ClusterStatus &status) const
{
const uint64_t now = RR->node->now();
@ -833,10 +954,10 @@ void Cluster::_flush(uint16_t memberId)
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey[ZT_POLY1305_KEY_LEN];
memset(polykey,0,sizeof(polykey));
s20.encrypt12(polykey,polykey,sizeof(polykey));
s20.crypt12(polykey,polykey,sizeof(polykey));
// Encrypt m.q in place
s20.encrypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
s20.crypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
// Add MAC for authentication (encrypt-then-MAC)
char mac[ZT_POLY1305_MAC_LEN];
@ -860,7 +981,7 @@ void Cluster::_flush(uint16_t memberId)
void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
{
if (remotep.payloadLength() >= ZT_ADDRESS_LENGTH) {
Identity queried(RR->topology->getIdentity(Address(remotep.payload(),ZT_ADDRESS_LENGTH)));
Identity queried(RR->topology->getIdentity((void *)0,Address(remotep.payload(),ZT_ADDRESS_LENGTH)));
if (queried) {
Buffer<1024> routp;
remotep.source().appendTo(routp);