Merge pull request #4403 from glassez/fastresume

Improve .fastresume saving and torrents starting up. Closes #4315.
This commit is contained in:
sledgehammer999 2016-02-05 12:06:58 -06:00
commit 958b70e4ac
7 changed files with 245 additions and 87 deletions

View file

@ -43,6 +43,7 @@ using namespace BitTorrent;
#include <QTimer>
#include <QProcess>
#include <QCoreApplication>
#include <QThread>
#include <queue>
#include <vector>
@ -78,6 +79,7 @@ using namespace BitTorrent;
#include "private/filterparserthread.h"
#include "private/statistics.h"
#include "private/bandwidthscheduler.h"
#include "private/resumedatasavingmanager.h"
#include "trackerentry.h"
#include "tracker.h"
#include "magneturi.h"
@ -93,7 +95,7 @@ namespace libt = libtorrent;
using namespace BitTorrent;
static bool readFile(const QString &path, QByteArray &buf);
static bool loadTorrentResumeData(const QByteArray &data, AddTorrentData &out, MagnetUri &magnetUri);
static bool loadTorrentResumeData(const QByteArray &data, AddTorrentData &out, int &prio, MagnetUri &magnetUri);
static void torrentQueuePositionUp(const libt::torrent_handle &handle);
static void torrentQueuePositionDown(const libt::torrent_handle &handle);
@ -193,6 +195,11 @@ Session::Session(QObject *parent)
connect(&m_networkManager, SIGNAL(configurationRemoved(const QNetworkConfiguration&)), SLOT(networkConfigurationChange(const QNetworkConfiguration&)));
connect(&m_networkManager, SIGNAL(configurationChanged(const QNetworkConfiguration&)), SLOT(networkConfigurationChange(const QNetworkConfiguration&)));
m_ioThread = new QThread(this);
m_resumeDataSavingManager = new ResumeDataSavingManager(m_resumeFolderPath);
m_resumeDataSavingManager->moveToThread(m_ioThread);
connect(m_ioThread, SIGNAL(finished()), m_resumeDataSavingManager, SLOT(deleteLater()));
m_ioThread->start();
m_resumeDataTimer->start();
// initialize PortForwarder instance
@ -270,6 +277,9 @@ Session::~Session()
qDebug("Deleting the session");
delete m_nativeSession;
m_ioThread->quit();
m_ioThread->wait();
m_resumeFolderLock.close();
m_resumeFolderLock.remove();
}
@ -1701,7 +1711,16 @@ void Session::handleTorrentFinished(TorrentHandle *const torrent)
void Session::handleTorrentResumeDataReady(TorrentHandle *const torrent, const libtorrent::entry &data)
{
--m_numResumeData;
writeResumeDataFile(torrent, data);
// Separated thread is used for the blocking IO which results in slow processing of many torrents.
// Encoding data in parallel while doing IO saves time. Copying libtorrent::entry objects around
// isn't cheap too.
QByteArray out;
libt::bencode(std::back_inserter(out), data);
QMetaObject::invokeMethod(m_resumeDataSavingManager, "saveResumeData",
Q_ARG(QString, torrent->hash()), Q_ARG(QByteArray, out));
}
void Session::handleTorrentResumeDataFailed(TorrentHandle *const torrent)
@ -1868,48 +1887,63 @@ void Session::startUpTorrents()
const QDir resumeDataDir(m_resumeFolderPath);
QStringList fastresumes = resumeDataDir.entryList(
QStringList(QLatin1String("*.fastresume.*")), QDir::Files, QDir::Unsorted);
QStringList(QLatin1String("*.fastresume")), QDir::Files, QDir::Unsorted);
typedef QPair<int, QString> PrioHashPair;
typedef std::vector<PrioHashPair> PrioHashVector;
typedef std::greater<PrioHashPair> PrioHashGreater;
std::priority_queue<PrioHashPair, PrioHashVector, PrioHashGreater> torrentQueue;
// Fastresume file name format:
// <torrent_info_hash>.fastresume.<torrent_queue_position>
// E.g.:
// fc8a15a2faf2734dbb1dc5f7afdc5c9beaeb1f59.fastresume.2
QRegExp rx(QLatin1String("^([A-Fa-f0-9]{40})\\.fastresume\\.(\\d+)$"));
foreach (const QString &fastresume, fastresumes) {
if (rx.indexIn(fastresume) != -1) {
PrioHashPair p = qMakePair(rx.cap(2).toInt(), rx.cap(1));
torrentQueue.push(p);
}
}
QString filePath;
Logger *const logger = Logger::instance();
qDebug("Starting up torrents");
qDebug("Priority queue size: %ld", (long)torrentQueue.size());
// Resume downloads
while (!torrentQueue.empty()) {
const int prio = torrentQueue.top().first;
const QString hash = torrentQueue.top().second;
torrentQueue.pop();
typedef struct
{
QString hash;
MagnetUri magnetUri;
AddTorrentData addTorrentData;
QByteArray data;
} TorrentResumeData;
QString fastresumePath =
resumeDataDir.absoluteFilePath(QString("%1.fastresume.%2").arg(hash).arg(prio));
auto startupTorrent = [this, logger, resumeDataDir](const TorrentResumeData &params)
{
QString filePath = resumeDataDir.filePath(QString("%1.torrent").arg(params.hash));
qDebug() << "Starting up torrent" << params.hash << "...";
if (!addTorrent_impl(params.addTorrentData, params.magnetUri, TorrentInfo::loadFromFile(filePath), params.data))
logger->addMessage(tr("Unable to resume torrent '%1'.", "e.g: Unable to resume torrent 'hash'.")
.arg(params.hash), Log::CRITICAL);
};
qDebug("Starting up torrents");
qDebug("Queue size: %d", fastresumes.size());
// Resume downloads
QMap<int, TorrentResumeData> queuedResumeData;
int nextQueuePosition = 1;
QRegExp rx(QLatin1String("^([A-Fa-f0-9]{40})\\.fastresume$"));
foreach (const QString &fastresumeName, fastresumes) {
if (rx.indexIn(fastresumeName) == -1) continue;
QString hash = rx.cap(1);
QString fastresumePath = resumeDataDir.absoluteFilePath(fastresumeName);
QByteArray data;
AddTorrentData resumeData;
MagnetUri magnetUri;
if (readFile(fastresumePath, data) && loadTorrentResumeData(data, resumeData, magnetUri)) {
filePath = resumeDataDir.filePath(QString("%1.torrent").arg(hash));
qDebug("Starting up torrent %s ...", qPrintable(hash));
if (!addTorrent_impl(resumeData, magnetUri, TorrentInfo::loadFromFile(filePath), data))
logger->addMessage(tr("Unable to resume torrent '%1'.", "e.g: Unable to resume torrent 'hash'.")
.arg(Utils::Fs::toNativePath(hash)), Log::CRITICAL);
int queuePosition;
if (readFile(fastresumePath, data) && loadTorrentResumeData(data, resumeData, queuePosition, magnetUri)) {
if (queuePosition <= nextQueuePosition) {
startupTorrent({ hash, magnetUri, resumeData, data });
if (queuePosition == nextQueuePosition) {
++nextQueuePosition;
while (queuedResumeData.contains(nextQueuePosition)) {
startupTorrent(queuedResumeData.take(nextQueuePosition));
++nextQueuePosition;
}
}
}
else {
queuedResumeData[queuePosition] = { hash, magnetUri, resumeData, data };
}
}
}
// starting up downloading torrents (queue position > 0)
foreach (const TorrentResumeData &torrentResumeData, queuedResumeData)
startupTorrent(torrentResumeData);
}
quint64 Session::getAlltimeDL() const
@ -2338,7 +2372,7 @@ bool readFile(const QString &path, QByteArray &buf)
return true;
}
bool loadTorrentResumeData(const QByteArray &data, AddTorrentData &out, MagnetUri &magnetUri)
bool loadTorrentResumeData(const QByteArray &data, AddTorrentData &out, int &prio, MagnetUri &magnetUri)
{
out = AddTorrentData();
out.resumed = true;
@ -2365,31 +2399,11 @@ bool loadTorrentResumeData(const QByteArray &data, AddTorrentData &out, MagnetUr
out.addPaused = fast.dict_find_int_value("qBt-paused");
out.addForced = fast.dict_find_int_value("qBt-forced");
prio = fast.dict_find_int_value("qBt-queuePosition");
return true;
}
bool Session::writeResumeDataFile(TorrentHandle *const torrent, const libt::entry &data)
{
const QDir resumeDataDir(m_resumeFolderPath);
QStringList filters(QString("%1.fastresume.*").arg(torrent->hash()));
const QStringList files = resumeDataDir.entryList(filters, QDir::Files, QDir::Unsorted);
foreach (const QString &file, files)
Utils::Fs::forceRemove(resumeDataDir.absoluteFilePath(file));
QString filename = QString("%1.fastresume.%2").arg(torrent->hash()).arg(torrent->queuePosition());
QString filepath = resumeDataDir.absoluteFilePath(filename);
qDebug("Saving resume data in %s", qPrintable(filepath));
QFile resumeFile(filepath);
QVector<char> out;
libt::bencode(std::back_inserter(out), data);
if (resumeFile.open(QIODevice::WriteOnly))
return (resumeFile.write(&out[0], out.size()) == out.size());
return false;
}
void torrentQueuePositionUp(const libt::torrent_handle &handle)
{
try {