ReadDirectoryChangesW stops working on large amount of Files

Jan Kevin Dick 0 Reputation points
2023-11-17T13:51:14.6566667+00:00

I'm using the ReadDirectoryChangesW together with the IOCompletionQueue to receive notifications if new files are added to a specific folder.

The Problem i'm currently facing is that if a huge batch copy operation happens (around 800 Files) my Program stopped working.

I debugged the program and recognized that it always fail when calling GetQueuedCompletionStatus. The Function call returns with a TRUE but the passed in variable lpNumberOfBytesTransfered is set to 0.

Here is the Code that i used (i'm also open for any suggestions and help because i'm not that expert on the Windows API and ReadDirectoryChangesW)

//FileIOAdapterImpl
HandlePtr FileIoAdapterImpl::CreateIoCompletionPortWrapper() {
  HandlePtr io_handle{ CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0) };
  return io_handle;
}


bool FileIoAdapterImpl::AssociateWithFileHandle(HANDLE directory, HANDLE existing_completion_port) {
  return CreateIoCompletionPort(directory, existing_completion_port, (ULONG_PTR)directory, 1);
}

bool FileIoAdapterImpl::GetQueuedCompletionStatusWrapper(HANDLE completion_token,
  LPDWORD bytes_transferred, PULONG_PTR completion_key, LPOVERLAPPED* overlap) {
  return GetQueuedCompletionStatus(completion_token, bytes_transferred, completion_key, overlap, 16);

}

HandlePtr FileIoAdapterImpl::CreateFileWrapper(const std::string& path) {
  HandlePtr dir_handle{
    CreateFileA(path.c_str(), FILE_LIST_DIRECTORY,
                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
                OPEN_EXISTING,
                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL)
  };

  return dir_handle;
}
bool FileIoAdapterImpl::ReadDirectoryChangesWrapper(HANDLE directory, std::vector<std::byte>& buffer, LPDWORD bytes_returned, OVERLAPPED* overlapped) {
  return ReadDirectoryChangesW(directory, buffer.data(), buffer.size(), FALSE, FILE_NOTIFY_CHANGE_FILE_NAME, bytes_returned, overlapped, nullptr);
}

This class is just a wrapper around the the WindowsAPI Functions that i use (for testing/mocking purpose). The HandlePtr is just a unique_ptr with a custom deleter to call the CloseHandle Function.

//ModernDirectoryWatcher.h
class ModernDirectoryWatcher {
public:
  using Callback = std::function<void(const std::string& filename)>;

  ModernDirectoryWatcher(const std::filesystem::path& input_directory, std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper = std::make_shared<FileIoAdapterImpl>());

  explicit operator bool() const {
    return is_valid_;
  }

  bool watch();

  void stop();

private:

  bool event_recv();
  bool event_send();
  void handle_events();
  bool has_event() const {
    return event_buf_len_ready_ != 0;
  }
  bool is_processable_file(const std::filesystem::path& path) const;

  Poco::Logger& logger_{ Poco::Logger::get("ModernDirectoryWatcher") };
  std::filesystem::path input_directory_;
  std::vector<std::string> file_types_{};
  Callback callback_;
  FileIOAdapterPtr win_io_api_wrapper_;
  HandlePtr path_handle_;
  HandlePtr event_completion_token_;
  unsigned long event_buf_len_ready_{ 0 };
  bool is_valid_{ false };
  OVERLAPPED event_overlap_{};
  std::vector<std::byte> event_buf_{ 64 * 1024 };
};
//ModernDirectoryWatcher.cpp
ModernDirectoryWatcher::ModernDirectoryWatcher(const std::filesystem::path& input_directory,
  std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper)
  : input_directory_(input_directory),
  file_types_(std::move(file_types)),
  callback_(std::move(callback)),
  win_io_api_wrapper_(std::move(io_wrapper)) {

  path_handle_ = win_io_api_wrapper_->CreateFileWrapper(input_directory_.string());

  if (path_handle_.get() != INVALID_HANDLE_VALUE) {
    poco_information(logger_, "Create Completition Token");
    event_completion_token_ = win_io_api_wrapper_->CreateIoCompletionPortWrapper();
  }

  if (event_completion_token_.get() != nullptr) {
    poco_information(logger_, "Associate with the FileHandle");
    is_valid_ = win_io_api_wrapper_->AssociateWithFileHandle(path_handle_.get(), event_completion_token_.get());
  }
}

bool ModernDirectoryWatcher::watch() {
  poco_information(logger_, "Start Watching...");
  if (is_valid_) {
    poco_information(logger_, "Receive Events");
    event_recv();

    while (is_valid_ && has_event()) {
      poco_information(logger_, "There are still events to process...");
      event_send();
    }

    while (is_valid_) {
      ULONG_PTR completion_key{ 0 };
      LPOVERLAPPED overlap{ 0 };
      bool complete = win_io_api_wrapper_->GetQueuedCompletionStatusWrapper(event_completion_token_.get(), &event_buf_len_ready_, &completion_key, &overlap);
      if (complete && event_buf_len_ready_ == 0) {
        poco_error(logger_, "Error"); // HERE THE ERROR IS PRINTED 
      }
      if (complete && overlap) {
        poco_information(logger_, "Handle the events");
        handle_events();
      } else if (int err_code = GetLastError() != 258 && !complete) {
        poco_error(logger_, "Error");
      }
    }
    return true;
  } else {
    return false;
  }
}

void ModernDirectoryWatcher::handle_events() {
  while (is_valid_ && has_event()) {
    poco_information(logger_, "Send Event");
    event_send();
    poco_information(logger_, "Receive Events");
    event_recv();
  }
}

void ModernDirectoryWatcher::stop() {
  poco_notice(logger_, "Stop the Watcher");
  is_valid_ = false;
}

bool ModernDirectoryWatcher::event_recv() {
  event_buf_len_ready_ = 0;
  DWORD bytes_returned = 0;
  memset(&event_overlap_, 0, sizeof(OVERLAPPED));
  poco_information(logger_, "Call the ReadDirectoryChanges");
  auto read_ok = win_io_api_wrapper_->ReadDirectoryChangesWrapper(path_handle_.get(), event_buf_, &bytes_returned, &event_overlap_);

 
  if (!event_buf_.empty() && read_ok) {
    event_buf_len_ready_ = bytes_returned > 0 ? bytes_returned : 0;
    poco_information_f1(logger_, "Event Buffer Len: %?d", event_buf_len_ready_);
    return true;
  }

  if (GetLastError() == ERROR_IO_PENDING) {
    poco_error(logger_, "Error Pending IO Received stopping...");
    event_buf_len_ready_ = 0;
    is_valid_ = false;
  } else {
    poco_error_f1(logger_, "Error Code: %?d", GetLastError());
  }
  return false;
}

bool ModernDirectoryWatcher::event_send() {
  auto buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(event_buf_.data());
  if (is_valid_) {
    while (buf + sizeof(FILE_NOTIFY_INFORMATION) <= buf + event_buf_len_ready_) {
      poco_information(logger_, "Get valid Buffer...");
      auto filename = input_directory_ / std::wstring{ buf->FileName, buf->FileNameLength / 2 };
      if ((buf->Action == FILE_ACTION_ADDED || buf->Action == FILE_ACTION_RENAMED_NEW_NAME) && is_processable_file(filename)) {
        callback_(filename.string());
      }

      if (buf->NextEntryOffset == 0) {
        break;
      }

      buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(reinterpret_cast<std::byte*>(buf) + buf->NextEntryOffset);

    }
    return true;
  } else {
    return false;
  }
}

bool ModernDirectoryWatcher::is_processable_file(const std::filesystem::path& path) const {
  std::string extension = path.extension().string().erase(0, 1);
  extension = Poco::toLower(extension);
  return std::find(file_types_.begin(), file_types_.end(), extension) != file_types_.end();
}

I start that Watcher as a thread from my application like this:

ModernDirectoryWatcher directory_watcher{dir, ftypes, [this](const std::string& filename) {...})}
std::thread watcher_thread = std::thread{&ModernDirectoryWatcher::watch, &directory_watcher}

If i copy a greater amount of files (~800) Files to the watched Directory the code prints out the Error and then if i add again files (just one for example) the file is not recognized anymore. I know that this happens because i check in the has_event Function if the event_buf_len_ready_ is greater then 0 what should normally be the case if the GetQueuedCompletionStatus is returning TRUE. But if i remove that check i start getting garbage out of the Buffer.

Windows API - Win32
Windows API - Win32
A core set of Windows application programming interfaces (APIs) for desktop and server applications. Previously known as Win32 API.
2,289 questions
C++
C++
A high-level, general-purpose programming language, created as an extension of the C programming language, that has object-oriented, generic, and functional features in addition to facilities for low-level memory manipulation.
3,327 questions
{count} votes

1 answer

Sort by: Most helpful
  1. RLWA32 37,131 Reputation points
    2023-11-20T12:19:33.18+00:00

    You can download the sample the I used to test from https://1drv.ms/u/s!AmnqrCFBv4nDhB_xJ8Ry0GqL4vRJ?e=7XV1CO

    It is a basic Windows API dialog-based application created using VS2019.

    Make sure to unblock the zip file before extracting the contents.

    1 person found this answer helpful.
    0 comments No comments