Add multi-core concurrent packet processing

This commit is contained in:
Joseph Henry 2024-02-23 09:09:46 -08:00
commit 2d89f07156
No known key found for this signature in database
GPG key ID: C45B33FF5EBC9344
12 changed files with 400 additions and 190 deletions

View file

@ -69,6 +69,7 @@ static bool fethMaxMtuAdjusted = false;
MacEthernetTap::MacEthernetTap(
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
@ -77,6 +78,7 @@ MacEthernetTap::MacEthernetTap(
void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len),
void *arg) :
_handler(handler),
_concurrency(concurrency),
_arg(arg),
_nwid(nwid),
_homePath(homePath),
@ -286,6 +288,9 @@ MacEthernetTap::~MacEthernetTap()
}
Thread::join(_thread);
for (std::thread &t : _rxThreads) {
t.join();
}
}
void MacEthernetTap::setEnabled(bool en) { _enabled = en; }
@ -474,17 +479,25 @@ void MacEthernetTap::setMtu(unsigned int mtu)
void MacEthernetTap::threadMain()
throw()
{
Thread::sleep(250);
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i] {
fprintf(stderr, "starting thread %d\n", i);
char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE];
char agentStderrBuf[256];
fd_set readfds,nullfds;
MAC to,from;
Thread::sleep(250);
const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1;
long agentReadPtr = 0;
fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK);
fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK);
if (i == 0) {
fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK);
fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK);
}
FD_ZERO(&readfds);
FD_ZERO(&nullfds);
@ -533,6 +546,7 @@ void MacEthernetTap::threadMain()
*/
}
}
}));}
::close(_agentStdin);
::close(_agentStdout);