From 7291ac209311593e77e5be0754a1d8349dd028db Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 10 Jan 2020 20:40:14 -0800 Subject: [PATCH] A bunch more cleanup and build fixes, and remove old version of multipath code to prep for new version. --- include/ZeroTierCore.h | 84 ------ node/CMakeLists.txt | 1 + node/Constants.hpp | 233 +---------------- node/IncomingPacket.cpp | 210 +-------------- node/Mutex.hpp | 42 ++- node/NetworkConfig.hpp | 23 +- node/Node.cpp | 13 - node/Path.cpp | 31 +++ node/Path.hpp | 564 ++-------------------------------------- node/Peer.cpp | 516 ++---------------------------------- node/Peer.hpp | 359 +++++-------------------- node/ScopedPtr.hpp | 12 + node/SharedPtr.hpp | 5 + node/Switch.cpp | 352 ++----------------------- node/Switch.hpp | 127 +-------- node/Topology.cpp | 153 +++++++++++ node/Topology.hpp | 238 +++++------------ 17 files changed, 454 insertions(+), 2509 deletions(-) create mode 100644 node/Topology.cpp diff --git a/include/ZeroTierCore.h b/include/ZeroTierCore.h index 41022a40a..e58e6eaeb 100644 --- a/include/ZeroTierCore.h +++ b/include/ZeroTierCore.h @@ -321,35 +321,6 @@ enum ZT_ResultCode */ #define ZT_ResultCode_isFatal(x) ((((int)(x)) >= 100)&&(((int)(x)) < 1000)) -/** - * The multipath algorithm in use by this node. - */ -enum ZT_MultipathMode -{ - /** - * No active multipath. - * - * Traffic is merely sent over the strongest path. That being - * said, this mode will automatically failover in the event that a link goes down. - */ - ZT_MULTIPATH_NONE = 0, - - /** - * Traffic is randomly distributed among all active paths. - * - * Will cease sending traffic over links that appear to be stale. - */ - ZT_MULTIPATH_RANDOM = 1, - - /** - * Traffic is allocated across all active paths in proportion to their strength and - * reliability. - * - * Will cease sending traffic over links that appear to be stale. - */ - ZT_MULTIPATH_PROPORTIONALLY_BALANCED = 2, -}; - /** * Status codes sent to status update callback when things happen */ @@ -1012,56 +983,6 @@ typedef struct */ uint64_t trustedPathId; - /** - * One-way latency - */ - float latency; - - /** - * How much latency varies over time - */ - float packetDelayVariance; - - /** - * How much observed throughput varies over time - */ - float throughputDisturbCoeff; - - /** - * Packet Error Ratio (PER) - */ - float packetErrorRatio; - - /** - * Packet Loss Ratio (PLR) - */ - float packetLossRatio; - - /** - * Stability of the path - */ - float stability; - - /** - * Current throughput (moving average) - */ - uint64_t throughput; - - /** - * Maximum observed throughput for this path - */ - uint64_t maxThroughput; - - /** - * Percentage of traffic allocated to this path - */ - float allocation; - - /** - * Name of physical interface (for monitoring) - */ - char *ifname; - /** * Is path alive? */ @@ -1127,11 +1048,6 @@ typedef struct */ unsigned int pathCount; - /** - * Whether this peer was ever reachable via an aggregate link - */ - int hadAggregateLink; - /** * Known network paths to peer */ diff --git a/node/CMakeLists.txt b/node/CMakeLists.txt index e5ee34e95..399e602c0 100644 --- a/node/CMakeLists.txt +++ b/node/CMakeLists.txt @@ -71,6 +71,7 @@ set(core_src SelfAwareness.cpp SHA512.cpp Switch.cpp + Topology.cpp Utils.cpp ) diff --git a/node/Constants.hpp b/node/Constants.hpp index f66da43ee..3393a5da9 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -117,202 +117,29 @@ #define ZT_RELAY_MAX_HOPS 4 /** - * Expire time for multicast 'likes' and indirect multicast memberships in ms - */ -#define ZT_MULTICAST_LIKE_EXPIRE 600000 - -/** - * Period for multicast LIKE re-announcements to connected nodes - */ -#define ZT_MULTICAST_ANNOUNCE_PERIOD 60000 - -/** - * Packets are only used for QoS/ACK statistical sampling if their packet ID is divisible by - * this integer. This is to provide a mechanism for both peers to agree on which packets need - * special treatment without having to exchange information. Changing this value would be - * a breaking change and would necessitate a protocol version upgrade. Since each incoming and - * outgoing packet ID is checked against this value its evaluation is of the form: - * (id & (divisor - 1)) == 0, thus the divisor must be a power of 2. - * - * This value is set at (16) so that given a normally-distributed RNG output we will sample - * 1/16th (or ~6.25%) of packets. - */ -#define ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR 0x10 - -/** - * Time horizon for VERB_QOS_MEASUREMENT and VERB_ACK packet processing cutoff - */ -#define ZT_PATH_QOS_ACK_CUTOFF_TIME 30000 - -/** - * Maximum number of VERB_QOS_MEASUREMENT and VERB_ACK packets allowed to be - * processed within cutoff time. Separate totals are kept for each type but - * the limit is the same for both. - * - * This limits how often this peer will compute statistical estimates - * of various QoS measures from a VERB_QOS_MEASUREMENT or VERB_ACK packets to - * CUTOFF_LIMIT times per CUTOFF_TIME milliseconds per peer to prevent - * this from being useful for DOS amplification attacks. - */ -#define ZT_PATH_QOS_ACK_CUTOFF_LIMIT 128 - -/** - * Path choice history window size. This is used to keep track of which paths were - * previously selected so that we can maintain a target allocation over time. - */ -#define ZT_MULTIPATH_PROPORTION_WIN_SZ 128 - -/** - * Interval used for rate-limiting the computation of path quality estimates. - */ -#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 1000 - -/** - * Number of samples to consider when computing real-time path statistics - */ -#define ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ 128 - -/** - * Number of samples to consider when computing performing long-term path quality analysis. - * By default this value is set to ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ but can - * be set to any value greater than that to observe longer-term path quality behavior. - */ -#define ZT_PATH_QUALITY_METRIC_WIN_SZ ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ - -/** - * Maximum acceptable Packet Delay Variance (PDV) over a path - */ -#define ZT_PATH_MAX_PDV 1000 - -/** - * Maximum acceptable time interval between expectation and receipt of at least one ACK over a path - */ -#define ZT_PATH_MAX_AGE 30000 - -/** - * Maximum acceptable mean latency over a path - */ -#define ZT_PATH_MAX_MEAN_LATENCY 1000 - -/** - * How much each factor contributes to the "stability" score of a path - */ -#define ZT_PATH_CONTRIB_PDV (1.0 / 3.0) -#define ZT_PATH_CONTRIB_LATENCY (1.0 / 3.0) -#define ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE (1.0 / 3.0) - -/** - * How much each factor contributes to the "quality" score of a path - */ -#define ZT_PATH_CONTRIB_STABILITY (0.75 / 3.0) -#define ZT_PATH_CONTRIB_THROUGHPUT (1.50 / 3.0) -#define ZT_PATH_CONTRIB_SCOPE (0.75 / 3.0) - -/** - * How often a QoS packet is sent - */ -#define ZT_PATH_QOS_INTERVAL 3000 - -/** - * Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet - */ -#define ZT_PATH_MIN_QOS_PACKET_SZ 8 + 1 -#define ZT_PATH_MAX_QOS_PACKET_SZ 1400 - -/** - * How many ID:sojourn time pairs in a single QoS packet - */ -#define ZT_PATH_QOS_TABLE_SIZE ((ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 16)) - -/** - * Maximum number of outgoing packets we monitor for QoS information - */ -#define ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS 128 - -/** - * Timeout for QoS records - */ -#define ZT_PATH_QOS_TIMEOUT (ZT_PATH_QOS_INTERVAL * 2) - -/** - * How often the service tests the path throughput - */ -#define ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL (ZT_PATH_ACK_INTERVAL * 8) - -/** - * Minimum amount of time between each ACK packet - */ -#define ZT_PATH_ACK_INTERVAL 1000 - -/** - * How often an aggregate link statistics report is emitted into this tracing system - */ -#define ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL 60000 - -/** - * How much an aggregate link's component paths can vary from their target allocation - * before the link is considered to be in a state of imbalance. - */ -#define ZT_PATH_IMBALANCE_THRESHOLD 0.20 - -/** - * Max allowable time spent in any queue - */ -#define ZT_QOS_TARGET 5 // ms - -/** - * Time period where the time spent in the queue by a packet should fall below - * target at least once - */ -#define ZT_QOS_INTERVAL 100 // ms - -/** - * The number of bytes that each queue is allowed to send during each DRR cycle. - * This approximates a single-byte-based fairness queuing scheme - */ -#define ZT_QOS_QUANTUM ZT_DEFAULT_MTU - -/** - * The maximum total number of packets that can be queued among all - * active/inactive, old/new queues - */ -#define ZT_QOS_MAX_ENQUEUED_PACKETS 1024 - -/** - * Number of QoS queues (buckets) - */ -#define ZT_QOS_NUM_BUCKETS 9 - -/** - * All unspecified traffic is put in this bucket. Anything in a bucket with a smaller - * value is de-prioritized. Anything in a bucket with a higher value is prioritized over - * other traffic. - */ -#define ZT_QOS_DEFAULT_BUCKET 0 - -/** - * Delay between full-fledge pings of directly connected peers + * Period between keepalives sent to paths if no other traffic has been sent * * See https://conferences.sigcomm.org/imc/2010/papers/p260.pdf for * some real world data on NAT UDP timeouts. From the paper: "the * lowest measured timeout when a binding has seen bidirectional * traffic is 54 sec." 30 seconds is faster than really necessary. */ -#define ZT_PEER_PING_PERIOD 30000 +#define ZT_PATH_KEEPALIVE_PERIOD 30000 /** - * Delay between refreshes of locators via DNS or other methods + * Timeout for path aliveness (measured from last receive) */ -#define ZT_DYNAMIC_ROOT_UPDATE_PERIOD 120000 +#define ZT_PATH_ACTIVITY_TIMEOUT ((ZT_PATH_KEEPALIVE_PERIOD * 2) + 5000) + +/** + * Delay between full HELLO messages between peers + */ +#define ZT_PEER_PING_PERIOD 60000 /** * Timeout for overall peer activity (measured from last receive) */ -#ifndef ZT_SDK -#define ZT_PEER_ACTIVITY_TIMEOUT 500000 -#else -#define ZT_PEER_ACTIVITY_TIMEOUT 30000 -#endif +#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_PING_PERIOD * 2) + 5000) /** * Delay between requests for updated network autoconf information @@ -322,15 +149,6 @@ */ #define ZT_NETWORK_AUTOCONF_DELAY 60000 -/** - * Minimum interval between attempts by relays to unite peers - * - * When a relay gets a packet destined for another peer, it sends both peers - * a RENDEZVOUS message no more than this often. This instructs the peers - * to attempt NAT-t and gives each the other's corresponding IP:port pair. - */ -#define ZT_MIN_UNITE_INTERVAL 30000 - /** * Sanity limit on maximum bridge routes * @@ -357,34 +175,10 @@ */ #define ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH 120000 -/** - * Time horizon for push direct paths cutoff - */ -#define ZT_PUSH_DIRECT_PATHS_CUTOFF_TIME 30000 - -/** - * Maximum number of direct path pushes within cutoff time - * - * This limits response to PUSH_DIRECT_PATHS to CUTOFF_LIMIT responses - * per CUTOFF_TIME milliseconds per peer to prevent this from being - * useful for DOS amplification attacks. - */ -#define ZT_PUSH_DIRECT_PATHS_CUTOFF_LIMIT 8 - /** * Maximum number of paths per IP scope (e.g. global, link-local) and family (e.g. v4/v6) */ -#define ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY 8 - -/** - * Time horizon for VERB_NETWORK_CREDENTIALS cutoff - */ -#define ZT_PEER_CREDENTIALS_CUTOFF_TIME 60000 - -/** - * Maximum number of VERB_NETWORK_CREDENTIALS within cutoff time - */ -#define ZT_PEER_CREDEITIALS_CUTOFF_LIMIT 15 +#define ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY 4 /** * WHOIS rate limit (we allow these to be pretty fast) @@ -423,11 +217,6 @@ */ #define ZT_SIGNATURE_BUFFER_SIZE 96 -/** - * Desired / recommended min stack size for threads (used on some platforms to reset thread stack size) - */ -#define ZT_THREAD_MIN_STACK_SIZE 1048576 - // Internal cryptographic algorithm IDs (these match relevant identity types) #define ZT_CRYPTO_ALG_C25519 0 #define ZT_CRYPTO_ALG_P384 1 diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index bf77874c4..9d8acdf73 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -181,14 +181,6 @@ ZT_ALWAYS_INLINE bool _doHELLO(IncomingPacket &pkt,const RuntimeEnvironment *con return true; } -ZT_ALWAYS_INLINE bool _doACK(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr &peer,const SharedPtr &path) -{ -} - -ZT_ALWAYS_INLINE bool _doQOS_MEASUREMENT(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr &peer,const SharedPtr &path) -{ -} - ZT_ALWAYS_INLINE bool _doERROR(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr &peer,const SharedPtr &path) { const Packet::Verb inReVerb = (Packet::Verb)pkt[ZT_PROTO_VERB_ERROR_IDX_IN_RE_VERB]; @@ -272,7 +264,6 @@ ZT_ALWAYS_INLINE bool _doOK(IncomingPacket &pkt,const RuntimeEnvironment *const return true; if (pkt.hops() == 0) { - path->updateLatency((unsigned int)latency,RR->node->now()); if ((ZT_PROTO_VERB_HELLO__OK__IDX_REVISION + 2) < pkt.size()) { InetAddress externalSurfaceAddress; externalSurfaceAddress.deserialize(pkt,ZT_PROTO_VERB_HELLO__OK__IDX_REVISION + 2); @@ -281,6 +272,7 @@ ZT_ALWAYS_INLINE bool _doOK(IncomingPacket &pkt,const RuntimeEnvironment *const } } + peer->updateLatency((unsigned int)latency); peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision); } break; @@ -461,7 +453,7 @@ ZT_ALWAYS_INLINE bool _doEXT_FRAME(IncomingPacket &pkt,const RuntimeEnvironment } } - if ((flags & 0x10) != 0) { // ACK requested + if ((flags & 0x10U) != 0) { // ACK requested Packet outp(peer->address(),RR->identity.address(),Packet::VERB_OK); outp.append((uint8_t)Packet::VERB_EXT_FRAME); outp.append((uint64_t)pkt.packetId()); @@ -497,9 +489,6 @@ ZT_ALWAYS_INLINE bool _doECHO(IncomingPacket &pkt,const RuntimeEnvironment *cons ZT_ALWAYS_INLINE bool _doNETWORK_CREDENTIALS(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr &peer,const SharedPtr &path) { - if (!peer->rateGateCredentialsReceived(RR->node->now())) - return true; - CertificateOfMembership com; Capability cap; Tag tag; @@ -674,7 +663,7 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi { const int64_t now = RR->node->now(); - if (peer->rateGatePushDirectPaths(now)) { + if (peer->rateGateInboundPushDirectPaths(now)) { uint8_t countPerScope[ZT_INETADDRESS_MAX_SCOPE+1][2]; // [][0] is v4, [][1] is v6 memset(countPerScope,0,sizeof(countPerScope)); @@ -689,7 +678,6 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi unsigned int addrLen = pkt[ptr++]; switch(addrType) { - case 4: { const InetAddress a(pkt.field(ptr,4),4,pkt.at(ptr + 4)); if ((!peer->hasActivePathTo(now,a)) && // not already known @@ -699,7 +687,6 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi peer->sendHELLO(tPtr,-1,a,now); } } break; - case 6: { const InetAddress a(pkt.field(ptr,16),16,pkt.at(ptr + 16)); if ((!peer->hasActivePathTo(now,a)) && // not already known @@ -709,8 +696,8 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi peer->sendHELLO(tPtr,-1,a,now); } } break; - } + ptr += addrLen; } } @@ -766,7 +753,6 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr) if (!trusted) { if (!dearmor(peer->key())) { RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops(),"invalid MAC"); - _path->recordInvalidPacket(); return true; } } @@ -784,8 +770,6 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr) peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,0); break; case Packet::VERB_HELLO: r = _doHELLO(*this,RR,tPtr,true,_path); break; - case Packet::VERB_ACK: r = _doACK(*this,RR,tPtr,peer,_path); break; - case Packet::VERB_QOS_MEASUREMENT: r = _doQOS_MEASUREMENT(*this,RR,tPtr,peer,_path); break; case Packet::VERB_ERROR: r = _doERROR(*this,RR,tPtr,peer,_path); break; case Packet::VERB_OK: r = _doOK(*this,RR,tPtr,peer,_path); break; case Packet::VERB_WHOIS: r = _doWHOIS(*this,RR,tPtr,peer,_path); break; @@ -814,190 +798,4 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr) } } -#if 0 - -bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer) -{ - if (!peer->rateGateACK(RR->node->now())) - return true; - /* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known - * maximums and detect packet loss. */ - if (peer->localMultipathSupport()) { - int32_t ackedBytes; - if (payloadLength() != sizeof(ackedBytes)) { - return true; // ignore - } - memcpy(&ackedBytes, payload(), sizeof(ackedBytes)); - _path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes)); - peer->inferRemoteMultipathEnabled(); - } - - return true; -} - -bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer) -{ - if (!peer->rateGateQoS(RR->node->now())) - return true; - - /* Dissect incoming QoS packet. From this we can compute latency values and their variance. - * The latency variance is used as a measure of "jitter". */ - if (peer->localMultipathSupport()) { - if (payloadLength() > ZT_PATH_MAX_QOS_PACKET_SZ || payloadLength() < ZT_PATH_MIN_QOS_PACKET_SZ) { - return true; // ignore - } - const int64_t now = RR->node->now(); - uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE]; - uint16_t rx_ts[ZT_PATH_QOS_TABLE_SIZE]; - char *begin = (char *)payload(); - char *ptr = begin; - int count = 0; - int len = payloadLength(); - // Read packet IDs and latency compensation intervals for each packet tracked by this QoS packet - while (ptr < (begin + len) && (count < ZT_PATH_QOS_TABLE_SIZE)) { - memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t)); - ptr+=sizeof(uint64_t); - memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t)); - ptr+=sizeof(uint16_t); - count++; - } - _path->receivedQoS(now, count, rx_id, rx_ts); - peer->inferRemoteMultipathEnabled(); - } - - return true; -} - -bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer) -{ - unsigned int offset = ZT_PACKET_IDX_PAYLOAD; - const uint64_t nwid = at(offset); offset += 8; - const unsigned int flags = (*this)[offset]; ++offset; - - const SharedPtr network(RR->node->network(nwid)); - if (network) { - if ((flags & 0x01) != 0) { - // This is deprecated but may still be sent by old peers - CertificateOfMembership com; - offset += com.deserialize(*this,offset); - if (com) - network->addCredential(tPtr,com); - } - - if (!network->gate(tPtr,peer)) { - _sendErrorNeedCredentials(RR,tPtr,peer,nwid); - return false; - } - - unsigned int gatherLimit = 0; - if ((flags & 0x02) != 0) { - gatherLimit = at(offset); offset += 4; - } - - MAC from; - if ((flags & 0x04) != 0) { - from.setTo(field(offset,6),6); offset += 6; - } else { - from.fromAddress(peer->address(),nwid); - } - - const unsigned int recipientsOffset = offset; - std::list
recipients; - if ((flags & 0x08) != 0) { - const unsigned int rc = at(offset); offset += 2; - for(unsigned int i=0;iaddress())&&(a != RR->identity.address())) { - recipients.push_back(a); - } - offset += 5; - } - } - const unsigned int afterRecipientsOffset = offset; - - const MulticastGroup to(MAC(field(offset,6),6),at(offset + 6)); offset += 10; - const unsigned int etherType = at(offset); offset += 2; - const unsigned int frameLen = size() - offset; - - if (network->config().multicastLimit == 0) { - RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"multicast disabled"); - peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid); - return true; - } - if (!to.mac().isMulticast()) { - RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"destination not multicast"); - peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid); - return true; - } - if ((!from)||(from.isMulticast())||(from == network->mac())) { - RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"invalid source MAC"); - peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid); - return true; - } - - if ((frameLen > 0)&&(frameLen <= ZT_MAX_MTU)) { - const uint8_t *const frameData = ((const uint8_t *)unsafeData()) + offset; - if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),from,to.mac(),frameData,frameLen,etherType,0) > 0) { - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to.mac(),etherType,0,(const void *)frameData,frameLen); - } - } - - if (!recipients.empty()) { - // TODO - /* - const std::vector
anchors = network->config().anchors(); - const bool amAnchor = (std::find(anchors.begin(),anchors.end(),RR->identity.address()) != anchors.end()); - - for(std::list
::iterator ra(recipients.begin());ra!=recipients.end();) { - SharedPtr recipient(RR->topology->get(*ra)); - if ((recipient)&&((recipient->remoteVersionProtocol() < 10)||(amAnchor))) { - Packet outp(*ra,RR->identity.address(),Packet::VERB_MULTICAST_FRAME); - outp.append(field(ZT_PACKET_IDX_PAYLOAD,recipientsOffset - ZT_PACKET_IDX_PAYLOAD),recipientsOffset - ZT_PACKET_IDX_PAYLOAD); - outp.append(field(afterRecipientsOffset,size() - afterRecipientsOffset),size() - afterRecipientsOffset); - RR->sw->send(tPtr,outp,true); - recipients.erase(ra++); - } else ++ra; - } - - if (!recipients.empty()) { - Packet outp(recipients.front(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME); - recipients.pop_front(); - outp.append(field(ZT_PACKET_IDX_PAYLOAD,recipientsOffset - ZT_PACKET_IDX_PAYLOAD),recipientsOffset - ZT_PACKET_IDX_PAYLOAD); - if (!recipients.empty()) { - outp.append((uint16_t)recipients.size()); - for(std::list
::iterator ra(recipients.begin());ra!=recipients.end();++ra) - ra->appendTo(outp); - } - outp.append(field(afterRecipientsOffset,size() - afterRecipientsOffset),size() - afterRecipientsOffset); - RR->sw->send(tPtr,outp,true); - } - */ - } - - if (gatherLimit) { // DEPRECATED but still supported - /* - Packet outp(source(),RR->identity.address(),Packet::VERB_OK); - outp.append((unsigned char)Packet::VERB_MULTICAST_FRAME); - outp.append(packetId()); - outp.append(nwid); - to.mac().appendTo(outp); - outp.append((uint32_t)to.adi()); - outp.append((unsigned char)0x02); // flag 0x02 = contains gather results - if (RR->mc->gather(peer->address(),nwid,to,outp,gatherLimit)) { - outp.armor(peer->key(),true); - _path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now()); - } - */ - } - - peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid); - return true; - } else { - _sendErrorNeedCredentials(RR,tPtr,peer,nwid); - return false; - } -} - -#endif - } // namespace ZeroTier diff --git a/node/Mutex.hpp b/node/Mutex.hpp index 69b6d4584..439ebe45a 100644 --- a/node/Mutex.hpp +++ b/node/Mutex.hpp @@ -25,12 +25,12 @@ namespace ZeroTier { -// libpthread based mutex lock class Mutex { public: - ZT_ALWAYS_INLINE Mutex() { pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0); } + ZT_ALWAYS_INLINE Mutex() { pthread_mutex_init(&_mh,0); } ZT_ALWAYS_INLINE ~Mutex() { pthread_mutex_destroy(&_mh); } + ZT_ALWAYS_INLINE void lock() const { pthread_mutex_lock(&((const_cast (this))->_mh)); } ZT_ALWAYS_INLINE void unlock() const { pthread_mutex_unlock(&((const_cast (this))->_mh)); } @@ -51,6 +51,43 @@ private: pthread_mutex_t _mh; }; +class RWMutex +{ +public: + ZT_ALWAYS_INLINE RWMutex() { pthread_rwlock_init(&_mh,0); } + ZT_ALWAYS_INLINE ~RWMutex() { pthread_rwlock_destroy(&_mh); } + + ZT_ALWAYS_INLINE void lock() const { pthread_rwlock_wrlock(&((const_cast (this))->_mh)); } + ZT_ALWAYS_INLINE void rlock() const { pthread_rwlock_rdlock(&((const_cast (this))->_mh)); } + ZT_ALWAYS_INLINE void unlock() const { pthread_rwlock_unlock(&((const_cast (this))->_mh)); } + + class RLock + { + public: + ZT_ALWAYS_INLINE RLock(RWMutex &m) : _m(&m) { m.rlock(); } + ZT_ALWAYS_INLINE RLock(const RWMutex &m) : _m(const_cast(&m)) { _m->rlock(); } + ZT_ALWAYS_INLINE ~RLock() { _m->unlock(); } + private: + RWMutex *const _m; + }; + + class Lock + { + public: + ZT_ALWAYS_INLINE Lock(RWMutex &m) : _m(&m) { m.lock(); } + ZT_ALWAYS_INLINE Lock(const RWMutex &m) : _m(const_cast(&m)) { _m->lock(); } + ZT_ALWAYS_INLINE ~Lock() { _m->unlock(); } + private: + RWMutex *const _m; + }; + +private: + ZT_ALWAYS_INLINE RWMutex(const RWMutex &) {} + ZT_ALWAYS_INLINE const RWMutex &operator=(const RWMutex &) { return *this; } + + pthread_rwlock_t _mh; +}; + } // namespace ZeroTier #endif @@ -61,7 +98,6 @@ private: namespace ZeroTier { -// Windows critical section based lock class Mutex { public: diff --git a/node/NetworkConfig.hpp b/node/NetworkConfig.hpp index 2d0e590be..efcc4fcff 100644 --- a/node/NetworkConfig.hpp +++ b/node/NetworkConfig.hpp @@ -72,11 +72,6 @@ namespace ZeroTier { */ #define ZT_NETWORKCONFIG_FLAG_RULES_RESULT_OF_UNSUPPORTED_MATCH 0x0000000000000008ULL -/** - * Flag: disable frame compression - */ -#define ZT_NETWORKCONFIG_FLAG_DISABLE_COMPRESSION 0x0000000000000010ULL - /** * Device can bridge to other Ethernet networks and gets unknown recipient multicasts */ @@ -88,7 +83,7 @@ namespace ZeroTier { #define ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR 0x0000040000000000ULL /** - * Device that can probe and receive remote trace info about this network + * Device that is allowed to remotely debug connectivity on this network */ #define ZT_NETWORKCONFIG_SPECIALIST_TYPE_DIAGNOSTICIAN 0x0000080000000000ULL @@ -223,22 +218,6 @@ struct NetworkConfig */ inline bool ndpEmulation() const { return ((this->flags & ZT_NETWORKCONFIG_FLAG_ENABLE_IPV6_NDP_EMULATION) != 0); } - /** - * @return True if frames should not be compressed - */ - inline bool disableCompression() const - { -#ifndef ZT_DISABLE_COMPRESSION - return ((this->flags & ZT_NETWORKCONFIG_FLAG_DISABLE_COMPRESSION) != 0); -#else - /* Compression is disabled for libzt builds since it causes non-obvious chaotic - interference with lwIP's TCP congestion algorithm. Compression is also disabled - for some NAS builds due to the usage of low-performance processors in certain - older and budget models. */ - return true; -#endif - } - /** * @return Network type is public (no access control) */ diff --git a/node/Node.cpp b/node/Node.cpp index 2ccbc7a1d..c5c979f7f 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -298,7 +298,6 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr) { Mutex::Lock _l(_networks_m); SharedPtr *nw = _networks.get(nwid); - RR->sw->removeNetworkQoSControlBlock(nwid); if (!nw) return ZT_RESULT_OK; if (uptr) @@ -400,7 +399,6 @@ ZT_PeerList *Node::peers() const p->address = (*pi)->address().toInt(); identities[pl->peerCount] = (*pi)->identity(); // need to make a copy in case peer gets deleted p->identity = &identities[pl->peerCount]; - p->hadAggregateLink = 0; if ((*pi)->remoteVersionKnown()) { p->versionMajor = (int)(*pi)->remoteVersionMajor(); p->versionMinor = (int)(*pi)->remoteVersionMinor(); @@ -417,7 +415,6 @@ ZT_PeerList *Node::peers() const std::vector< SharedPtr > paths((*pi)->paths(now)); SharedPtr bestp((*pi)->getAppropriatePath(now,false)); - p->hadAggregateLink |= (*pi)->hasAggregateLink(); p->pathCount = 0; for(std::vector< SharedPtr >::iterator path(paths.begin());path!=paths.end();++path) { memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage)); @@ -426,16 +423,6 @@ ZT_PeerList *Node::peers() const p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address()); p->paths[p->pathCount].alive = (*path)->alive(now) ? 1 : 0; p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0; - p->paths[p->pathCount].latency = (float)(*path)->latency(); - p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance(); - p->paths[p->pathCount].throughputDisturbCoeff = (*path)->throughputDisturbanceCoefficient(); - p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio(); - p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio(); - p->paths[p->pathCount].stability = (*path)->lastComputedStability(); - p->paths[p->pathCount].throughput = (*path)->meanThroughput(); - p->paths[p->pathCount].maxThroughput = (*path)->maxLifetimeThroughput(); - p->paths[p->pathCount].allocation = (float)(*path)->allocation() / (float)255; - p->paths[p->pathCount].ifname = (*path)->getName(); ++p->pathCount; } diff --git a/node/Path.cpp b/node/Path.cpp index e209e9c04..aa420459c 100644 --- a/node/Path.cpp +++ b/node/Path.cpp @@ -26,4 +26,35 @@ bool Path::send(const RuntimeEnvironment *RR,void *tPtr,const void *data,unsigne return false; } +bool Path::isAddressValidForPath(const InetAddress &a) +{ + if ((a.ss_family == AF_INET)||(a.ss_family == AF_INET6)) { + switch(a.ipScope()) { + /* Note: we don't do link-local at the moment. Unfortunately these + * cause several issues. The first is that they usually require a + * device qualifier, which we don't handle yet and can't portably + * push in PUSH_DIRECT_PATHS. The second is that some OSes assign + * these very ephemerally or otherwise strangely. So we'll use + * private, pseudo-private, shared (e.g. carrier grade NAT), or + * global IP addresses. */ + case InetAddress::IP_SCOPE_PRIVATE: + case InetAddress::IP_SCOPE_PSEUDOPRIVATE: + case InetAddress::IP_SCOPE_SHARED: + case InetAddress::IP_SCOPE_GLOBAL: + if (a.ss_family == AF_INET6) { + // TEMPORARY HACK: for now, we are going to blacklist he.net IPv6 + // tunnels due to very spotty performance and low MTU issues over + // these IPv6 tunnel links. + const uint8_t *ipd = reinterpret_cast(reinterpret_cast(&a)->sin6_addr.s6_addr); + if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70)) + return false; + } + return true; + default: + return false; + } + } + return false; +} + } // namespace ZeroTier diff --git a/node/Path.hpp b/node/Path.hpp index d2fb10710..a4ffa6e38 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -26,15 +26,8 @@ #include "SharedPtr.hpp" #include "AtomicCounter.hpp" #include "Utils.hpp" -#include "RingBuffer.hpp" -#include "Packet.hpp" #include "Mutex.hpp" -/** - * Maximum return value of preferenceRank() - */ -#define ZT_PATH_MAX_PREFERENCE_RANK ((ZT_INETADDRESS_MAX_SCOPE << 1) | 1) - namespace ZeroTier { class RuntimeEnvironment; @@ -54,7 +47,6 @@ public: { public: ZT_ALWAYS_INLINE HashKey() {} - ZT_ALWAYS_INLINE HashKey(const int64_t l,const InetAddress &r) { if (r.ss_family == AF_INET) { @@ -79,81 +71,15 @@ public: uint64_t _k[3]; }; - inline Path() : - _lastOut(0), + ZT_ALWAYS_INLINE Path(const int64_t l,const InetAddress &r) : + _localSocket(l), _lastIn(0), - _lastPathQualityComputeTime(0), - _localSocket(-1), - _latency(0xffff), - _addr(), - _ipScope(InetAddress::IP_SCOPE_NONE), - _lastAck(0), - _lastThroughputEstimation(0), - _lastQoSMeasurement(0), - _lastQoSRecordPurge(0), - _unackedBytes(0), - _expectingAckAsOf(0), - _packetsReceivedSinceLastAck(0), - _packetsReceivedSinceLastQoS(0), - _maxLifetimeThroughput(0), - _lastComputedMeanThroughput(0), - _bytesAckedSinceLastThroughputEstimation(0), - _lastComputedMeanLatency(0.0), - _lastComputedPacketDelayVariance(0.0), - _lastComputedPacketErrorRatio(0.0), - _lastComputedPacketLossRatio(0), - _lastComputedStability(0.0), - _lastComputedRelativeQuality(0), - _lastComputedThroughputDistCoeff(0.0), - _lastAllocation(0) - { - memset(_ifname, 0, 16); - memset(_addrString, 0, sizeof(_addrString)); - } - - inline Path(const int64_t localSocket,const InetAddress &addr) : _lastOut(0), - _lastIn(0), - _lastPathQualityComputeTime(0), - _localSocket(localSocket), - _latency(0xffff), - _addr(addr), - _ipScope(addr.ipScope()), - _lastAck(0), - _lastThroughputEstimation(0), - _lastQoSMeasurement(0), - _lastQoSRecordPurge(0), - _unackedBytes(0), - _expectingAckAsOf(0), - _packetsReceivedSinceLastAck(0), - _packetsReceivedSinceLastQoS(0), - _maxLifetimeThroughput(0), - _lastComputedMeanThroughput(0), - _bytesAckedSinceLastThroughputEstimation(0), - _lastComputedMeanLatency(0.0), - _lastComputedPacketDelayVariance(0.0), - _lastComputedPacketErrorRatio(0.0), - _lastComputedPacketLossRatio(0), - _lastComputedStability(0.0), - _lastComputedRelativeQuality(0), - _lastComputedThroughputDistCoeff(0.0), - _lastAllocation(0) + _addr(r), + __refCount() { - memset(_ifname, 0, 16); - memset(_addrString, 0, sizeof(_addrString)); - if (_localSocket != -1) { - // TODO: add localInterface alongside localSocket - //_phy->getIfName((PhySocket *) ((uintptr_t) _localSocket), _ifname, 16); - } } - /** - * Called when a packet is received from this remote path, regardless of content - * - * @param t Time of receive - */ - inline void received(const uint64_t t) { _lastIn = t; } - /** * Send a packet via this path (last out time is also updated) * @@ -167,53 +93,38 @@ public: bool send(const RuntimeEnvironment *RR,void *tPtr,const void *data,unsigned int len,int64_t now); /** - * Manually update last sent time + * Called when a packet is received from this remote path, regardless of content * - * @param t Time of send + * @param t Time of receive */ - inline void sent(const int64_t t) { _lastOut = t; } + ZT_ALWAYS_INLINE void received(const uint64_t t) { _lastIn = t; } /** - * Update path latency with a new measurement + * Check path aliveness * - * @param l Measured latency + * @param now Current time */ - inline void updateLatency(const unsigned int l, int64_t now) - { - unsigned int pl = _latency; - if (pl < 0xffff) { - _latency = (pl + l) / 2; - } - else { - _latency = l; - } - _latencySamples.push(l); - } - - /** - * @return Local socket as specified by external code - */ - inline int64_t localSocket() const { return _localSocket; } + ZT_ALWAYS_INLINE bool alive(const int64_t now) const { return ((now - _lastIn) < ZT_PATH_ACTIVITY_TIMEOUT); } /** * @return Physical address */ - inline const InetAddress &address() const { return _addr; } + ZT_ALWAYS_INLINE const InetAddress &address() const { return _addr; } /** - * @return IP scope -- faster shortcut for address().ipScope() + * @return Local socket as specified by external code */ - inline InetAddress::IpScope ipScope() const { return _ipScope; } + ZT_ALWAYS_INLINE int64_t localSocket() const { return _localSocket; } /** - * @return Preference rank, higher == better + * @return Last time we received anything */ - inline unsigned int preferenceRank() const - { - // This causes us to rank paths in order of IP scope rank (see InetAdddress.hpp) but - // within each IP scope class to prefer IPv6 over IPv4. - return ( ((unsigned int)_ipScope << 1) | (unsigned int)(_addr.ss_family == AF_INET6) ); - } + ZT_ALWAYS_INLINE int64_t lastIn() const { return _lastIn; } + + /** + * @return Last time we sent something + */ + ZT_ALWAYS_INLINE int64_t lastOut() const { return _lastOut; } /** * Check whether this address is valid for a ZeroTier path @@ -224,443 +135,14 @@ public: * @param a Address to check * @return True if address is good for ZeroTier path use */ - static inline bool isAddressValidForPath(const InetAddress &a) - { - if ((a.ss_family == AF_INET)||(a.ss_family == AF_INET6)) { - switch(a.ipScope()) { - /* Note: we don't do link-local at the moment. Unfortunately these - * cause several issues. The first is that they usually require a - * device qualifier, which we don't handle yet and can't portably - * push in PUSH_DIRECT_PATHS. The second is that some OSes assign - * these very ephemerally or otherwise strangely. So we'll use - * private, pseudo-private, shared (e.g. carrier grade NAT), or - * global IP addresses. */ - case InetAddress::IP_SCOPE_PRIVATE: - case InetAddress::IP_SCOPE_PSEUDOPRIVATE: - case InetAddress::IP_SCOPE_SHARED: - case InetAddress::IP_SCOPE_GLOBAL: - if (a.ss_family == AF_INET6) { - // TEMPORARY HACK: for now, we are going to blacklist he.net IPv6 - // tunnels due to very spotty performance and low MTU issues over - // these IPv6 tunnel links. - const uint8_t *ipd = reinterpret_cast(reinterpret_cast(&a)->sin6_addr.s6_addr); - if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70)) - return false; - } - return true; - default: - return false; - } - } - return false; - } - - /** - * @return Latency or 0xffff if unknown - */ - inline unsigned int latency() const { return _latency; } - - /** - * @return Path quality -- lower is better - */ - inline long quality(const int64_t now) const - { - const long l = (long)_latency; - const long age = (long)std::min((long)(now - _lastIn),(long)(ZT_PEER_PING_PERIOD * 10)); // set an upper sanity limit to avoid overflow - return ( ( (age < (ZT_PEER_PING_PERIOD + 5000)) ? l : (l + 65535 + age) ) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1)); - } - - /** - * Record statistics on outgoing packets. Used later to estimate QoS metrics. - * - * @param now Current time - * @param packetId ID of packet - * @param payloadLength Length of payload - * @param verb Packet verb - */ - inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) - { - Mutex::Lock _l(_statistics_m); - if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { - if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { - _unackedBytes += payloadLength; - // Take note that we're expecting a VERB_ACK on this path as of a specific time - _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now; - if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) { - _outQoSRecords[packetId] = now; - } - } - } - } - - /** - * Record statistics on incoming packets. Used later to estimate QoS metrics. - * - * @param now Current time - * @param packetId ID of packet - * @param payloadLength Length of payload - * @param verb Packet verb - */ - inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb) - { - Mutex::Lock _l(_statistics_m); - if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { - if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { - _inACKRecords[packetId] = payloadLength; - _packetsReceivedSinceLastAck++; - _inQoSRecords[packetId] = now; - _packetsReceivedSinceLastQoS++; - } - _packetValiditySamples.push(true); - } - } - - /** - * Record that we've received a VERB_ACK on this path, also compute throughput if required. - * - * @param now Current time - * @param ackedBytes Number of bytes acknowledged by other peer - */ - inline void receivedAck(int64_t now, int32_t ackedBytes) - { - _expectingAckAsOf = 0; - _unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes; - int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation); - if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) { - uint64_t throughput = (uint64_t)((float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000)); - _throughputSamples.push(throughput); - _maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput; - _lastThroughputEstimation = now; - _bytesAckedSinceLastThroughputEstimation = 0; - } else { - _bytesAckedSinceLastThroughputEstimation += ackedBytes; - } - } - - /** - * @return Number of bytes this peer is responsible for ACKing since last ACK - */ - inline int32_t bytesToAck() - { - Mutex::Lock _l(_statistics_m); - int32_t bytesToAck = 0; - std::map::iterator it = _inACKRecords.begin(); - while (it != _inACKRecords.end()) { - bytesToAck += it->second; - it++; - } - return bytesToAck; - } - - /** - * @return Number of bytes thus far sent that have not been acknowledged by the remote peer - */ - inline int64_t unackedSentBytes() - { - return _unackedBytes; - } - - /** - * Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers - * - * @param Current time - */ - inline void sentAck(int64_t now) - { - Mutex::Lock _l(_statistics_m); - _inACKRecords.clear(); - _packetsReceivedSinceLastAck = 0; - _lastAck = now; - } - - /** - * Receive QoS data, match with recorded egress times from this peer, compute latency - * estimates. - * - * @param now Current time - * @param count Number of records - * @param rx_id table of packet IDs - * @param rx_ts table of holding times - */ - inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts) - { - Mutex::Lock _l(_statistics_m); - // Look up egress times and compute latency values for each record - std::map::iterator it; - for (int j=0; jsecond); - uint16_t rtt_compensated = rtt - rx_ts[j]; - uint16_t latency = rtt_compensated / 2; - updateLatency(latency, now); - _outQoSRecords.erase(it); - } - } - } - - /** - * Generate the contents of a VERB_QOS_MEASUREMENT packet. - * - * @param now Current time - * @param qosBuffer destination buffer - * @return Size of payload - */ - inline int32_t generateQoSPacket(int64_t now, char *qosBuffer) - { - Mutex::Lock _l(_statistics_m); - int32_t len = 0; - std::map::iterator it = _inQoSRecords.begin(); - int i=0; - while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) { - uint64_t id = it->first; - memcpy(qosBuffer, &id, sizeof(uint64_t)); - qosBuffer+=sizeof(uint64_t); - uint16_t holdingTime = (uint16_t)(now - it->second); - memcpy(qosBuffer, &holdingTime, sizeof(uint16_t)); - qosBuffer+=sizeof(uint16_t); - len+=sizeof(uint64_t)+sizeof(uint16_t); - _inQoSRecords.erase(it++); - i++; - } - return len; - } - - /** - * Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers. - * - * @param Current time - */ - inline void sentQoS(int64_t now) { - _packetsReceivedSinceLastQoS = 0; - _lastQoSMeasurement = now; - } - - /** - * @param now Current time - * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time - */ - inline bool needsToSendAck(int64_t now) { - return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL || - (_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck; - } - - /** - * @param now Current time - * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time - */ - inline bool needsToSendQoS(int64_t now) { - return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) || - ((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS; - } - - /** - * How much time has elapsed since we've been expecting a VERB_ACK on this path. This value - * is used to determine a more relevant path "age". This lets us penalize paths which are no - * longer ACKing, but not those that simple aren't being used to carry traffic at the - * current time. - */ - inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; } - - /** - * The maximum observed throughput (in bits/s) for this path - */ - inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; } - - /** - * @return The mean throughput (in bits/s) of this link - */ - inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; } - - /** - * Assign a new relative quality value for this path in the aggregate link - * - * @param rq Quality of this path in comparison to other paths available to this peer - */ - inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; } - - /** - * @return Quality of this path compared to others in the aggregate link - */ - inline float relativeQuality() { return _lastComputedRelativeQuality; } - - /** - * Assign a new allocation value for this path in the aggregate link - * - * @param allocation Percentage of traffic to be sent over this path to a peer - */ - inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; } - - /** - * @return Percentage of traffic allocated to this path in the aggregate link - */ - inline unsigned char allocation() { return _lastAllocation; } - - /** - * @return Stability estimates can become expensive to compute, we cache the most recent result. - */ - inline float lastComputedStability() { return _lastComputedStability; } - - /** - * @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to - */ - inline char *getName() { return _ifname; } - - /** - * @return Packet delay variance - */ - inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; } - - /** - * @return Previously-computed mean latency - */ - inline float meanLatency() { return _lastComputedMeanLatency; } - - /** - * @return Packet loss rate (PLR) - */ - inline float packetLossRatio() { return _lastComputedPacketLossRatio; } - - /** - * @return Packet error ratio (PER) - */ - inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; } - - /** - * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now - * contribute to a Packet Error Ratio (PER). - */ - inline void recordInvalidPacket() { _packetValiditySamples.push(false); } - - /** - * @return A pointer to a cached copy of the address string for this Path (For debugging only) - */ - inline char *getAddressString() { return _addrString; } - - /** - * @return The current throughput disturbance coefficient - */ - inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; } - - /** - * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved" - * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality". - * - * @param now Current time - */ - inline void processBackgroundPathMeasurements(const int64_t now) - { - if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) { - Mutex::Lock _l(_statistics_m); - _lastPathQualityComputeTime = now; - address().toString(_addrString); - _lastComputedMeanLatency = _latencySamples.mean(); - _lastComputedPacketDelayVariance = _latencySamples.stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689) - _lastComputedMeanThroughput = (uint64_t)_throughputSamples.mean(); - - // If no packet validity samples, assume PER==0 - _lastComputedPacketErrorRatio = 1 - (_packetValiditySamples.count() ? _packetValiditySamples.mean() : 1); - - // Compute path stability - // Normalize measurements with wildly different ranges into a reasonable range - float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10); - float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10); - float throughput_cv = _throughputSamples.mean() > 0 ? _throughputSamples.stddev() / _throughputSamples.mean() : 1; - - // Form an exponential cutoff and apply contribution weights - float pdv_contrib = expf((-1.0f)*normalized_pdv) * (float)ZT_PATH_CONTRIB_PDV; - float latency_contrib = expf((-1.0f)*normalized_la) * (float)ZT_PATH_CONTRIB_LATENCY; - - // Throughput Disturbance Coefficient - float throughput_disturbance_contrib = expf((-1.0f)*throughput_cv) * (float)ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE; - _throughputDisturbanceSamples.push(throughput_cv); - _lastComputedThroughputDistCoeff = _throughputDisturbanceSamples.mean(); - - // Obey user-defined ignored contributions - pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1; - latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1; - throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1; - - // Stability - _lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib; - _lastComputedStability *= 1 - _lastComputedPacketErrorRatio; - - // Prevent QoS records from sticking around for too long - std::map::iterator it = _outQoSRecords.begin(); - while (it != _outQoSRecords.end()) { - // Time since egress of tracked packet - if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) { - _outQoSRecords.erase(it++); - } else { it++; } - } - } - } - - /** - * @return True if this path is alive (receiving data) - */ - inline bool alive(const int64_t now) const { return ((now - _lastIn) < ((ZT_PEER_PING_PERIOD * 2) + 5000)); } - - /** - * @return Last time we sent something - */ - inline int64_t lastOut() const { return _lastOut; } - - /** - * @return Last time we received anything - */ - inline int64_t lastIn() const { return _lastIn; } + static bool isAddressValidForPath(const InetAddress &a); private: - Mutex _statistics_m; - - volatile int64_t _lastOut; - volatile int64_t _lastIn; - volatile int64_t _lastPathQualityComputeTime; int64_t _localSocket; - volatile unsigned int _latency; + int64_t _lastIn; + int64_t _lastOut; InetAddress _addr; - InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often AtomicCounter __refCount; - - std::map _outQoSRecords; // id:egress_time - std::map _inQoSRecords; // id:now - std::map _inACKRecords; // id:len - - int64_t _lastAck; - int64_t _lastThroughputEstimation; - int64_t _lastQoSMeasurement; - int64_t _lastQoSRecordPurge; - - int64_t _unackedBytes; - int64_t _expectingAckAsOf; - int16_t _packetsReceivedSinceLastAck; - int16_t _packetsReceivedSinceLastQoS; - - uint64_t _maxLifetimeThroughput; - uint64_t _lastComputedMeanThroughput; - uint64_t _bytesAckedSinceLastThroughputEstimation; - - float _lastComputedMeanLatency; - float _lastComputedPacketDelayVariance; - - float _lastComputedPacketErrorRatio; - float _lastComputedPacketLossRatio; - - // cached estimates - float _lastComputedStability; - float _lastComputedRelativeQuality; - float _lastComputedThroughputDistCoeff; - unsigned char _lastAllocation; - - // cached human-readable strings for tracing purposes - char _ifname[16]; - char _addrString[256]; - - RingBuffer _throughputSamples; - RingBuffer _latencySamples; - RingBuffer _packetValiditySamples; - RingBuffer _throughputDisturbanceSamples; }; } // namespace ZeroTier diff --git a/node/Peer.cpp b/node/Peer.cpp index e098b865b..116d3e0f4 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -20,42 +20,23 @@ #include "Packet.hpp" #include "Trace.hpp" #include "InetAddress.hpp" -#include "RingBuffer.hpp" -#include "Utils.hpp" -#include "ScopedPtr.hpp" namespace ZeroTier { Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) : RR(renv), _lastReceive(0), - _lastDirectPathPushSent(0), - _lastDirectPathPushReceive(0), - _lastCredentialRequestSent(0), _lastWhoisRequestReceived(0), _lastEchoRequestReceived(0), - _lastCredentialsReceived(0), - _lastACKWindowReset(0), - _lastQoSWindowReset(0), - _lastMultipathCompatibilityCheck(0), + _lastPushDirectPathsReceived(0), _lastTriedStaticPath(0), - _uniqueAlivePathCount(0), - _localMultipathSupported(false), - _remoteMultipathSupported(false), - _canUseMultipath(false), - _freeRandomByte((uint8_t)Utils::random()), + _latency(0xffff), + _pathCount(0), + _id(peerIdentity), _vProto(0), _vMajor(0), _vMinor(0), - _vRevision(0), - _id(peerIdentity), - _directPathPushCutoffCount(0), - _credentialsCutoffCount(0), - _linkIsBalanced(false), - _linkIsRedundant(false), - _remotePeerMultipathEnabled(false), - _lastAggregateStatsReport(0), - _lastAggregateAllocation(0) + _vRevision(0) { if (!myIdentity.agree(peerIdentity,_key)) throw ZT_EXCEPTION_INVALID_ARGUMENT; @@ -76,23 +57,7 @@ void Peer::received( _lastReceive = now; - { - Mutex::Lock _l(_paths_m); - - recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); - - if (_canUseMultipath) { - if (path->needsToSendQoS(now)) { - sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); - } - for(unsigned int i=0;iprocessBackgroundPathMeasurements(now); - } - } - } - } - + /* if (hops == 0) { // If this is a direct packet (no hops), update existing paths or learn new ones bool havePath = false; @@ -164,9 +129,11 @@ void Peer::received( RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb); } } + */ // Periodically push direct paths to the peer, doing so more often if we do not // currently have a direct path. + /* const int64_t sinceLastPush = now - _lastDirectPathPushSent; if (sinceLastPush >= ((hops == 0) ? ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH : ZT_DIRECT_PATH_PUSH_INTERVAL)) { _lastDirectPathPushSent = now; @@ -219,452 +186,13 @@ void Peer::received( } } } + */ } -void Peer::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, - uint16_t payloadLength, const Packet::Verb verb, int64_t now) +bool Peer::hasActivePathTo(int64_t now,const InetAddress &addr) const { - _freeRandomByte += (unsigned char)(packetId >> 8); // grab entropy to use in path selection logic for multipath - if (_canUseMultipath) { - path->recordOutgoingPacket(now, packetId, payloadLength, verb); - } -} - -void Peer::recordIncomingPacket(void *tPtr, const SharedPtr &path, const uint64_t packetId, - uint16_t payloadLength, const Packet::Verb verb, int64_t now) -{ - if (_canUseMultipath) { - if (path->needsToSendAck(now)) { - sendACK(tPtr, path, path->localSocket(), path->address(), now); - } - path->recordIncomingPacket(now, packetId, payloadLength, verb); - } -} - -void Peer::computeAggregateProportionalAllocation(int64_t now) -{ - float maxStability = 0; - float totalRelativeQuality = 0; - float maxThroughput = 1; - float maxScope = 0; - float relStability[ZT_MAX_PEER_NETWORK_PATHS]; - float relThroughput[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&relStability, 0, sizeof(relStability)); - memset(&relThroughput, 0, sizeof(relThroughput)); - // Survey all paths - for(unsigned int i=0;ilastComputedStability(); - relThroughput[i] = (float)_paths[i]->maxLifetimeThroughput(); - maxStability = relStability[i] > maxStability ? relStability[i] : maxStability; - maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput; - maxScope = _paths[i]->ipScope() > maxScope ? _paths[i]->ipScope() : maxScope; - } - } - // Convert to relative values - for(unsigned int i=0;iackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10); - float age_contrib = exp((-1)*normalized_ma); - float relScope = ((float)(_paths[i]->ipScope()+1) / (maxScope + 1)); - float relQuality = - (relStability[i] * (float)ZT_PATH_CONTRIB_STABILITY) - + (fmaxf(1.0f, relThroughput[i]) * (float)ZT_PATH_CONTRIB_THROUGHPUT) - + relScope * (float)ZT_PATH_CONTRIB_SCOPE; - relQuality *= age_contrib; - // Arbitrary cutoffs - relQuality = relQuality > (1.00f / 100.0f) ? relQuality : 0.0f; - relQuality = relQuality < (99.0f / 100.0f) ? relQuality : 1.0f; - totalRelativeQuality += relQuality; - _paths[i]->updateRelativeQuality(relQuality); - } - } - // Convert set of relative performances into an allocation set - for(uint16_t i=0;iupdateComponentAllocationOfAggregateLink((unsigned char)((_paths[i]->relativeQuality() / totalRelativeQuality) * 255)); - } - } -} - -int Peer::computeAggregateLinkPacketDelayVariance() -{ - float pdv = 0.0; - for(unsigned int i=0;irelativeQuality() * _paths[i]->packetDelayVariance(); - } - } - return (int)pdv; -} - -int Peer::computeAggregateLinkMeanLatency() -{ - int ml = 0; - int pathCount = 0; - for(unsigned int i=0;irelativeQuality() * _paths[i]->meanLatency()); - } - } - return ml / pathCount; -} - -int Peer::aggregateLinkPhysicalPathCount() -{ - std::map ifnamemap; - int pathCount = 0; - int64_t now = RR->node->now(); - for(unsigned int i=0;ialive(now)) { - if (!ifnamemap[_paths[i]->getName()]) { - ifnamemap[_paths[i]->getName()] = true; - pathCount++; - } - } - } - return pathCount; -} - -int Peer::aggregateLinkLogicalPathCount() -{ - int pathCount = 0; - int64_t now = RR->node->now(); - for(unsigned int i=0;ialive(now)) { - pathCount++; - } - } - return pathCount; -} - -SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) -{ - Mutex::Lock _l(_paths_m); - unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - - /** - * Send traffic across the highest quality path only. This algorithm will still - * use the old path quality metric from protocol version 9. - */ - if (!_canUseMultipath) { - long bestPathQuality = 2147483647; - for(unsigned int i=0;ialive(now))) { - const long q = _paths[i]->quality(now); - if (q <= bestPathQuality) { - bestPathQuality = q; - bestPath = i; - } - } - } else break; - } - if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { - return _paths[bestPath]; - } - return SharedPtr(); - } - - for(unsigned int i=0;iprocessBackgroundPathMeasurements(now); - } - } - - /** - * Randomly distribute traffic across all paths - */ - int numAlivePaths = 0; - int numStalePaths = 0; - if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) { - int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; - int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&alivePaths, -1, sizeof(alivePaths)); - memset(&stalePaths, -1, sizeof(stalePaths)); - for(unsigned int i=0;ialive(now)) { - alivePaths[numAlivePaths] = i; - numAlivePaths++; - } - else { - stalePaths[numStalePaths] = i; - numStalePaths++; - } - } - } - unsigned int r = _freeRandomByte; - if (numAlivePaths > 0) { - int rf = r % numAlivePaths; - return _paths[alivePaths[rf]]; - } - else if(numStalePaths > 0) { - // Resort to trying any non-expired path - int rf = r % numStalePaths; - return _paths[stalePaths[rf]]; - } - } - - /** - * Proportionally allocate traffic according to dynamic path quality measurements - */ - if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { - if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { - _lastAggregateAllocation = now; - computeAggregateProportionalAllocation(now); - } - // Randomly choose path according to their allocations - float rf = _freeRandomByte; - for(int i=0;iallocation()) { - bestPath = i; - _pathChoiceHist.push(bestPath); // Record which path we chose - break; - } - rf -= _paths[i]->allocation(); - } - } - if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) { - return _paths[bestPath]; - } - } - return SharedPtr(); -} - -char *Peer::interfaceListStr() -{ - std::map ifnamemap; - char tmp[32]; - const int64_t now = RR->node->now(); - char *ptr = _interfaceListStr; - bool imbalanced = false; - memset(_interfaceListStr, 0, sizeof(_interfaceListStr)); - int alivePathCount = aggregateLinkLogicalPathCount(); - for(unsigned int i=0;ialive(now)) { - int ipv = _paths[i]->address().isV4(); - // If this is acting as an aggregate link, check allocations - float targetAllocation = 1.0f / (float)alivePathCount; - float currentAllocation = 1.0f; - if (alivePathCount > 1) { - currentAllocation = (float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count(); - if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) { - imbalanced = true; - } - } - char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6"; - sprintf(tmp, "(%s, %s, %.3f)", _paths[i]->getName(), ipvStr, currentAllocation); - // Prevent duplicates - if(ifnamemap[_paths[i]->getName()] != ipv) { - memcpy(ptr, tmp, strlen(tmp)); - ptr += strlen(tmp); - *ptr = ' '; - ptr++; - ifnamemap[_paths[i]->getName()] = ipv; - } - } - } - ptr--; // Overwrite trailing space - if (imbalanced) { - sprintf(tmp, ", is asymmetrical"); - memcpy(ptr, tmp, sizeof(tmp)); - } else { - *ptr = '\0'; - } - return _interfaceListStr; -} - -void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr &other) const -{ - unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - unsigned int myBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - long myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - long myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - unsigned int theirBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - unsigned int theirBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - long theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - long theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1]; - for(int i=0;i<=ZT_INETADDRESS_MAX_SCOPE;++i) { - myBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; - myBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; - myBestV4QualityByScope[i] = 2147483647; - myBestV6QualityByScope[i] = 2147483647; - theirBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; - theirBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS; - theirBestV4QualityByScope[i] = 2147483647; - theirBestV6QualityByScope[i] = 2147483647; - } - - Mutex::Lock _l1(_paths_m); - - for(unsigned int i=0;iquality(now); - const unsigned int s = (unsigned int)_paths[i]->ipScope(); - switch(_paths[i]->address().ss_family) { - case AF_INET: - if (q <= myBestV4QualityByScope[s]) { - myBestV4QualityByScope[s] = q; - myBestV4ByScope[s] = i; - } - break; - case AF_INET6: - if (q <= myBestV6QualityByScope[s]) { - myBestV6QualityByScope[s] = q; - myBestV6ByScope[s] = i; - } - break; - } - } else break; - } - - Mutex::Lock _l2(other->_paths_m); - - for(unsigned int i=0;i_paths[i]) { - const long q = other->_paths[i]->quality(now); - const unsigned int s = (unsigned int)other->_paths[i]->ipScope(); - switch(other->_paths[i]->address().ss_family) { - case AF_INET: - if (q <= theirBestV4QualityByScope[s]) { - theirBestV4QualityByScope[s] = q; - theirBestV4ByScope[s] = i; - } - break; - case AF_INET6: - if (q <= theirBestV6QualityByScope[s]) { - theirBestV6QualityByScope[s] = q; - theirBestV6ByScope[s] = i; - } - break; - } - } else break; - } - - unsigned int mine = ZT_MAX_PEER_NETWORK_PATHS; - unsigned int theirs = ZT_MAX_PEER_NETWORK_PATHS; - - for(int s=ZT_INETADDRESS_MAX_SCOPE;s>=0;--s) { - if ((myBestV6ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)&&(theirBestV6ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)) { - mine = myBestV6ByScope[s]; - theirs = theirBestV6ByScope[s]; - break; - } - if ((myBestV4ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)&&(theirBestV4ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)) { - mine = myBestV4ByScope[s]; - theirs = theirBestV4ByScope[s]; - break; - } - } - - if (mine != ZT_MAX_PEER_NETWORK_PATHS) { - unsigned int alt = (unsigned int)Utils::random() & 1; // randomize which hint we send first for black magickal NAT-t reasons - const unsigned int completed = alt + 2; - while (alt != completed) { - if ((alt & 1) == 0) { - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); - other->_id.address().appendTo(outp); - outp.append((uint16_t)other->_paths[theirs]->address().port()); - if (other->_paths[theirs]->address().ss_family == AF_INET6) { - outp.append((uint8_t)16); - outp.append(other->_paths[theirs]->address().rawIpData(),16); - } else { - outp.append((uint8_t)4); - outp.append(other->_paths[theirs]->address().rawIpData(),4); - } - outp.armor(_key,true); - _paths[mine]->send(RR,tPtr,outp.data(),outp.size(),now); - } else { - Packet outp(other->_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); - _id.address().appendTo(outp); - outp.append((uint16_t)_paths[mine]->address().port()); - if (_paths[mine]->address().ss_family == AF_INET6) { - outp.append((uint8_t)16); - outp.append(_paths[mine]->address().rawIpData(),16); - } else { - outp.append((uint8_t)4); - outp.append(_paths[mine]->address().rawIpData(),4); - } - outp.armor(other->_key,true); - other->_paths[theirs]->send(RR,tPtr,outp.data(),outp.size(),now); - } - ++alt; - } - } -} - -inline void Peer::processBackgroundPeerTasks(const int64_t now) -{ - // Determine current multipath compatibility with other peer - if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { - // - // Cache number of available paths so that we can short-circuit multipath logic elsewhere - // - // We also take notice of duplicate paths (same IP only) because we may have - // recently received a direct path push from a peer and our list might contain - // a dead path which hasn't been fully recognized as such. In this case we - // don't want the duplicate to trigger execution of multipath code prematurely. - // - // This is done to support the behavior of auto multipath enable/disable - // without user intervention. - // - int currAlivePathCount = 0; - int duplicatePathsFound = 0; - for (unsigned int i=0;iaddress().ipsEqual2(_paths[j]->address()) && i != j) { - duplicatePathsFound+=1; - break; - } - } - } - } - _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2)); - _lastMultipathCompatibilityCheck = now; - _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); - _remoteMultipathSupported = _vProto > 9; - // If both peers support multipath and more than one path exist, we can use multipath logic - _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); - } -} - -void Peer::sendACK(void *tPtr,const SharedPtr &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) -{ - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); - uint32_t bytesToAck = path->bytesToAck(); - outp.append(bytesToAck); - if (atAddress) { - outp.armor(_key,false); - RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); - } else { - RR->sw->send(tPtr,outp,false); - } - path->sentAck(now); -} - -void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) -{ - const int64_t _now = RR->node->now(); - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); - char qosData[ZT_PATH_MAX_QOS_PACKET_SZ]; - int16_t len = path->generateQoSPacket(_now,qosData); - outp.append(qosData,len); - if (atAddress) { - outp.armor(_key,false); - RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); - } else { - RR->sw->send(tPtr,outp,false); - } - path->sentQoS(now); + // TODO + return false; } void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now) @@ -691,24 +219,9 @@ void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atA void Peer::ping(void *tPtr,int64_t now,unsigned int &v4SendCount,unsigned int &v6SendCount) { + /* Mutex::Lock _l(_paths_m); - if (_canUseMultipath) { - int alivePathCount = aggregateLinkPhysicalPathCount(); - if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { - _lastAggregateStatsReport = now; - if (alivePathCount) { - RR->t->peerLinkAggregateStatistics(NULL,*this); - } - } if (alivePathCount < 2 && _linkIsRedundant) { - _linkIsRedundant = !_linkIsRedundant; - RR->t->peerLinkNoLongerRedundant(NULL,*this); - } if (alivePathCount > 1 && !_linkIsRedundant) { - _linkIsRedundant = !_linkIsRedundant; - RR->t->peerLinkNowRedundant(NULL,*this); - } - } - unsigned int j = 0; for(unsigned int i=0;ialive(now))) { @@ -729,10 +242,12 @@ void Peer::ping(void *tPtr,int64_t now,unsigned int &v4SendCount,unsigned int &v _paths[j].zero(); ++j; } + */ } void Peer::resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddressFamily,int64_t now) { + /* Mutex::Lock _l(_paths_m); for(unsigned int i=0;i - #include "Constants.hpp" #include "RuntimeEnvironment.hpp" #include "Node.hpp" @@ -30,6 +28,8 @@ #include "Hashtable.hpp" #include "Mutex.hpp" +#include + namespace ZeroTier { /** @@ -40,7 +40,7 @@ class Peer friend class SharedPtr; private: - inline Peer() {} // disabled to prevent bugs -- should not be constructed uninitialized + inline Peer() {} public: ZT_ALWAYS_INLINE ~Peer() { Utils::burn(_key,sizeof(_key)); } @@ -98,124 +98,7 @@ public: * @param addr Remote address * @return True if we have an active path to this destination */ - inline bool hasActivePathTo(int64_t now,const InetAddress &addr) const - { - Mutex::Lock _l(_paths_m); - for(unsigned int i=0;iaddress() == addr)&&(_paths[i]->alive(now))) - return true; - } else break; - } - return false; - } - - /** - * Send via best direct path - * - * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - * @param data Packet data - * @param len Packet length - * @param now Current time - * @param force If true, send even if path is not alive - * @return True if we actually sent something - */ - ZT_ALWAYS_INLINE bool sendDirect(void *tPtr,const void *data,unsigned int len,int64_t now,bool force) - { - SharedPtr bp(getAppropriatePath(now,force)); - if (bp) - return bp->send(RR,tPtr,data,len,now); - return false; - } - - /** - * Record statistics on outgoing packets - * - * @param path Path over which packet was sent - * @param id Packet ID - * @param len Length of packet payload - * @param verb Packet verb - * @param now Current time - */ - void recordOutgoingPacket(const SharedPtr &path, uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now); - - /** - * Record statistics on incoming packets - * - * @param path Path over which packet was sent - * @param id Packet ID - * @param len Length of packet payload - * @param verb Packet verb - * @param now Current time - */ - void recordIncomingPacket(void *tPtr, const SharedPtr &path, uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now); - - /** - * Send an ACK to peer for the most recent packets received - * - * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - * @param localSocket Raw socket the ACK packet will be sent over - * @param atAddress Destination for the ACK packet - * @param now Current time - */ - void sendACK(void *tPtr, const SharedPtr &path, int64_t localSocket,const InetAddress &atAddress,int64_t now); - - /** - * Send a QoS packet to peer so that it can evaluate the quality of this link - * - * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - * @param localSocket Raw socket the QoS packet will be sent over - * @param atAddress Destination for the QoS packet - * @param now Current time - */ - void sendQOS_MEASUREMENT(void *tPtr, const SharedPtr &path, int64_t localSocket,const InetAddress &atAddress,int64_t now); - - /** - * Compute relative quality values and allocations for the components of the aggregate link - * - * @param now Current time - */ - void computeAggregateProportionalAllocation(int64_t now); - - /** - * @return The aggregate link Packet Delay Variance (PDV) - */ - int computeAggregateLinkPacketDelayVariance(); - - /** - * @return The aggregate link mean latency - */ - int computeAggregateLinkMeanLatency(); - - /** - * @return The number of currently alive "physical" paths in the aggregate link - */ - int aggregateLinkPhysicalPathCount(); - - /** - * @return The number of currently alive "logical" paths in the aggregate link - */ - int aggregateLinkLogicalPathCount(); - - /** - * Get the most appropriate direct path based on current multipath and QoS configuration - * - * @param now Current time - * @param includeExpired If true, include even expired paths - * @return Best current path or NULL if none - */ - SharedPtr getAppropriatePath(int64_t now, bool includeExpired); - - /** - * Generate a human-readable string of interface names making up the aggregate link, also include - * moving allocation and IP version number for each (for tracing) - */ - char *interfaceListStr(); - - /** - * Send VERB_RENDEZVOUS to this and another peer via the best common IP scope and path - */ - void introduce(void *tPtr,int64_t now,const SharedPtr &other) const; + bool hasActivePathTo(int64_t now,const InetAddress &addr) const; /** * Send a HELLO to this peer at a specified physical address @@ -256,21 +139,6 @@ public: */ void resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddressFamily,int64_t now); - /** - * @param now Current time - * @return All known paths to this peer - */ - inline std::vector< SharedPtr > paths(const int64_t now) const - { - std::vector< SharedPtr > pp; - Mutex::Lock _l(_paths_m); - for(unsigned int i=0;i bp(getAppropriatePath(now,false)); - if (bp) - return bp->latency(); - return 0xffff; + if ((l > 0)&&(l < 0xffff)) { + unsigned int lat = _latency; + if (lat < 0xffff) + _latency = (l + lat) / 2; + else _latency = l; } } @@ -323,77 +198,10 @@ public: ZT_ALWAYS_INLINE unsigned int remoteVersionRevision() const { return _vRevision; } ZT_ALWAYS_INLINE bool remoteVersionKnown() const { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); } - /** - * Periodically update known multipath activation constraints. This is done so that we know when and when - * not to use multipath logic. Doing this once every few seconds is sufficient. - * - * @param now Current time - */ - void processBackgroundPeerTasks(const int64_t now); - - /** - * Record that the remote peer does have multipath enabled. As is evident by the receipt of a VERB_ACK - * or a VERB_QOS_MEASUREMENT packet at some point in the past. Until this flag is set, the local client - * shall assume that multipath is not enabled and should only use classical Protocol 9 logic. - */ - inline void inferRemoteMultipathEnabled() { _remotePeerMultipathEnabled = true; } - - /** - * @return Whether the local client supports and is configured to use multipath - */ - inline bool localMultipathSupport() { return _localMultipathSupported; } - - /** - * @return Whether the remote peer supports and is configured to use multipath - */ - inline bool remoteMultipathSupport() { return _remoteMultipathSupported; } - - /** - * @return Whether this client can use multipath to communicate with this peer. True if both peers are using - * the correct protocol and if both peers have multipath enabled. False if otherwise. - */ - inline bool canUseMultipath() { return _canUseMultipath; } - - /** - * Rate limit gate for VERB_PUSH_DIRECT_PATHS - */ - inline bool rateGatePushDirectPaths(const int64_t now) - { - if ((now - _lastDirectPathPushReceive) <= ZT_PUSH_DIRECT_PATHS_CUTOFF_TIME) - ++_directPathPushCutoffCount; - else _directPathPushCutoffCount = 0; - _lastDirectPathPushReceive = now; - return (_directPathPushCutoffCount < ZT_PUSH_DIRECT_PATHS_CUTOFF_LIMIT); - } - - /** - * Rate limit gate for VERB_NETWORK_CREDENTIALS - */ - inline bool rateGateCredentialsReceived(const int64_t now) - { - if ((now - _lastCredentialsReceived) <= ZT_PEER_CREDENTIALS_CUTOFF_TIME) - ++_credentialsCutoffCount; - else _credentialsCutoffCount = 0; - _lastCredentialsReceived = now; - return (_directPathPushCutoffCount < ZT_PEER_CREDEITIALS_CUTOFF_LIMIT); - } - - /** - * Rate limit gate for sending of ERROR_NEED_MEMBERSHIP_CERTIFICATE - */ - inline bool rateGateRequestCredentials(const int64_t now) - { - if ((now - _lastCredentialRequestSent) >= ZT_PEER_GENERAL_RATE_LIMIT) { - _lastCredentialRequestSent = now; - return true; - } - return false; - } - /** * Rate limit gate for inbound WHOIS requests */ - inline bool rateGateInboundWhoisRequest(const int64_t now) + ZT_ALWAYS_INLINE bool rateGateInboundWhoisRequest(const int64_t now) { if ((now - _lastWhoisRequestReceived) >= ZT_PEER_WHOIS_RATE_LIMIT) { _lastWhoisRequestReceived = now; @@ -402,10 +210,22 @@ public: return false; } + /** + * Rate limit gate for inbound PUSH_DIRECT_PATHS requests + */ + ZT_ALWAYS_INLINE bool rateGateInboundPushDirectPaths(const int64_t now) + { + if ((now - _lastPushDirectPathsReceived) >= ZT_DIRECT_PATH_PUSH_INTERVAL) { + _lastPushDirectPathsReceived = now; + return true; + } + return false; + } + /** * Rate limit gate for inbound ECHO requests */ - inline bool rateGateEchoRequest(const int64_t now) + ZT_ALWAYS_INLINE bool rateGateEchoRequest(const int64_t now) { if ((now - _lastEchoRequestReceived) >= ZT_PEER_GENERAL_RATE_LIMIT) { _lastEchoRequestReceived = now; @@ -414,38 +234,10 @@ public: return false; } - /** - * Rate limit gate for VERB_ACK - */ - inline bool rateGateACK(const int64_t now) - { - if ((now - _lastACKWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) { - _lastACKWindowReset = now; - _ACKCutoffCount = 0; - } else { - ++_ACKCutoffCount; - } - return (_ACKCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT); - } - - /** - * Rate limit gate for VERB_QOS_MEASUREMENT - */ - inline bool rateGateQoS(const int64_t now) - { - if ((now - _lastQoSWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) { - _lastQoSWindowReset = now; - _QoSCutoffCount = 0; - } else { - ++_QoSCutoffCount; - } - return (_QoSCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT); - } - /** * Rate limit gate for trying externally defined or static path */ - inline bool rateGateTryStaticPath(const int64_t now) + ZT_ALWAYS_INLINE bool rateGateTryStaticPath(const int64_t now) { if ((now - _lastTriedStaticPath) >= ZT_PEER_PING_PERIOD) { _lastTriedStaticPath = now; @@ -455,11 +247,35 @@ public: } /** - * @return Whether this peer is reachable via an aggregate link + * Send directly if a direct path exists + * + * @param tPtr Thread pointer supplied by user + * @param data Data to send + * @param len Length of data + * @param now Current time + * @return True if packet appears to have been sent, false if no path or send failed */ - inline bool hasAggregateLink() const + ZT_ALWAYS_INLINE bool sendDirect(void *tPtr,const void *data,const unsigned int len,const int64_t now) { - return _localMultipathSupported && _remoteMultipathSupported && _remotePeerMultipathEnabled; + _paths_l.rlock(); + if (_pathCount == 0) { + _paths_l.unlock(); + return false; + } + const bool r = _paths[0]->send(RR,tPtr,data,len,now); + _paths_l.unlock(); + return r; + } + + /** + * @return Current best path + */ + ZT_ALWAYS_INLINE SharedPtr path() + { + RWMutex::RLock l(_paths_l); + if (_pathCount == 0) + return SharedPtr(); + return _paths[0]; } private: @@ -467,60 +283,27 @@ private: const RuntimeEnvironment *RR; - int64_t _lastReceive; // direct or indirect - int64_t _lastDirectPathPushSent; - int64_t _lastDirectPathPushReceive; - int64_t _lastCredentialRequestSent; + int64_t _lastReceive; int64_t _lastWhoisRequestReceived; int64_t _lastEchoRequestReceived; - int64_t _lastCredentialsReceived; - int64_t _lastACKWindowReset; - int64_t _lastQoSWindowReset; - int64_t _lastMultipathCompatibilityCheck; + int64_t _lastPushDirectPathsReceived; int64_t _lastTriedStaticPath; + unsigned int _latency; - int _uniqueAlivePathCount; + AtomicCounter __refCount; - bool _localMultipathSupported; - bool _remoteMultipathSupported; - bool _canUseMultipath; - uint8_t _freeRandomByte; + unsigned int _pathCount; + SharedPtr _paths[ZT_MAX_PEER_NETWORK_PATHS]; + RWMutex _paths_l; + + Identity _id; uint16_t _vProto; uint16_t _vMajor; uint16_t _vMinor; uint16_t _vRevision; - - SharedPtr _paths[ZT_MAX_PEER_NETWORK_PATHS]; - Mutex _paths_m; - - Identity _id; - - unsigned int _directPathPushCutoffCount; - unsigned int _credentialsCutoffCount; - unsigned int _QoSCutoffCount; - unsigned int _ACKCutoffCount; - - RingBuffer _pathChoiceHist; - - bool _linkIsBalanced; - bool _linkIsRedundant; - bool _remotePeerMultipathEnabled; - - int64_t _lastAggregateStatsReport; - int64_t _lastAggregateAllocation; - - char _interfaceListStr[256]; // 16 characters * 16 paths in a link - - AtomicCounter __refCount; }; } // namespace ZeroTier -// Add a swap() for shared ptr's to peers to speed up peer sorts -namespace std { - template<> - inline void swap(ZeroTier::SharedPtr &a,ZeroTier::SharedPtr &b) { a.swap(b); } -} - #endif diff --git a/node/ScopedPtr.hpp b/node/ScopedPtr.hpp index d1671d94b..09f408a93 100644 --- a/node/ScopedPtr.hpp +++ b/node/ScopedPtr.hpp @@ -35,6 +35,13 @@ public: explicit ZT_ALWAYS_INLINE operator bool() const { return (_p != (T *)0); } ZT_ALWAYS_INLINE T *ptr() const { return _p; } + ZT_ALWAYS_INLINE void swap(const ScopedPtr &p) + { + T *const tmp = _p; + _p = p._p; + p._p = tmp; + } + ZT_ALWAYS_INLINE bool operator==(const ScopedPtr &p) const { return (_p == p._p); } ZT_ALWAYS_INLINE bool operator!=(const ScopedPtr &p) const { return (_p != p._p); } ZT_ALWAYS_INLINE bool operator==(T *const p) const { return (_p == p); } @@ -49,4 +56,9 @@ private: } // namespace ZeroTier +namespace std { +template +ZT_ALWAYS_INLINE void swap(ZeroTier::ScopedPtr &a,ZeroTier::ScopedPtr &b) { a.swap(b); } +} + #endif diff --git a/node/SharedPtr.hpp b/node/SharedPtr.hpp index e719acf49..56ecf5ec6 100644 --- a/node/SharedPtr.hpp +++ b/node/SharedPtr.hpp @@ -131,4 +131,9 @@ private: } // namespace ZeroTier +namespace std { +template +ZT_ALWAYS_INLINE void swap(ZeroTier::SharedPtr &a,ZeroTier::SharedPtr &b) { a.swap(b); } +} + #endif diff --git a/node/Switch.cpp b/node/Switch.cpp index 5bdbab58a..6c853e486 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -35,8 +35,7 @@ namespace ZeroTier { Switch::Switch(const RuntimeEnvironment *renv) : RR(renv), - _lastCheckedQueues(0), - _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine + _lastCheckedQueues(0) { } @@ -58,15 +57,11 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre if (destination != RR->identity.address()) { if (fragment.hops() < ZT_RELAY_MAX_HOPS) { fragment.incrementHops(); - - // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. - // It wouldn't hurt anything, just redundant and unnecessary. SharedPtr relayTo = RR->topology->get(destination); - if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,false))) { - // Don't know peer or no direct path -- so relay via someone upstream + if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now))) { relayTo = RR->topology->findRelayTo(now,destination); if (relayTo) - relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,true); + relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now); } } } else { @@ -131,21 +126,10 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre if (packet.hops() < ZT_RELAY_MAX_HOPS) { packet.incrementHops(); SharedPtr relayTo = RR->topology->get(destination); - if ((relayTo)&&(relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,false))) { - if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) { - const SharedPtr sourcePeer(RR->topology->get(source)); - if (sourcePeer) - relayTo->introduce(tPtr,now,sourcePeer); - } - } else { + if ((!relayTo)||(!relayTo->sendDirect(tPtr,packet.data(),packet.size(),now))) { relayTo = RR->topology->findRelayTo(now,destination); - if ((relayTo)&&(relayTo->address() != source)) { - if (relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true)) { - const SharedPtr sourcePeer(RR->topology->get(source)); - if (sourcePeer) - relayTo->introduce(tPtr,now,sourcePeer); - } - } + if ((relayTo)&&(relayTo->address() != source)) + relayTo->sendDirect(tPtr,packet.data(),packet.size(),now); } } @@ -153,13 +137,13 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre // Packet is the head of a fragmented packet series ---------------- const uint64_t packetId = ( - (((uint64_t)reinterpret_cast(data)[0]) << 56) | - (((uint64_t)reinterpret_cast(data)[1]) << 48) | - (((uint64_t)reinterpret_cast(data)[2]) << 40) | - (((uint64_t)reinterpret_cast(data)[3]) << 32) | - (((uint64_t)reinterpret_cast(data)[4]) << 24) | - (((uint64_t)reinterpret_cast(data)[5]) << 16) | - (((uint64_t)reinterpret_cast(data)[6]) << 8) | + (((uint64_t)reinterpret_cast(data)[0]) << 56U) | + (((uint64_t)reinterpret_cast(data)[1]) << 48U) | + (((uint64_t)reinterpret_cast(data)[2]) << 40U) | + (((uint64_t)reinterpret_cast(data)[3]) << 32U) | + (((uint64_t)reinterpret_cast(data)[4]) << 24U) | + (((uint64_t)reinterpret_cast(data)[5]) << 16U) | + (((uint64_t)reinterpret_cast(data)[6]) << 8U) | ((uint64_t)reinterpret_cast(data)[7]) ); @@ -234,7 +218,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const } } - uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET; + uint8_t qosBucket = 0; if (to.isMulticast()) { MulticastGroup multicastGroup(to,0); @@ -287,8 +271,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const break; } } else if (sipNetmaskBits == 40) { // ZT-6PLANE /40 ??? - const uint32_t nwid32 = (uint32_t)((network->id() ^ (network->id() >> 32)) & 0xffffffff); - if ( (my6[0] == 0xfc) && (my6[1] == (uint8_t)((nwid32 >> 24) & 0xff)) && (my6[2] == (uint8_t)((nwid32 >> 16) & 0xff)) && (my6[3] == (uint8_t)((nwid32 >> 8) & 0xff)) && (my6[4] == (uint8_t)(nwid32 & 0xff))) { + const uint32_t nwid32 = (uint32_t)((network->id() ^ (network->id() >> 32U)) & 0xffffffffU); + if ( (my6[0] == 0xfc) && (my6[1] == (uint8_t)((nwid32 >> 24U) & 0xffU)) && (my6[2] == (uint8_t)((nwid32 >> 16U) & 0xffU)) && (my6[3] == (uint8_t)((nwid32 >> 8U) & 0xffU)) && (my6[4] == (uint8_t)(nwid32 & 0xffU))) { unsigned int ptr = 0; while (ptr != 5) { if (pkt6[ptr] != my6[ptr]) @@ -328,10 +312,10 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const for(int i=0;i<32;++i) pseudo[40 + i] = adv[40 + i]; uint32_t checksum = 0; for(int i=0;i<36;++i) checksum += Utils::hton(pseudo_[i]); - while ((checksum >> 16)) checksum = (checksum & 0xffff) + (checksum >> 16); + while ((checksum >> 16U)) checksum = (checksum & 0xffffU) + (checksum >> 16U); checksum = ~checksum; - adv[42] = (checksum >> 8) & 0xff; - adv[43] = checksum & 0xff; + adv[42] = (checksum >> 8U) & 0xffU; + adv[43] = checksum & 0xffU; RR->node->putFrame(tPtr,network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72); return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. @@ -397,17 +381,11 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const from.appendTo(outp); outp.append((uint16_t)etherType); outp.append(data,len); - if (!network->config().disableCompression()) - outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); } else { Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME); outp.append(network->id()); outp.append((uint16_t)etherType); outp.append(data,len); - if (!network->config().disableCompression()) - outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); } } else { // Destination is bridged behind a remote peer --------------------------- @@ -465,9 +443,6 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const from.appendTo(outp); outp.append((uint16_t)etherType); outp.append(data,len); - if (!network->config().disableCompression()) - outp.compress(); - aqm_enqueue(tPtr,network,outp,true,qosBucket); } else { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)"); } @@ -475,263 +450,6 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const } } -void Switch::aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket) -{ - if(!network->qosEnabled()) { - send(tPtr, packet, encrypt); - return; - } - NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()]; - if (!nqcb) { - // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id()); - nqcb = new NetworkQoSControlBlock(); - _netQueueControlBlock[network->id()] = nqcb; - // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list - // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm - for (int i=0; iinactiveQueues.push_back(new ManagedQueue(i)); - } - } - - if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) { - // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb()); - // just send packet normally, no QoS for ZT protocol traffic - send(tPtr, packet, encrypt); - } - - _aqm_m.lock(); - - // Enqueue packet and move queue to appropriate list - - const Address dest(packet.destination()); - TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt); - - ManagedQueue *selectedQueue = nullptr; - for (size_t i=0; ioldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue) - if (nqcb->oldQueues[i]->id == qosBucket) { - selectedQueue = nqcb->oldQueues[i]; - } - } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues) - if (nqcb->newQueues[i]->id == qosBucket) { - selectedQueue = nqcb->newQueues[i]; - } - } if (i < nqcb->inactiveQueues.size()) { // search inactive queues - if (nqcb->inactiveQueues[i]->id == qosBucket) { - selectedQueue = nqcb->inactiveQueues[i]; - // move queue to end of NEW queue list - selectedQueue->byteCredit = ZT_QOS_QUANTUM; - // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue); - nqcb->newQueues.push_back(selectedQueue); - nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i); - } - } - } - if (!selectedQueue) { - return; - } - - selectedQueue->q.push_back(txEntry); - selectedQueue->byteLength+=txEntry->packet.payloadLength(); - nqcb->_currEnqueuedPackets++; - - // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue); - - // Drop a packet if necessary - ManagedQueue *selectedQueueToDropFrom = nullptr; - if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS) - { - // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets); - int maxQueueLength = 0; - for (size_t i=0; ioldQueues.size()) { - if (nqcb->oldQueues[i]->byteLength > maxQueueLength) { - maxQueueLength = nqcb->oldQueues[i]->byteLength; - selectedQueueToDropFrom = nqcb->oldQueues[i]; - } - } if (i < nqcb->newQueues.size()) { - if (nqcb->newQueues[i]->byteLength > maxQueueLength) { - maxQueueLength = nqcb->newQueues[i]->byteLength; - selectedQueueToDropFrom = nqcb->newQueues[i]; - } - } if (i < nqcb->inactiveQueues.size()) { - if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) { - maxQueueLength = nqcb->inactiveQueues[i]->byteLength; - selectedQueueToDropFrom = nqcb->inactiveQueues[i]; - } - } - } - if (selectedQueueToDropFrom) { - // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength); - int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength(); - delete selectedQueueToDropFrom->q.front(); - selectedQueueToDropFrom->q.pop_front(); - selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket; - nqcb->_currEnqueuedPackets--; - } - } - _aqm_m.unlock(); - aqm_dequeue(tPtr); -} - -uint64_t Switch::control_law(uint64_t t, int count) -{ - return (uint64_t)(t + ZT_QOS_INTERVAL / sqrt(count)); -} - -Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now) -{ - dqr r; - r.ok_to_drop = false; - r.p = q->q.front(); - - if (r.p == NULL) { - q->first_above_time = 0; - return r; - } - uint64_t sojourn_time = now - r.p->creationTime; - if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) { - // went below - stay below for at least interval - q->first_above_time = 0; - } else { - if (q->first_above_time == 0) { - // just went above from below. if still above at - // first_above_time, will say it's ok to drop. - q->first_above_time = now + ZT_QOS_INTERVAL; - } else if (now >= q->first_above_time) { - r.ok_to_drop = true; - } - } - return r; -} - -Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now) -{ - dqr r = dodequeue(q, now); - - if (q->dropping) { - if (!r.ok_to_drop) { - q->dropping = false; - } - while (now >= q->drop_next && q->dropping) { - q->q.pop_front(); // drop - r = dodequeue(q, now); - if (!r.ok_to_drop) { - // leave dropping state - q->dropping = false; - } else { - ++(q->count); - // schedule the next drop. - q->drop_next = control_law(q->drop_next, q->count); - } - } - } else if (r.ok_to_drop) { - q->q.pop_front(); // drop - r = dodequeue(q, now); - q->dropping = true; - q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)? - q->count - 2 : 1; - q->drop_next = control_law(now, q->count); - } - return r.p; -} - -void Switch::aqm_dequeue(void *tPtr) -{ - // Cycle through network-specific QoS control blocks - for(std::map::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) { - if (!(*nqcb).second->_currEnqueuedPackets) { - return; - } - - uint64_t now = RR->node->now(); - TXQueueEntry *entryToEmit = nullptr; - std::vector *currQueues = &((*nqcb).second->newQueues); - std::vector *oldQueues = &((*nqcb).second->oldQueues); - std::vector *inactiveQueues = &((*nqcb).second->inactiveQueues); - - _aqm_m.lock(); - - // Attempt dequeue from queues in NEW list - bool examiningNewQueues = true; - while (currQueues->size()) { - ManagedQueue *queueAtFrontOfList = currQueues->front(); - if (queueAtFrontOfList->byteCredit < 0) { - queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM; - // Move to list of OLD queues - // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList); - oldQueues->push_back(queueAtFrontOfList); - currQueues->erase(currQueues->begin()); - } else { - entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now); - if (!entryToEmit) { - // Move to end of list of OLD queues - // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList); - oldQueues->push_back(queueAtFrontOfList); - currQueues->erase(currQueues->begin()); - } - else { - int len = entryToEmit->packet.payloadLength(); - queueAtFrontOfList->byteLength -= len; - queueAtFrontOfList->byteCredit -= len; - // Send the packet! - queueAtFrontOfList->q.pop_front(); - send(tPtr, entryToEmit->packet, entryToEmit->encrypt); - (*nqcb).second->_currEnqueuedPackets--; - } - if (queueAtFrontOfList) { - //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit); - } - break; - } - } - - // Attempt dequeue from queues in OLD list - examiningNewQueues = false; - currQueues = &((*nqcb).second->oldQueues); - while (currQueues->size()) { - ManagedQueue *queueAtFrontOfList = currQueues->front(); - if (queueAtFrontOfList->byteCredit < 0) { - queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM; - oldQueues->push_back(queueAtFrontOfList); - currQueues->erase(currQueues->begin()); - } else { - entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now); - if (!entryToEmit) { - //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList); - // Move to inactive list of queues - inactiveQueues->push_back(queueAtFrontOfList); - currQueues->erase(currQueues->begin()); - } - else { - int len = entryToEmit->packet.payloadLength(); - queueAtFrontOfList->byteLength -= len; - queueAtFrontOfList->byteCredit -= len; - queueAtFrontOfList->q.pop_front(); - send(tPtr, entryToEmit->packet, entryToEmit->encrypt); - (*nqcb).second->_currEnqueuedPackets--; - } - if (queueAtFrontOfList) { - //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit); - } - break; - } - } - nqcb++; - _aqm_m.unlock(); - } -} - -void Switch::removeNetworkQoSControlBlock(uint64_t nwid) -{ - NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid]; - if (nq) { - _netQueueControlBlock.erase(nwid); - delete nq; - nq = NULL; - } -} - void Switch::send(void *tPtr,Packet &packet,bool encrypt) { const Address dest(packet.destination()); @@ -763,12 +481,12 @@ void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr) else last = now; } - const SharedPtr root(RR->topology->root(now)); + const SharedPtr root(RR->topology->root()); if (root) { Packet outp(root->address(),RR->identity.address(),Packet::VERB_WHOIS); addr.appendTo(outp); RR->node->expectReplyTo(outp.packetId()); - root->sendDirect(tPtr,outp.data(),outp.size(),now,true); + root->sendDirect(tPtr,outp.data(),outp.size(),now); } } @@ -845,17 +563,6 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now) } } - { - Mutex::Lock _l(_lastUniteAttempt_m); - Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt); - _LastUniteKey *k = (_LastUniteKey *)0; - uint64_t *v = (uint64_t *)0; - while (i.next(k,v)) { - if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 8)) - _lastUniteAttempt.erase(*k); - } - } - { Mutex::Lock _l(_lastSentWhoisRequest_m); Hashtable< Address,int64_t >::Iterator i(_lastSentWhoisRequest); @@ -870,17 +577,6 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now) return ZT_WHOIS_RETRY_DELAY; } -bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address &destination) -{ - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &ts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - if ((now - ts) >= ZT_MIN_UNITE_INTERVAL) { - ts = now; - return true; - } - return false; -} - bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) { SharedPtr viaPath; @@ -889,7 +585,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) const SharedPtr peer(RR->topology->get(destination)); if (peer) { - viaPath = peer->getAppropriatePath(now,false); + viaPath = peer->path(); if (!viaPath) { if (peer->rateGateTryStaticPath(now)) { InetAddress tryAddr; @@ -905,7 +601,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) const SharedPtr relay(RR->topology->findRelayTo(now,destination)); if (relay) { - viaPath = relay->getAppropriatePath(now,true); + viaPath = relay->path(); if (!viaPath) return false; } @@ -923,8 +619,6 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) unsigned int chunkSize = std::min(packet.size(),mtu); packet.setFragmented(chunkSize < packet.size()); - peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now); - if (trustedPathId) { packet.setTrusted(trustedPathId); } else { diff --git a/node/Switch.hpp b/node/Switch.hpp index 6b36960fa..3bb6c0454 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -46,16 +46,8 @@ class Peer; */ class Switch { - struct ManagedQueue; - struct TXQueueEntry; - - typedef struct { - TXQueueEntry *p; - bool ok_to_drop; - } dqr; - public: - Switch(const RuntimeEnvironment *renv); + explicit Switch(const RuntimeEnvironment *renv); /** * Called when a packet is received from the real network @@ -82,62 +74,6 @@ public: */ void onLocalEthernet(void *tPtr,const SharedPtr &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); - /** - * Determines the next drop schedule for packets in the TX queue - * - * @param t Current time - * @param count Number of packets dropped this round - */ - uint64_t control_law(uint64_t t, int count); - - /** - * Selects a packet eligible for transmission from a TX queue. According to the control law, multiple packets - * may be intentionally dropped before a packet is returned to the AQM scheduler. - * - * @param q The TX queue that is being dequeued from - * @param now Current time - */ - dqr dodequeue(ManagedQueue *q, uint64_t now); - - /** - * Presents a packet to the AQM scheduler. - * - * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - * @param network Network that the packet shall be sent over - * @param packet Packet to be sent - * @param encrypt Encrypt packet payload? (always true except for HELLO) - * @param qosBucket Which bucket the rule-system determined this packet should fall into - */ - void aqm_enqueue(void *tPtr, const SharedPtr &network, Packet &packet,bool encrypt,int qosBucket); - - /** - * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks - * - * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call - */ - void aqm_dequeue(void *tPtr); - - /** - * Calls the dequeue mechanism and adjust queue state variables - * - * @param q The TX queue that is being dequeued from - * @param isNew Whether or not this queue is in the NEW list - * @param now Current time - */ - Switch::TXQueueEntry * CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now); - - /** - * Removes QoS Queues and flow state variables for a specific network. These queues are created - * automatically upon the transmission of the first packet from this peer to another peer on the - * given network. - * - * The reason for existence of queues and flow state variables specific to each network is so that - * each network's QoS rules function independently. - * - * @param nwid Network ID - */ - void removeNetworkQoSControlBlock(uint64_t nwid); - /** * Send a packet to a ZeroTier address (destination in packet) * @@ -163,7 +99,7 @@ public: * @param now Current time * @param addr Address to look up */ - void requestWhois(void *tPtr,const int64_t now,const Address &addr); + void requestWhois(void *tPtr,int64_t now,const Address &addr); /** * Run any processes that are waiting for this peer's identity @@ -188,7 +124,6 @@ public: unsigned long doTimerTasks(void *tPtr,int64_t now); private: - bool _shouldUnite(const int64_t now,const Address &source,const Address &destination); bool _trySend(void *tPtr,Packet &packet,bool encrypt); // packet is modified if return is true const RuntimeEnvironment *const RR; @@ -201,7 +136,7 @@ private: // Packets waiting for WHOIS replies or other decode info or missing fragments struct RXQueueEntry { - RXQueueEntry() : timestamp(0) {} + ZT_ALWAYS_INLINE RXQueueEntry() : timestamp(0) {} volatile int64_t timestamp; // 0 if entry is not in use volatile uint64_t packetId; IncomingPacket frag0; // head of packet @@ -236,8 +171,8 @@ private: // ZeroTier-layer TX queue entry struct TXQueueEntry { - TXQueueEntry() {} - TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) : + ZT_ALWAYS_INLINE TXQueueEntry() {} + ZT_ALWAYS_INLINE TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) : dest(d), creationTime(ct), packet(p), @@ -250,58 +185,6 @@ private: }; std::list< TXQueueEntry > _txQueue; Mutex _txQueue_m; - Mutex _aqm_m; - - // Tracks sending of VERB_RENDEZVOUS to relaying peers - struct _LastUniteKey - { - _LastUniteKey() : x(0),y(0) {} - _LastUniteKey(const Address &a1,const Address &a2) - { - if (a1 > a2) { - x = a2.toInt(); - y = a1.toInt(); - } else { - x = a1.toInt(); - y = a2.toInt(); - } - } - inline unsigned long hashCode() const { return ((unsigned long)x ^ (unsigned long)y); } - inline bool operator==(const _LastUniteKey &k) const { return ((x == k.x)&&(y == k.y)); } - inline bool operator!=(const _LastUniteKey &k) const { return ((x != k.x)||(y != k.y)); } - uint64_t x,y; - }; - Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior - Mutex _lastUniteAttempt_m; - - // Queue with additional flow state variables - struct ManagedQueue - { - ManagedQueue(int id) : - id(id), - byteCredit(ZT_QOS_QUANTUM), - byteLength(0), - dropping(false) - {} - int id; - int byteCredit; - int byteLength; - uint64_t first_above_time; - uint32_t count; - uint64_t drop_next; - bool dropping; - uint64_t drop_next_time; - std::list< TXQueueEntry *> q; - }; - // To implement fq_codel we need to maintain a queue of queues - struct NetworkQoSControlBlock - { - int _currEnqueuedPackets; - std::vector newQueues; - std::vector oldQueues; - std::vector inactiveQueues; - }; - std::map _netQueueControlBlock; }; } // namespace ZeroTier diff --git a/node/Topology.cpp b/node/Topology.cpp new file mode 100644 index 000000000..50a112a0a --- /dev/null +++ b/node/Topology.cpp @@ -0,0 +1,153 @@ +/* + * Copyright (c)2019 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2023-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#include "Topology.hpp" + +namespace ZeroTier { + +struct _RootSortComparisonOperator +{ + ZT_ALWAYS_INLINE _RootSortComparisonOperator(const int64_t now) : _now(now) {} + ZT_ALWAYS_INLINE bool operator()(const SharedPtr &a,const SharedPtr &b) + { + const int64_t now = _now; + if (a->alive(now)) { + if (b->alive(now)) + return (a->latency() < b->latency()); + return true; + } + return false; + } + const int64_t _now; +}; + +Topology::Topology(const RuntimeEnvironment *renv,const Identity &myId) : + RR(renv), + _myIdentity(myId), + _numConfiguredPhysicalPaths(0), + _peers(128), + _paths(256) +{ +} + +Topology::~Topology() +{ +} + +void Topology::getAllPeers(std::vector< SharedPtr > &allPeers) const +{ + RWMutex::RLock l(_peers_l); + allPeers.clear(); + allPeers.reserve(_peers.size()); + Hashtable< Address,SharedPtr >::Iterator i(*(const_cast > *>(&_peers))); + Address *a = (Address *)0; + SharedPtr *p = (SharedPtr *)0; + while (i.next(a,p)) + allPeers.push_back(*p); +} + +void Topology::setPhysicalPathConfiguration(const struct sockaddr_storage *pathNetwork,const ZT_PhysicalPathConfiguration *pathConfig) +{ + if (!pathNetwork) { + _numConfiguredPhysicalPaths = 0; + } else { + std::map cpaths; + for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i ZT_MAX_PHYSMTU) + pc.mtu = ZT_MAX_PHYSMTU; + + cpaths[*(reinterpret_cast(pathNetwork))] = pc; + } else { + cpaths.erase(*(reinterpret_cast(pathNetwork))); + } + + unsigned int cnt = 0; + for(std::map::const_iterator i(cpaths.begin());((i!=cpaths.end())&&(cntfirst; + _physicalPathConfig[cnt].second = i->second; + ++cnt; + } + _numConfiguredPhysicalPaths = cnt; + } +} + +void Topology::addRoot(const Identity &id) +{ + if (id == _myIdentity) return; // sanity check + RWMutex::Lock l1(_peers_l); + std::pair< std::set::iterator,bool > ir(_roots.insert(id)); + if (ir.second) { + SharedPtr &p = _peers[id.address()]; + if (!p) + p.set(new Peer(RR,_myIdentity,id)); + _rootPeers.push_back(p); + } +} + +bool Topology::removeRoot(const Identity &id) +{ + RWMutex::Lock l1(_peers_l); + std::set::iterator r(_roots.find(id)); + if (r != _roots.end()) { + for(std::vector< SharedPtr >::iterator p(_rootPeers.begin());p!=_rootPeers.end();++p) { + if ((*p)->identity() == id) { + _rootPeers.erase(p); + break; + } + } + _roots.erase(r); + return true; + } + return false; +} + +void Topology::rankRoots(const int64_t now) +{ + RWMutex::Lock l1(_peers_l); + std::sort(_rootPeers.begin(),_rootPeers.end(),_RootSortComparisonOperator(now)); +} + +void Topology::doPeriodicTasks(const int64_t now) +{ + { + RWMutex::Lock l1(_peers_l); + Hashtable< Address,SharedPtr >::Iterator i(_peers); + Address *a = (Address *)0; + SharedPtr *p = (SharedPtr *)0; + while (i.next(a,p)) { + if ( (!(*p)->alive(now)) && (_roots.count((*p)->identity()) == 0) ) + _peers.erase(*a); + } + } + { + RWMutex::Lock l1(_paths_l); + Hashtable< Path::HashKey,SharedPtr >::Iterator i(_paths); + Path::HashKey *k = (Path::HashKey *)0; + SharedPtr *p = (SharedPtr *)0; + while (i.next(k,p)) { + if (p->references() <= 1) + _paths.erase(*k); + } + } +} + +} // namespace ZeroTier diff --git a/node/Topology.hpp b/node/Topology.hpp index 62ce38f46..77f26cd44 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -34,7 +34,6 @@ #include "Hashtable.hpp" #include "SharedPtr.hpp" #include "ScopedPtr.hpp" -#include "Str.hpp" namespace ZeroTier { @@ -46,14 +45,8 @@ class RuntimeEnvironment; class Topology { public: - ZT_ALWAYS_INLINE Topology(const RuntimeEnvironment *renv,const Identity &myId) : - RR(renv), - _myIdentity(myId), - _numConfiguredPhysicalPaths(0), - _peers(128), - _paths(256) - { - } + Topology(const RuntimeEnvironment *renv,const Identity &myId); + ~Topology(); /** * Add a peer to database @@ -66,7 +59,7 @@ public: */ ZT_ALWAYS_INLINE SharedPtr add(const SharedPtr &peer) { - Mutex::Lock _l(_peers_l); + RWMutex::Lock _l(_peers_l); SharedPtr &hp = _peers[peer->address()]; if (!hp) hp = peer; @@ -80,9 +73,9 @@ public: * @param zta ZeroTier address of peer * @return Peer or NULL if not found */ - ZT_ALWAYS_INLINE SharedPtr get(const Address &zta) + ZT_ALWAYS_INLINE SharedPtr get(const Address &zta) const { - Mutex::Lock l1(_peers_l); + RWMutex::RLock l1(_peers_l); const SharedPtr *const ap = _peers.get(zta); return (ap) ? *ap : SharedPtr(); } @@ -92,12 +85,12 @@ public: * @param zta ZeroTier address of peer * @return Identity or NULL identity if not found */ - ZT_ALWAYS_INLINE Identity getIdentity(void *tPtr,const Address &zta) + ZT_ALWAYS_INLINE Identity getIdentity(void *tPtr,const Address &zta) const { if (zta == _myIdentity.address()) { return _myIdentity; } else { - Mutex::Lock _l(_peers_l); + RWMutex::RLock _l(_peers_l); const SharedPtr *const ap = _peers.get(zta); if (ap) return (*ap)->identity(); @@ -110,45 +103,57 @@ public: * * @param l Local socket * @param r Remote address - * @return Pointer to canonicalized Path object + * @return Pointer to canonicalized Path object or NULL on error */ ZT_ALWAYS_INLINE SharedPtr getPath(const int64_t l,const InetAddress &r) { - Mutex::Lock _l(_paths_l); - SharedPtr &p = _paths[Path::HashKey(l,r)]; - if (!p) - p.set(new Path(l,r)); + const Path::HashKey k(l,r); + + _paths_l.rlock(); + SharedPtr p(_paths[k]); + _paths_l.unlock(); + if (p) + return p; + + _paths_l.lock(); + SharedPtr &p2 = _paths[k]; + if (p2) { + p = p2; + } else { + try { + p.set(new Path(l,r)); + } catch ( ... ) { + _paths_l.unlock(); + return SharedPtr(); + } + p2 = p; + } + _paths_l.unlock(); + return p; } + /** + * @return Current best root server + */ + ZT_ALWAYS_INLINE SharedPtr root() const + { + RWMutex::RLock l(_peers_l); + if (_rootPeers.empty()) + return SharedPtr(); + return _rootPeers.front(); + } + /** * @param id Identity to check * @return True if this identity corresponds to a root */ ZT_ALWAYS_INLINE bool isRoot(const Identity &id) const { - Mutex::Lock l(_peers_l); + RWMutex::RLock l(_peers_l); return (_roots.count(id) > 0); } - /** - * @param now Current time - * @return Number of peers with active direct paths - */ - ZT_ALWAYS_INLINE unsigned long countActive(const int64_t now) const - { - unsigned long cnt = 0; - Mutex::Lock _l(_peers_l); - Hashtable< Address,SharedPtr >::Iterator i(const_cast(this)->_peers); - Address *a = (Address *)0; - SharedPtr *p = (SharedPtr *)0; - while (i.next(a,p)) { - if ((*p)->getAppropriatePath(now,false)) - ++cnt; - } - return cnt; - } - /** * Apply a function or function object to all peers * @@ -159,10 +164,10 @@ public: * @tparam F Function or function object type */ template - ZT_ALWAYS_INLINE void eachPeer(F f) + ZT_ALWAYS_INLINE void eachPeer(F f) const { - Mutex::Lock l(_peers_l); - Hashtable< Address,SharedPtr >::Iterator i(_peers); + RWMutex::RLock l(_peers_l); + Hashtable< Address,SharedPtr >::Iterator i(const_cast(this)->_peers); Address *a = (Address *)0; SharedPtr *p = (SharedPtr *)0; while (i.next(a,p)) { @@ -181,16 +186,16 @@ public: * @tparam F Function or function object type */ template - ZT_ALWAYS_INLINE void eachPeerWithRoot(F f) + ZT_ALWAYS_INLINE void eachPeerWithRoot(F f) const { - Mutex::Lock l(_peers_l); + RWMutex::RLock l(_peers_l); std::vector rootPeerPtrs; - for(std::vector< SharedPtr >::iterator i(_rootPeers.begin());i!=_rootPeers.end();++i) + for(std::vector< SharedPtr >::const_iterator i(_rootPeers.begin());i!=_rootPeers.end();++i) rootPeerPtrs.push_back((uintptr_t)i->ptr()); std::sort(rootPeerPtrs.begin(),rootPeerPtrs.end()); - Hashtable< Address,SharedPtr >::Iterator i(_peers); + Hashtable< Address,SharedPtr >::Iterator i(const_cast(this)->_peers); Address *a = (Address *)0; SharedPtr *p = (SharedPtr *)0; while (i.next(a,p)) { @@ -208,7 +213,7 @@ public: */ ZT_ALWAYS_INLINE SharedPtr findRelayTo(const int64_t now,const Address &toAddr) { - Mutex::Lock l(_peers_l); + RWMutex::RLock l(_peers_l); if (_rootPeers.empty()) return SharedPtr(); return _rootPeers[0]; @@ -217,17 +222,7 @@ public: /** * @param allPeers vector to fill with all current peers */ - ZT_ALWAYS_INLINE void getAllPeers(std::vector< SharedPtr > &allPeers) const - { - Mutex::Lock l(_peers_l); - allPeers.clear(); - allPeers.reserve(_peers.size()); - Hashtable< Address,SharedPtr >::Iterator i(*(const_cast > *>(&_peers))); - Address *a = (Address *)0; - SharedPtr *p = (SharedPtr *)0; - while (i.next(a,p)) - allPeers.push_back(*p); - } + void getAllPeers(std::vector< SharedPtr > &allPeers) const; /** * Get info about a path @@ -249,21 +244,6 @@ public: } } - /** - * Get the payload MTU for an outbound physical path (returns default if not configured) - * - * @param physicalAddress Physical endpoint address - * @return MTU - */ - ZT_ALWAYS_INLINE unsigned int getOutboundPathMtu(const InetAddress &physicalAddress) - { - for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i cpaths; - for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i ZT_MAX_PHYSMTU) - pc.mtu = ZT_MAX_PHYSMTU; - - cpaths[*(reinterpret_cast(pathNetwork))] = pc; - } else { - cpaths.erase(*(reinterpret_cast(pathNetwork))); - } - - unsigned int cnt = 0; - for(std::map::const_iterator i(cpaths.begin());((i!=cpaths.end())&&(cntfirst; - _physicalPathConfig[cnt].second = i->second; - ++cnt; - } - _numConfiguredPhysicalPaths = cnt; - } - } + void setPhysicalPathConfiguration(const struct sockaddr_storage *pathNetwork,const ZT_PhysicalPathConfiguration *pathConfig); /** * Add a root server's identity to the root server set * * @param id Root server identity */ - inline void addRoot(const Identity &id) - { - if (id == _myIdentity) return; // sanity check - Mutex::Lock l1(_peers_l); - std::pair< std::set::iterator,bool > ir(_roots.insert(id)); - if (ir.second) { - SharedPtr &p = _peers[id.address()]; - if (!p) - p.set(new Peer(RR,_myIdentity,id)); - _rootPeers.push_back(p); - } - } + void addRoot(const Identity &id); /** * Remove a root server's identity from the root server set @@ -355,83 +292,26 @@ public: * @param id Root server identity * @return True if root found and removed, false if not found */ - inline bool removeRoot(const Identity &id) - { - Mutex::Lock l1(_peers_l); - std::set::iterator r(_roots.find(id)); - if (r != _roots.end()) { - for(std::vector< SharedPtr >::iterator p(_rootPeers.begin());p!=_rootPeers.end();++p) { - if ((*p)->identity() == id) { - _rootPeers.erase(p); - break; - } - } - _roots.erase(r); - return true; - } - return false; - } + bool removeRoot(const Identity &id); /** * Sort roots in asecnding order of apparent latency * * @param now Current time */ - ZT_ALWAYS_INLINE void rankRoots(const int64_t now) - { - Mutex::Lock l1(_peers_l); - std::sort(_rootPeers.begin(),_rootPeers.end(),_RootSortComparisonOperator(now)); - } + void rankRoots(const int64_t now); /** * Do periodic tasks such as database cleanup */ - ZT_ALWAYS_INLINE void doPeriodicTasks(const int64_t now) - { - { - Mutex::Lock l1(_peers_l); - Hashtable< Address,SharedPtr >::Iterator i(_peers); - Address *a = (Address *)0; - SharedPtr *p = (SharedPtr *)0; - while (i.next(a,p)) { - if ( (!(*p)->alive(now)) && (_roots.count((*p)->identity()) == 0) ) - _peers.erase(*a); - } - } - { - Mutex::Lock l1(_paths_l); - Hashtable< Path::HashKey,SharedPtr >::Iterator i(_paths); - Path::HashKey *k = (Path::HashKey *)0; - SharedPtr *p = (SharedPtr *)0; - while (i.next(k,p)) { - if (p->references() <= 1) - _paths.erase(*k); - } - } - } + void doPeriodicTasks(const int64_t now); private: - struct _RootSortComparisonOperator - { - ZT_ALWAYS_INLINE _RootSortComparisonOperator(const int64_t now) : _now(now) {} - ZT_ALWAYS_INLINE bool operator()(const SharedPtr &a,const SharedPtr &b) - { - const int64_t now = _now; - if (a->alive(now)) { - if (b->alive(now)) - return (a->latency(now) < b->latency(now)); - return true; - } - return false; - } - const int64_t _now; - }; - const RuntimeEnvironment *const RR; const Identity _myIdentity; - Mutex _peers_l; - Mutex _paths_l; + RWMutex _peers_l; + RWMutex _paths_l; std::pair< InetAddress,ZT_PhysicalPathConfiguration > _physicalPathConfig[ZT_MAX_CONFIGURABLE_PATHS]; unsigned int _numConfiguredPhysicalPaths;