forked from zbackup/zbackup
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchunk_storage.hh
147 lines (117 loc) · 4.3 KB
/
chunk_storage.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) 2012-2014 Konstantin Isakov <[email protected]> and ZBackup contributors, see CONTRIBUTORS
// Part of ZBackup. Licensed under GNU GPLv2 or later + OpenSSL, see LICENSE
#ifndef CHUNK_STORAGE_HH_INCLUDED
#define CHUNK_STORAGE_HH_INCLUDED
#include <stddef.h>
#include <exception>
#include <string>
#include <utility>
#include <vector>
#include "bundle.hh"
#include "chunk_id.hh"
#include "chunk_index.hh"
#include "encryption_key.hh"
#include "ex.hh"
#include "file.hh"
#include "index_file.hh"
#include "mt.hh"
#include "nocopy.hh"
#include "objectcache.hh"
#include "sptr.hh"
#include "tmp_mgr.hh"
#include "zbackup.pb.h"
#include "config.hh"
namespace ChunkStorage {
using std::string;
using std::vector;
using std::pair;
DEF_EX( Ex, "Chunk storage exception", std::exception )
/// Allows adding new chunks to the storage by filling up new bundles with them
/// and writing new index files
class Writer: NoCopy
{
public:
/// All new bundles and index files are created as temp files. Call commit()
/// to move them to their permanent locations. commit() is never called
/// automatically!
Writer( Config const &, EncryptionKey const &,
TmpMgr &, ChunkIndex & index, string const & bundlesDir,
string const & indexDir, size_t maxCompressorsToRun );
/// Adds the given chunk to the store. If such a chunk has already existed
/// in the index, does nothing and returns false
bool add( ChunkId const &, void const * data, size_t size );
/// Adds an existing bundle to the index
void addBundle( BundleInfo const &, Bundle::Id const & bundleId );
/// Commits all newly created bundles. Must be called before destroying the
/// object -- otherwise all work will be removed from the temp dir and lost
void commit();
/// Throw away all current changes.
void reset();
~Writer();
private:
/// Performs the compression in a separate thread. Destroys itself once done
class Compressor: public Thread
{
Writer & writer;
sptr< Bundle::Creator > bundleCreator;
string fileName;
Config const & config;
public:
Compressor( Config const &, Writer &, sptr< Bundle::Creator > const &,
string const & fileName );
protected:
virtual void * threadFunction() throw();
};
friend class Compressor;
/// Returns the id of the currently written bundle. If there's none, generates
/// one. If a bundle hasn't yet started, still generates it - once the bundle
/// is started, it will be used then
Bundle::Id const & getCurrentBundleId();
/// Returns *currentBundle or creates a new one
Bundle::Creator & getCurrentBundle();
/// Writes the current bundle and deallocates it
void finishCurrentBundle();
/// Wait for all compressors to finish
void waitForAllCompressorsToFinish();
Config const & config;
EncryptionKey const & encryptionKey;
TmpMgr & tmpMgr;
ChunkIndex & index;
string bundlesDir, indexDir;
sptr< TemporaryFile > indexTempFile;
sptr< IndexFile::Writer > indexFile;
sptr< Bundle::Creator > currentBundle;
Bundle::Id currentBundleId;
bool hasCurrentBundleId;
size_t maxCompressorsToRun;
Mutex runningCompressorsMutex;
Condition runningCompressorsCondition;
size_t runningCompressors;
/// Maps temp file of the bundle to its id blob
typedef pair< sptr< TemporaryFile >, Bundle::Id > PendingBundleRename;
vector< PendingBundleRename > pendingBundleRenames;
};
/// Allows retrieving existing chunks by extracting them from the bundles with
/// the help of an Index object
class Reader: NoCopy
{
public:
DEF_EX_STR( exNoSuchChunk, "no such chunk found:", Ex )
Reader( Config const &, EncryptionKey const &, ChunkIndex & index,
string const & bundlesDir, size_t maxCacheSizeBytes );
Bundle::Id const * getBundleId( ChunkId const &, size_t & size );
/// Loads the given chunk from the store into the given buffer. May throw file
/// and decompression exceptions. 'data' may be enlarged but won't be shrunk.
/// The size of the actual chunk would be stored in 'size'
void get( ChunkId const &, string & data, size_t & size );
/// Retrieves the reader for the given bundle id. May employ caching
Bundle::Reader & getReaderFor( Bundle::Id const & );
private:
Config const & config;
EncryptionKey const & encryptionKey;
ChunkIndex & index;
string bundlesDir;
ObjectCache cachedReaders;
};
}
#endif