#pragma once #include "tcp_connect.h" #include "pe...

Tạo vào: 14 tháng 5, 2026

Trả lời bằng GPT-5.5 Thinking bởi Chat01

Câu hỏi

#pragma once
#include "tcp_connect.h"
#include "peer.h"
#include "torrent_file.h"
#include "piece_storage.h"
#include <iostream>
#include <string>
#include <utility>
#include <cstring>
#include <arpa/inet.h>
#include "byte_tools.h"
#include "message.h"
/*

  • Структура, хранящая информацию о доступности частей скачиваемого файла у данного пира
    /
    class PeerPiecesAvailability {
    public:
    PeerPiecesAvailability()
    : bitfield_() {}
    /

    • bitfield – массив байтов, в котором i-й бит означает наличие или отсутствие i-й части файла у пира
    • https://wiki.theory.org/BitTorrentSpecification#bitfield:_.3Clen.3D0001.2BX.3E.3Cid.3D5.3E.3Cbitfield.3E
      /
      explicit PeerPiecesAvailability(std::string bitfield)
      : bitfield_(std::move(bitfield)) {}
      /
    • Если ли часть под номером pieceIndex у пира?
      /
      bool IsPieceAvailable(size_t pieceIndex) const {
      size_t ind = pieceIndex / 8;
      if (ind >= bitfield_.size()) {
      return false;
      }
      uint8_t m = 1 << (7 - (pieceIndex % 8));
      return (static_cast<uint8_t>(bitfield_[ind]) & m) != 0;
      }
      /
    • Пометить часть под номером pieceIndex как доступную
      /
      void SetPieceAvailability(size_t pieceIndex) {
      size_t ind = pieceIndex / 8;
      if (ind >= bitfield_.size()) {
      bitfield_.resize(ind + 1, '\0');
      }
      uint8_t m = 1 << (7 - (pieceIndex % 8));
      bitfield_[ind] = static_cast<char>(static_cast<uint8_t>(bitfield_[ind]) | m);
      }
      void SetAllPiecesAvailable(size_t piecesCount) {
      if (piecesCount == 0) {
      bitfield_.clear();
      return;
      }
      bitfield_.assign((piecesCount + 7) / 8, static_cast<char>(0xff));
      }
      /
    • Сколько бит хранится в bitfield'е
      /
      size_t Size() const {
      return bitfield_.size() * 8;
      }
      private:
      std::string bitfield_;
      };
      /
  • Класс, представляющий соединение с одним пиром.

  • С помощью него можно подключиться к пиру и обмениваться с ним сообщениями
    /
    class PeerConnect {
    public:
    PeerConnect(const Peer& peer, const TorrentFile& tf, std::string selfPeerId, PieceStorage& pieceStorage)
    : tf_(tf)
    , socket_(peer.ip, peer.port, std::chrono::milliseconds(500), std::chrono::milliseconds(500))
    , selfPeerId_(std::move(selfPeerId))
    , terminated_(false)
    , choked_(true)
    , pieceStorage_(pieceStorage)
    , pieceInProgress_(nullptr)
    , pendingBlock_(false)
    , failed_(false){
    }
    ~PeerConnect() {
    Terminate();
    socket_.CloseConnection();
    }
    /

    • Основная функция, в которой будет происходить цикл общения с пиром.
    • https://wiki.theory.org/BitTorrentSpecification#Messages
      /
      void Run();
      void Terminate() {
      terminated_ = true;
      }
      bool Failed() const {
      return failed_;
      }
      protected:
      const TorrentFile& tf_;
      TcpConnect socket_;
      const std::string selfPeerId_;
      std::string peerId_;
      PeerPiecesAvailability piecesAvailability_;
      bool terminated_;
      bool choked_, failed_;
      PiecePtr pieceInProgress_;
      PieceStorage& pieceStorage_;
      bool pendingBlock_;
      /
    • Функция производит handshake.
      • Подключиться к пиру по протоколу TCP
      • Отправить пиру сообщение handshake
      • Проверить правильность ответа пира
    • https://wiki.theory.org/BitTorrentSpecification#Handshake
      /
      void PerformHandshake() {
      socket_.EstablishConnection();
      std::string handshake;
      handshake.push_back(19);
      handshake += "BitTorrent protocol";
      handshake.append(8, '\0');
      handshake += tf_.infoHash;
      handshake += selfPeerId_;
      socket_.SendData(handshake);
      std::string response = socket_.ReceiveData(68);
      if (response.size() != 68) {
      throw std::runtime_error("Wrong format");
      }
      if (static_cast<unsigned char>(response[0]) != 19) {
      throw std::runtime_error("Wrong protocol length");
      }
      std::string protocol = response.substr(1, 19);
      if (protocol != "BitTorrent protocol") {
      throw std::runtime_error("Invalid protocol name");
      }
      std::string infoHash = response.substr(28, 20);
      if (infoHash != tf_.infoHash) {
      throw std::runtime_error("Invalid info hash");
      }
      peerId_ = response.substr(48, 20);
      }
      /
      • Провести handshake
      • Получить bitfield с информацией о наличии у пира различных частей файла
      • Сообщить пиру, что мы готовы получать от него данные (отправить interested)
    • Возвращает true, если все три этапа прошли без ошибок
      /
      bool EstablishConnection() {
      try {
      PerformHandshake();
      piecesAvailability_.SetAllPiecesAvailable(tf_.pieceHashes.size());
      ReceiveBitfield();
      SendInterested();
      return true;
      } catch (...) {
      socket_.CloseConnection();
      return false;
      }
      }
      /
    • Функция читает из сокета bitfield с информацией о наличии у пира различных частей файла.
    • Полученную информацию надо сохранить в поле piecesAvailability_.
    • Также надо учесть, что сообщение тип Bitfield является опциональным, то есть пиры необязательно будут слать его.
    • Вместо этого они могут сразу прислать сообщение Unchoke, поэтому надо быть готовым обработать его в этой функции.
    • Обработка сообщения Unchoke заключается в выставлении флага choked_ в значение false
      /
      void ReceiveBitfield() {
      try {
      std::string messageString = socket_.ReceiveData();
      Message message = Message::Parse(messageString);
      if (message.id == MessageId::BitField) {
      piecesAvailability_ = PeerPiecesAvailability(message.payload);
      } else if (message.id == MessageId::Unchoke) {
      choked_ = false;
      } else if (message.id == MessageId::Have) {
      if (message.payload.size() == 4) {
      piecesAvailability_.SetPieceAvailability(BytesToInt(message.payload));
      }
      } else if (message.id == MessageId::Choke) {
      choked_ = true;
      pendingBlock_ = false;
      if (pieceInProgress_ != nullptr) {
      pieceInProgress_->Reset();
      pieceInProgress_ = nullptr;
      }
      } else if (message.id == MessageId::Piece) {
      pendingBlock_ = false;
      if (pieceInProgress_ == nullptr || message.payload.size() < 8) {
      return;
      }
      uint32_t pieceIndex = BytesToInt(message.payload.substr(0, 4)), offset = BytesToInt(message.payload.substr(4, 4));
      std::string data = message.payload.substr(8);
      if (pieceIndex != pieceInProgress_->GetIndex()) {
      return;
      }
      pieceInProgress_->SaveBlock(offset, data);
      if (pieceInProgress_->AllBlocksRetrieved()) {
      if (pieceInProgress_->HashMatches()) {
      pieceStorage_.PieceProcessed(pieceInProgress_);
      pieceInProgress_ = nullptr;
      } else {
      pieceInProgress_->Reset();
      }
      }
      }
      } catch (...) {
      return;
      }
      }
      /
    • Функция посылает пиру сообщение типа interested
      /
      void SendInterested() {
      Message m = Message::Init(MessageId::Interested, "");
      socket_.SendData(m.ToString());
      }
      /
    • Функция отправляет пиру сообщение типа request. Это сообщение обозначает запрос части файла у пира.
    • За одно сообщение запрашивается не часть целиком, а блок данных размером 2^14 байт или меньше.
    • Если в данный момент мы не знаем, какую часть файла надо запросить у пира, то надо получить эту информацию у
    • PieceStorage
      /
      void RequestPiece() {
      if (pieceStorage_.QueueIsEmpty()) {
      Terminate();
      return;
      }
      if (pieceInProgress_ == nullptr) {
      pieceInProgress_ = pieceStorage_.GetNextPieceToDownload();
      }
      if (!piecesAvailability_.IsPieceAvailable(pieceInProgress_->GetIndex())) {
      Terminate();
      return;
      }
      Block
      block = pieceInProgress_->FirstMissingBlock();
      block->status = Block::Status::Pending;
      char m[17];
      uint32_t net_len = htonl(13);
      memcpy(m, &net_len, 4);
      m[4] = 6;
      uint32_t ind = static_cast<uint32_t>(pieceInProgress_->GetIndex());
      uint32_t piece = htonl(ind);
      memcpy(m + 5, &piece, 4);
      uint32_t offset = htonl(block->offset);
      memcpy(m + 9, &offset, 4);
      uint32_t len = htonl(block->length);
      memcpy(m + 13, &len, 4);
      socket_.SendData(std::string(m, 17));
      pendingBlock_ = true;
      }

    /*

    • Основной цикл общения с пиром. Здесь мы ждем следующее сообщение от пира и обрабатываем его.
    • Также, если мы не ждем в данный момент от пира содержимого части файла, то надо отправить соответствующий запрос
      /
      void MainLoop() {
      while (!terminated_ && !pieceStorage_.QueueIsEmpty()) {
      ReceiveBitfield();
      if (!choked_ && !pendingBlock_ && !pieceStorage_.QueueIsEmpty()) {
      RequestPiece();
      }
      }
      }
      };
      #pragma once
      #include "torrent_file.h"
      #include "piece.h"
      #include <queue>
      #include <string>
      #include <unordered_set>
      #include <mutex>
      #include <stdexcept>
      #include <fstream>
      #include <filesystem>
      /
  • Хранилище информации о частях скачиваемого файла.

  • В этом классе отслеживается информация о том, какие части файла осталось скачать
    /
    class PieceStorage {
    public:
    explicit PieceStorage(const TorrentFile& tf) : PieceStorage(tf, std::filesystem::current_path()) {
    }
    PieceStorage(const TorrentFile& tf, const std::filesystem::path& outputDirectory) : totalPieces_(tf.pieceHashes.size()),
    pieceLength_(tf.pieceLength),
    outputFile_(outputDirectory / tf.name, std::ios::binary | std::ios::trunc) {
    for (int i = 0; i < tf.pieceHashes.size(); ++i) {
    size_t length = tf.pieceLength;
    if (i == tf.pieceHashes.size() - 1 && tf.length % tf.pieceLength != 0) {
    length = tf.length % tf.pieceLength;
    }
    auto piece = std::make_shared<Piece>(i, length, tf.pieceHashes[i]);
    remainPieces_.push(piece);
    }
    if (tf.length != 0) {
    char c = '\0';
    outputFile_.seekp(tf.length - 1);
    outputFile_.write(&c, 1);
    }
    }
    /

    • Отдает указатель на следующую часть файла, которую надо скачать
      /
      PiecePtr GetNextPieceToDownload() {
      std::unique_lock lock(mt_);
      if (remainPieces_.empty()) {
      throw std::runtime_error("Queue is empty");
      }
      auto piece = remainPieces_.front();
      remainPieces_.pop();
      ++pieceInProgress_;
      return piece;
      }
      /
    • Эта функция вызывается из PeerConnect, когда скачивание одной части файла завершено.
      */
      void PieceProcessed(const PiecePtr& piece) {
      std::unique_lock lock(mt_);
      --pieceInProgress_;
      SavePieceToDisk(piece);
      pieces_.push_back(piece->GetIndex());
      }

    void PieceFailed(const PiecePtr& piece) {
    std::unique_lock lock(mt_);
    if (pieceInProgress_ > 0) {
    --pieceInProgress_;
    }
    if (piece != nullptr) {
    piece->Reset();
    remainPieces_.push(piece);
    }
    }
    /*

    • Остались ли нескачанные части файла?
      /
      bool QueueIsEmpty() const {
      std::unique_lock lock(mt_);
      return remainPieces_.empty();
      }
      /
    • Сколько частей файла всего
      /
      size_t TotalPiecesCount() const {
      std::unique_lock lock(mt_);
      return totalPieces_;
      }
      /
    • Сколько частей файла было сохранено на диск
      /
      size_t PiecesSavedToDiscCount() const {
      std::unique_lock lock(mt_);
      return pieces_.size();
      }
      /
    • Закрыть поток вывода в файл
      /
      void CloseOutputFile() {
      std::unique_lock lock(mt_);
      if (outputFile_.is_open()) {
      outputFile_.close();
      }
      }
      /
    • Отдает список номеров частей файла, которые были сохранены на диск
      /
      std::vector<size_t> GetPiecesSavedToDiscIndices() const {
      std::unique_lock lock(mt_);
      return pieces_;
      }
      /
    • Сколько частей файла в данный момент скачивается
      /
      size_t PiecesInProgressCount() const {
      std::unique_lock lock(mt_);
      return pieceInProgress_;
      }
      protected:
      std::queue<PiecePtr> remainPieces_;
      std::vector <size_t> pieces_;
      virtual void SavePieceToDisk(PiecePtr piece) {
      outputFile_.seekp(piece->GetIndex() * pieceLength_);
      const auto& data = piece->GetData();
      outputFile_.write(reinterpret_cast<const char
      >(data.data()), data.size());
      }
      long long totalPieces_ = 0, pieceInProgress_ = 0;
      size_t pieceLength_ = 0;
      mutable std::mutex mt_;
      std::ofstream outputFile_;
      };
      #pragma once
      #include "torrent_file.h"
      #include "piece.h"
      #include <queue>
      #include <string>
      #include <unordered_set>
      #include <mutex>
      #include <stdexcept>
      #include <fstream>
      #include <filesystem>
      /*
  • Хранилище информации о частях скачиваемого файла.

  • В этом классе отслеживается информация о том, какие части файла осталось скачать
    /
    class PieceStorage {
    public:
    explicit PieceStorage(const TorrentFile& tf) : PieceStorage(tf, std::filesystem::current_path()) {
    }
    PieceStorage(const TorrentFile& tf, const std::filesystem::path& outputDirectory) : totalPieces_(tf.pieceHashes.size()),
    pieceLength_(tf.pieceLength),
    outputFile_(outputDirectory / tf.name, std::ios::binary | std::ios::trunc) {
    for (int i = 0; i < tf.pieceHashes.size(); ++i) {
    size_t length = tf.pieceLength;
    if (i == tf.pieceHashes.size() - 1 && tf.length % tf.pieceLength != 0) {
    length = tf.length % tf.pieceLength;
    }
    auto piece = std::make_shared<Piece>(i, length, tf.pieceHashes[i]);
    remainPieces_.push(piece);
    }
    if (tf.length != 0) {
    char c = '\0';
    outputFile_.seekp(tf.length - 1);
    outputFile_.write(&c, 1);
    }
    }
    /

    • Отдает указатель на следующую часть файла, которую надо скачать
      /
      PiecePtr GetNextPieceToDownload() {
      std::unique_lock lock(mt_);
      if (remainPieces_.empty()) {
      throw std::runtime_error("Queue is empty");
      }
      auto piece = remainPieces_.front();
      remainPieces_.pop();
      ++pieceInProgress_;
      return piece;
      }
      /
    • Эта функция вызывается из PeerConnect, когда скачивание одной части файла завершено.
      */
      void PieceProcessed(const PiecePtr& piece) {
      std::unique_lock lock(mt_);
      --pieceInProgress_;
      SavePieceToDisk(piece);
      pieces_.push_back(piece->GetIndex());
      }

    void PieceFailed(const PiecePtr& piece) {
    std::unique_lock lock(mt_);
    if (pieceInProgress_ > 0) {
    --pieceInProgress_;
    }
    if (piece != nullptr) {
    piece->Reset();
    remainPieces_.push(piece);
    }
    }
    /*

    • Остались ли нескачанные части файла?
      /
      bool QueueIsEmpty() const {
      std::unique_lock lock(mt_);
      return remainPieces_.empty();
      }
      /
    • Сколько частей файла всего
      /
      size_t TotalPiecesCount() const {
      std::unique_lock lock(mt_);
      return totalPieces_;
      }
      /
    • Сколько частей файла было сохранено на диск
      /
      size_t PiecesSavedToDiscCount() const {
      std::unique_lock lock(mt_);
      return pieces_.size();
      }
      /
    • Закрыть поток вывода в файл
      /
      void CloseOutputFile() {
      std::unique_lock lock(mt_);
      if (outputFile_.is_open()) {
      outputFile_.close();
      }
      }
      /
    • Отдает список номеров частей файла, которые были сохранены на диск
      /
      std::vector<size_t> GetPiecesSavedToDiscIndices() const {
      std::unique_lock lock(mt_);
      return pieces_;
      }
      /
    • Сколько частей файла в данный момент скачивается
      /
      size_t PiecesInProgressCount() const {
      std::unique_lock lock(mt_);
      return pieceInProgress_;
      }
      protected:
      std::queue<PiecePtr> remainPieces_;
      std::vector <size_t> pieces_;
      virtual void SavePieceToDisk(PiecePtr piece) {
      outputFile_.seekp(piece->GetIndex() * pieceLength_);
      const auto& data = piece->GetData();
      outputFile_.write(reinterpret_cast<const char
      >(data.data()), data.size());
      }
      long long totalPieces_ = 0, pieceInProgress_ = 0;
      size_t pieceLength_ = 0;
      mutable std::mutex mt_;
      std::ofstream outputFile_;
      };
      #include "torrent_tracker.h"
      #include "piece_storage.h"
      #include "peer_connect.h"
      #include "byte_tools.h"
      #include <cassert>
      #include <iostream>
      #include <filesystem>
      #include <random>
      #include <thread>
      #include <algorithm>

namespace fs = std::filesystem;

std::mutex cerrMutex, coutMutex;

std::string RandomString(size_t length) {
std::random_device random;
std::string result;
result.reserve(length);
for (size_t i = 0; i < length; ++i) {
result.push_back(random() % ('Z' - 'A') + 'A');
}
return result;
}

const std::string PeerId = "TESTAPPDONTWORRY" + RandomString(4);
constexpr size_t PiecesToDownload = 20;

void CheckDownloadedPiecesIntegrity(const std::filesystem::path& outputFilename, const TorrentFile& tf, PieceStorage& pieces) {
pieces.CloseOutputFile();

text
if (std::filesystem::file_size(outputFilename) != tf.length) { throw std::runtime_error("Output file has wrong size"); } if (pieces.GetPiecesSavedToDiscIndices().size() != pieces.PiecesSavedToDiscCount()) { throw std::runtime_error("Cannot determine real amount of saved pieces"); } if (pieces.PiecesSavedToDiscCount() < PiecesToDownload) { throw std::runtime_error("Downloaded pieces amount is not enough"); } if (pieces.TotalPiecesCount() != tf.pieceHashes.size() || pieces.TotalPiecesCount() < 200) { throw std::runtime_error("Wrong amount of pieces"); } std::vector<size_t> pieceIndices = pieces.GetPiecesSavedToDiscIndices(); std::sort(pieceIndices.begin(), pieceIndices.end()); std::ifstream file(outputFilename, std::ios_base::binary); for (size_t pieceIndex : pieceIndices) { const std::streamoff positionInFile = pieceIndex * tf.pieceLength; file.seekg(positionInFile); if (!file.good()) { throw std::runtime_error("Cannot read from file"); } std::string pieceDataFromFile(tf.pieceLength, '\0'); file.read(pieceDataFromFile.data(), tf.pieceLength); const size_t readBytesCount = file.gcount(); pieceDataFromFile.resize(readBytesCount); const std::string realHash = CalculateSHA1(pieceDataFromFile); if (realHash != tf.pieceHashes[pieceIndex]) { std::cerr << "File piece with index " << pieceIndex << " started at position " << positionInFile << " with length " << pieceDataFromFile.length() << " has wrong hash " << HexEncode(realHash) << ". Expected hash is " << HexEncode(tf.pieceHashes[pieceIndex]) << std::endl; throw std::runtime_error("Wrong piece hash"); } }

}

void DeleteDownloadedFile(const std::filesystem::path& outputFilename) {
std::filesystem::remove(outputFilename);
}

std::filesystem::path PrepareDownloadDirectory(const std::string& randomString) {
std::filesystem::path outputDirectory = "/tmp/downloads";
outputDirectory /= randomString;
std::filesystem::create_directories(outputDirectory);
return outputDirectory;
}

bool RunDownloadMultithread(PieceStorage& pieces, const TorrentFile& torrentFile, const std::string& ourId, const TorrentTracker& tracker) {
using namespace std::chrono_literals;

text
std::vector<std::thread> peerThreads; std::vector<std::shared_ptr<PeerConnect>> peerConnections; for (const Peer& peer : tracker.GetPeers()) { peerConnections.emplace_back(std::make_shared<PeerConnect>(peer, torrentFile, ourId, pieces)); } peerThreads.reserve(peerConnections.size()); for (auto& peerConnectPtr : peerConnections) { peerThreads.emplace_back( [peerConnectPtr] () { bool tryAgain = true; int attempts = 0; do { try { ++attempts; peerConnectPtr->Run(); } catch (const std::runtime_error& e) { std::lock_guard<std::mutex> cerrLock(cerrMutex); std::cerr << "Runtime error: " << e.what() << std::endl; } catch (const std::exception& e) { std::lock_guard<std::mutex> cerrLock(cerrMutex); std::cerr << "Exception: " << e.what() << std::endl; } catch (...) { std::lock_guard<std::mutex> cerrLock(cerrMutex); std::cerr << "Unknown error" << std::endl; } tryAgain = peerConnectPtr->Failed() && attempts < 3; } while (tryAgain); } ); } { std::lock_guard<std::mutex> coutLock(coutMutex); std::cout << "Started " << peerThreads.size() << " threads for peers" << std::endl; } std::this_thread::sleep_for(10s); while (pieces.PiecesSavedToDiscCount() < PiecesToDownload) { if (pieces.PiecesInProgressCount() == 0) { { std::lock_guard<std::mutex> coutLock(coutMutex); std::cout << "Want to download more pieces but all peer connections are not working. Let's request new peers" << std::endl; } for (auto& peerConnectPtr : peerConnections) { peerConnectPtr->Terminate(); } for (std::thread& thread : peerThreads) { thread.join(); } return true; } std::this_thread::sleep_for(1s); } { std::lock_guard<std::mutex> coutLock(coutMutex); std::cout << "Terminating all peer connections" << std::endl; } for (auto& peerConnectPtr : peerConnections) { peerConnectPtr->Terminate(); } for (std::thread& thread : peerThreads) { thread.join(); } return false;

}

void DownloadTorrentFile(const TorrentFile& torrentFile, PieceStorage& pieces, const std::string& ourId) {
std::cout << "Connecting to tracker " << torrentFile.announce << std::endl;
TorrentTracker tracker(torrentFile.announce);
bool requestMorePeers = false;
do {
tracker.UpdatePeers(torrentFile, ourId, 12345);

text
if (tracker.GetPeers().empty()) { std::cerr << "No peers found. Cannot download a file" << std::endl; } std::cout << "Found " << tracker.GetPeers().size() << " peers" << std::endl; for (const Peer& peer : tracker.GetPeers()) { std::cout << "Found peer " << peer.ip << ":" << peer.port << std::endl; } requestMorePeers = RunDownloadMultithread(pieces, torrentFile, ourId, tracker); } while (requestMorePeers);

}

void TestTorrentFile(const fs::path& file) {
TorrentFile torrentFile;
try {
torrentFile = LoadTorrentFile(file);
std::cout << "Loaded torrent file " << file << ". " << torrentFile.comment << std::endl;
} catch (const std::invalid_argument& e) {
std::cerr << e.what() << std::endl;
return;
}

text
const std::filesystem::path outputDirectory = PrepareDownloadDirectory(PeerId); PieceStorage pieces(torrentFile, outputDirectory); DownloadTorrentFile(torrentFile, pieces, PeerId); CheckDownloadedPiecesIntegrity(outputDirectory / torrentFile.name, torrentFile, pieces); DeleteDownloadedFile(outputDirectory / torrentFile.name);

}

int main() {
for (const auto& entry : fs::directory_iterator("resources")) {
TestTorrentFile(entry.path());
}
return 0;
}
#pragma once
#include <string>
#include <vector>
#include <optional>
#include <memory>
#include <stdexcept>
#include "byte_tools.h"
/*

  • Части файла скачиваются не за одно сообщение, а блоками размером 2^14 байт или меньше (последний блок обычно меньше)
    /
    struct Block {
    enum Status {
    Missing = 0,
    Pending,
    Retrieved,
    };
    uint32_t piece;
    uint32_t offset;
    uint32_t length;
    Status status;
    std::string data;
    };
    /
  • Часть скачиваемого файла
    /
    class Piece {
    public:
    /
    • index -- номер части файла, нумерация начинается с 0
    • length -- длина части файла. Все части, кроме последней, имеют длину, равную torrentFile.pieceLength
    • hash -- хеш-сумма части файла, взятая из torrentFile.pieceHashes
      /
      Piece(size_t index, size_t length, std::string hash) : index_(index), length_(length), hash_(hash), numBlocks_(length / (1 << 14) + (length % (1 << 14) != 0)) {
      for (long long i = 0; i < length / (1 << 14); ++i) {
      blocks_.push_back(Block{static_cast<uint32_t>(index), static_cast<uint32_t>(i * (1 << 14)), static_cast<uint32_t>(1 << 14), Block::Status::Missing});
      }
      if (length % (1 << 14) != 0) {
      long long lastBlock = length / (1 << 14);
      long long lenLastBlock = length % (1 << 14);
      blocks_.push_back(Block{static_cast<uint32_t>(index), static_cast<uint32_t>(lastBlock * (1 << 14)), static_cast<uint32_t>(lenLastBlock), Block::Status::Missing});
      }
      }
      /
    • Совпадает ли хеш скачанных данных с ожидаемым
      /
      bool HashMatches() const {
      return hash_ == GetDataHash();
      }
      /
    • Дать указатель на отсутствующий (еще не скачанный и не запрошенный) блок
      /
      Block
      FirstMissingBlock() {
      if (AllBlocksRetrieved()) throw std::runtime_error("All blocks retrieved");
      for (long long i = lastUnsavedBlock_; i < numBlocks_; ++i) {
      if (blocks_[i].status == Block::Status::Missing) {
      lastUnsavedBlock_ = i;
      return &blocks_[i];
      }
      }
      throw std::runtime_error("No missing blocks");
      }
      /*
    • Получить порядковый номер части файла
      /
      size_t GetIndex() const {
      return index_;
      }
      /
    • Сохранить скачанные данные для какого-то блока
      /
      void SaveBlock(size_t blockOffset, std::string data) {
      long long blockIndex = blockOffset / (1 << 14);
      if (blockIndex >= numBlocks_) {
      throw std::runtime_error("Wrong block offset");
      }
      blocks_[blockIndex].data = data;
      if (blocks_[blockIndex].status != Block::Status::Retrieved) {
      ++savedBlocks_;
      }
      blocks_[blockIndex].status = Block::Status::Retrieved;
      if (blockIndex == lastUnsavedBlock_) {
      for (long long i = lastUnsavedBlock_ + 1; i < numBlocks_; ++i) {
      if (blocks_[i].status == Block::Status::Missing) {
      lastUnsavedBlock_ = i;
      return;
      }
      }
      }
      }
      /
    • Скачали ли уже все блоки
      /
      bool AllBlocksRetrieved() const {
      return savedBlocks_ == numBlocks_;
      }
      /
    • Получить скачанные данные для части файла
      /
      std::string GetData() const {
      std::string fullData(length_, '\0');
      for (const auto& block : blocks_) {
      if (block.status == Block::Status::Retrieved) {
      fullData.replace(block.offset, block.length, block.data);
      }
      }
      return fullData;
      }
      /
    • Посчитать хеш по скачанным данным
      /
      std::string GetDataHash() const {
      return CalculateSHA1(GetData());
      }
      /
    • Получить хеш для части из .torrent файла
      /
      const std::string& GetHash() const {
      return hash_;
      }
      /
    • Удалить все скачанные данные и отметить все блоки как Missing
      */
      void Reset() {
      for (long long i = 0; i < numBlocks_; ++i) {
      blocks_[i].data.clear();
      blocks_[i].status = Block::Status::Missing;
      }
      savedBlocks_ = 0;
      lastUnsavedBlock_ = 0;
      }
      private:
      const size_t index_, length_;
      const std::string hash_;
      std::vector<Block> blocks_;
      long long numBlocks_ = 0, savedBlocks_ = 0, lastUnsavedBlock_ = 0;
      };
      using PiecePtr = std::shared_ptr<Piece>;
      Running with gitlab-runner 15.3.0 (bbcb5aba)
      on mephi-dap-programming-2 mGyyj4m2

Preparing the "docker" executor
00:03
Using Docker executor with image cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest ...
Authenticating with credentials from /root/.docker/config.json
Pulling docker image cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest ...
Using docker image sha256:c852b22b050713fe28561528760e76a1850ccd81f42128edb25f737778e0b3cc for cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest with digest cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base@sha256:df2190ae81ca8f3f4fe5f08e24955cb6711fe17ddae5c875a50ce64ef9755362 ...

Preparing environment
00:01
Running on runner-mgyyj4m2-project-31792-concurrent-0 via mephi-dap...

Getting source from Git repository
00:01
Fetching changes with git depth set to 20...
Initialized empty Git repository in /builds/mephi-dap-programming-2/students-2026/polinakozhanova/.git/
Created fresh repository.
Checking out b584302e as main...
Skipping Git submodules setup

Executing "step_script" stage of the job script
09:55
Using docker image sha256:c852b22b050713fe28561528760e76a1850ccd81f42128edb25f737778e0b3cc for cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest with digest cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base@sha256:df2190ae81ca8f3f4fe5f08e24955cb6711fe17ddae5c875a50ce64ef9755362 ...
cd/opt/mephi/cd /opt/mephi/ python3.9 -m checker grade
Unable to find env <GITLAB_SERVICE_TOKEN>
Unable to find env <GITLAB_API_TOKEN>
current_time 2026-05-14 09:16:37.879448
-> commit_time 2026-05-14 09:16:25+03:00
-> pipeline_created_time 2026-05-14 06:16:30
-> job_start_time 2026-05-14 06:16:31
= using send_time 2026-05-14 06:16:30
Loading changes...
Looking log_between_no_upstream between 46e2b5b21a7d0acb0ea13de68552038a869ab42f and b584302e24ff1b50d1f31f201a96373c9b4c8441 which not in https://gitlab.manytask.org/mephi-dap-programming-2/public-2026...
fatal: No such remote: 'upstream'
fatal: could not read Username for 'https://gitlab.manytask.org': No such device or address

Detected changes in the following files:
project-part-6/download_file/peer_connect.h
Testing only changed tasks...
Changed tasks: ['project-part-6/download_file']
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++ Testing tasks: <project-part-6/download_file> +++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Running tests...
ERROR: Job failed: execution took longer than 10m0s secondsRunning with gitlab-runner 15.3.0 (bbcb5aba)
on mephi-dap-programming-2 mGyyj4m2

Preparing the "docker" executor
00:03
Using Docker executor with image cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest ...
Authenticating with credentials from /root/.docker/config.json
Pulling docker image cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest ...
Using docker image sha256:c852b22b050713fe28561528760e76a1850ccd81f42128edb25f737778e0b3cc for cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest with digest cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base@sha256:df2190ae81ca8f3f4fe5f08e24955cb6711fe17ddae5c875a50ce64ef9755362 ...

Preparing environment
00:01
Running on runner-mgyyj4m2-project-31792-concurrent-0 via mephi-dap...

Getting source from Git repository
00:01
Fetching changes with git depth set to 20...
Initialized empty Git repository in /builds/mephi-dap-programming-2/students-2026/polinakozhanova/.git/
Created fresh repository.
Checking out b584302e as main...
Skipping Git submodules setup

Executing "step_script" stage of the job script
09:55
Using docker image sha256:c852b22b050713fe28561528760e76a1850ccd81f42128edb25f737778e0b3cc for cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base:latest with digest cr.yandex/crpmmm1ugaibpaqr61te/programming-2/2026/base@sha256:df2190ae81ca8f3f4fe5f08e24955cb6711fe17ddae5c875a50ce64ef9755362 ...
cd/opt/mephi/cd /opt/mephi/ python3.9 -m checker grade
Unable to find env <GITLAB_SERVICE_TOKEN>
Unable to find env <GITLAB_API_TOKEN>
current_time 2026-05-14 09:16:37.879448
-> commit_time 2026-05-14 09:16:25+03:00
-> pipeline_created_time 2026-05-14 06:16:30
-> job_start_time 2026-05-14 06:16:31
= using send_time 2026-05-14 06:16:30
Loading changes...
Looking log_between_no_upstream between 46e2b5b21a7d0acb0ea13de68552038a869ab42f and b584302e24ff1b50d1f31f201a96373c9b4c8441 which not in https://gitlab.manytask.org/mephi-dap-programming-2/public-2026...
fatal: No such remote: 'upstream'
fatal: could not read Username for 'https://gitlab.manytask.org': No such device or address

Detected changes in the following files:
project-part-6/download_file/peer_connect.h
Testing only changed tasks...
Changed tasks: ['project-part-6/download_file']
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++ Testing tasks: <project-part-6/download_file> +++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Running tests...
ERROR: Job failed: execution took longer than 10m0s seconds

Скачивание файла
В данном задании нужно дополнить реализацию классов PeerConnect и PieceStorage из предыдущего задания. Теперь требуется добавить сохранение частей файла на диск, а также сделать так, чтобы PieceStorage можно было использовать из нескольких потоков.

Для выполнения данного задания вам понадобится уметь парсить .torrent файлы, запрашивать список пиров, подключаться к ним и скачивать части файла как в предыдущем домашнем задании. Рекомендуется воспользоваться своим кодом из предыдущего задания.

Файлы, разрешенные для редактирования студентами в рамках данного задания:

bencode.h / .cpp
byte_tools.h / .cpp
message.h / .cpp
peer.h
peer_connect.h / .cpp
piece.h / .cpp
piece_storage.h / .cpp
tcp_connect.h / .cpp
torrent_file.h / .cpp
torrent_tracker.h / .cpp
Изменения в прочих файлах не будут учитываться при сборке проекта в проверяющей системе.

Запрещается использовать чужие реализации парсера bencode и .torrent файлов.

Для сборки проекта потребуется система сборки CMake, а также установленные в системе библиотеки OpenSSL и libcurl.

Как установить зависимости на Ubuntu 20.04:
sudoaptgetinstallcmakelibcurl4openssldevlibssldevКакустановитьзависимостинаMacOSчерезпакетныйменеджерbrew(https://brew.sh/):sudo apt-get install cmake libcurl4-openssl-dev libssl-dev Как установить зависимости на Mac OS через пакетный менеджер brew (https://brew.sh/): brew install openssl cmake
Дополнительная информация:

Что такое торрент https://ru.wikipedia.org/wiki/BitTorrent
.torrent файл https://ru.wikipedia.org/wiki/.torrent
Подробное описание протокола http://www.bittorrent.org/beps/bep_0003.html, https://wiki.theory.org/BitTorrentSpecification
Формат компактного ответа трекера http://www.bittorrent.org/beps/bep_0023.html
не понимаю почему таймаут. может, по заданию надо было куда-то потоки добавить? исправь. стиль кода не меняй! это ВАжно! ТЕ ЖЕ названия переменных, функций, библиотеки, без проспуков и пустых строк.
дай точечные исправления (что на что поменять) с комментариями что не верно у меня
дай готовый верный код В МОЕМ СТИЛЕ  

Suy nghĩ

Fixing MainLoop condition and handling in-progress pieces

It looks like the issue is with how the MainLoop checks the QueueIsEmpty() condition. The queue might be empty, but pieces in progress still exist. If QueueIsEmpty is false with pending blocks, the loop exits and terminates the connection. The loop should account for in-progress pieces, ensuring they aren't lost before completion.

Refining loop conditions for termination handling

I need to ensure that the loop continues until termination with the condition pieceInProgress_ != nullptr || !queueIsEmpty(). However, if the queue is empty but there's a pending block, the process should keep receiving until the task is complete. I think adding termination conditions and handling exceptions in the Run function is crucial.

Thought for 53s

Trả lời

Нет, потоки в PeerConnect добавлять не надо: тест уже создаёт поток на каждого пира. Таймаут из-за логики возврата кусков в PieceStorage.

Что у тебя неверно:

  1. Ты забираешь piece из очереди, но при неудаче не возвращаешь его обратно.
    Например тут:
cpp
if (!piecesAvailability_.IsPieceAvailable(pieceInProgress_->GetIndex())) { Terminate(); return; }

Кусок уже вынут из remainPieces_, pieceInProgress_ увеличен, но обратно ты его не кладёшь. Проверяющий код потом видит PiecesInProgressCount() > 0 и ждёт бесконечно.

  1. При Choke ты делаешь Reset(), но не вызываешь pieceStorage_.PieceFailed(...).
    Счётчик pieceInProgress_ в PieceStorage не уменьшается.

  2. MainLoop() завершается, когда очередь пустая, даже если текущий кусок ещё скачивается.

cpp
while (!terminated_ && !pieceStorage_.QueueIsEmpty())

Это неправильно. Очередь может быть пустой, потому что кусок уже взяли в работу, но его ещё надо дочитать.

  1. При завершении соединения текущий кусок тоже надо вернуть в PieceStorage, если он не был успешно сохранён.

Ниже готовый peer_connect.h в твоём стиле. PieceStorage у тебя в целом нормальный, его можно оставить.

cpp
#pragma once #include "tcp_connect.h" #include "peer.h" #include "torrent_file.h" #include "piece_storage.h" #include <iostream> #include <string> #include <utility> #include <cstring> #include <arpa/inet.h> #include "byte_tools.h" #include "message.h" /* * Структура, хранящая информацию о доступности частей скачиваемого файла у данного пира */ class PeerPiecesAvailability { public: PeerPiecesAvailability() : bitfield_() {} /* * bitfield – массив байтов, в котором i-й бит означает наличие или отсутствие i-й части файла у пира * https://wiki.theory.org/BitTorrentSpecification#bitfield:_.3Clen.3D0001.2BX.3E.3Cid.3D5.3E.3Cbitfield.3E */ explicit PeerPiecesAvailability(std::string bitfield) : bitfield_(std::move(bitfield)) {} /* * Если ли часть под номером pieceIndex у пира? */ bool IsPieceAvailable(size_t pieceIndex) const { size_t ind = pieceIndex / 8; if (ind >= bitfield_.size()) { return false; } uint8_t m = 1 << (7 - (pieceIndex % 8)); return (static_cast<uint8_t>(bitfield_[ind]) & m) != 0; } /* * Пометить часть под номером pieceIndex как доступную */ void SetPieceAvailability(size_t pieceIndex) { size_t ind = pieceIndex / 8; if (ind >= bitfield_.size()) { bitfield_.resize(ind + 1, '\0'); } uint8_t m = 1 << (7 - (pieceIndex % 8)); bitfield_[ind] = static_cast<char>(static_cast<uint8_t>(bitfield_[ind]) | m); } void SetAllPiecesAvailable(size_t piecesCount) { if (piecesCount == 0) { bitfield_.clear(); return; } bitfield_.assign((piecesCount + 7) / 8, static_cast<char>(0xff)); } /* * Сколько бит хранится в bitfield'е */ size_t Size() const { return bitfield_.size() * 8; } private: std::string bitfield_; }; /* * Класс, представляющий соединение с одним пиром. * С помощью него можно подключиться к пиру и обмениваться с ним сообщениями */ class PeerConnect { public: PeerConnect(const Peer& peer, const TorrentFile& tf, std::string selfPeerId, PieceStorage& pieceStorage) : tf_(tf) , socket_(peer.ip, peer.port, std::chrono::milliseconds(500), std::chrono::milliseconds(500)) , selfPeerId_(std::move(selfPeerId)) , terminated_(false) , choked_(true) , failed_(false) , pieceStorage_(pieceStorage) , pieceInProgress_(nullptr) , pendingBlock_(false) { } ~PeerConnect() { Terminate(); ResetPieceInProgress(); socket_.CloseConnection(); } /* * Основная функция, в которой будет происходить цикл общения с пиром. * https://wiki.theory.org/BitTorrentSpecification#Messages */ void Run() { failed_ = false; terminated_ = false; choked_ = true; pendingBlock_ = false; try { if (!EstablishConnection()) { failed_ = true; terminated_ = true; return; } MainLoop(); } catch (...) { failed_ = true; ResetPieceInProgress(); socket_.CloseConnection(); throw; } ResetPieceInProgress(); socket_.CloseConnection(); } void Terminate() { terminated_ = true; } bool Failed() const { return failed_; } protected: const TorrentFile& tf_; TcpConnect socket_; const std::string selfPeerId_; std::string peerId_; PeerPiecesAvailability piecesAvailability_; bool terminated_; bool choked_, failed_; PiecePtr pieceInProgress_; PieceStorage& pieceStorage_; bool pendingBlock_; void ResetPieceInProgress() { pendingBlock_ = false; if (pieceInProgress_ != nullptr) { pieceStorage_.PieceFailed(pieceInProgress_); pieceInProgress_ = nullptr; } } /* * Функция производит handshake. * - Подключиться к пиру по протоколу TCP * - Отправить пиру сообщение handshake * - Проверить правильность ответа пира * https://wiki.theory.org/BitTorrentSpecification#Handshake */ void PerformHandshake() { socket_.EstablishConnection(); std::string handshake; handshake.push_back(19); handshake += "BitTorrent protocol"; handshake.append(8, '\0'); handshake += tf_.infoHash; handshake += selfPeerId_; socket_.SendData(handshake); std::string response = socket_.ReceiveData(68); if (response.size() != 68) { throw std::runtime_error("Wrong format"); } if (static_cast<unsigned char>(response[0]) != 19) { throw std::runtime_error("Wrong protocol length"); } std::string protocol = response.substr(1, 19); if (protocol != "BitTorrent protocol") { throw std::runtime_error("Invalid protocol name"); } std::string infoHash = response.substr(28, 20); if (infoHash != tf_.infoHash) { throw std::runtime_error("Invalid info hash"); } peerId_ = response.substr(48, 20); } /* * - Провести handshake * - Получить bitfield с информацией о наличии у пира различных частей файла * - Сообщить пиру, что мы готовы получать от него данные (отправить interested) * Возвращает true, если все три этапа прошли без ошибок */ bool EstablishConnection() { try { PerformHandshake(); piecesAvailability_.SetAllPiecesAvailable(tf_.pieceHashes.size()); ReceiveBitfield(); SendInterested(); return true; } catch (...) { socket_.CloseConnection(); return false; } } /* * Функция читает из сокета bitfield с информацией о наличии у пира различных частей файла. * Полученную информацию надо сохранить в поле piecesAvailability_. * Также надо учесть, что сообщение тип Bitfield является опциональным, то есть пиры необязательно будут слать его. * Вместо этого они могут сразу прислать сообщение Unchoke, поэтому надо быть готовым обработать его в этой функции. * Обработка сообщения Unchoke заключается в выставлении флага choked_ в значение false */ void ReceiveBitfield() { try { std::string messageString = socket_.ReceiveData(); Message message = Message::Parse(messageString); if (message.id == MessageId::BitField) { piecesAvailability_ = PeerPiecesAvailability(message.payload); } else if (message.id == MessageId::Unchoke) { choked_ = false; } else if (message.id == MessageId::Choke) { choked_ = true; ResetPieceInProgress(); } else if (message.id == MessageId::Have) { if (message.payload.size() == 4) { piecesAvailability_.SetPieceAvailability(BytesToInt(message.payload)); } } else if (message.id == MessageId::Piece) { pendingBlock_ = false; if (pieceInProgress_ == nullptr || message.payload.size() < 8) { return; } uint32_t pieceIndex = BytesToInt(message.payload.substr(0, 4)); uint32_t offset = BytesToInt(message.payload.substr(4, 4)); std::string data = message.payload.substr(8); if (pieceIndex != pieceInProgress_->GetIndex()) { return; } pieceInProgress_->SaveBlock(offset, data); if (pieceInProgress_->AllBlocksRetrieved()) { if (pieceInProgress_->HashMatches()) { pieceStorage_.PieceProcessed(pieceInProgress_); pieceInProgress_ = nullptr; } else { ResetPieceInProgress(); } } } } catch (...) { return; } } /* * Функция посылает пиру сообщение типа interested */ void SendInterested() { Message m = Message::Init(MessageId::Interested, ""); socket_.SendData(m.ToString()); } /* * Функция отправляет пиру сообщение типа request. Это сообщение обозначает запрос части файла у пира. * За одно сообщение запрашивается не часть целиком, а блок данных размером 2^14 байт или меньше. * Если в данный момент мы не знаем, какую часть файла надо запросить у пира, то надо получить эту информацию у * PieceStorage */ void RequestPiece() { if (pieceInProgress_ == nullptr) { size_t attempts = 0; while (!pieceStorage_.QueueIsEmpty() && attempts < pieceStorage_.TotalPiecesCount()) { try { pieceInProgress_ = pieceStorage_.GetNextPieceToDownload(); } catch (...) { Terminate(); return; } if (piecesAvailability_.IsPieceAvailable(pieceInProgress_->GetIndex())) { break; } ResetPieceInProgress(); ++attempts; } } if (pieceInProgress_ == nullptr) { Terminate(); return; } Block* block = pieceInProgress_->FirstMissingBlock(); block->status = Block::Status::Pending; char m[17]; uint32_t net_len = htonl(13); memcpy(m, &net_len, 4); m[4] = 6; uint32_t ind = static_cast<uint32_t>(pieceInProgress_->GetIndex()); uint32_t piece = htonl(ind); memcpy(m + 5, &piece, 4); uint32_t offset = htonl(block->offset); memcpy(m + 9, &offset, 4); uint32_t len = htonl(block->length); memcpy(m + 13, &len, 4); socket_.SendData(std::string(m, 17)); pendingBlock_ = true; } /* * Основной цикл общения с пиром. Здесь мы ждем следующее сообщение от пира и обрабатываем его. * Также, если мы не ждем в данный момент от пира содержимого части файла, то надо отправить соответствующий запрос */ void MainLoop() { while (!terminated_) { if (pieceInProgress_ == nullptr && pieceStorage_.QueueIsEmpty()) { Terminate(); break; } ReceiveBitfield(); if (!choked_ && !pendingBlock_) { RequestPiece(); } } } };

Если у тебя Run() уже реализован в peer_connect.cpp, тогда не дублируй его в .h. В таком случае оставь в .h только объявление:

cpp
void Run();

а тело из моего кода перенеси в peer_connect.cpp:

cpp
void PeerConnect::Run() { failed_ = false; terminated_ = false; choked_ = true; pendingBlock_ = false; try { if (!EstablishConnection()) { failed_ = true; terminated_ = true; return; } MainLoop(); } catch (...) { failed_ = true; ResetPieceInProgress(); socket_.CloseConnection(); throw; } ResetPieceInProgress(); socket_.CloseConnection(); }

Главное исправление — любой взятый, но не сохранённый piece должен проходить через PieceFailed. Иначе PiecesInProgressCount() зависает больше нуля, а проверяющая система ждёт до таймаута.

Chia sẻ Q&A này