diff --git a/.clangd b/.clangd new file mode 100644 index 000000000..0605ccdb7 --- /dev/null +++ b/.clangd @@ -0,0 +1,6 @@ +CompileFlags: + Add: + - "-std=c++17" + - "-I../ext" + - "-I../ext/prometheus-cpp-lite-1.0/core/include" + - "-I../ext/prometheus-cpp-lite-1.0/simpleapi/include" diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 82d0207af..b9e1f4904 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -9,13 +9,12 @@ jobs: git config --global core.autocrlf input # git config --global core.eol lf - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Rust - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: toolchain: stable - target: x86_64-unknown-linux-gnu - override: true + targets: x86_64-unknown-linux-gnu components: rustfmt, clippy - name: Set up cargo cache @@ -33,6 +32,14 @@ jobs: run: | make selftest ./zerotier-selftest + - name: 'Tar files' # keeps permissions (execute) + run: tar -cvf zerotier-one.tar zerotier-one + - name: Archive production artifacts + uses: actions/upload-artifact@v4 + with: + name: zerotier-one-ubuntu-x64 + path: zerotier-one.tar + retention-days: 7 build_macos: runs-on: macos-latest @@ -42,20 +49,18 @@ jobs: git config --global core.autocrlf input # git config --global core.eol lf - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Rust aarch64 - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: toolchain: stable target: aarch64-apple-darwin - override: true components: rustfmt, clippy - name: Install Rust x86_64 - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: toolchain: stable target: x86_64-apple-darwin - override: true components: rustfmt, clippy - name: Set up cargo cache uses: Swatinem/rust-cache@v2 @@ -65,13 +70,21 @@ jobs: shared-key: ${{ runner.os }}-cargo- workspaces: | rustybits/ - - name: make run: make - name: selftest run: | make selftest ./zerotier-selftest + - name: 'Tar files' # keeps permissions (execute) + run: tar -cvf zerotier-one.tar zerotier-one + - name: Archive production artifacts + uses: actions/upload-artifact@v4 + with: + name: zerotier-one-mac + path: zerotier-one.tar + retention-days: 7 + build_windows: runs-on: windows-latest @@ -81,13 +94,12 @@ jobs: git config --global core.autocrlf true # git config --global core.eol lf - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Rust - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: toolchain: stable target: aarch64-apple-darwin - override: true components: rustfmt, clippy - name: Set up cargo cache uses: Swatinem/rust-cache@v2 @@ -99,7 +111,13 @@ jobs: rustybits/ - name: setup msbuild - uses: microsoft/setup-msbuild@v1.1.3 + uses: microsoft/setup-msbuild@v2 - name: msbuild run: | - msbuild windows\ZeroTierOne.sln /m /p:Configuration=Release /property:Platform=x64 /t:ZeroTierOne + msbuild windows\ZeroTierOne.sln /m /p:Configuration=Release /property:Platform=x64 /t:ZeroTierOne + - name: Archive production artifacts + uses: actions/upload-artifact@v4 + with: + name: zerotier-one-windows + path: windows/Build + retention-days: 7 diff --git a/.gitignore b/.gitignore index fd2f7a9a1..ba8b4afca 100755 --- a/.gitignore +++ b/.gitignore @@ -124,6 +124,7 @@ attic/world/mkworld workspace/ workspace2/ zeroidc/target/ +tcp-proxy/target #snapcraft specifics /parts/ diff --git a/README.md b/README.md index 42eecdda0..e881ce810 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ To build on Mac and Linux just type `make`. On FreeBSD and OpenBSD `gmake` (GNU - Xcode command line tools for macOS 10.13 or newer are required. - Rust for x86_64 and ARM64 targets *if SSO is enabled in the build*. - **Linux** - - The minimum compiler versions required are GCC/G++ 4.9.3 or CLANG/CLANG++ 3.4.2. (Install `clang` on CentOS 7 as G++ is too old.) + - The minimum compiler versions required are GCC/G++ 8.x or CLANG/CLANG++ 5.x. - Linux makefiles automatically detect and prefer clang/clang++ if present as it produces smaller and slightly faster binaries in most cases. You can override by supplying CC and CXX variables on the make command line. - Rust for x86_64 and ARM64 targets *if SSO is enabled in the build*. - **Windows** diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index bcf1acbd2..14c37250b 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -315,12 +315,14 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) return true; } else if (t == "MATCH_MAC_SOURCE") { rule.t |= ZT_NETWORK_RULE_MATCH_MAC_SOURCE; - const std::string mac(OSUtils::jsonString(r["mac"],"0")); + std::string mac(OSUtils::jsonString(r["mac"],"0")); + Utils::cleanMac(mac); Utils::unhex(mac.c_str(),(unsigned int)mac.length(),rule.v.mac,6); return true; } else if (t == "MATCH_MAC_DEST") { rule.t |= ZT_NETWORK_RULE_MATCH_MAC_DEST; - const std::string mac(OSUtils::jsonString(r["mac"],"0")); + std::string mac(OSUtils::jsonString(r["mac"],"0")); + Utils::cleanMac(mac); Utils::unhex(mac.c_str(),(unsigned int)mac.length(),rule.v.mac,6); return true; } else if (t == "MATCH_IPV4_SOURCE") { diff --git a/make-linux.mk b/make-linux.mk index 97ff6215e..efc1badf6 100644 --- a/make-linux.mk +++ b/make-linux.mk @@ -364,7 +364,7 @@ override CFLAGS+=-fPIC -fPIE override CXXFLAGS+=-fPIC -fPIE # Non-executable stack -override ASFLAGS+=--noexecstack +override LDFLAGS+=-Wl,-z,noexecstack .PHONY: all all: one diff --git a/node/Bond.cpp b/node/Bond.cpp index f5124a6a2..2a061796d 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -373,6 +373,7 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) */ if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && _paths[_abPathIdx].p) { + //fprintf(stderr, "trying to send via (_abPathIdx=%d) %s\n", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str()); return _paths[_abPathIdx].p; } } @@ -1032,6 +1033,13 @@ void Bond::curateBond(int64_t now, bool rebuildBond) bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay; // How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known) bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD); + + // Allow active-backup to operate without the receipt of QoS records + // This may be expanded to the other modes as an option + if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { + acceptableQoSAge = true; + } + currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial); if (currEligibility) { @@ -1043,12 +1051,11 @@ void Bond::curateBond(int64_t now, bool rebuildBond) */ if (currEligibility != _paths[i].eligible) { if (currEligibility == 0) { - log("link %s is no longer eligible", pathToStr(_paths[i].p).c_str()); + log("link %s is no longer eligible (reason: allowed=%d, age=%d, ud=%d, qos=%d, trial=%d)", pathToStr(_paths[i].p).c_str(), _paths[i].allowed(), acceptableAge, satisfiedUpDelay, acceptableQoSAge, inTrial); } if (currEligibility == 1) { log("link %s is eligible", pathToStr(_paths[i].p).c_str()); } - debug("\t[%d] allowed=%d, age=%d, qa=%d, ud=%d, trial=%d", i, _paths[i].allowed(), acceptableAge, acceptableQoSAge, satisfiedUpDelay, inTrial); dumpPathStatus(now, i); if (currEligibility) { rebuildBond = true; @@ -1496,7 +1503,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) { int prevActiveBackupPathIdx = _abPathIdx; int nonPreferredPathIdx = ZT_MAX_PEER_NETWORK_PATHS; - bool bFoundPrimaryLink = false; + bool foundPathOnPrimaryLink = false; + bool foundPreferredPath = false; if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && ! _paths[_abPathIdx].p) { _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; @@ -1559,15 +1567,16 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) if (! _paths[i].preferred()) { // Found path on primary link, take note in case we don't find a preferred path nonPreferredPathIdx = i; - bFoundPrimaryLink = true; + foundPathOnPrimaryLink = true; } if (_paths[i].preferred()) { _abPathIdx = i; - bFoundPrimaryLink = true; + foundPathOnPrimaryLink = true; if (_paths[_abPathIdx].p) { SharedPtr abLink = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket()); if (abLink) { - log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str()); + log("found preferred primary link (_abPathIdx=%d), %s", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str()); + foundPreferredPath = true; } break; // Found preferred path on primary link } @@ -1575,8 +1584,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } } - if (bFoundPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) { - log("found non-preferred primary link"); + if (!foundPreferredPath && foundPathOnPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) { + log("found non-preferred primary link (_abPathIdx=%d)", _abPathIdx); _abPathIdx = nonPreferredPathIdx; } } @@ -1614,10 +1623,10 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } if (_paths[(*it)].p && ! _paths[(*it)].eligible) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[(*it)].p->localSocket()); - it = _abFailoverQueue.erase(it); if (link) { - log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); + log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[(*it)].p).c_str(), _abFailoverQueue.size()); } + it = _abFailoverQueue.erase(it); continue; } else { @@ -1684,7 +1693,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } if (! bFoundPathInQueue) { - _abFailoverQueue.push_front(i); + _abFailoverQueue.push_back(i); log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size()); addPathToBond(i, 0); } @@ -1734,13 +1743,14 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } if (! bFoundPathInQueue) { - _abFailoverQueue.push_front(i); + _abFailoverQueue.push_back(i); log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size()); addPathToBond(i, 0); } } } } + /* // Sort queue based on performance if (! _abFailoverQueue.empty()) { for (int i = 0; i < _abFailoverQueue.size(); i++) { @@ -1752,7 +1762,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } _abFailoverQueue[hole_position] = value_to_insert; } - } + }*/ /** * Short-circuit if we have no queued paths @@ -1902,7 +1912,7 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT * Policy defaults */ _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; - _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_OPTIMIZE; + _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_ALWAYS; _rrPacketsSentOnCurrLink = 0; _rrIdx = 0; _packetsPerLink = 64; @@ -2021,7 +2031,8 @@ void Bond::dumpInfo(int64_t now, bool force) _lastSummaryDump = now; float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f); _overheadBytes = 0; - log("bond: bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)", + log("bond: ready=%d, bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)", + isReady(), _policy, _failoverInterval, _monitorInterval, diff --git a/node/Bond.hpp b/node/Bond.hpp index 408c1e125..d5d3f673e 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -1144,6 +1144,7 @@ class Bond { __attribute__((format(printf, 2, 3))) #endif { + //if (_peerId != 0x0 && _peerId != 0x0) { return; } #ifdef ZT_TRACE time_t rawtime; struct tm* timeinfo; @@ -1175,6 +1176,7 @@ class Bond { __attribute__((format(printf, 2, 3))) #endif { + //if (_peerId != 0x0 && _peerId != 0x0) { return; } #ifdef ZT_DEBUG time_t rawtime; struct tm* timeinfo; diff --git a/node/Constants.hpp b/node/Constants.hpp index e310cc947..95f093b3e 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -202,6 +202,72 @@ #define ZT_PACKED_STRUCT(D) D __attribute__((packed)) #endif +#if defined(_WIN32) +#define ZT_PLATFORM_NAME "windows" // Windows +#elif defined(_WIN64) +#define ZT_PLATFORM_NAME "windows" // Windows +#elif defined(__CYGWIN__) +#define ZT_PLATFORM_NAME "windows" // Windows (Cygwin POSIX under Microsoft Window) +#elif defined(__ANDROID__) +#define ZT_PLATFORM_NAME "android" // Android (implies Linux, so it must come first) +#elif defined(__linux__) +#define ZT_PLATFORM_NAME "linux" // Debian, Ubuntu, Gentoo, Fedora, openSUSE, RedHat, Centos and other +#elif defined(__unix__) || !defined(__APPLE__) && defined(__MACH__) +#include +#if defined(BSD) +#define ZT_PLATFORM_NAME "bsd" // FreeBSD, NetBSD, OpenBSD, DragonFly BSD +#endif +#elif defined(__hpux) +#define ZT_PLATFORM_NAME "hp-ux" // HP-UX +#elif defined(_AIX) +#define ZT_PLATFORM_NAME "aix" // IBM AIX +#elif defined(__APPLE__) && defined(__MACH__) // Apple OSX and iOS (Darwin) +#include +#if defined(TARGET_IPHONE_SIMULATOR) && TARGET_IPHONE_SIMULATOR == 1 +#define ZT_PLATFORM_NAME "ios_sim" // Apple iOS +#elif defined(TARGET_OS_IPAD) && TARGET_OS_IPAD == 1 +#define ZT_PLATFORM_NAME "ios_ipad" +#elif defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE == 1 +#define ZT_PLATFORM_NAME "ios_iphone" // Apple iOS +#elif defined(TARGET_OS_MAC) && TARGET_OS_MAC == 1 +#define ZT_PLATFORM_NAME "macos" // Apple OSX +#endif +#elif defined(__sun) && defined(__SVR4) +#define ZT_PLATFORM_NAME "solaris" // Oracle Solaris, Open Indiana +#else +#define ZT_PLATFORM_NAME "unknown" +#endif +#ifndef ZT_PLATFORM_NAME +#define ZT_PLATFORM_NAME "unknown" +#endif + +#if defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__) || defined(_M_X64) || defined(_M_AMD64) +#define ZT_ARCH_NAME "x86_64" +#elif defined(__i386__) || defined(__i486__) || defined(__i586__) || defined(__i686__) || defined(_X86_) || defined(_M_IX86) || defined(__X86__) || defined(__I86__) || defined(_M_I86) +#define ZT_ARCH_NAME "x86" +#elif defined(__aarch64__) || defined(__AARCH64EL__) || defined(_M_ARM64) +#define ZT_ARCH_NAME "arm64" +#elif defined(__arm__) || defined(__TARGET_ARCH_ARM) || defined(_ARM) || defined(_M_ARM) || defined(_M_ARMT) || defined(__arm) || defined(__thumb__) +#define ZT_ARCH_NAME "arm" +#elif defined(__loongarch__) || defined(_LOONGARCH_ARCH) +#define ZT_ARCH_NAME "loongarch" +#elif defined(__mips__) || defined(__MIPS__) +#define ZT_ARCH_NAME "mips" +#elif defined(__riscv) || defined(__riscv_xlen) +#define ZT_ARCH_NAME "riscv" +#elif defined(__powerpc__) || defined(__powerpc64__) || defined(__ppc__) || defined(__ppc64__) || defined (_M_PPC) +#define ZT_ARCH_NAME "powerpc" +#elif defined(__s390__) || defined(__s390x__) || defined(__zarch__) +#define ZT_ARCH_NAME "s390" +#else +#define ZT_ARCH_NAME "unknown" +#endif +#ifndef ZT_ARCH_NAME +#define ZT_ARCH_NAME "unknown" +#endif + +#define ZT_TARGET_NAME (ZT_PLATFORM_NAME "/" ZT_ARCH_NAME) + /** * Length of a ZeroTier address in bytes */ diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index d939fa0a1..2537c0fb6 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -38,6 +38,7 @@ #include "Path.hpp" #include "Bond.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" namespace ZeroTier { @@ -334,7 +335,6 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr& peer) { Metrics::pkt_qos_in++; - SharedPtr bond = peer->bond(); if (! peer->rateGateQoS(RR->node->now(), _path)) { return true; } @@ -793,66 +793,65 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - if (peer->flowHashingSupported()) { - if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { - const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); - const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - uint8_t proto = (reinterpret_cast(frameData)[9]); - const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); - switch(proto) { - case 0x01: // ICMP - //flowId = 0x01; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (headerLen + 4)) { - unsigned int pos = headerLen + 0; - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - } + if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { + const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); + const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + + if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + uint8_t proto = (reinterpret_cast(frameData)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + //flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; } + } - if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - unsigned int pos; - unsigned int proto; - _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); - switch(proto) { - case 0x3A: // ICMPv6 - //flowId = 0x3A; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (pos + 4)) { - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - default: - break; - } + if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + //flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (pos + 4)) { + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; + default: + break; } } } @@ -869,7 +868,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -942,7 +941,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Network.cpp b/node/Network.cpp index 2e03b2482..1643487fe 100644 --- a/node/Network.cpp +++ b/node/Network.cpp @@ -1313,6 +1313,7 @@ void Network::requestConfiguration(void *tPtr) rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_MAX_NETWORK_TAGS,(uint64_t)ZT_MAX_NETWORK_TAGS); rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_FLAGS,(uint64_t)0); rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_RULES_ENGINE_REV,(uint64_t)ZT_RULES_ENGINE_REVISION); + rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_OS_ARCH,ZT_TARGET_NAME); RR->t->networkConfigRequestSent(tPtr,*this,ctrl); diff --git a/node/NetworkConfig.hpp b/node/NetworkConfig.hpp index 416bbfd78..65e4c8365 100644 --- a/node/NetworkConfig.hpp +++ b/node/NetworkConfig.hpp @@ -105,6 +105,8 @@ namespace ZeroTier { // Network config version #define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION "v" +// Network config version +#define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_OS_ARCH "o" // Protocol version (see Packet.hpp) #define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION "pv" // Software vendor @@ -687,7 +689,7 @@ public: /** * Time current authentication expires or 0 if external authentication is disabled - * + * * Not used if authVersion >= 1 */ uint64_t authenticationExpiryTime; diff --git a/node/Node.cpp b/node/Node.cpp index 5c561fedc..1f377c545 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -35,6 +35,7 @@ #include "Network.hpp" #include "Trace.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" // FIXME: remove this suppression and actually fix warnings #ifdef __GNUC__ @@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0); const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0); const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0); - const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0); - m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bc)); + m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms)); if (!m) { throw std::bad_alloc(); } @@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 RR->sa = new (m) SelfAwareness(RR); m += sas; RR->bc = new (m) Bond(RR); + m += bcs; + RR->pm = new (m) PacketMultiplexer(RR); } catch ( ... ) { if (RR->sa) { RR->sa->~SelfAwareness(); @@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(m); throw; } @@ -191,6 +198,9 @@ Node::~Node() if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(RR->rtmem); } @@ -230,6 +240,11 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } +void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled) +{ + RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); +} + // Closure used to ping upstream and active/online peers class _PingPeersThatNeedPing { diff --git a/node/Node.hpp b/node/Node.hpp index 81c78249b..f9d05483a 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,7 +283,10 @@ public: return _lowBandwidthMode; } -private: + void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled); + + +public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp new file mode 100644 index 000000000..a1dc835a1 --- /dev/null +++ b/node/PacketMultiplexer.cpp @@ -0,0 +1,122 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#include "PacketMultiplexer.hpp" + +#include "Node.hpp" +#include "RuntimeEnvironment.hpp" +#include "Constants.hpp" + +#include +#include + +namespace ZeroTier { + +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; +}; + +void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) +{ +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; +#endif + + if (!_enabled) { + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; + } + + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->tPtr = tPtr; + packet->nwid = nwid; + packet->nuptr = nuptr; + packet->source = source.toInt(); + packet->dest = dest.toInt(); + packet->etherType = etherType; + packet->vlanId = vlanId; + packet->len = len; + packet->flowId = flowId; + memcpy(packet->data, data, len); + + int bucket = flowId % _concurrency; + _rxPacketQueues[bucket]->postLimit(packet, 2048); +} + +void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) +{ +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _enabled = true; + _concurrency = concurrency; + bool _enablePinning = cpuPinningEnabled; + + for (unsigned int i = 0; i < _concurrency; ++i) { + fprintf(stderr, "Reserved queue for thread %d\n", i); + _rxPacketQueues.push_back(new BlockingQueue()); + } + + // Each thread picks from its own queue to feed into the core + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning]() { + fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); + + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueues[i]->get(packet)) { + break; + } + if (! packet) { + break; + } + + // fprintf(stderr, "popped packet from queue %d\n", i); + + MAC sourceMac = MAC(packet->source); + MAC destMac = MAC(packet->dest); + + RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + /* + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + */ + } + })); + } +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp new file mode 100644 index 000000000..4753180ed --- /dev/null +++ b/node/PacketMultiplexer.hpp @@ -0,0 +1,65 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_PACKET_MULTIPLEXER_HPP +#define ZT_PACKET_MULTIPLEXER_HPP + +#include "../osdep/BlockingQueue.hpp" +#include "MAC.hpp" +#include "Mutex.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +struct PacketRecord { + void* tPtr; + uint64_t nwid; + void** nuptr; + uint64_t source; + uint64_t dest; + unsigned int etherType; + unsigned int vlanId; + uint8_t data[ZT_MAX_MTU]; + unsigned int len; + unsigned int flowId; +}; + +class PacketMultiplexer { + public: + const RuntimeEnvironment* RR; + + PacketMultiplexer(const RuntimeEnvironment* renv); + + void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled); + + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); + + std::vector*> _rxPacketQueues; + + unsigned int _concurrency; + // pool + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m, _rxPacketThreads_m; + + std::vector _rxThreads; + unsigned int _rxThreadCount; + bool _enabled; +}; + +} // namespace ZeroTier + +#endif // ZT_PACKET_MULTIPLEXER_HPP \ No newline at end of file diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index eefa2eeda..6b14f771c 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -31,6 +31,7 @@ class NetworkController; class SelfAwareness; class Trace; class Bond; +class PacketMultiplexer; /** * Holds global state for an instance of ZeroTier::Node @@ -77,6 +78,7 @@ public: Topology *topology; SelfAwareness *sa; Bond *bc; + PacketMultiplexer *pm; // This node's identity and string representations thereof Identity identity; diff --git a/node/Switch.cpp b/node/Switch.cpp index 4fea0d0e9..7664f7a48 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72); }).detach(); - + return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. } // else no NDP emulation } // else no NDP emulation diff --git a/node/Utils.hpp b/node/Utils.hpp index 67aa77739..41809cba0 100644 --- a/node/Utils.hpp +++ b/node/Utils.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #if defined(__FreeBSD__) #include @@ -849,6 +850,19 @@ public: * Hexadecimal characters 0-f */ static const char HEXCHARS[16]; + + /* + * Remove `-` and `:` from a MAC address (in-place). + * + * @param mac The MAC address + */ + static inline void cleanMac(std::string& mac) + { + auto start = mac.begin(); + auto end = mac.end(); + auto new_end = std::remove_if(start, end, [](char c) { return c == 45 || c == 58; }); + mac.erase(new_end, end); + } }; } // namespace ZeroTier diff --git a/objects.mk b/objects.mk index d07578fb3..1d8a6c0a5 100644 --- a/objects.mk +++ b/objects.mk @@ -29,7 +29,8 @@ CORE_OBJS=\ node/Topology.o \ node/Trace.o \ node/Utils.o \ - node/Bond.o + node/Bond.o \ + node/PacketMultiplexer.o ONE_OBJS=\ controller/EmbeddedNetworkController.o \ diff --git a/osdep/BSDEthernetTap.cpp b/osdep/BSDEthernetTap.cpp index 3114306ee..1a240c1a1 100644 --- a/osdep/BSDEthernetTap.cpp +++ b/osdep/BSDEthernetTap.cpp @@ -39,7 +39,9 @@ #include #include #include +#include +#include #include #include #include @@ -53,6 +55,7 @@ #include "BSDEthernetTap.hpp" #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv" +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -61,6 +64,8 @@ namespace ZeroTier { BSDEthernetTap::BSDEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,6 +74,8 @@ BSDEthernetTap::BSDEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int), void *arg) : _handler(handler), + _concurrency(concurrency), + _pinning(pinning), _arg(arg), _nwid(nwid), _mtu(mtu), @@ -195,11 +202,9 @@ BSDEthernetTap::BSDEthernetTap( BSDEthernetTap::~BSDEthernetTap() { ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit - Thread::join(_thread); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); - long cpid = (long)vfork(); if (cpid == 0) { #ifdef ZT_TRACE @@ -211,6 +216,10 @@ BSDEthernetTap::~BSDEthernetTap() int exitcode = -1; ::waitpid(cpid,&exitcode,0); } + Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void BSDEthernetTap::setEnabled(bool en) @@ -418,53 +427,75 @@ void BSDEthernetTap::setMtu(unsigned int mtu) void BSDEthernetTap::threadMain() throw() { - fd_set readfds,nullfds; - MAC to,from; - int n,nfds,r; - char getBuf[ZT_MAX_MTU + 64]; - // Wait for a moment after startup -- wait for Network to finish // constructing itself. Thread::sleep(500); - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _pinning] { - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread - break; - - if (FD_ISSET(_fd,&readfds)) { - n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r); - if (n < 0) { - if ((errno != EINTR)&&(errno != ETIMEDOUT)) - break; - } else { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - to.setTo(getBuf,6); - from.setTo(getBuf + 6,6); - unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]); - _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14); - } - - r = 0; + if (_pinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "Pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + //int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "Failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); } } - } + + uint8_t b[ZT_TAP_BUF_SIZE]; + MAC to, from; + fd_set readfds, nullfds; + int n, nfds, r; + + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + + r = 0; + + for(;;) { + FD_SET(_shutdownSignalPipe[0],&readfds); + FD_SET(_fd,&readfds); + select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + + if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread + break; + + if (FD_ISSET(_fd,&readfds)) { + n = (int)::read(_fd,b + r,sizeof(b) - r); + if (n < 0) { + if ((errno != EINTR)&&(errno != ETIMEDOUT)) + break; + } else { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + to.setTo(b,6); + from.setTo(b + 6,6); + unsigned int etherType = ntohs(((const uint16_t *)b)[6]); + _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14); + } + + r = 0; + } + } + } + } + })); } } diff --git a/osdep/BSDEthernetTap.hpp b/osdep/BSDEthernetTap.hpp index 9700fb365..50e2e6e8b 100644 --- a/osdep/BSDEthernetTap.hpp +++ b/osdep/BSDEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MulticastGroup.hpp" @@ -34,6 +35,8 @@ class BSDEthernetTap : public EthernetTap public: BSDEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -62,6 +65,8 @@ public: private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; + unsigned int _concurrency; + bool _pinning; uint64_t _nwid; Thread _thread; std::string _dev; @@ -73,6 +78,7 @@ private: volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index fdda8fe15..0be209ecd 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -57,6 +57,8 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, @@ -92,7 +94,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ @@ -130,7 +132,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __WINDOWS__ #ifdef __FreeBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __FreeBSD__ #ifdef __NetBSD__ diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index e6833c33d..1d97f1256 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -32,6 +32,8 @@ class EthernetTap public: static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 11dd8c0cf..14929d176 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -60,7 +60,7 @@ #define IFNAMSIZ 16 #endif -#define ZT_TAP_BUF_SIZE 16384 +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC namespace ZeroTier { // determine if we're running a really old linux kernel. -// Kernels in the 2.6.x series don't behave the same when bringing up +// Kernels in the 2.6.x series don't behave the same when bringing up // the tap devices. // // Returns true if the kernel major version is < 3 @@ -111,6 +111,8 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -220,135 +222,155 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - _tapReaderThread = std::thread([this]{ - uint8_t b[ZT_TAP_BUF_SIZE]; - fd_set readfds,nullfds; - int n,nfds,r; - std::vector buffers; - struct ifreq ifr; + for (unsigned int i = 0; i < concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, concurrency, pinning] { - memset(&ifr,0,sizeof(ifr)); - strcpy(ifr.ifr_name,_dev.c_str()); + if (pinning) { + int pinCore = i % concurrency; + fprintf(stderr, "Pinning tap thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "Failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - const int sock = socket(AF_INET,SOCK_DGRAM,0); - if (sock <= 0) - return; + uint8_t b[ZT_TAP_BUF_SIZE]; + fd_set readfds, nullfds; + int n, nfds, r; + if (i == 0) { + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + strcpy(ifr.ifr_name, _dev.c_str()); - if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + const int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock <= 0) + return; - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - usleep(100000); + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + usleep(100000); + + if (isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + + usleep(100000); + } + + ifr.ifr_flags |= IFF_MULTICAST; + ifr.ifr_flags |= IFF_UP; + if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } + + usleep(100000); + + if (! isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + } + + fcntl(_fd, F_SETFL, O_NONBLOCK); - if (isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + } + + if (! _run) { return; } - usleep(100000); - } - + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1; - ifr.ifr_flags |= IFF_MULTICAST; - ifr.ifr_flags |= IFF_UP; - if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + r = 0; + for (;;) { + FD_SET(_shutdownSignalPipe[0], &readfds); + FD_SET(_fd, &readfds); + select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0); - usleep(100000); + if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) { + break; + } + if (FD_ISSET(_fd, &readfds)) { + for (;;) { + // read until there are no more packets, then return to outer select() loop + n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r); + if (n > 0) { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; - if (!isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + if (_enabled) { + MAC to(b, 6), from(b + 6, 6); + unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]); + _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14)); + } - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); - return; - } - } - - fcntl(_fd,F_SETFL,O_NONBLOCK); - - ::close(sock); - - if (!_run) - return; - - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; - - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) - break; - - if (FD_ISSET(_fd,&readfds)) { - for(;;) { // read until there are no more packets, then return to outer select() loop - n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r); - if (n > 0) { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - //_tapq.post(std::pair(buf,r)); - //buf = nullptr; - MAC to(b, 6),from(b + 6, 6); - unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); - _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); + r = 0; } - - r = 0; } - } else { - r = 0; - break; + else { + r = 0; + break; + } } } } - } - }); + })); + } } LinuxEthernetTap::~LinuxEthernetTap() { _run = false; (void)::write(_shutdownSignalPipe[1],"\0",1); - _tapReaderThread.join(); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); + for (std::thread &t : _rxThreads) { + t.join(); + } } void LinuxEthernetTap::setEnabled(bool en) diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 6353f8661..41e299823 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -26,6 +26,7 @@ #include #include "../node/MulticastGroup.hpp" #include "EthernetTap.hpp" +#include "BlockingQueue.hpp" namespace ZeroTier { @@ -34,6 +35,8 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -57,9 +60,6 @@ public: virtual void setMtu(unsigned int mtu); virtual void setDns(const char *domain, const std::vector &servers) {} - - - private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; @@ -73,9 +73,9 @@ private: int _shutdownSignalPipe[2]; std::atomic_bool _enabled; std::atomic_bool _run; - std::thread _tapReaderThread; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index 0b2fcb85b..e74c66940 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -32,7 +32,7 @@ * All this stuff is basically undocumented. A lot of tracing through * the Darwin/XNU kernel source was required to figure out how to make * this actually work. - * + * * We hope to develop a DriverKit-based driver in the near-mid future to * replace this weird hack, but it works for now through Big Sur in our * testing. diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index d52d67947..e06560300 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -447,7 +447,9 @@ MacKextEthernetTap::~MacKextEthernetTap() ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit Thread::join(_thread); - + for (std::thread &t : _rxThreads) { + t.join(); + } ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 46b0915f1..aede92867 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MAC.hpp" @@ -75,6 +76,7 @@ private: int _fd; int _shutdownSignalPipe[2]; volatile bool _enabled; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/service/OneService.cpp b/service/OneService.cpp index 88a516ebd..298b2b26e 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -26,6 +25,11 @@ #include #include +#ifdef __FreeBSD__ +#include +#include +#endif + #include "../version.h" #include "../include/ZeroTierOne.h" @@ -42,6 +46,7 @@ #include "../node/SHA512.hpp" #include "../node/Bond.hpp" #include "../node/Peer.hpp" +#include "../node/PacketMultiplexer.hpp" #include "../osdep/Phy.hpp" #include "../osdep/OSUtils.hpp" @@ -759,7 +764,7 @@ struct TcpConnection Mutex writeq_m; }; -struct OneServiceIncomingPacket +struct PacketRecord { uint64_t now; int64_t sock; @@ -786,14 +791,22 @@ public: SoftwareUpdater *_updater; bool _updateAutoApply; - httplib::Server _controlPlane; + httplib::Server _controlPlane; httplib::Server _controlPlaneV6; - std::thread _serverThread; + std::thread _serverThread; std::thread _serverThreadV6; bool _serverThreadRunning; bool _serverThreadRunningV6; - bool _allowTcpFallbackRelay; + BlockingQueue _rxPacketQueue; + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m,_rxPacketThreads_m; + bool _multicoreEnabled; + bool _cpuPinningEnabled; + unsigned int _concurrency; + + bool _allowTcpFallbackRelay; bool _forceTcpRelay; bool _allowSecondaryPort; bool _enableWebServer; @@ -844,8 +857,6 @@ public: // Deadline for the next background task service function volatile int64_t _nextBackgroundTaskDeadline; - - std::map _nets; Mutex _nets_m; @@ -892,9 +903,9 @@ public: ,_node((Node *)0) ,_updater((SoftwareUpdater *)0) ,_updateAutoApply(false) - ,_controlPlane() + ,_controlPlane() ,_controlPlaneV6() - ,_serverThread() + ,_serverThread() ,_serverThreadV6() ,_serverThreadRunning(false) ,_serverThreadRunningV6(false) @@ -928,9 +939,9 @@ public: _ports[1] = 0; _ports[2] = 0; - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); - prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); - prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); + prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); + prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); + prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -942,20 +953,34 @@ public: #ifdef __WINDOWS__ WinFWHelper::removeICMPRules(); #endif + + _rxPacketQueue.stop(); + _rxPacketThreads_m.lock(); + for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) { + t->join(); + } + _rxPacketThreads_m.unlock(); _binder.closeAll(_phy); #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif - _controlPlane.stop(); + _controlPlane.stop(); if (_serverThreadRunning) { - _serverThread.join(); + _serverThread.join(); } _controlPlaneV6.stop(); if (_serverThreadRunningV6) { _serverThreadV6.join(); } + _rxPacketVector_m.lock(); + while (!_rxPacketVector.empty()) { + delete _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; @@ -964,6 +989,15 @@ public: delete _rc; } + void setUpMultithreading() + { +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _node->initMultithreading(_concurrency, _cpuPinningEnabled); + bool pinning = _cpuPinningEnabled; + } + virtual ReasonForTermination run() { try { @@ -1272,6 +1306,9 @@ public: const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500; clockShouldBe = now + (int64_t)delay; _phy.poll(delay); + + + } } catch (std::exception &e) { Mutex::Lock _l(_termReason_m); @@ -2510,7 +2547,7 @@ public: } _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,mtu,capacity,enabled,linkMode,failoverToStr)); } - std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize")); + std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"always")); if (linkSelectMethodStr == "always") { newTemplateBond->setLinkSelectMethod(ZT_BOND_RESELECTION_POLICY_ALWAYS); } @@ -2562,7 +2599,25 @@ public: fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); - _node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false)); +#if defined(__LINUX__) || defined(__FreeBSD__) + _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); + _concurrency = OSUtils::jsonInt(settings["concurrency"],1); + _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); + if (_multicoreEnabled) { + unsigned int maxConcurrency = std::thread::hardware_concurrency(); + if (_concurrency <= 1 || _concurrency >= maxConcurrency) { + unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1); + fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault); + _concurrency = conservativeDefault; + } + setUpMultithreading(); + } + else { + // Force values in case the user accidentally defined them with multicore disabled + _concurrency = 1; + _cpuPinningEnabled = false; + } +#endif #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -2877,16 +2932,19 @@ public: // Handlers for Node and Phy<> callbacks // ========================================================================= - inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) + + + + inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len) { if (_forceTcpRelay) { return; } - Metrics::udp_recv += len; + Metrics::udp_recv += len; const uint64_t now = OSUtils::now(); - if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; - } + } const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -2898,6 +2956,7 @@ public: } } + inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { if (!success) { @@ -3116,6 +3175,8 @@ public: n.setTap(EthernetTap::newInstance( nullptr, + _concurrency, + _cpuPinningEnabled, _homePath.c_str(), MAC(nwc->mac), nwc->mtu, @@ -3630,8 +3691,9 @@ public: inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { NetworkState *n = reinterpret_cast(*nuptr); - if ((!n)||(!n->tap())) + if ((!n)||(!n->tap())) { return; + } n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len); } diff --git a/windows/ZeroTierOne/ZeroTierOne.vcxproj b/windows/ZeroTierOne/ZeroTierOne.vcxproj index fcd8b56e2..2dd05aa57 100644 --- a/windows/ZeroTierOne/ZeroTierOne.vcxproj +++ b/windows/ZeroTierOne/ZeroTierOne.vcxproj @@ -88,6 +88,7 @@ +