Merge branch 'master' into cmake

This commit is contained in:
Grant Limberg 2019-08-01 13:49:30 -07:00
commit c2f9aab068
86 changed files with 4809 additions and 5863 deletions

View file

@ -104,8 +104,7 @@ void DB::cleanMember(nlohmann::json &member)
member.erase("lastRequestMetaData");
}
DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
_controller(nc),
DB::DB(const Identity &myId,const char *path) :
_myId(myId),
_myAddress(myId.address()),
_path((path) ? path : "")
@ -115,9 +114,7 @@ DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path
_myAddressStr = tmp;
}
DB::~DB()
{
}
DB::~DB() {}
bool DB::get(const uint64_t networkId,nlohmann::json &network)
{
@ -229,7 +226,7 @@ void DB::networks(std::vector<uint64_t> &networks)
networks.push_back(n->first);
}
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push)
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
@ -313,8 +310,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
}
}
if (push)
_controller->onNetworkMemberUpdate(networkId,memberId);
if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig);
}
}
} else if (memberId) {
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
@ -332,20 +333,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
}
}
if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)))
_controller->onNetworkMemberDeauthorize(networkId,memberId);
if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberDeauthorize(networkId,memberId);
}
}
}
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push)
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
{
if (networkConfig.is_object()) {
const std::string ids = networkConfig["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
if (networkId) {
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[id];
std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
@ -354,15 +359,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = networkConfig;
}
if (push)
_controller->onNetworkUpdate(id);
if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkUpdate(networkId,networkConfig);
}
}
}
} else if (old.is_object()) {
const std::string ids = old["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
_networks.erase(id);
_networks.erase(networkId);
}
}
}

View file

@ -40,20 +40,29 @@
#include <unordered_set>
#include <vector>
#include <atomic>
#include <mutex>
#include "../ext/json/json.hpp"
namespace ZeroTier
{
class EmbeddedNetworkController;
/**
* Base class with common infrastructure for all controller DB implementations
*/
class DB
{
public:
class ChangeListener
{
public:
ChangeListener() {}
virtual ~ChangeListener() {}
virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {}
virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {}
};
struct NetworkSummaryInfo
{
NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
@ -64,27 +73,12 @@ public:
int64_t mostRecentDeauthTime;
};
/**
* Ensure that all network fields are present
*/
static void initNetwork(nlohmann::json &network);
/**
* Ensure that all member fields are present
*/
static void initMember(nlohmann::json &member);
/**
* Remove old and temporary network fields
*/
static void cleanNetwork(nlohmann::json &network);
/**
* Remove old and temporary member fields
*/
static void cleanMember(nlohmann::json &member);
DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
DB(const Identity &myId,const char *path);
virtual ~DB();
virtual bool waitForReady() = 0;
@ -100,19 +94,20 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
inline void addListener(DB::ChangeListener *const listener)
{
std::lock_guard<std::mutex> l(_changeListeners_l);
_changeListeners.push_back(listener);
}
protected:
struct _Network
{
@ -126,18 +121,19 @@ protected:
std::mutex lock;
};
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push);
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push);
void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
EmbeddedNetworkController *const _controller;
const Identity _myId;
const Address _myAddress;
const std::string _path;
std::string _myAddressStr;
std::vector<DB::ChangeListener *> _changeListeners;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
mutable std::mutex _changeListeners_l;
mutable std::mutex _networks_l;
};

View file

@ -46,6 +46,11 @@
#include "../version.h"
#include "EmbeddedNetworkController.hpp"
#include "LFDB.hpp"
#include "FileDB.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
#include "PostgreSQL.hpp"
#endif
#include "../node/Node.hpp"
#include "../node/CertificateOfMembership.hpp"
@ -488,12 +493,51 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
_signingId = signingId;
_sender = sender;
_signingIdAddressString = signingId.address().toString(tmp);
#ifdef ZT_CONTROLLER_USE_LIBPQ
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:"))
_db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
else // else use FileDB after endif
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
_db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
} else {
#endif
_db.reset(new FileDB(this,_signingId,_path.c_str()));
std::string lfJSON;
OSUtils::readFile((_path + ZT_PATH_SEPARATOR_S ".." ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON);
if (lfJSON.length() > 0) {
nlohmann::json lfConfig(OSUtils::jsonParse(lfJSON));
nlohmann::json &settings = lfConfig["settings"];
if (settings.is_object()) {
nlohmann::json &controllerDb = settings["controllerDb"];
if (controllerDb.is_object()) {
std::string type = controllerDb["type"];
if (type == "lf") {
std::string lfOwner = controllerDb["owner"];
std::string lfHost = controllerDb["host"];
int lfPort = controllerDb["port"];
bool storeOnlineState = controllerDb["storeOnlineState"];
if ((lfOwner.length())&&(lfHost.length())&&(lfPort > 0)&&(lfPort < 65536)) {
std::size_t pubHdrLoc = lfOwner.find("Public: ");
if ((pubHdrLoc > 0)&&((pubHdrLoc + 8) < lfOwner.length())) {
std::string lfOwnerPublic = lfOwner.substr(pubHdrLoc + 8);
std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t ");
if (pubHdrEnd != std::string::npos) {
lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd);
_db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
}
}
}
}
}
}
}
if (!_db)
_db.reset(new FileDB(_signingId,_path.c_str()));
_db->addListener(this);
#ifdef ZT_CONTROLLER_USE_LIBPQ
}
#endif
_db->waitForReady();
}
@ -1146,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
}
}
void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network)
{
// Send an update to all members of the network that are online
const int64_t now = OSUtils::now();
@ -1157,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
}
}
void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId)
void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member)
{
// Push update to member if online
try {

View file

@ -51,10 +51,6 @@
#include "../ext/json/json.hpp"
#include "DB.hpp"
#include "FileDB.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
#include "PostgreSQL.hpp"
#endif
namespace ZeroTier {
@ -62,7 +58,7 @@ class Node;
struct MQConfig;
class EmbeddedNetworkController : public NetworkController
class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener
{
public:
/**
@ -105,10 +101,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt);
// Called on update via POST or by JSONDB on external update of network or network member records
void onNetworkUpdate(const uint64_t networkId);
void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId);
void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);

View file

@ -29,10 +29,12 @@
namespace ZeroTier
{
FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
DB(nc,myId,path),
FileDB::FileDB(const Identity &myId,const char *path) :
DB(myId,path),
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace")
_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
_onlineChanged(false),
_running(true)
{
OSUtils::mkdir(_path.c_str());
OSUtils::lockDownFile(_path.c_str(),true);
@ -69,9 +71,65 @@ FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const ch
} catch ( ... ) {}
}
}
_onlineUpdateThread = std::thread([this]() {
unsigned int cnt = 0;
while (this->_running) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
if ((++cnt % 20) == 0) { // 5 seconds
std::lock_guard<std::mutex> l(this->_online_l);
if (!this->_running) return;
if (this->_onlineChanged) {
char p[4096],atmp[64];
for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) {
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first);
FILE *f = fopen(p,"wb");
if (f) {
fprintf(f,"{");
const char *memberPrefix = "";
for(auto m=nw->second.begin();m!=nw->second.end();++m) {
fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first);
memberPrefix = ",";
InetAddress lastAddr;
const char *timestampPrefix = " ";
int cnt = 0;
for(auto ts=m->second.rbegin();ts!=m->second.rend();) {
if (cnt < 25) {
if (lastAddr != ts->second) {
lastAddr = ts->second;
fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp));
timestampPrefix = ",";
++cnt;
++ts;
} else {
ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
}
} else {
ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
}
}
fprintf(f,"}");
}
fprintf(f,"}" ZT_EOL_S);
fclose(f);
}
}
this->_onlineChanged = false;
}
}
}
});
}
FileDB::~FileDB() {}
FileDB::~FileDB()
{
try {
_online_l.lock();
_running = false;
_online_l.unlock();
_onlineUpdateThread.join();
} catch ( ... ) {}
}
bool FileDB::waitForReady() { return true; }
bool FileDB::isReady() { return true; }
@ -94,14 +152,10 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid);
OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
OSUtils::rename(p1,p2);
_networkChanged(old,record,true);
}
}
@ -111,10 +165,9 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json.new",pb,(unsigned long long)id);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid);
OSUtils::mkdir(p2);
@ -122,9 +175,6 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
}
OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
OSUtils::rename(p1,p2);
_memberChanged(old,record,true);
}
}
@ -144,29 +194,39 @@ void FileDB::eraseNetwork(const uint64_t networkId)
get(networkId,network);
char p[16384];
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId);
if (OSUtils::fileExists(p,false)){
OSUtils::rm(p);
}
OSUtils::rm(p);
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId);
OSUtils::rm(p);
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId);
OSUtils::rmDashRf(p);
_networkChanged(network,nullJson,true);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online.erase(networkId);
this->_onlineChanged = true;
}
void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
nlohmann::json network,member,nullJson;
get(networkId,network);
get(memberId,member);
char p[16384];
get(memberId,member);
char p[4096];
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId);
if (OSUtils::fileExists(p,false)){
OSUtils::rm(p);
}
OSUtils::rm(p);
_memberChanged(member,nullJson,true);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online[networkId].erase(memberId);
this->_onlineChanged = true;
}
void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
// Nothing to do here right now in the filesystem store mode since we can just get this from the peer list
char mid[32],atmp[64];
OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId);
physicalAddress.toString(atmp);
std::lock_guard<std::mutex> l(this->_online_l);
this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
this->_onlineChanged = true;
}
} // namespace ZeroTier

View file

@ -35,7 +35,7 @@ namespace ZeroTier
class FileDB : public DB
{
public:
FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
FileDB(const Identity &myId,const char *path);
virtual ~FileDB();
virtual bool waitForReady();
@ -48,6 +48,11 @@ public:
protected:
std::string _networksPath;
std::string _tracePath;
std::thread _onlineUpdateThread;
std::map< uint64_t,std::map<uint64_t,std::map<int64_t,InetAddress> > > _online;
std::mutex _online_l;
bool _onlineChanged;
bool _running;
};
} // namespace ZeroTier

400
controller/LFDB.cpp Normal file
View file

@ -0,0 +1,400 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "LFDB.hpp"
#include <thread>
#include <chrono>
#include <iostream>
#include <sstream>
#include "../osdep/OSUtils.hpp"
#include "../ext/cpp-httplib/httplib.h"
namespace ZeroTier
{
LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
DB(myId,path),
_myId(myId),
_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
_lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"),
_lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980),
_running(true),
_ready(false),
_storeOnlineState(storeOnlineState)
{
_syncThread = std::thread([this]() {
char controllerAddress[24];
const uint64_t controllerAddressInt = _myId.address().toInt();
_myId.address().toString(controllerAddress);
std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member");
// LF record masking key is the first 32 bytes of SHA512(controller private key) in hex,
// hiding record values from anything but the controller or someone who has its key.
uint8_t sha512pk[64];
_myId.sha512PrivateKey(sha512pk);
char maskingKey [128];
Utils::hex(sha512pk,32,maskingKey);
httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
int64_t timeRangeStart = 0;
while (_running) {
{
std::lock_guard<std::mutex> sl(_state_l);
for(auto ns=_state.begin();ns!=_state.end();++ns) {
if (ns->second.dirty) {
nlohmann::json network;
if (get(ns->first,network)) {
nlohmann::json newrec,selector0;
selector0["Name"] = networksSelectorName;
selector0["Ordinal"] = ns->first;
newrec["Selectors"].push_back(selector0);
newrec["Value"] = network.dump();
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = maskingKey;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ns->second.dirty = false;
printf("SET network %.16llx %s\n",ns->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
}
}
for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) {
nlohmann::json newrec,selector0,selector1,selectors,ip;
char tmp[1024],tmp2[128];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first);
ms->second.lastOnlineAddress.toIpString(tmp2);
selector0["Name"] = tmp;
selector0["Ordinal"] = ms->first;
selector1["Name"] = tmp2;
selector1["Ordinal"] = 0;
selectors.push_back(selector0);
selectors.push_back(selector1);
newrec["Selectors"] = selectors;
const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData();
switch(ms->second.lastOnlineAddress.ss_family) {
case AF_INET:
for(int j=0;j<4;++j)
ip.push_back((unsigned int)rawip[j]);
break;
case AF_INET6:
for(int j=0;j<16;++j)
ip.push_back((unsigned int)rawip[j]);
break;
default:
ip = tmp2; // should never happen since only IP transport is currently supported
break;
}
newrec["Value"] = ip;
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = maskingKey;
newrec["Timestamp"] = ms->second.lastOnlineTime;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.lastOnlineDirty = false;
printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
}
if (ms->second.dirty) {
nlohmann::json network,member;
if (get(ns->first,network,ms->first,member)) {
nlohmann::json newrec,selector0,selector1,selectors;
selector0["Name"] = networksSelectorName;
selector0["Ordinal"] = ns->first;
selector1["Name"] = membersSelectorName;
selector1["Ordinal"] = ms->first;
selectors.push_back(selector0);
selectors.push_back(selector1);
newrec["Selectors"] = selectors;
newrec["Value"] = member.dump();
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = maskingKey;
newrec["PulseIfUnchanged"] = true;
auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json");
if (resp) {
if (resp->status == 200) {
ms->second.dirty = false;
printf("SET member %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
} else {
fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
}
}
}
}
}
{
std::ostringstream query;
query
<< '{'
<< "\"Ranges\":[{"
<< "\"Name\":\"" << networksSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "}],"
<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << maskingKey << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
auto resp = htcli.Post("/query",query.str(),"application/json");
if (resp) {
if (resp->status == 200) {
nlohmann::json results(OSUtils::jsonParse(resp->body));
if ((results.is_array())&&(results.size() > 0)) {
for(std::size_t ri=0;ri<results.size();++ri) {
nlohmann::json &rset = results[ri];
if ((rset.is_array())&&(rset.size() > 0)) {
nlohmann::json &result = rset[0];
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
const std::string recordValue = result["Value"];
printf("GET network %s\n",recordValue.c_str());
nlohmann::json network(OSUtils::jsonParse(recordValue));
if (network.is_object()) {
const std::string idstr = network["id"];
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
if ((id >> 24) == controllerAddressInt) { // sanity check
std::lock_guard<std::mutex> sl(_state_l);
_NetworkState &ns = _state[id];
if (!ns.dirty) {
nlohmann::json oldNetwork;
if (get(id,oldNetwork)) {
const uint64_t revision = network["revision"];
const uint64_t prevRevision = oldNetwork["revision"];
if (prevRevision < revision) {
_networkChanged(oldNetwork,network,timeRangeStart > 0);
}
} else {
nlohmann::json nullJson;
_networkChanged(nullJson,network,timeRangeStart > 0);
}
}
}
}
}
}
}
}
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
}
{
std::ostringstream query;
query
<< '{'
<< "\"Ranges\":[{"
<< "\"Name\":\"" << networksSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "},{"
<< "\"Name\":\"" << membersSelectorName << "\","
<< "\"Range\":[0,18446744073709551615]"
<< "}],"
<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << maskingKey << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
auto resp = htcli.Post("/query",query.str(),"application/json");
if (resp) {
if (resp->status == 200) {
nlohmann::json results(OSUtils::jsonParse(resp->body));
if ((results.is_array())&&(results.size() > 0)) {
for(std::size_t ri=0;ri<results.size();++ri) {
nlohmann::json &rset = results[ri];
if ((rset.is_array())&&(rset.size() > 0)) {
nlohmann::json &result = rset[0];
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
const std::string recordValue = result["Value"];
printf("GET member %s\n",recordValue.c_str());
nlohmann::json member(OSUtils::jsonParse(recordValue));
if (member.is_object()) {
const std::string nwidstr = member["nwid"];
const std::string idstr = member["id"];
const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str());
const uint64_t id = Utils::hexStrToU64(idstr.c_str());
if ((id)&&((nwid >> 24) == controllerAddressInt)) { // sanity check
std::lock_guard<std::mutex> sl(_state_l);
auto ns = _state.find(nwid);
if ((ns == _state.end())||(!ns->second.members[id].dirty)) {
nlohmann::json network,oldMember;
if (get(nwid,network,id,oldMember)) {
const uint64_t revision = member["revision"];
const uint64_t prevRevision = oldMember["revision"];
if (prevRevision < revision)
_memberChanged(oldMember,member,timeRangeStart > 0);
}
} else {
nlohmann::json nullJson;
_memberChanged(nullJson,member,timeRangeStart > 0);
}
}
}
}
}
}
}
}
} else {
fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
}
}
timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates
_ready = true;
for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members
if (!_running)
return;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
}
LFDB::~LFDB()
{
_running = false;
_syncThread.join();
}
bool LFDB::waitForReady()
{
while (!_ready) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return true;
}
bool LFDB::isReady()
{
return (_ready);
}
void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
{
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
}
} else {
record["revision"] = 1;
}
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
_networkChanged(old,record,true);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].dirty = true;
}
}
}
} else if (objtype == "member") {
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
_memberChanged(old,record,true);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].members[id].dirty = true;
}
}
}
}
}
void LFDB::eraseNetwork(const uint64_t networkId)
{
// TODO
}
void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
// TODO
}
void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
std::lock_guard<std::mutex> l(_state_l);
auto nw = _state.find(networkId);
if (nw != _state.end()) {
auto m = nw->second.members.find(memberId);
if (m != nw->second.members.end()) {
m->second.lastOnlineTime = OSUtils::now();
if (physicalAddress)
m->second.lastOnlineAddress = physicalAddress;
m->second.lastOnlineDirty = true;
}
}
}
} // namespace ZeroTier

102
controller/LFDB.hpp Normal file
View file

@ -0,0 +1,102 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#ifndef ZT_CONTROLLER_LFDB_HPP
#define ZT_CONTROLLER_LFDB_HPP
#include "DB.hpp"
#include <mutex>
#include <string>
#include <unordered_map>
#include <atomic>
namespace ZeroTier {
/**
* DB implementation for controller that stores data in LF
*/
class LFDB : public DB
{
public:
/**
* @param myId Identity of controller node (with secret)
* @param path Base path for ZeroTier node itself
* @param lfOwnerPrivate LF owner private in PEM format
* @param lfOwnerPublic LF owner public in @base62 format
* @param lfNodeHost LF node host
* @param lfNodePort LF node http (not https) port
* @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!)
*/
LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
virtual ~LFDB();
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
protected:
const Identity _myId;
std::string _lfOwnerPrivate,_lfOwnerPublic;
std::string _lfNodeHost;
int _lfNodePort;
struct _MemberState
{
_MemberState() :
lastOnlineAddress(),
lastOnlineTime(0),
dirty(false),
lastOnlineDirty(false) {}
InetAddress lastOnlineAddress;
int64_t lastOnlineTime;
bool dirty;
bool lastOnlineDirty;
};
struct _NetworkState
{
_NetworkState() :
members(),
dirty(false) {}
std::unordered_map<uint64_t,_MemberState> members;
bool dirty;
};
std::unordered_map<uint64_t,_NetworkState> _state;
std::mutex _state_l;
std::atomic_bool _running;
std::atomic_bool _ready;
std::thread _syncThread;
bool _storeOnlineState;
};
} // namespace ZeroTier
#endif

View file

@ -39,6 +39,8 @@
using json = nlohmann::json;
namespace {
static const int DB_MINIMUM_VERSION = 5;
static const char *_timestr()
{
time_t t = time(0);
@ -75,8 +77,8 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(nc, myId, path)
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(myId, path)
, _ready(0)
, _connected(1)
, _run(1)
@ -86,6 +88,36 @@ PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId
{
_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
// Database Schema Version Check
PGconn *conn = getPgConn();
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
exit(1);
}
PGresult *res = PQexec(conn, "SELECT version FROM ztc_database");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Error determining database version");
exit(1);
}
if (PQntuples(res) != 1) {
fprintf(stderr, "Invalid number of db version tuples returned.");
exit(1);
}
int dbVersion = std::stoi(PQgetvalue(res, 0, 0));
if (dbVersion < DB_MINIMUM_VERSION) {
fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
exit(1);
}
PQclear(res);
res = NULL;
PQfinish(conn);
conn = NULL;
_readyLock.lock();
_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
_membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);

View file

@ -51,7 +51,7 @@ struct MQConfig;
class PostgreSQL : public DB
{
public:
PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
virtual bool waitForReady();
@ -78,7 +78,6 @@ private:
void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ();
void commitThread();
void onlineNotificationThread();
@ -93,7 +92,6 @@ private:
BlockingQueue<nlohmann::json *> _commitQueue;
std::thread _heartbeatThread;
std::thread _membersDbWatcher;
std::thread _networksDbWatcher;
@ -116,4 +114,4 @@ private:
#endif // ZT_CONTROLLER_LIBPQ_HPP
#endif // ZT_CONTROLLER_USE_LIBPQ
#endif // ZT_CONTROLLER_USE_LIBPQ