I'm using the ReadDirectoryChangesW together with the IOCompletionQueue to receive notifications if new files are added to a specific folder.
The Problem i'm currently facing is that if a huge batch copy operation happens (around 800 Files) my Program stopped working.
I debugged the program and recognized that it always fail when calling GetQueuedCompletionStatus. The Function call returns with a TRUE but the passed in variable lpNumberOfBytesTransfered is set to 0.
Here is the Code that i used (i'm also open for any suggestions and help because i'm not that expert on the Windows API and ReadDirectoryChangesW)
//FileIOAdapterImpl
HandlePtr FileIoAdapterImpl::CreateIoCompletionPortWrapper() {
HandlePtr io_handle{ CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0) };
return io_handle;
}
bool FileIoAdapterImpl::AssociateWithFileHandle(HANDLE directory, HANDLE existing_completion_port) {
return CreateIoCompletionPort(directory, existing_completion_port, (ULONG_PTR)directory, 1);
}
bool FileIoAdapterImpl::GetQueuedCompletionStatusWrapper(HANDLE completion_token,
LPDWORD bytes_transferred, PULONG_PTR completion_key, LPOVERLAPPED* overlap) {
return GetQueuedCompletionStatus(completion_token, bytes_transferred, completion_key, overlap, 16);
}
HandlePtr FileIoAdapterImpl::CreateFileWrapper(const std::string& path) {
HandlePtr dir_handle{
CreateFileA(path.c_str(), FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL)
};
return dir_handle;
}
bool FileIoAdapterImpl::ReadDirectoryChangesWrapper(HANDLE directory, std::vector<std::byte>& buffer, LPDWORD bytes_returned, OVERLAPPED* overlapped) {
return ReadDirectoryChangesW(directory, buffer.data(), buffer.size(), FALSE, FILE_NOTIFY_CHANGE_FILE_NAME, bytes_returned, overlapped, nullptr);
}
This class is just a wrapper around the the WindowsAPI Functions that i use (for testing/mocking purpose). The HandlePtr is just a unique_ptr with a custom deleter to call the CloseHandle Function.
//ModernDirectoryWatcher.h
class ModernDirectoryWatcher {
public:
using Callback = std::function<void(const std::string& filename)>;
ModernDirectoryWatcher(const std::filesystem::path& input_directory, std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper = std::make_shared<FileIoAdapterImpl>());
explicit operator bool() const {
return is_valid_;
}
bool watch();
void stop();
private:
bool event_recv();
bool event_send();
void handle_events();
bool has_event() const {
return event_buf_len_ready_ != 0;
}
bool is_processable_file(const std::filesystem::path& path) const;
Poco::Logger& logger_{ Poco::Logger::get("ModernDirectoryWatcher") };
std::filesystem::path input_directory_;
std::vector<std::string> file_types_{};
Callback callback_;
FileIOAdapterPtr win_io_api_wrapper_;
HandlePtr path_handle_;
HandlePtr event_completion_token_;
unsigned long event_buf_len_ready_{ 0 };
bool is_valid_{ false };
OVERLAPPED event_overlap_{};
std::vector<std::byte> event_buf_{ 64 * 1024 };
};
//ModernDirectoryWatcher.cpp
ModernDirectoryWatcher::ModernDirectoryWatcher(const std::filesystem::path& input_directory,
std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper)
: input_directory_(input_directory),
file_types_(std::move(file_types)),
callback_(std::move(callback)),
win_io_api_wrapper_(std::move(io_wrapper)) {
path_handle_ = win_io_api_wrapper_->CreateFileWrapper(input_directory_.string());
if (path_handle_.get() != INVALID_HANDLE_VALUE) {
poco_information(logger_, "Create Completition Token");
event_completion_token_ = win_io_api_wrapper_->CreateIoCompletionPortWrapper();
}
if (event_completion_token_.get() != nullptr) {
poco_information(logger_, "Associate with the FileHandle");
is_valid_ = win_io_api_wrapper_->AssociateWithFileHandle(path_handle_.get(), event_completion_token_.get());
}
}
bool ModernDirectoryWatcher::watch() {
poco_information(logger_, "Start Watching...");
if (is_valid_) {
poco_information(logger_, "Receive Events");
event_recv();
while (is_valid_ && has_event()) {
poco_information(logger_, "There are still events to process...");
event_send();
}
while (is_valid_) {
ULONG_PTR completion_key{ 0 };
LPOVERLAPPED overlap{ 0 };
bool complete = win_io_api_wrapper_->GetQueuedCompletionStatusWrapper(event_completion_token_.get(), &event_buf_len_ready_, &completion_key, &overlap);
if (complete && event_buf_len_ready_ == 0) {
poco_error(logger_, "Error"); // HERE THE ERROR IS PRINTED
}
if (complete && overlap) {
poco_information(logger_, "Handle the events");
handle_events();
} else if (int err_code = GetLastError() != 258 && !complete) {
poco_error(logger_, "Error");
}
}
return true;
} else {
return false;
}
}
void ModernDirectoryWatcher::handle_events() {
while (is_valid_ && has_event()) {
poco_information(logger_, "Send Event");
event_send();
poco_information(logger_, "Receive Events");
event_recv();
}
}
void ModernDirectoryWatcher::stop() {
poco_notice(logger_, "Stop the Watcher");
is_valid_ = false;
}
bool ModernDirectoryWatcher::event_recv() {
event_buf_len_ready_ = 0;
DWORD bytes_returned = 0;
memset(&event_overlap_, 0, sizeof(OVERLAPPED));
poco_information(logger_, "Call the ReadDirectoryChanges");
auto read_ok = win_io_api_wrapper_->ReadDirectoryChangesWrapper(path_handle_.get(), event_buf_, &bytes_returned, &event_overlap_);
if (!event_buf_.empty() && read_ok) {
event_buf_len_ready_ = bytes_returned > 0 ? bytes_returned : 0;
poco_information_f1(logger_, "Event Buffer Len: %?d", event_buf_len_ready_);
return true;
}
if (GetLastError() == ERROR_IO_PENDING) {
poco_error(logger_, "Error Pending IO Received stopping...");
event_buf_len_ready_ = 0;
is_valid_ = false;
} else {
poco_error_f1(logger_, "Error Code: %?d", GetLastError());
}
return false;
}
bool ModernDirectoryWatcher::event_send() {
auto buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(event_buf_.data());
if (is_valid_) {
while (buf + sizeof(FILE_NOTIFY_INFORMATION) <= buf + event_buf_len_ready_) {
poco_information(logger_, "Get valid Buffer...");
auto filename = input_directory_ / std::wstring{ buf->FileName, buf->FileNameLength / 2 };
if ((buf->Action == FILE_ACTION_ADDED || buf->Action == FILE_ACTION_RENAMED_NEW_NAME) && is_processable_file(filename)) {
callback_(filename.string());
}
if (buf->NextEntryOffset == 0) {
break;
}
buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(reinterpret_cast<std::byte*>(buf) + buf->NextEntryOffset);
}
return true;
} else {
return false;
}
}
bool ModernDirectoryWatcher::is_processable_file(const std::filesystem::path& path) const {
std::string extension = path.extension().string().erase(0, 1);
extension = Poco::toLower(extension);
return std::find(file_types_.begin(), file_types_.end(), extension) != file_types_.end();
}
I start that Watcher as a thread from my application like this:
ModernDirectoryWatcher directory_watcher{dir, ftypes, [this](const std::string& filename) {...})}
std::thread watcher_thread = std::thread{&ModernDirectoryWatcher::watch, &directory_watcher}
If i copy a greater amount of files (~800) Files to the watched Directory the code prints out the Error and then if i add again files (just one for example) the file is not recognized anymore. I know that this happens because i check in the has_event Function if the event_buf_len_ready_ is greater then 0 what should normally be the case if the GetQueuedCompletionStatus is returning TRUE. But if i remove that check i start getting garbage out of the Buffer.