use set_alert_dispatch instead of timer to get an alerts from libtorrent

libtorrent allows setting a custom dispatch handler that is invoked in
libtorrent thread when new alerts are incoming. QAlertDispatcher is a
class that allows to translate these alerts to UI thread.

The concept is very simple:

1. On initialization QAlertDispatcher constructor calls set_alert_dispatch() passing
 QAlertDispatcher::dispatch as argument.

2. On deinitialization destructor calls set_alert_dispatch() passing a empty
 function. (line 25) libtorrent handles thos and switches back to queuing
 alerts in queue.

3. QAlertDispatcher::dispatch() adds alert to queue and notifies UI thread that new
 alerts are incoming. Enqueuing is done in function enqueueToMainThread().
 The invariant of class is the following:

    if alert queue is not empty, in message loop of UI thread contains a queued
    invocation of deliverSignal().

4. When message loop is pumped UI thread execute deliverSignal() function.
 It emit appropriate signal and if queue is still not empty (for example
 if slot doesn't grab alerts) rewind enqueuing to main thread.

This is a idea. But here is some details.

1. When QAlertDispatcher is destoyed, libtorrent still can call
QAlertDispatcher::dispatch a few times after destruction. This is
handled by passing a "tag". A tag is a object that references QAlertDispatch.
Tag could be invalidated. So on destruction QAlertDispatcher invalidates a tag
and then unsubscribes from alerts. When QAlertDispatcher::dispatch is called
with invalid tag it simply discard an alert.

    Therefore we could drop a few alerts during unsubscription. So we unsubscribe
    only at exit when missing some alerts is not a problem.

2. Another problem is in QBtSession::saveFastResumeData(). It pumps alert
queue synchronously. My first attempt was to destroy QAlertDispatcher
and then pump libtorrent queue. But as I was afraid of losing alerts I
supported synchronous querying of alerts in QAlertDispatcher.
(QAlertDispatcher::getPendingAlerts)

Conflicts:
	src/qtlibtorrent/qbtsession.cpp
This commit is contained in:
Ivan Sorokin 2014-05-17 13:04:33 +04:00
parent 329b754197
commit eb46326d23
5 changed files with 632 additions and 487 deletions

View file

@ -0,0 +1,93 @@
#include "alertdispatcher.h"
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <QMutexLocker>
QAlertDispatcher::QAlertDispatcher(libtorrent::session *session, QObject* parent)
: QObject(parent)
, session(session)
, current_tag(new QAtomicPointer<QAlertDispatcher>(this))
, event_posted(false) {
session->set_alert_dispatch(boost::bind(&QAlertDispatcher::dispatch, current_tag, _1));
}
QAlertDispatcher::~QAlertDispatcher() {
// When QAlertDispatcher is destoyed, libtorrent still can call
// QAlertDispatcher::dispatch a few times after destruction. This is
// handled by passing a "tag". A tag is a object that references QAlertDispatch.
// Tag could be invalidated. So on destruction QAlertDispatcher invalidates a tag
// and then unsubscribes from alerts. When QAlertDispatcher::dispatch is called
// with invalid tag it simply discard an alert.
{
QMutexLocker lock(&(alerts_mutex));
*current_tag = 0;
current_tag.clear();
}
typedef boost::function<void (std::auto_ptr<libtorrent::alert>)> dispatch_function_t;
session->set_alert_dispatch(dispatch_function_t());
}
void QAlertDispatcher::getPendingAlertsNoWait(std::deque<libtorrent::alert*>& out) {
Q_ASSERT(out.empty());
QMutexLocker lock(&(alerts_mutex));
std::swap(alerts, out);
event_posted = false;
}
void QAlertDispatcher::getPendingAlerts(std::deque<libtorrent::alert*>& out) {
assert(out.empty());
QMutexLocker lock(&(alerts_mutex));
while (alerts.empty())
alerts_condvar.wait(&(alerts_mutex));
std::swap(alerts, out);
event_posted = false;
}
void QAlertDispatcher::dispatch(QSharedPointer<QAtomicPointer<QAlertDispatcher> > tag,
std::auto_ptr<libtorrent::alert> alert_ptr) {
QAlertDispatcher* that = *tag;
if (!that)
return;
QMutexLocker lock(&(that->alerts_mutex));
that = *tag;
if (!that)
return;
bool was_empty = that->alerts.empty();
that->alerts.push_back(alert_ptr.get());
alert_ptr.release();
if (was_empty)
that->alerts_condvar.wakeAll();
that->enqueueToMainThread();
Q_ASSERT(that->current_tag == tag);
}
void QAlertDispatcher::enqueueToMainThread() {
if (!event_posted) {
event_posted = true;
QMetaObject::invokeMethod(this, "deliverSignal", Qt::QueuedConnection);
}
}
void QAlertDispatcher::deliverSignal() {
emit alertsReceived();
QMutexLocker lock(&(alerts_mutex));
event_posted = false;
if (!alerts.empty())
enqueueToMainThread();
}

View file

@ -0,0 +1,43 @@
#ifndef ALERTDISPATCHER_H
#define ALERTDISPATCHER_H
#include <QObject>
#include <QMutex>
#include <QWaitCondition>
#include <QAtomicPointer>
#include <QSharedPointer>
#include <libtorrent/session.hpp>
class QAlertDispatcher : public QObject
{
Q_OBJECT
Q_DISABLE_COPY(QAlertDispatcher)
public:
QAlertDispatcher(libtorrent::session *session, QObject* parent);
~QAlertDispatcher();
void getPendingAlertsNoWait(std::deque<libtorrent::alert*>&);
void getPendingAlerts(std::deque<libtorrent::alert*>&);
signals:
void alertsReceived();
private:
static void dispatch(QSharedPointer<QAtomicPointer<QAlertDispatcher> >,
std::auto_ptr<libtorrent::alert>);
void enqueueToMainThread();
private slots:
void deliverSignal();
private:
libtorrent::session *session;
QMutex alerts_mutex;
QWaitCondition alerts_condvar;
std::deque<libtorrent::alert*> alerts;
QSharedPointer<QAtomicPointer<QAlertDispatcher> > current_tag;
bool event_posted;
};
#endif // ALERTDISPATCHER_H

View file

@ -114,6 +114,7 @@ QBtSession::QBtSession()
, m_upnp(0), m_natpmp(0)
#endif
, m_dynDNSUpdater(0)
, m_alertDispatcher(0)
{
BigRatioTimer = new QTimer(this);
BigRatioTimer->setInterval(10000);
@ -147,9 +148,8 @@ QBtSession::QBtSession()
PeXEnabled = false;
}
s->add_extension(&create_smart_ban_plugin);
timerAlerts = new QTimer(this);
connect(timerAlerts, SIGNAL(timeout()), SLOT(readAlerts()));
timerAlerts->start(1000);
m_alertDispatcher = new QAlertDispatcher(s, this);
connect(m_alertDispatcher, SIGNAL(alertsReceived()), SLOT(readAlerts()));
appendLabelToSavePath = pref.appendTorrentLabel();
appendqBExtension = pref.useIncompleteFilesExtension();
connect(m_scanFolders, SIGNAL(torrentsAdded(QStringList&)), SLOT(addTorrentsFromScanFolder(QStringList&)));
@ -179,7 +179,6 @@ QBtSession::~QBtSession() {
// Delete our objects
if (m_tracker)
delete m_tracker;
delete timerAlerts;
if (BigRatioTimer)
delete BigRatioTimer;
if (filterParser)
@ -190,6 +189,7 @@ QBtSession::~QBtSession() {
// HTTP Server
if (httpServer)
delete httpServer;
delete m_alertDispatcher;
qDebug("Deleting the session");
delete s;
qDebug("BTSession destructor OUT");
@ -1608,7 +1608,6 @@ void QBtSession::saveFastResumeData() {
qDebug("Saving fast resume data...");
// Stop listening for alerts
resumeDataTimer.stop();
timerAlerts->stop();
int num_resume_data = 0;
// Pause session
s->pause();
@ -1633,17 +1632,16 @@ void QBtSession::saveFastResumeData() {
} catch(libtorrent::invalid_handle&) {}
}
while (num_resume_data > 0) {
alert const* a = s->wait_for_alert(seconds(30));
if (a == 0) {
std::cerr << " aborting with " << num_resume_data << " outstanding "
"torrents to save resume data for" << std::endl;
break;
}
std::deque<alert*> alerts;
m_alertDispatcher->getPendingAlerts(alerts);
for (std::deque<alert*>::const_iterator i = alerts.begin(), end = alerts.end(); i != end; ++i)
{
alert const* a = *i;
// Saving fastresume data can fail
save_resume_data_failed_alert const* rda = dynamic_cast<save_resume_data_failed_alert const*>(a);
if (rda) {
--num_resume_data;
s->pop_alert();
try {
// Remove torrent from session
if (rda->handle.is_valid())
@ -1653,7 +1651,6 @@ void QBtSession::saveFastResumeData() {
}
save_resume_data_alert const* rd = dynamic_cast<save_resume_data_alert const*>(a);
if (!rd) {
s->pop_alert();
continue;
}
// Saving fast resume data was successful
@ -1677,8 +1674,10 @@ void QBtSession::saveFastResumeData() {
}
// Remove torrent from session
s->remove_torrent(rd->handle);
s->pop_alert();
} catch(libtorrent::invalid_handle&) {}
delete a;
}
}
}
@ -2117,11 +2116,20 @@ void QBtSession::sendNotificationEmail(const QTorrentHandle &h) {
// Read alerts sent by the Bittorrent session
void QBtSession::readAlerts() {
// look at session alerts and display some infos
std::auto_ptr<alert> a = s->pop_alert();
while (a.get()) {
typedef std::deque<alert*> alerts_t;
alerts_t alerts;
m_alertDispatcher->getPendingAlertsNoWait(alerts);
for (alerts_t::const_iterator i = alerts.begin(), end = alerts.end(); i != end; ++i) {
handleAlert(*i);
delete *i;
}
}
void QBtSession::handleAlert(libtorrent::alert* a) {
try {
if (torrent_finished_alert* p = dynamic_cast<torrent_finished_alert*>(a.get())) {
if (torrent_finished_alert* p = dynamic_cast<torrent_finished_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
const QString hash = h.hash();
@ -2230,7 +2238,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (save_resume_data_alert* p = dynamic_cast<save_resume_data_alert*>(a.get())) {
else if (save_resume_data_alert* p = dynamic_cast<save_resume_data_alert*>(a)) {
const QDir torrentBackup(fsutils::BTBackupLocation());
const QTorrentHandle h(p->handle);
if (h.is_valid() && p->resume_data) {
@ -2248,7 +2256,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (file_renamed_alert* p = dynamic_cast<file_renamed_alert*>(a.get())) {
else if (file_renamed_alert* p = dynamic_cast<file_renamed_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
if (h.num_files() > 1) {
@ -2271,7 +2279,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (torrent_deleted_alert* p = dynamic_cast<torrent_deleted_alert*>(a.get())) {
else if (torrent_deleted_alert* p = dynamic_cast<torrent_deleted_alert*>(a)) {
qDebug("A torrent was deleted from the hard disk, attempting to remove the root folder too...");
QString hash = misc::toQString(p->info_hash);
if (!hash.isEmpty()) {
@ -2293,7 +2301,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (storage_moved_alert* p = dynamic_cast<storage_moved_alert*>(a.get())) {
else if (storage_moved_alert* p = dynamic_cast<storage_moved_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
// Attempt to remove old folder if empty
@ -2313,7 +2321,7 @@ void QBtSession::readAlerts() {
//h.force_recheck();
}
}
else if (metadata_received_alert* p = dynamic_cast<metadata_received_alert*>(a.get())) {
else if (metadata_received_alert* p = dynamic_cast<metadata_received_alert*>(a)) {
QTorrentHandle h(p->handle);
Preferences pref;
if (h.is_valid()) {
@ -2362,7 +2370,7 @@ void QBtSession::readAlerts() {
}
}
else if (file_error_alert* p = dynamic_cast<file_error_alert*>(a.get())) {
else if (file_error_alert* p = dynamic_cast<file_error_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
h.pause();
@ -2376,7 +2384,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (file_completed_alert* p = dynamic_cast<file_completed_alert*>(a.get())) {
else if (file_completed_alert* p = dynamic_cast<file_completed_alert*>(a)) {
QTorrentHandle h(p->handle);
qDebug("A file completed download in torrent %s", qPrintable(h.name()));
if (appendqBExtension) {
@ -2390,7 +2398,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (torrent_paused_alert* p = dynamic_cast<torrent_paused_alert*>(a.get())) {
else if (torrent_paused_alert* p = dynamic_cast<torrent_paused_alert*>(a)) {
if (p->handle.is_valid()) {
QTorrentHandle h(p->handle);
if (!HiddenData::hasData(h.hash())) {
@ -2400,7 +2408,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (tracker_error_alert* p = dynamic_cast<tracker_error_alert*>(a.get())) {
else if (tracker_error_alert* p = dynamic_cast<tracker_error_alert*>(a)) {
// Level: fatal
QTorrentHandle h(p->handle);
if (h.is_valid()) {
@ -2418,7 +2426,7 @@ void QBtSession::readAlerts() {
}
}
}
else if (tracker_reply_alert* p = dynamic_cast<tracker_reply_alert*>(a.get())) {
else if (tracker_reply_alert* p = dynamic_cast<tracker_reply_alert*>(a)) {
const QTorrentHandle h(p->handle);
if (h.is_valid()) {
qDebug("Received a tracker reply from %s (Num_peers=%d)", p->url.c_str(), p->num_peers);
@ -2432,7 +2440,7 @@ void QBtSession::readAlerts() {
trackersInfos[h.hash()] = trackers_data;
}
}
else if (tracker_warning_alert* p = dynamic_cast<tracker_warning_alert*>(a.get())) {
else if (tracker_warning_alert* p = dynamic_cast<tracker_warning_alert*>(a)) {
const QTorrentHandle h(p->handle);
if (h.is_valid()) {
// Connection was successful now but there is a warning message
@ -2445,16 +2453,16 @@ void QBtSession::readAlerts() {
qDebug("Received a tracker warning from %s: %s", p->url.c_str(), p->msg.c_str());
}
}
else if (portmap_error_alert* p = dynamic_cast<portmap_error_alert*>(a.get())) {
else if (portmap_error_alert* p = dynamic_cast<portmap_error_alert*>(a)) {
addConsoleMessage(tr("UPnP/NAT-PMP: Port mapping failure, message: %1").arg(misc::toQStringU(p->message())), "red");
//emit UPnPError(QString(p->msg().c_str()));
}
else if (portmap_alert* p = dynamic_cast<portmap_alert*>(a.get())) {
else if (portmap_alert* p = dynamic_cast<portmap_alert*>(a)) {
qDebug("UPnP Success, msg: %s", p->message().c_str());
addConsoleMessage(tr("UPnP/NAT-PMP: Port mapping successful, message: %1").arg(misc::toQStringU(p->message())), "blue");
//emit UPnPSuccess(QString(p->msg().c_str()));
}
else if (peer_blocked_alert* p = dynamic_cast<peer_blocked_alert*>(a.get())) {
else if (peer_blocked_alert* p = dynamic_cast<peer_blocked_alert*>(a)) {
boost::system::error_code ec;
string ip = p->ip.to_string(ec);
if (!ec) {
@ -2462,7 +2470,7 @@ void QBtSession::readAlerts() {
//emit peerBlocked(QString::fromLatin1(ip.c_str()));
}
}
else if (peer_ban_alert* p = dynamic_cast<peer_ban_alert*>(a.get())) {
else if (peer_ban_alert* p = dynamic_cast<peer_ban_alert*>(a)) {
boost::system::error_code ec;
string ip = p->ip.address().to_string(ec);
if (!ec) {
@ -2470,7 +2478,7 @@ void QBtSession::readAlerts() {
//emit peerBlocked(QString::fromLatin1(ip.c_str()));
}
}
else if (fastresume_rejected_alert* p = dynamic_cast<fastresume_rejected_alert*>(a.get())) {
else if (fastresume_rejected_alert* p = dynamic_cast<fastresume_rejected_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
qDebug("/!\\ Fast resume failed for %s, reason: %s", qPrintable(h.name()), p->message().c_str());
@ -2486,11 +2494,11 @@ void QBtSession::readAlerts() {
}
}
}
else if (url_seed_alert* p = dynamic_cast<url_seed_alert*>(a.get())) {
else if (url_seed_alert* p = dynamic_cast<url_seed_alert*>(a)) {
addConsoleMessage(tr("Url seed lookup failed for url: %1, message: %2").arg(misc::toQString(p->url)).arg(misc::toQStringU(p->message())), QString::fromUtf8("red"));
//emit urlSeedProblem(QString::fromUtf8(p->url.c_str()), QString::fromUtf8(p->msg().c_str()));
}
else if (listen_succeeded_alert *p = dynamic_cast<listen_succeeded_alert*>(a.get())) {
else if (listen_succeeded_alert *p = dynamic_cast<listen_succeeded_alert*>(a)) {
boost::system::error_code ec;
QString proto = "TCP";
#if LIBTORRENT_VERSION_NUM >= 10000
@ -2512,7 +2520,7 @@ void QBtSession::readAlerts() {
it->force_reannounce();
}
}
else if (listen_failed_alert *p = dynamic_cast<listen_failed_alert*>(a.get())) {
else if (listen_failed_alert *p = dynamic_cast<listen_failed_alert*>(a)) {
boost::system::error_code ec;
QString proto = "TCP";
#if LIBTORRENT_VERSION_NUM >= 10000
@ -2530,7 +2538,7 @@ void QBtSession::readAlerts() {
qDebug() << "Failed listening on " << proto << p->endpoint.address().to_string(ec).c_str() << "/" << p->endpoint.port();
addConsoleMessage(tr("qBittorrent failed listening on interface %1 port: %2/%3. Reason: %4", "e.g: qBittorrent failed listening on interface 192.168.0.1 port: TCP/6881. Reason: already in use").arg(p->endpoint.address().to_string(ec).c_str()).arg(proto).arg(QString::number(p->endpoint.port())).arg(misc::toQStringU(p->error.message())), "red");
}
else if (torrent_checked_alert* p = dynamic_cast<torrent_checked_alert*>(a.get())) {
else if (torrent_checked_alert* p = dynamic_cast<torrent_checked_alert*>(a)) {
QTorrentHandle h(p->handle);
if (h.is_valid()) {
const QString hash = h.hash();
@ -2556,16 +2564,13 @@ void QBtSession::readAlerts() {
}
}
}
else if (external_ip_alert *p = dynamic_cast<external_ip_alert*>(a.get())) {
else if (external_ip_alert *p = dynamic_cast<external_ip_alert*>(a)) {
boost::system::error_code ec;
addConsoleMessage(tr("External IP: %1", "e.g. External IP: 192.168.0.1").arg(p->external_address.to_string(ec).c_str()), "blue");
}
} catch (const std::exception& e) {
qWarning() << "Caught exception in readAlerts(): " << e.what();
}
a = s->pop_alert();
}
}
void QBtSession::recheckTorrent(const QString &hash) {

View file

@ -51,6 +51,7 @@
#include "qtracker.h"
#include "qtorrenthandle.h"
#include "trackerinfos.h"
#include "alertdispatcher.h"
#define MAX_SAMPLES 20
@ -190,6 +191,7 @@ private:
void updateRatioTimer();
void recoverPersistentData(const QString &hash, const std::vector<char> &buf);
void backupPersistentData(const QString &hash, boost::shared_ptr<libtorrent::entry> data);
void handleAlert(libtorrent::alert* a);
private slots:
void addTorrentsFromScanFolder(QStringList&);
@ -234,7 +236,6 @@ signals:
private:
// Bittorrent
libtorrent::session *s;
QPointer<QTimer> timerAlerts;
QPointer<BandwidthScheduler> bd_scheduler;
QMap<QUrl, QPair<QString, QString> > savepathLabel_fromurl; // Use QMap for compatibility with Qt < 4.7: qHash(QUrl)
QHash<QString, QHash<QString, TrackerInfos> > trackersInfos;
@ -287,6 +288,7 @@ private:
#endif
// DynDNS
DNSUpdater *m_dynDNSUpdater;
QAlertDispatcher* m_alertDispatcher;
};
#endif

View file

@ -5,11 +5,13 @@ HEADERS += $$PWD/qbtsession.h \
$$PWD/bandwidthscheduler.h \
$$PWD/trackerinfos.h \
$$PWD/torrentspeedmonitor.h \
$$PWD/filterparserthread.h
$$PWD/filterparserthread.h \
$$PWD/alertdispatcher.h
SOURCES += $$PWD/qbtsession.cpp \
$$PWD/qtorrenthandle.cpp \
$$PWD/torrentspeedmonitor.cpp
$$PWD/torrentspeedmonitor.cpp \
$$PWD/alertdispatcher.cpp
!contains(DEFINES, DISABLE_GUI) {
HEADERS += $$PWD/torrentmodel.h \