diff --git a/src/base/FileSystem.cpp b/src/base/FileSystem.cpp index 903affd..a0fbb4d 100644 --- a/src/base/FileSystem.cpp +++ b/src/base/FileSystem.cpp @@ -5,7 +5,7 @@ namespace codefs { string FileSystem::serializeFileDataCompressed(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); MessageWriter writer; if (allFileData.find(path) == allFileData.end()) { writer.writePrimitive(0); @@ -25,7 +25,7 @@ string FileSystem::serializeFileDataCompressed(const string& path) { void FileSystem::deserializeFileDataCompressed(const string& path, const string& s) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); MessageReader reader; reader.load(decompressString(s)); int numFiles = reader.readPrimitive(); diff --git a/src/base/FileSystem.hpp b/src/base/FileSystem.hpp index 356432c..a42ecc0 100644 --- a/src/base/FileSystem.hpp +++ b/src/base/FileSystem.hpp @@ -11,7 +11,7 @@ class FileSystem { } virtual optional getNode(const string &path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); auto it = allFileData.find(path); if (it == allFileData.end()) { return nullopt; @@ -20,7 +20,7 @@ class FileSystem { } void setNode(const FileData &fileData) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); allFileData.erase(fileData.path()); if (fileData.deleted()) { // The node is deleted, Don't add @@ -35,7 +35,7 @@ class FileSystem { } void createStub(const string &path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); FileData stub; stub.set_path(path); stub.set_invalid(true); @@ -43,7 +43,7 @@ class FileSystem { } void deleteNode(const string &path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); auto it = allFileData.find(path); if (it != allFileData.end()) { allFileData.erase(it); @@ -53,8 +53,8 @@ class FileSystem { virtual string absoluteToRelative(const string &absolutePath) { if (absolutePath.find(rootPath) != 0) { LOGFATAL << "Tried to convert absolute path to fuse that wasn't inside " - "the absolute FS: " - << absolutePath << " " << rootPath; + "the absolute FS: " + << absolutePath << " " << rootPath; } string relative = absolutePath.substr(rootPath.size()); if (relative.length() == 0) { @@ -107,7 +107,7 @@ class FileSystem { protected: string rootPath; shared_ptr fuseThread; - std::recursive_mutex fileDataMutex; + std::recursive_mutex mutex; }; } // namespace codefs diff --git a/src/client/Client.cpp b/src/client/Client.cpp index 850505b..d573ac5 100644 --- a/src/client/Client.cpp +++ b/src/client/Client.cpp @@ -4,7 +4,7 @@ namespace codefs { Client::Client(const string& _address, shared_ptr _fileSystem) - : address(_address), fileSystem(_fileSystem), fdCounter(1) { + : address(_address), fileSystem(_fileSystem) { MessageReader reader; MessageWriter writer; rpc = @@ -18,7 +18,7 @@ Client::Client(const string& _address, shared_ptr _fileSystem) while (true) { LOG(INFO) << "Waiting for init..."; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); rpc->update(); rpc->heartbeat(); if (rpc->hasIncomingReplyWithId(initId)) { @@ -37,7 +37,7 @@ Client::Client(const string& _address, shared_ptr _fileSystem) int Client::update() { MessageReader reader; MessageWriter writer; - lock_guard lock(rpcMutex); + lock_guard lock(mutex); rpc->update(); while (rpc->hasIncomingRequest()) { @@ -50,7 +50,7 @@ int Client::update() { case SERVER_CLIENT_METADATA_UPDATE: { string path = reader.readPrimitive(); LOG(INFO) << "UPDATING PATH: " << path; - cachedStatVfsProto.reset(); + fileSystem->invalidateVfsCache(); FileData fileData = reader.readProto(); if (fileData.invalid()) { LOGFATAL << "Got filedata with invalid set"; @@ -83,7 +83,7 @@ vector> Client::getNodes(const vector& paths) { vector pathsToDownload = fileSystem->getPathsToDownload(path); VLOG(1) << "Number of paths: " << pathsToDownload.size(); for (auto it : pathsToDownload) { - VLOG(1) << "GETTING SCAN FOR PATH: " << it; + LOG(INFO) << "GETTING SCAN FOR PATH: " << it; metadataToFetch.push_back(it); } } @@ -91,7 +91,7 @@ vector> Client::getNodes(const vector& paths) { if (!metadataToFetch.empty()) { RpcId id; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); MessageWriter writer; writer.start(); writer.writePrimitive(CLIENT_SERVER_FETCH_METADATA); @@ -105,9 +105,9 @@ vector> Client::getNodes(const vector& paths) { rpcIds.push_back(id); string result; while (true) { - usleep(0); + usleep(1); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); if (rpc->hasIncomingReplyWithId(id)) { result = rpc->consumeIncomingReplyWithId(id); break; @@ -115,7 +115,7 @@ vector> Client::getNodes(const vector& paths) { } } { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); MessageReader reader; reader.load(result); while (reader.sizeRemaining()) { @@ -144,7 +144,7 @@ vector> Client::getNodes(const vector& paths) { "from server"; string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); MessageWriter writer; writer.start(); writer.writePrimitive( @@ -155,7 +155,7 @@ vector> Client::getNodes(const vector& paths) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); MessageReader reader; reader.load(result); auto path = reader.readPrimitive(); @@ -209,9 +209,9 @@ int Client::open(const string& path, int flags) { int readWriteMode = (flags & O_ACCMODE); bool readOnly = (readWriteMode == O_RDONLY); LOG(INFO) << "Reading file " << path << " (readonly? " << readOnly << ")"; - int fd = fdCounter++; + int fd = fileSystem->getNewFd(); - if (ownedFileContents.find(path) == ownedFileContents.end()) { + if (!fileSystem->ownsPathContents(path)) { if (!readOnly) { fileSystem->invalidatePathAndParent(path); } @@ -219,13 +219,12 @@ int Client::open(const string& path, int flags) { auto cachedData = fileSystem->getCachedFile(path); if (readOnly && cachedData) { LOG(INFO) << "FETCHING FROM FILE CACHE: " << cachedData->size(); - ownedFileContents.insert( - make_pair(path, OwnedFileInfo(fd, *cachedData, readOnly))); + fileSystem->addOwnedFileContents(path, fd, *cachedData, readOnly); } else { string payload; { - lock_guard lock(rpcMutex); - cachedStatVfsProto.reset(); + lock_guard lock(mutex); + fileSystem->invalidateVfsCache(); writer.start(); writer.writePrimitive(CLIENT_SERVER_REQUEST_FILE); writer.writePrimitive(path); @@ -234,7 +233,7 @@ int Client::open(const string& path, int flags) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int rpcErrno = reader.readPrimitive(); if (rpcErrno) { @@ -244,16 +243,12 @@ int Client::open(const string& path, int flags) { string fileContents = decompressString(reader.readPrimitive()); LOG(INFO) << "READ FILE: " << path << " WITH CONTENTS SIZE " << fileContents.size(); - ownedFileContents.insert( - make_pair(path, OwnedFileInfo(fd, fileContents, readOnly))); + fileSystem->addOwnedFileContents(path, fd, fileContents, readOnly); } } } else { LOG(INFO) << "FILE IS ALREADY IN LOCAL CACHE, SKIPPING READ"; - ownedFileContents.at(path).fds.insert(fd); - if (!readOnly) { - ownedFileContents.at(path).readOnly = false; - } + fileSystem->addHandleToOwnedFile(path, fd, readOnly); } return fd; } @@ -264,9 +259,9 @@ int Client::create(const string& path, int flags, mode_t mode) { int readWriteMode = (flags & O_ACCMODE); bool readOnly = (readWriteMode == O_RDONLY); LOG(INFO) << "Creating file " << path << " (readonly? " << readOnly << ")"; - int fd = fdCounter++; + int fd = fileSystem->getNewFd(); - if (ownedFileContents.find(path) == ownedFileContents.end()) { + if (!fileSystem->ownsPathContents(path)) { if (!readOnly) { fileSystem->invalidatePathAndParent(path); } @@ -277,8 +272,8 @@ int Client::create(const string& path, int flags, mode_t mode) { } else { string payload; { - lock_guard lock(rpcMutex); - cachedStatVfsProto.reset(); + lock_guard lock(mutex); + fileSystem->invalidateVfsCache(); writer.start(); writer.writePrimitive(CLIENT_SERVER_CREATE_FILE); writer.writePrimitive(path); @@ -290,7 +285,7 @@ int Client::create(const string& path, int flags, mode_t mode) { fileSystem->createStub(path); string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int rpcErrno = reader.readPrimitive(); if (rpcErrno) { @@ -299,8 +294,7 @@ int Client::create(const string& path, int flags, mode_t mode) { return -1; } LOG(INFO) << "CREATED FILE: " << path; - ownedFileContents.insert( - make_pair(path, OwnedFileInfo(fd, "", readOnly))); + fileSystem->addOwnedFileContents(path, fd, "", readOnly); } } } else { @@ -313,41 +307,33 @@ int Client::create(const string& path, int flags, mode_t mode) { int Client::close(const string& path, int fd) { MessageReader reader; MessageWriter writer; - auto& ownedFile = ownedFileContents.at(path); - if (ownedFile.fds.find(fd) == ownedFile.fds.end()) { - LOGFATAL << "Tried to close a file handle that is not owned"; - } - if (!ownedFile.readOnly) { - LOG(INFO) << "Invalidating path"; - fileSystem->invalidatePathAndParent(path); - } + + bool readOnly; + string content; + fileSystem->closeOwnedFile(path, fd, &readOnly, &content); + string payload; { - lock_guard lock(rpcMutex); - cachedStatVfsProto.reset(); + lock_guard lock(mutex); + fileSystem->invalidateVfsCache(); writer.start(); writer.writePrimitive(CLIENT_SERVER_RETURN_FILE); writer.writePrimitive(path); - writer.writePrimitive(ownedFile.readOnly); - fileSystem->setCachedFile(path, ownedFile.content); - if (ownedFile.readOnly) { + writer.writePrimitive(readOnly); + fileSystem->setCachedFile(path, content); + if (readOnly) { LOG(INFO) << "RETURNED FILE " << path << " TO SERVER READ-ONLY"; } else { - writer.writePrimitive(compressString(ownedFile.content)); + writer.writePrimitive(compressString(content)); LOG(INFO) << "RETURNED FILE " << path << " TO SERVER WITH " - << ownedFile.content.size() << " BYTES"; + << content.size() << " BYTES"; } payload = writer.finish(); } - ownedFile.fds.erase(ownedFile.fds.find(fd)); - if (ownedFile.fds.empty()) { - ownedFileContents.erase(path); - } - string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -358,48 +344,23 @@ int Client::close(const string& path, int fd) { return 0; } } + int Client::pread(const string& path, char* buf, int64_t size, int64_t offset) { - auto it = ownedFileContents.find(path); - if (it == ownedFileContents.end()) { - LOGFATAL << "TRIED TO READ AN INVALID PATH"; - } - const auto& content = it->second.content; - if (offset >= int64_t(content.size())) { - return 0; - } - auto start = content.c_str() + offset; - int64_t actualSize = min(int64_t(content.size()), offset + size) - offset; - LOG(INFO) << content.size() << " " << size << " " << offset << " " - << actualSize << endl; - memcpy(buf, start, actualSize); - return actualSize; + return fileSystem->readOwnedFile(path, buf, size, offset); } + int Client::pwrite(const string& path, const char* buf, int64_t size, int64_t offset) { - auto it = ownedFileContents.find(path); - if (it == ownedFileContents.end()) { - LOGFATAL << "TRIED TO READ AN INVALID PATH: " << path; - } - if (it->second.readOnly) { - LOGFATAL << "Tried to write to a read-only file: " << path; - } - auto& content = it->second.content; - LOG(INFO) << "WRITING " << size << " TO " << path << " AT " << offset; - if (int64_t(content.size()) < offset + size) { - content.resize(offset + size, '\0'); - } - memcpy(&(content[offset]), buf, size); - return size; + return fileSystem->writeOwnedFile(path, buf, size, offset); } int Client::mkdir(const string& path, mode_t mode) { MessageReader reader; MessageWriter writer; - cachedStatVfsProto.reset(); fileSystem->invalidatePathAndParent(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_MKDIR); writer.writePrimitive(path); @@ -408,7 +369,7 @@ int Client::mkdir(const string& path, mode_t mode) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -420,41 +381,31 @@ int Client::mkdir(const string& path, mode_t mode) { } int Client::unlink(const string& path) { - cachedStatVfsProto.reset(); fileSystem->invalidatePathAndParent(path); return singlePathNoReturn(CLIENT_SERVER_UNLINK, path); } int Client::rmdir(const string& path) { - cachedStatVfsProto.reset(); fileSystem->invalidatePathAndParent(path); return singlePathNoReturn(CLIENT_SERVER_RMDIR, path); } int Client::symlink(const string& from, const string& to) { - cachedStatVfsProto.reset(); fileSystem->invalidatePathAndParent(from); fileSystem->invalidatePathAndParent(to); return twoPathsNoReturn(CLIENT_SERVER_SYMLINK, from, to); } int Client::rename(const string& from, const string& to) { - cachedStatVfsProto.reset(); + fileSystem->invalidateVfsCache(); LOG(INFO) << "RENAMING FROM " << from << " TO " << to; - if (ownedFileContents.find(to) != ownedFileContents.end()) { - LOGFATAL << "I don't handle renaming from one open file to another yet"; - } - if (ownedFileContents.find(from) != ownedFileContents.end()) { - ownedFileContents.insert(make_pair(to, ownedFileContents.at(from))); - ownedFileContents.erase(ownedFileContents.find(from)); - } + fileSystem->renameOwnedFileIfItExists(from, to); fileSystem->invalidatePathAndParentAndChildren(from); fileSystem->invalidatePathAndParentAndChildren(to); return twoPathsNoReturn(CLIENT_SERVER_RENAME, from, to); } int Client::link(const string& from, const string& to) { - cachedStatVfsProto.reset(); fileSystem->invalidatePathAndParent(from); fileSystem->invalidatePathAndParent(to); return twoPathsNoReturn(CLIENT_SERVER_LINK, from, to); @@ -466,7 +417,7 @@ int Client::chmod(const string& path, int mode) { fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_CHMOD); writer.writePrimitive(path); @@ -475,7 +426,7 @@ int Client::chmod(const string& path, int mode) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -491,7 +442,7 @@ int Client::lchown(const string& path, int64_t uid, int64_t gid) { fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_LCHOWN); writer.writePrimitive(path); @@ -501,7 +452,7 @@ int Client::lchown(const string& path, int64_t uid, int64_t gid) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -514,16 +465,15 @@ int Client::lchown(const string& path, int64_t uid, int64_t gid) { int Client::truncate(const string& path, int64_t size) { MessageReader reader; MessageWriter writer; - cachedStatVfsProto.reset(); - if (ownedFileContents.find(path) != ownedFileContents.end()) { - ownedFileContents.at(path).content.resize(size, '\0'); + fileSystem->invalidateVfsCache(); + if (fileSystem->truncateOwnedFileIfExists(path, size)) { return 0; } fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_TRUNCATE); writer.writePrimitive(path); @@ -532,7 +482,7 @@ int Client::truncate(const string& path, int64_t size) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -547,19 +497,21 @@ int Client::statvfs(struct statvfs* stbuf) { MessageReader reader; MessageWriter writer; - if (cachedStatVfsProto) { - statVfsProto = *cachedStatVfsProto; + auto cachedVfs = fileSystem->getVfsCache(); + + if (cachedVfs) { + statVfsProto = *cachedVfs; } else { string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_STATVFS); payload = writer.finish(); } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -568,7 +520,7 @@ int Client::statvfs(struct statvfs* stbuf) { errno = rpcErrno; return res; } - cachedStatVfsProto = statVfsProto; + fileSystem->setVfsCache(statVfsProto); } } stbuf->f_bsize = statVfsProto.bsize(); @@ -591,7 +543,7 @@ int Client::utimensat(const string& path, const struct timespec ts[2]) { fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_UTIMENSAT); writer.writePrimitive(path); @@ -603,7 +555,7 @@ int Client::utimensat(const string& path, const struct timespec ts[2]) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -619,7 +571,7 @@ int Client::lremovexattr(const string& path, const string& name) { fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_LREMOVEXATTR); writer.writePrimitive(path); @@ -628,7 +580,7 @@ int Client::lremovexattr(const string& path, const string& name) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -645,7 +597,7 @@ int Client::lsetxattr(const string& path, const string& name, fileSystem->invalidatePath(path); string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(CLIENT_SERVER_LSETXATTR); writer.writePrimitive(path); @@ -657,7 +609,7 @@ int Client::lsetxattr(const string& path, const string& name, } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -674,7 +626,7 @@ int Client::twoPathsNoReturn(unsigned char header, const string& from, MessageWriter writer; string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(header); writer.writePrimitive(from); @@ -683,7 +635,7 @@ int Client::twoPathsNoReturn(unsigned char header, const string& from, } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -699,7 +651,7 @@ int Client::singlePathNoReturn(unsigned char header, const string& path) { MessageWriter writer; string payload; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); writer.start(); writer.writePrimitive(header); writer.writePrimitive(path); @@ -707,7 +659,7 @@ int Client::singlePathNoReturn(unsigned char header, const string& path) { } string result = fileRpc(payload); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); reader.load(result); int res = reader.readPrimitive(); int rpcErrno = reader.readPrimitive(); @@ -721,13 +673,13 @@ int Client::singlePathNoReturn(unsigned char header, const string& path) { string Client::fileRpc(const string& payload) { RpcId id; { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); id = rpc->request(payload); } while (true) { - usleep(0); + usleep(1); { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); if (rpc->hasIncomingReplyWithId(id)) { return rpc->consumeIncomingReplyWithId(id); } diff --git a/src/client/Client.hpp b/src/client/Client.hpp index 5627551..0098e69 100644 --- a/src/client/Client.hpp +++ b/src/client/Client.hpp @@ -6,23 +6,12 @@ #include "ZmqBiDirectionalRpc.hpp" namespace codefs { -struct OwnedFileInfo { - unordered_set fds; - string content; - bool readOnly; - - OwnedFileInfo(int fd, string _content, bool _readOnly) - : content(_content), readOnly(_readOnly) { - fds.insert(fd); - } -}; - class Client { public: Client(const string& _address, shared_ptr _fileSystem); int update(); inline void heartbeat() { - lock_guard lock(rpcMutex); + lock_guard lock(mutex); rpc->heartbeat(); } @@ -60,21 +49,14 @@ class Client { int64_t size, int flags); optional getSizeOverride(const string& path) { - auto it = ownedFileContents.find(path); - if (it == ownedFileContents.end()) { - return optional(); - } - return int64_t(it->second.content.size()); + return fileSystem->getSizeOverride(path); } protected: string address; shared_ptr rpc; shared_ptr fileSystem; - unordered_map ownedFileContents; - recursive_mutex rpcMutex; - optional cachedStatVfsProto; - int fdCounter; + recursive_mutex mutex; int twoPathsNoReturn(unsigned char header, const string& from, const string& to); int singlePathNoReturn(unsigned char header, const string& path); diff --git a/src/client/ClientFileSystem.hpp b/src/client/ClientFileSystem.hpp index 582cba5..ebc5808 100644 --- a/src/client/ClientFileSystem.hpp +++ b/src/client/ClientFileSystem.hpp @@ -4,14 +4,36 @@ #include "FileSystem.hpp" namespace codefs { +class OwnedFileInfo { + public: + unordered_set fds; + string content; + bool readOnly; + + OwnedFileInfo() : readOnly(false) {} + + OwnedFileInfo(int fd, string _content, bool _readOnly) + : content(_content), readOnly(_readOnly) { + fds.insert(fd); + } + + explicit OwnedFileInfo(const OwnedFileInfo& other) { + fds = other.fds; + content = other.content; + readOnly = other.readOnly; + } +}; + class ClientFileSystem : public FileSystem { public: - explicit ClientFileSystem(const string& _rootPath) : FileSystem(_rootPath) {} + explicit ClientFileSystem(const string& _rootPath) + : FileSystem(_rootPath), fdCounter(1) {} virtual ~ClientFileSystem() {} inline void invalidatePath(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); + invalidateVfsCache(); LOG(INFO) << "INVALIDATING PATH: " << path; { auto it = fileCache.find(path); @@ -31,13 +53,15 @@ class ClientFileSystem : public FileSystem { } inline void invalidatePathAndParent(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); + invalidateVfsCache(); invalidatePath(boost::filesystem::path(path).parent_path().string()); invalidatePath(path); } inline void invalidatePathAndParentAndChildren(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); + invalidateVfsCache(); invalidatePathAndParent(path); for (auto& it : allFileData) { if (it.first.find(path + string("/")) == 0) { @@ -48,6 +72,7 @@ class ClientFileSystem : public FileSystem { } inline vector getPathsToDownload(const string& path) { + std::lock_guard lock(mutex); auto it = allFileData.find(path); if (it != allFileData.end()) { // We already have this path, let's make sure we also have all the @@ -86,7 +111,7 @@ class ClientFileSystem : public FileSystem { } inline optional getCachedFile(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); auto it = fileCache.find(path); if (it == fileCache.end()) { return optional(); @@ -96,12 +121,140 @@ class ClientFileSystem : public FileSystem { } inline void setCachedFile(const string& path, const string& data) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); fileCache[path] = data; } + inline void invalidateVfsCache() { + std::lock_guard lock(mutex); + cachedStatVfsProto.reset(); + } + + inline int getNewFd() { + std::lock_guard lock(mutex); + fdCounter++; + return fdCounter; + } + + inline bool ownsPathContents(const string& path) { + std::lock_guard lock(mutex); + return ownedFileContents.find(path) != ownedFileContents.end(); + } + + inline void addOwnedFileContents(const string& path, int fd, + const string& cachedData, bool readOnly) { + std::lock_guard lock(mutex); + ownedFileContents[path] = OwnedFileInfo(fd, cachedData, readOnly); + } + + inline void addHandleToOwnedFile(const string& path, int fd, bool readOnly) { + std::lock_guard lock(mutex); + ownedFileContents.at(path).fds.insert(fd); + if (!readOnly) { + ownedFileContents.at(path).readOnly = false; + } + } + + inline int readOwnedFile(const string& path, char* buf, int64_t size, + int64_t offset) { + std::lock_guard lock(mutex); + auto it = ownedFileContents.find(path); + if (it == ownedFileContents.end()) { + LOGFATAL << "TRIED TO READ AN INVALID PATH"; + } + const auto& content = it->second.content; + if (offset >= int64_t(content.size())) { + return 0; + } + auto start = content.c_str() + offset; + int64_t actualSize = min(int64_t(content.size()), offset + size) - offset; + LOG(INFO) << content.size() << " " << size << " " << offset << " " + << actualSize << endl; + memcpy(buf, start, actualSize); + return actualSize; + } + + inline int writeOwnedFile(const string& path, const char* buf, int64_t size, + int64_t offset) { + std::lock_guard lock(mutex); + auto it = ownedFileContents.find(path); + if (it == ownedFileContents.end()) { + LOGFATAL << "TRIED TO READ AN INVALID PATH: " << path; + } + if (it->second.readOnly) { + LOGFATAL << "Tried to write to a read-only file: " << path; + } + auto& content = it->second.content; + LOG(INFO) << "WRITING " << size << " TO " << path << " AT " << offset; + if (int64_t(content.size()) < offset + size) { + content.resize(offset + size, '\0'); + } + memcpy(&(content[offset]), buf, size); + return size; + } + + inline void closeOwnedFile(const string& path, int fd, bool* readOnly, + string* content) { + std::lock_guard lock(mutex); + auto& ownedFile = ownedFileContents.at(path); + if (ownedFile.fds.find(fd) == ownedFile.fds.end()) { + LOGFATAL << "Tried to close a file handle that is not owned"; + } + if (!ownedFile.readOnly) { + LOG(INFO) << "Invalidating path"; + invalidatePathAndParent(path); + } + *readOnly = ownedFile.readOnly; + *content = ownedFile.content; + ownedFile.fds.erase(ownedFile.fds.find(fd)); + if (ownedFile.fds.empty()) { + ownedFileContents.erase(path); + } + } + + optional getSizeOverride(const string& path) { + std::lock_guard lock(mutex); + auto it = ownedFileContents.find(path); + if (it == ownedFileContents.end()) { + return optional(); + } + return int64_t(it->second.content.size()); + } + + bool truncateOwnedFileIfExists(const string& path, int64_t size) { + std::lock_guard lock(mutex); + if (ownedFileContents.find(path) != ownedFileContents.end()) { + ownedFileContents.at(path).content.resize(size, '\0'); + return true; + } + return false; + } + + optional getVfsCache() { + std::lock_guard lock(mutex); + return cachedStatVfsProto; + } + + void setVfsCache(const StatVfsData& vfs) { + std::lock_guard lock(mutex); + cachedStatVfsProto = vfs; + } + + void renameOwnedFileIfItExists(const string& from, const string& to) { + if (ownedFileContents.find(to) != ownedFileContents.end()) { + LOGFATAL << "I don't handle renaming from one open file to another yet"; + } + if (ownedFileContents.find(from) != ownedFileContents.end()) { + ownedFileContents.insert(make_pair(to, ownedFileContents.at(from))); + ownedFileContents.erase(ownedFileContents.find(from)); + } + } + protected: unordered_map fileCache; + unordered_map ownedFileContents; + optional cachedStatVfsProto; + int fdCounter; }; } // namespace codefs diff --git a/src/client/ClientFuseAdapter.cpp b/src/client/ClientFuseAdapter.cpp index a03ce66..89f3583 100644 --- a/src/client/ClientFuseAdapter.cpp +++ b/src/client/ClientFuseAdapter.cpp @@ -36,7 +36,6 @@ static int codefs_access(const char *path, int mask) { return -1 * ENOENT; } if (mask == 0) { - LOG(INFO) << "MASK IS 0"; return 0; } @@ -250,7 +249,7 @@ static int codefs_getattr(const char *path, struct stat *stbuf) { LOGFATAL << "Tried to getattr with a NULL stat object"; } - LOG(INFO) << "GETTING ATTR FOR PATH: " << path; + VLOG(2) << "GETTING ATTR FOR PATH: " << path; optional fileDataPtr = client->getNode(path); if (!fileDataPtr) { LOG(INFO) << "File doesn't exist"; diff --git a/src/client/Main.cpp b/src/client/Main.cpp index 2b0eb02..3bca08a 100644 --- a/src/client/Main.cpp +++ b/src/client/Main.cpp @@ -22,7 +22,8 @@ static const struct fuse_opt codefs_opts[] = { void runFuse(char *binaryLocation, shared_ptr client, shared_ptr fileSystem) { - vector fuseFlags = {binaryLocation, FLAGS_mountpoint.c_str(), "-s"}; + vector fuseFlags = {binaryLocation, + FLAGS_mountpoint.c_str()}; //, "-s"}; #if __APPLE__ // OSXFUSE has a timeout in the kernel. Because we can block on network // failure, we disable this timeout @@ -114,7 +115,7 @@ int main(int argc, char *argv[]) { client->heartbeat(); lastHeartbeatTime = std::chrono::high_resolution_clock::now(); } - usleep(0); + usleep(1); } }); diff --git a/src/server/Main.cpp b/src/server/Main.cpp index a13fa40..f1a9079 100644 --- a/src/server/Main.cpp +++ b/src/server/Main.cpp @@ -169,7 +169,7 @@ int main(int argc, char *argv[]) { server->heartbeat(); lastHeartbeatTime = std::chrono::high_resolution_clock::now(); } - usleep(0); + usleep(1); } } } // namespace codefs diff --git a/src/server/ServerFileSystem.cpp b/src/server/ServerFileSystem.cpp index 0ecd0c4..13cb937 100644 --- a/src/server/ServerFileSystem.cpp +++ b/src/server/ServerFileSystem.cpp @@ -14,18 +14,18 @@ void ServerFileSystem::init() { } void ServerFileSystem::rescanPath(const string& absolutePath) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); scanNode(absolutePath); } string ServerFileSystem::readFile(const string& path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return fileToStr(relativeToAbsolute(path)); } int ServerFileSystem::writeFile(const string& path, const string& fileContents) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); FILE* fp = ::fopen(relativeToAbsolute(path).c_str(), "wb"); if (fp == NULL) { return -1; @@ -62,7 +62,7 @@ void ServerFileSystem::scanRecursively( return; } - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); VLOG(1) << "SCANNING DIRECTORY " << path_string; scanNode(path_string); @@ -109,7 +109,7 @@ void ServerFileSystem::scanNode(const string& path) { FileData fd; { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); VLOG(1) << "SCANNING NODE : " << path; fd.set_path(absoluteToRelative(path)); @@ -128,7 +128,7 @@ void ServerFileSystem::scanNode(const string& path) { } else { VLOG(1) << "FILE IS GONE: " << path << " " << errno; { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); allFileData.erase(absoluteToRelative(path)); } fd.set_deleted(true); @@ -169,7 +169,7 @@ void ServerFileSystem::scanNode(const string& path) { VLOG(1) << "FILE IS GONE: " << path << " " << errno; // The file is gone { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); allFileData.erase(absoluteToRelative(path)); } fd.set_deleted(true); @@ -275,7 +275,7 @@ void ServerFileSystem::scanNode(const string& path) { VLOG(1) << "SETTING: " << absoluteToRelative(path); { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); allFileData[absoluteToRelative(path)] = fd; } } diff --git a/src/server/ServerFileSystem.hpp b/src/server/ServerFileSystem.hpp index 9eca24e..f4fcf26 100644 --- a/src/server/ServerFileSystem.hpp +++ b/src/server/ServerFileSystem.hpp @@ -23,7 +23,7 @@ class ServerFileSystem : public FileSystem { void rescanPath(const string &absolutePath); inline void rescanPathAndParent(const string &absolutePath) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); rescanPath(absolutePath); if (absoluteToRelative(absolutePath) != string("/")) { LOG(INFO) << "RESCANNING PARENT"; @@ -32,7 +32,7 @@ class ServerFileSystem : public FileSystem { } inline void rescanPathAndParentAndChildren(const string &absolutePath) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); if (absoluteToRelative(absolutePath) != string("/")) { rescanPath(boost::filesystem::path(absolutePath).parent_path().string()); } @@ -40,7 +40,7 @@ class ServerFileSystem : public FileSystem { } inline void rescanPathAndChildren(const string &absolutePath) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); auto node = getNode(absolutePath); if (node) { // scan node and known children envelope for deletion/update @@ -65,61 +65,61 @@ class ServerFileSystem : public FileSystem { int writeFile(const string &path, const string &fileContents); int mkdir(const string &path, mode_t mode) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::mkdir(relativeToAbsolute(path).c_str(), mode); } int unlink(const string &path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::unlink(relativeToAbsolute(path).c_str()); } int rmdir(const string &path) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::rmdir(relativeToAbsolute(path).c_str()); } int symlink(const string &from, const string &to) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::symlink(relativeToAbsolute(from).c_str(), relativeToAbsolute(to).c_str()); } int link(const string &from, const string &to) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::link(relativeToAbsolute(from).c_str(), relativeToAbsolute(to).c_str()); } int rename(const string &from, const string &to) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); return ::rename(relativeToAbsolute(from).c_str(), relativeToAbsolute(to).c_str()); } int chmod(const string &path, mode_t mode) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::chmod(relativeToAbsolute(path).c_str(), mode); rescanPath(relativeToAbsolute(path)); return res; } int lchown(const string &path, int64_t uid, int64_t gid) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::lchown(relativeToAbsolute(path).c_str(), uid, gid); rescanPath(relativeToAbsolute(path)); return res; } int truncate(const string &path, int64_t size) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::truncate(relativeToAbsolute(path).c_str(), size); rescanPath(relativeToAbsolute(path)); return res; } int utimensat(const string &path, struct timespec ts[2]) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::utimensat(0, relativeToAbsolute(path).c_str(), ts, AT_SYMLINK_NOFOLLOW); rescanPath(relativeToAbsolute(path)); @@ -127,7 +127,7 @@ class ServerFileSystem : public FileSystem { } int lremovexattr(const string &path, const string &name) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::lremovexattr(relativeToAbsolute(path).c_str(), name.c_str()); rescanPath(relativeToAbsolute(path)); return res; @@ -135,7 +135,7 @@ class ServerFileSystem : public FileSystem { int lsetxattr(const string &path, const string &name, const string &value, int64_t size, int flags) { - std::lock_guard lock(fileDataMutex); + std::lock_guard lock(mutex); int res = ::lsetxattr(relativeToAbsolute(path).c_str(), name.c_str(), value.c_str(), size, flags); rescanPath(relativeToAbsolute(path));