mirror of
https://github.com/ZeroTier/ZeroTierOne
synced 2025-07-06 04:51:46 -07:00
Merge branch 'dev' into redisrection
This commit is contained in:
commit
c6fc3560f2
12804 changed files with 2946788 additions and 1833 deletions
|
@ -196,14 +196,6 @@ void DB::networks(std::set<uint64_t> &networks)
|
|||
networks.insert(n->first);
|
||||
}
|
||||
|
||||
void DB::networkMemberSSOHasExpired(uint64_t nwid, int64_t now) {
|
||||
std::lock_guard<std::mutex> l(_networks_l);
|
||||
auto nw = _networks.find(nwid);
|
||||
if (nw != _networks.end()) {
|
||||
nw->second->mostRecentDeauthTime = now;
|
||||
}
|
||||
}
|
||||
|
||||
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners)
|
||||
{
|
||||
uint64_t memberId = 0;
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
#include <set>
|
||||
#include <map>
|
||||
|
||||
#include "../ext/json/json.hpp"
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#define ZT_MEMBER_AUTH_TIMEOUT_NOTIFY_BEFORE 25000
|
||||
|
||||
|
@ -135,7 +135,6 @@ public:
|
|||
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
|
||||
|
||||
virtual AuthInfo getSSOAuthInfo(const nlohmann::json &member, const std::string &redirectURL) { return AuthInfo(); }
|
||||
virtual void networkMemberSSOHasExpired(uint64_t nwid, int64_t ts);
|
||||
|
||||
inline void addListener(DB::ChangeListener *const listener)
|
||||
{
|
||||
|
|
|
@ -137,14 +137,6 @@ AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json &member, const std::st
|
|||
return AuthInfo();
|
||||
}
|
||||
|
||||
void DBMirrorSet::networkMemberSSOHasExpired(uint64_t nwid, int64_t ts)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
|
||||
(*d)->networkMemberSSOHasExpired(nwid, ts);
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::networks(std::set<uint64_t> &networks)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_dbs_l);
|
||||
|
@ -248,47 +240,4 @@ void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,u
|
|||
_listener->onNetworkMemberDeauthorize(this,networkId,memberId);
|
||||
}
|
||||
|
||||
void DBMirrorSet::membersExpiring(std::set< std::pair<uint64_t, uint64_t> > &soon, std::set< std::pair<uint64_t, uint64_t> > &expired)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_membersExpiringSoon_l);
|
||||
int64_t now = OSUtils::now();
|
||||
for(auto next=_membersExpiringSoon.begin();next!=_membersExpiringSoon.end();) {
|
||||
if (next->first > now) {
|
||||
const uint64_t nwid = next->second.first;
|
||||
const uint64_t memberId = next->second.second;
|
||||
nlohmann::json network, member;
|
||||
if (this->get(nwid, network, memberId, member)) {
|
||||
try {
|
||||
const bool authorized = member["authorized"];
|
||||
const bool ssoExempt = member["ssoExempt"];
|
||||
const int64_t authenticationExpiryTime = member["authenticationExpiryTime"];
|
||||
if ((authenticationExpiryTime == next->first)&&(authorized)&&(!ssoExempt)) {
|
||||
if ((authenticationExpiryTime - now) > ZT_MEMBER_AUTH_TIMEOUT_NOTIFY_BEFORE) {
|
||||
// Stop when we get to entries too far in the future.
|
||||
break;
|
||||
} else {
|
||||
const bool ssoEnabled = network["ssoEnabled"];
|
||||
if (ssoEnabled)
|
||||
soon.insert(std::pair<uint64_t, uint64_t>(nwid, memberId));
|
||||
}
|
||||
} else {
|
||||
// Obsolete entry, no longer authorized, or SSO exempt.
|
||||
}
|
||||
} catch ( ... ) {
|
||||
// Invalid member object, erase.
|
||||
}
|
||||
} else {
|
||||
// Not found.
|
||||
}
|
||||
}
|
||||
_membersExpiringSoon.erase(next++);
|
||||
}
|
||||
}
|
||||
|
||||
void DBMirrorSet::memberWillExpire(int64_t expTime, uint64_t nwid, uint64_t memberId)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_membersExpiringSoon_l);
|
||||
_membersExpiringSoon.insert(std::pair< int64_t, std::pair< uint64_t, uint64_t > >(expTime, std::pair< uint64_t, uint64_t >(nwid, memberId)));
|
||||
}
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
|
@ -52,7 +52,6 @@ public:
|
|||
virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
|
||||
|
||||
AuthInfo getSSOAuthInfo(const nlohmann::json &member, const std::string &redirectURL);
|
||||
void networkMemberSSOHasExpired(uint64_t nwid, int64_t ts);
|
||||
|
||||
inline void addDB(const std::shared_ptr<DB> &db)
|
||||
{
|
||||
|
@ -61,17 +60,12 @@ public:
|
|||
_dbs.push_back(db);
|
||||
}
|
||||
|
||||
void membersExpiring(std::set< std::pair<uint64_t, uint64_t> > &soon, std::set< std::pair<uint64_t, uint64_t> > &expired);
|
||||
void memberWillExpire(int64_t expTime, uint64_t nwid, uint64_t memberId);
|
||||
|
||||
private:
|
||||
DB::ChangeListener *const _listener;
|
||||
std::atomic_bool _running;
|
||||
std::thread _syncCheckerThread;
|
||||
std::vector< std::shared_ptr< DB > > _dbs;
|
||||
mutable std::mutex _dbs_l;
|
||||
std::set< std::pair< int64_t, std::pair<uint64_t, uint64_t> > > _membersExpiringSoon;
|
||||
mutable std::mutex _membersExpiringSoon_l;
|
||||
};
|
||||
|
||||
} // namespace ZeroTier
|
||||
|
|
|
@ -1262,6 +1262,7 @@ void EmbeddedNetworkController::_request(
|
|||
}
|
||||
const bool newMember = ((!member.is_object())||(member.empty()));
|
||||
DB::initMember(member);
|
||||
_MemberStatusKey msk(nwid,identity.address().toInt());
|
||||
|
||||
{
|
||||
const std::string haveIdStr(OSUtils::jsonString(member["identity"],""));
|
||||
|
@ -1335,43 +1336,21 @@ void EmbeddedNetworkController::_request(
|
|||
// Should we check SSO Stuff?
|
||||
// If network is configured with SSO, and the member is not marked exempt: yes
|
||||
// Otherwise no, we use standard auth logic.
|
||||
AuthInfo info;
|
||||
int64_t authenticationExpiryTime = -1;
|
||||
bool networkSSOEnabled = OSUtils::jsonBool(network["ssoEnabled"], false);
|
||||
bool memberSSOExempt = OSUtils::jsonBool(member["ssoExempt"], false);
|
||||
AuthInfo info;
|
||||
if (networkSSOEnabled && !memberSSOExempt) {
|
||||
// TODO: Get expiry time if auth is still valid
|
||||
|
||||
// else get new auth info & stuff
|
||||
authenticationExpiryTime = (int64_t)OSUtils::jsonInt(member["authenticationExpiryTime"], 0);
|
||||
info = _db.getSSOAuthInfo(member, _ssoRedirectURL);
|
||||
assert(info.enabled == networkSSOEnabled);
|
||||
|
||||
std::string memberId = member["id"];
|
||||
//fprintf(stderr, "ssoEnabled && !ssoExempt %s-%s\n", nwids, memberId.c_str());
|
||||
uint64_t authenticationExpiryTime = (int64_t)OSUtils::jsonInt(member["authenticationExpiryTime"], 0);
|
||||
fprintf(stderr, "authExpiryTime: %lld\n", authenticationExpiryTime);
|
||||
if (authenticationExpiryTime < now) {
|
||||
fprintf(stderr, "Handling expired member\n");
|
||||
if (authenticationExpiryTime <= now) {
|
||||
if (info.version == 0) {
|
||||
if (!info.authenticationURL.empty()) {
|
||||
_db.networkMemberSSOHasExpired(nwid, now);
|
||||
onNetworkMemberDeauthorize(&_db, nwid, identity.address().toInt());
|
||||
|
||||
Dictionary<4096> authInfo;
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_VERSION, (uint64_t)0ULL);
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_AUTHENTICATION_URL, info.authenticationURL.c_str());
|
||||
//fprintf(stderr, "sending auth URL: %s\n", authenticationURL.c_str());
|
||||
|
||||
DB::cleanMember(member);
|
||||
_db.save(member,true);
|
||||
|
||||
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED, authInfo.data(), authInfo.sizeBytes());
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (info.version == 1) {
|
||||
_db.networkMemberSSOHasExpired(nwid, now);
|
||||
onNetworkMemberDeauthorize(&_db, nwid, identity.address().toInt());
|
||||
|
||||
Dictionary<4096> authInfo;
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_VERSION, (uint64_t)0ULL);
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_AUTHENTICATION_URL, info.authenticationURL.c_str());
|
||||
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED, authInfo.data(), authInfo.sizeBytes());
|
||||
} else if (info.version == 1) {
|
||||
Dictionary<8192> authInfo;
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_VERSION, info.version);
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_ISSUER_URL, info.issuerURL.c_str());
|
||||
|
@ -1379,20 +1358,11 @@ void EmbeddedNetworkController::_request(
|
|||
authInfo.add(ZT_AUTHINFO_DICT_KEY_NONCE, info.ssoNonce.c_str());
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_STATE, info.ssoState.c_str());
|
||||
authInfo.add(ZT_AUTHINFO_DICT_KEY_CLIENT_ID, info.ssoClientID.c_str());
|
||||
|
||||
DB::cleanMember(member);
|
||||
_db.save(member, true);
|
||||
|
||||
fprintf(stderr, "Sending NC_ERROR_AUTHENTICATION_REQUIRED\n");
|
||||
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED, authInfo.data(), authInfo.sizeBytes());
|
||||
return;
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "invalid sso info.version %llu\n", info.version);
|
||||
}
|
||||
} else if (authorized) {
|
||||
fprintf(stderr, "Setting member will expire to: %lld\n", authenticationExpiryTime);
|
||||
_db.memberWillExpire(authenticationExpiryTime, nwid, identity.address().toInt());
|
||||
DB::cleanMember(member);
|
||||
_db.save(member,true);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1411,8 +1381,8 @@ void EmbeddedNetworkController::_request(
|
|||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_memberStatus_l);
|
||||
_MemberStatus &ms = _memberStatus[_MemberStatusKey(nwid,identity.address().toInt())];
|
||||
|
||||
_MemberStatus &ms = _memberStatus[msk];
|
||||
ms.authenticationExpiryTime = authenticationExpiryTime;
|
||||
ms.vMajor = (int)vMajor;
|
||||
ms.vMinor = (int)vMinor;
|
||||
ms.vRev = (int)vRev;
|
||||
|
@ -1420,9 +1390,13 @@ void EmbeddedNetworkController::_request(
|
|||
ms.lastRequestMetaData = metaData;
|
||||
ms.identity = identity;
|
||||
}
|
||||
}
|
||||
|
||||
if (authenticationExpiryTime > 0) {
|
||||
std::lock_guard<std::mutex> l(_expiringSoon_l);
|
||||
_expiringSoon.insert(std::pair<int64_t, _MemberStatusKey>(authenticationExpiryTime, msk));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
// If they are not authorized, STOP!
|
||||
DB::cleanMember(member);
|
||||
_db.save(member,true);
|
||||
|
@ -1434,18 +1408,13 @@ void EmbeddedNetworkController::_request(
|
|||
// If we made it this far, they are authorized (and authenticated).
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
int64_t credentialtmd = ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MAX_MAX_DELTA;
|
||||
if (now > ns.mostRecentDeauthTime) {
|
||||
// If we recently de-authorized a member, shrink credential TTL/max delta to
|
||||
// be below the threshold required to exclude it. Cap this to a min/max to
|
||||
// prevent jitter or absurdly large values.
|
||||
const uint64_t deauthWindow = now - ns.mostRecentDeauthTime;
|
||||
if (deauthWindow < ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MIN_MAX_DELTA) {
|
||||
credentialtmd = ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MIN_MAX_DELTA;
|
||||
} else if (deauthWindow < (ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MAX_MAX_DELTA + 5000ULL)) {
|
||||
credentialtmd = deauthWindow - 5000ULL;
|
||||
}
|
||||
// Default timeout: 15 minutes. Maximum: two hours. Can be specified by an optional field in the network config
|
||||
// if something longer than 15 minutes is desired. Minimum is 5 minutes since shorter than that would be flaky.
|
||||
int64_t credentialtmd = ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_DFL_MAX_DELTA;
|
||||
if (network.contains("certificateTimeoutWindowSize")) {
|
||||
credentialtmd = (int64_t)network["certificateTimeoutWindowSize"];
|
||||
}
|
||||
credentialtmd = std::max(std::min(credentialtmd, ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MAX_MAX_DELTA), ZT_NETWORKCONFIG_DEFAULT_CREDENTIAL_TIME_MIN_MAX_DELTA);
|
||||
|
||||
std::unique_ptr<NetworkConfig> nc(new NetworkConfig());
|
||||
|
||||
|
@ -1460,7 +1429,7 @@ void EmbeddedNetworkController::_request(
|
|||
nc->mtu = std::max(std::min((unsigned int)OSUtils::jsonInt(network["mtu"],ZT_DEFAULT_MTU),(unsigned int)ZT_MAX_MTU),(unsigned int)ZT_MIN_MTU);
|
||||
nc->multicastLimit = (unsigned int)OSUtils::jsonInt(network["multicastLimit"],32ULL);
|
||||
|
||||
nc->ssoEnabled = OSUtils::jsonBool(network["ssoEnabled"], false);
|
||||
nc->ssoEnabled = networkSSOEnabled; //OSUtils::jsonBool(network["ssoEnabled"], false);
|
||||
nc->ssoVersion = info.version;
|
||||
|
||||
if (info.version == 0) {
|
||||
|
@ -1858,6 +1827,8 @@ void EmbeddedNetworkController::_startThreads()
|
|||
const long hwc = std::max((long)std::thread::hardware_concurrency(),(long)1);
|
||||
for(long t=0;t<hwc;++t) {
|
||||
_threads.emplace_back([this]() {
|
||||
std::vector<_MemberStatusKey> expired;
|
||||
nlohmann::json network, member;
|
||||
for(;;) {
|
||||
_RQEntry *qe = (_RQEntry *)0;
|
||||
auto timedWaitResult = _queue.get(qe, 1000);
|
||||
|
@ -1876,28 +1847,31 @@ void EmbeddedNetworkController::_startThreads()
|
|||
}
|
||||
}
|
||||
|
||||
std::set< std::pair<uint64_t, uint64_t> > soon;
|
||||
std::set< std::pair<uint64_t, uint64_t> > expired;
|
||||
_db.membersExpiring(soon, expired);
|
||||
|
||||
for(auto s=soon.begin();s!=soon.end();++s) {
|
||||
Identity identity;
|
||||
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> lastMetaData;
|
||||
{
|
||||
std::unique_lock<std::mutex> ll(_memberStatus_l);
|
||||
auto ms = _memberStatus.find(_MemberStatusKey(s->first, s->second));
|
||||
if (ms != _memberStatus.end()) {
|
||||
lastMetaData = ms->second.lastRequestMetaData;
|
||||
identity = ms->second.identity;
|
||||
expired.clear();
|
||||
int64_t now = OSUtils::now();
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_expiringSoon_l);
|
||||
for(auto s=_expiringSoon.begin();s!=_expiringSoon.end();) {
|
||||
const int64_t when = s->first;
|
||||
if (when <= now) {
|
||||
// The user may have re-authorized, so we must actually look it up and check.
|
||||
network.clear();
|
||||
member.clear();
|
||||
if (_db.get(s->second.networkId, network, s->second.nodeId, member)) {
|
||||
int64_t authenticationExpiryTime = (int64_t)OSUtils::jsonInt(member["authenticationExpiryTime"], 0);
|
||||
if (authenticationExpiryTime <= now) {
|
||||
expired.push_back(s->second);
|
||||
}
|
||||
}
|
||||
_expiringSoon.erase(s++);
|
||||
} else {
|
||||
// Don't bother going further into the future than necessary.
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (identity) {
|
||||
request(s->first,InetAddress(),0,identity,lastMetaData);
|
||||
}
|
||||
}
|
||||
|
||||
for(auto e=expired.begin();e!=expired.end();++e) {
|
||||
onNetworkMemberDeauthorize(nullptr, e->first, e->second);
|
||||
onNetworkMemberDeauthorize(nullptr, e->networkId, e->nodeId);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
#include "../osdep/Thread.hpp"
|
||||
#include "../osdep/BlockingQueue.hpp"
|
||||
|
||||
#include "../ext/json/json.hpp"
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "DB.hpp"
|
||||
#include "DBMirrorSet.hpp"
|
||||
|
@ -109,6 +109,7 @@ private:
|
|||
RQENTRY_TYPE_REQUEST = 0
|
||||
} type;
|
||||
};
|
||||
|
||||
struct _MemberStatusKey
|
||||
{
|
||||
_MemberStatusKey() : networkId(0),nodeId(0) {}
|
||||
|
@ -116,11 +117,13 @@ private:
|
|||
uint64_t networkId;
|
||||
uint64_t nodeId;
|
||||
inline bool operator==(const _MemberStatusKey &k) const { return ((k.networkId == networkId)&&(k.nodeId == nodeId)); }
|
||||
inline bool operator<(const _MemberStatusKey &k) const { return (k.networkId < networkId) || ((k.networkId == networkId)&&(k.nodeId < nodeId)); }
|
||||
};
|
||||
struct _MemberStatus
|
||||
{
|
||||
_MemberStatus() : lastRequestTime(0),vMajor(-1),vMinor(-1),vRev(-1),vProto(-1) {}
|
||||
uint64_t lastRequestTime;
|
||||
_MemberStatus() : lastRequestTime(0),authenticationExpiryTime(-1),vMajor(-1),vMinor(-1),vRev(-1),vProto(-1) {}
|
||||
int64_t lastRequestTime;
|
||||
int64_t authenticationExpiryTime;
|
||||
int vMajor,vMinor,vRev,vProto;
|
||||
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> lastRequestMetaData;
|
||||
Identity identity;
|
||||
|
@ -152,6 +155,9 @@ private:
|
|||
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
|
||||
std::mutex _memberStatus_l;
|
||||
|
||||
std::set< std::pair<int64_t, _MemberStatusKey> > _expiringSoon;
|
||||
std::mutex _expiringSoon_l;
|
||||
|
||||
RedisConfig *_rc;
|
||||
std::string _ssoRedirectURL;
|
||||
};
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include <chrono>
|
||||
|
||||
|
||||
// #define ZT_TRACE 1
|
||||
// #define REDIS_TRACE 1
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
|
@ -783,6 +783,7 @@ void PostgreSQL::initializeMembers()
|
|||
std::string assignedAddresses = std::get<20>(row);
|
||||
|
||||
config["id"] = memberId;
|
||||
config["address"] = memberId;
|
||||
config["nwid"] = networkId;
|
||||
config["activeBridge"] = activeBridge.value_or(false);
|
||||
config["authorized"] = authorized.value_or(false);
|
||||
|
@ -942,30 +943,31 @@ void PostgreSQL::_membersWatcher_Postgres() {
|
|||
void PostgreSQL::_membersWatcher_Redis() {
|
||||
char buf[11] = {0};
|
||||
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
|
||||
std::string lastID = "0";
|
||||
fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
|
||||
while (_run == 1) {
|
||||
try {
|
||||
json tmp;
|
||||
std::unordered_map<std::string, ItemStream> result;
|
||||
if (_rc->clusterMode) {
|
||||
_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
_cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
} else {
|
||||
_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
_redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
}
|
||||
if (!result.empty()) {
|
||||
for (auto element : result) {
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
|
||||
#endif
|
||||
for (auto rec : element.second) {
|
||||
std::string id = rec.first;
|
||||
auto attrs = rec.second;
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "Record ID: %s\n", id.c_str());
|
||||
fprintf(stdout, "attrs len: %lu\n", attrs.size());
|
||||
#endif
|
||||
for (auto a : attrs) {
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
|
||||
#endif
|
||||
try {
|
||||
|
@ -987,6 +989,7 @@ void PostgreSQL::_membersWatcher_Redis() {
|
|||
} else {
|
||||
_redis->xdel(key, id);
|
||||
}
|
||||
lastID = id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1029,31 +1032,31 @@ void PostgreSQL::_networksWatcher_Postgres() {
|
|||
void PostgreSQL::_networksWatcher_Redis() {
|
||||
char buf[11] = {0};
|
||||
std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
|
||||
|
||||
std::string lastID = "0";
|
||||
while (_run == 1) {
|
||||
try {
|
||||
json tmp;
|
||||
std::unordered_map<std::string, ItemStream> result;
|
||||
if (_rc->clusterMode) {
|
||||
_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
_cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
} else {
|
||||
_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
_redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
||||
}
|
||||
|
||||
if (!result.empty()) {
|
||||
for (auto element : result) {
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
|
||||
#endif
|
||||
for (auto rec : element.second) {
|
||||
std::string id = rec.first;
|
||||
auto attrs = rec.second;
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "Record ID: %s\n", id.c_str());
|
||||
fprintf(stdout, "attrs len: %lu\n", attrs.size());
|
||||
#endif
|
||||
for (auto a : attrs) {
|
||||
#ifdef ZT_TRACE
|
||||
#ifdef REDIS_TRACE
|
||||
fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
|
||||
#endif
|
||||
try {
|
||||
|
@ -1075,6 +1078,7 @@ void PostgreSQL::_networksWatcher_Redis() {
|
|||
} else {
|
||||
_redis->xdel(key, id);
|
||||
}
|
||||
lastID = id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue