use shared mutexes for read/write locks

This commit is contained in:
Grant Limberg 2023-05-16 13:34:17 -07:00
commit 2771461334
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735
6 changed files with 53 additions and 48 deletions

View file

@ -111,14 +111,14 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network)
Metrics::db_get_network++;
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
std::shared_lock<std::shared_mutex> l2(nw->lock);
network = nw->config;
}
return true;
@ -130,14 +130,14 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t mem
Metrics::db_get_network_and_member++;
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
std::shared_lock<std::shared_mutex> l2(nw->lock);
network = nw->config;
auto m = nw->members.find(memberId);
if (m == nw->members.end())
@ -153,14 +153,14 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t mem
Metrics::db_get_network_and_member_and_summary++;
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
std::shared_lock<std::shared_mutex> l2(nw->lock);
network = nw->config;
_fillSummaryInfo(nw,info);
auto m = nw->members.find(memberId);
@ -177,14 +177,14 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma
Metrics::db_get_member_list++;
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
std::shared_lock<std::shared_mutex> l2(nw->lock);
network = nw->config;
for(auto m=nw->members.begin();m!=nw->members.end();++m) {
members.push_back(m->second);
@ -197,6 +197,7 @@ void DB::networks(std::set<uint64_t> &networks)
{
waitForReady();
Metrics::db_get_member_list++;
std::shared_lock<std::shared_mutex> l(_networks_l);
for(auto n=_networks.begin();n!=_networks.end();++n)
networks.insert(n->first);
}
@ -215,14 +216,14 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool no
networkId = OSUtils::jsonIntHex(old["nwid"],0ULL);
if ((memberId)&&(networkId)) {
{
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
auto nw2 = _networks.find(networkId);
if (nw2 != _networks.end()) {
nw = nw2->second;
}
}
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
std::unique_lock<std::shared_mutex> l(nw->lock);
if (OSUtils::jsonBool(old["activeBridge"],false)) {
nw->activeBridgeMembers.erase(memberId);
}
@ -252,7 +253,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool no
networkId = OSUtils::jsonIntHex(memberConfig["nwid"],0ULL);
if ((!memberId)||(!networkId))
return;
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
@ -260,7 +261,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool no
}
{
std::lock_guard<std::mutex> l(nw->lock);
std::unique_lock<std::shared_mutex> l(nw->lock);
nw->members[memberId] = memberConfig;
@ -293,18 +294,18 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool no
}
if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
std::unique_lock<std::shared_mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
}
}
} else if (memberId) {
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
std::unique_lock<std::shared_mutex> l(nw->lock);
nw->members.erase(memberId);
}
if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
auto er = _networkByMember.equal_range(memberId);
for(auto i=er.first;i!=er.second;++i) {
if (i->second == networkId) {
@ -334,7 +335,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool no
}
if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
std::unique_lock<std::shared_mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
}
@ -360,18 +361,18 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
if (networkId) {
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
std::unique_lock<std::shared_mutex> l2(nw->lock);
nw->config = networkConfig;
}
if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
std::unique_lock<std::shared_mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkUpdate(this,networkId,networkConfig);
}
@ -381,7 +382,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
const std::string ids = old["id"];
const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
_networks.erase(networkId);
}
}

View file

@ -29,7 +29,7 @@
#include <unordered_set>
#include <vector>
#include <atomic>
#include <mutex>
#include <shared_mutex>
#include <set>
#include <map>
@ -109,7 +109,7 @@ public:
inline bool hasNetwork(const uint64_t networkId) const
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
return (_networks.find(networkId) != _networks.end());
}
@ -124,7 +124,7 @@ public:
inline void each(F f)
{
nlohmann::json nullJson;
std::lock_guard<std::mutex> lck(_networks_l);
std::unique_lock<std::shared_mutex> lck(_networks_l);
for(auto nw=_networks.begin();nw!=_networks.end();++nw) {
f(nw->first,nw->second->config,0,nullJson); // first provide network with 0 for member ID
for(auto m=nw->second->members.begin();m!=nw->second->members.end();++m) {
@ -142,7 +142,7 @@ public:
inline void addListener(DB::ChangeListener *const listener)
{
std::lock_guard<std::mutex> l(_changeListeners_l);
std::unique_lock<std::shared_mutex> l(_changeListeners_l);
_changeListeners.push_back(listener);
}
@ -178,7 +178,7 @@ protected:
std::unordered_set<uint64_t> authorizedMembers;
std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps;
int64_t mostRecentDeauthTime;
std::mutex lock;
std::shared_mutex lock;
};
virtual void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners);
@ -188,8 +188,8 @@ protected:
std::vector<DB::ChangeListener *> _changeListeners;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
mutable std::mutex _changeListeners_l;
mutable std::mutex _networks_l;
mutable std::shared_mutex _changeListeners_l;
mutable std::shared_mutex _networks_l;
};
} // namespace ZeroTier

View file

@ -32,7 +32,7 @@ DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener)
std::vector< std::shared_ptr<DB> > dbs;
{
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
if (_dbs.size() <= 1)
continue; // no need to do this if there's only one DB, so skip the iteration
dbs = _dbs;
@ -79,7 +79,7 @@ DBMirrorSet::~DBMirrorSet()
bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->hasNetwork(networkId))
return true;
@ -89,7 +89,7 @@ bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network)) {
return true;
@ -100,7 +100,7 @@ bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,memberId,member))
return true;
@ -110,7 +110,7 @@ bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uin
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,memberId,member,info))
return true;
@ -120,7 +120,7 @@ bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uin
bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if ((*d)->get(networkId,network,members))
return true;
@ -130,7 +130,7 @@ bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vect
AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json &member, const std::string &redirectURL)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL);
if (info.enabled) {
@ -142,7 +142,7 @@ AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json &member, const std::st
void DBMirrorSet::networks(std::set<uint64_t> &networks)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->networks(networks);
}
@ -151,7 +151,7 @@ void DBMirrorSet::networks(std::set<uint64_t> &networks)
bool DBMirrorSet::waitForReady()
{
bool r = false;
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
r |= (*d)->waitForReady();
}
@ -160,7 +160,7 @@ bool DBMirrorSet::waitForReady()
bool DBMirrorSet::isReady()
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (!(*d)->isReady())
return false;
@ -172,7 +172,7 @@ bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
{
std::vector< std::shared_ptr<DB> > dbs;
{
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
dbs = _dbs;
}
if (notifyListeners) {
@ -192,7 +192,7 @@ bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
void DBMirrorSet::eraseNetwork(const uint64_t networkId)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseNetwork(networkId);
}
@ -200,7 +200,7 @@ void DBMirrorSet::eraseNetwork(const uint64_t networkId)
void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseMember(networkId,memberId);
}
@ -208,7 +208,7 @@ void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
std::lock_guard<std::mutex> l(_dbs_l);
std::shared_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->nodeIsOnline(networkId,memberId,physicalAddress);
}
@ -217,7 +217,7 @@ void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,
void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
{
nlohmann::json record(network);
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
(*d)->save(record,false);
@ -229,7 +229,7 @@ void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohma
void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
nlohmann::json record(member);
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
(*d)->save(record,false);

View file

@ -18,7 +18,7 @@
#include <vector>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <set>
#include <thread>
@ -56,7 +56,7 @@ public:
inline void addDB(const std::shared_ptr<DB> &db)
{
db->addListener(this);
std::lock_guard<std::mutex> l(_dbs_l);
std::unique_lock<std::shared_mutex> l(_dbs_l);
_dbs.push_back(db);
}
@ -65,7 +65,7 @@ private:
std::atomic_bool _running;
std::thread _syncCheckerThread;
std::vector< std::shared_ptr< DB > > _dbs;
mutable std::mutex _dbs_l;
mutable std::shared_mutex _dbs_l;
};
} // namespace ZeroTier

View file

@ -1176,6 +1176,8 @@ void EmbeddedNetworkController::_request(
const Identity &identity,
const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData)
{
Metrics::network_config_request++;
char nwids[24];
DB::NetworkSummaryInfo ns;
json network,member;
@ -1770,6 +1772,7 @@ void EmbeddedNetworkController::_startThreads()
const long hwc = std::max((long)std::thread::hardware_concurrency(),(long)1);
for(long t=0;t<hwc;++t) {
_threads.emplace_back([this]() {
Metrics::network_config_request_threads++;
for(;;) {
_RQEntry *qe = (_RQEntry *)0;
Metrics::network_config_request_queue_size = _queue.size();
@ -1790,6 +1793,7 @@ void EmbeddedNetworkController::_startThreads()
}
}
}
Metrics::network_config_request_threads--;
});
}
}

View file

@ -776,7 +776,7 @@ void PostgreSQL::initializeMembers()
if (_redisMemberStatus) {
fprintf(stderr, "Initialize Redis for members...\n");
std::lock_guard<std::mutex> l(_networks_l);
std::unique_lock<std::shared_mutex> l(_networks_l);
std::unordered_set<std::string> deletes;
for ( auto it : _networks) {
uint64_t nwid_i = it.first;
@ -1812,7 +1812,7 @@ uint64_t PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &con
sw::redis::RightBoundedInterval<double>(expireOld,
sw::redis::BoundType::LEFT_OPEN));
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_lock<std::shared_mutex> l(_networks_l);
for (const auto &it : _networks) {
uint64_t nwid_i = it.first;
char nwidTmp[64];