-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(space-store): add idb implementation
- Loading branch information
Showing
13 changed files
with
775 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,25 @@ | ||
{ | ||
"name": "@affine/space-store", | ||
"type": "module", | ||
"version": "0.17.0", | ||
"version": "0.15.0", | ||
"private": true, | ||
"sideEffects": false, | ||
"exports": { | ||
".": "./index.ts" | ||
".": "./index.ts", | ||
"./idb": "./src/impls/idb/index.ts", | ||
"./idb/v1": "./src/impls/idb/v1/index.ts" | ||
}, | ||
"dependencies": { | ||
"@toeverything/infra": "workspace:*", | ||
"eventemitter2": "^6.4.9", | ||
"lodash-es": "^4.17.21", | ||
"rxjs": "^7.8.1", | ||
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" | ||
}, | ||
"devDependencies": { | ||
"idb": "^8.0.0" | ||
}, | ||
"peerDependencies": { | ||
"idb": "^8.0.0" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import { share } from '../../connection'; | ||
import { | ||
type BlobRecord, | ||
BlobStorage, | ||
type BlobStorageOptions, | ||
type ListedBlobRecord, | ||
} from '../../storage'; | ||
import { IDBConnection } from './db'; | ||
|
||
export interface IndexedDBBlobStorageOptions extends BlobStorageOptions { | ||
dbName: string; | ||
} | ||
|
||
export class IndexedDBBlobStorage extends BlobStorage<IndexedDBBlobStorageOptions> { | ||
readonly connection = share(new IDBConnection(this.options.dbName)); | ||
|
||
get db() { | ||
return this.connection.inner; | ||
} | ||
|
||
override async get(key: string) { | ||
const trx = this.db.transaction(['blobs', 'blobData'], 'readonly'); | ||
const blob = await trx.objectStore('blobs').get(key); | ||
const data = await trx.objectStore('blobData').get(key); | ||
|
||
if (!blob || blob.deletedAt || !data) { | ||
return null; | ||
} | ||
|
||
return { | ||
...blob, | ||
data: data.data, | ||
}; | ||
} | ||
|
||
override async set(blob: BlobRecord) { | ||
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); | ||
await trx.objectStore('blobs').put({ | ||
key: blob.key, | ||
mime: blob.mime, | ||
size: blob.data.byteLength, | ||
createdAt: new Date(), | ||
deletedAt: null, | ||
}); | ||
await trx.objectStore('blobData').put({ | ||
key: blob.key, | ||
data: blob.data, | ||
}); | ||
} | ||
|
||
override async delete(key: string, permanently: boolean) { | ||
if (permanently) { | ||
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); | ||
await trx.objectStore('blobs').delete(key); | ||
await trx.objectStore('blobData').delete(key); | ||
} else { | ||
const trx = this.db.transaction('blobs', 'readwrite'); | ||
const blob = await trx.store.get(key); | ||
if (blob) { | ||
await trx.store.put({ | ||
...blob, | ||
deletedAt: new Date(), | ||
}); | ||
} | ||
} | ||
} | ||
|
||
override async release() { | ||
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); | ||
|
||
const it = trx.objectStore('blobs').iterate(); | ||
|
||
for await (const item of it) { | ||
if (item.value.deletedAt) { | ||
await item.delete(); | ||
await trx.objectStore('blobData').delete(item.value.key); | ||
} | ||
} | ||
} | ||
|
||
override async list() { | ||
const trx = this.db.transaction('blobs', 'readonly'); | ||
const it = trx.store.iterate(); | ||
|
||
const blobs: ListedBlobRecord[] = []; | ||
for await (const item of it) { | ||
if (!item.value.deletedAt) { | ||
blobs.push(item.value); | ||
} | ||
} | ||
|
||
return blobs; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import { type IDBPDatabase, openDB } from 'idb'; | ||
|
||
import { Connection } from '../../connection'; | ||
import { type DocStorageSchema, migrator } from './schema'; | ||
|
||
export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> { | ||
override get shareId() { | ||
return `idb(${migrator.version}):${this.dbName}`; | ||
} | ||
|
||
constructor(private readonly dbName: string) { | ||
super(); | ||
} | ||
|
||
override async doConnect() { | ||
return openDB<DocStorageSchema>(this.dbName, migrator.version, { | ||
upgrade: migrator.migrate, | ||
blocking: () => { | ||
// if, for example, an tab with newer version is opened, this function will be called. | ||
// we should close current connection to allow the new version to upgrade the db. | ||
this.close( | ||
new Error('Blocking a new version. Closing the connection.') | ||
); | ||
}, | ||
blocked: () => { | ||
// fallback to retry auto retry | ||
this.setStatus('error', new Error('Blocked by other tabs.')); | ||
}, | ||
}); | ||
} | ||
|
||
override async doDisconnect() { | ||
this.close(); | ||
} | ||
|
||
private close(error?: Error) { | ||
this.maybeConnection?.close(); | ||
this.setStatus('closed', error); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
import { share } from '../../connection'; | ||
import { | ||
type DocClocks, | ||
type DocRecord, | ||
DocStorage, | ||
type DocStorageOptions, | ||
type DocUpdate, | ||
} from '../../storage'; | ||
import { IDBConnection } from './db'; | ||
|
||
export interface IndexedDBDocStorageOptions extends DocStorageOptions { | ||
dbName: string; | ||
} | ||
|
||
export class IndexedDBDocStorage extends DocStorage<IndexedDBDocStorageOptions> { | ||
readonly connection = share(new IDBConnection(this.options.dbName)); | ||
|
||
get db() { | ||
return this.connection.inner; | ||
} | ||
|
||
get name() { | ||
return 'idb'; | ||
} | ||
|
||
override async pushDocUpdate(update: DocUpdate) { | ||
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite'); | ||
const timestamp = new Date(); | ||
await trx.objectStore('updates').add({ | ||
...update, | ||
createdAt: timestamp, | ||
}); | ||
|
||
await trx.objectStore('clocks').put({ docId: update.docId, timestamp }); | ||
|
||
return { docId: update.docId, timestamp }; | ||
} | ||
|
||
protected override async getDocSnapshot(docId: string) { | ||
const trx = this.db.transaction('snapshots', 'readonly'); | ||
const record = await trx.store.get(docId); | ||
|
||
if (!record) { | ||
return null; | ||
} | ||
|
||
return { | ||
docId, | ||
bin: record.bin, | ||
timestamp: record.updatedAt, | ||
}; | ||
} | ||
|
||
override async deleteDoc(docId: string) { | ||
const trx = this.db.transaction( | ||
['snapshots', 'updates', 'clocks'], | ||
'readwrite' | ||
); | ||
|
||
const idx = trx.objectStore('updates').index('docId'); | ||
const iter = idx.iterate(IDBKeyRange.only(docId)); | ||
|
||
for await (const { value } of iter) { | ||
await trx.objectStore('updates').delete([value.docId, value.createdAt]); | ||
} | ||
|
||
await trx.objectStore('snapshots').delete(docId); | ||
await trx.objectStore('clocks').delete(docId); | ||
} | ||
|
||
override async getDocTimestamps(after: Date = new Date(0)) { | ||
const trx = this.db.transaction('clocks', 'readonly'); | ||
|
||
const clocks = await trx.store.getAll(); | ||
|
||
return clocks.reduce((ret, cur) => { | ||
if (cur.timestamp > after) { | ||
ret[cur.docId] = cur.timestamp; | ||
} | ||
return ret; | ||
}, {} as DocClocks); | ||
} | ||
|
||
protected override async setDocSnapshot( | ||
snapshot: DocRecord | ||
): Promise<boolean> { | ||
const trx = this.db.transaction('snapshots', 'readwrite'); | ||
const record = await trx.store.get(snapshot.docId); | ||
|
||
if (!record || record.updatedAt < snapshot.timestamp) { | ||
await trx.store.put({ | ||
docId: snapshot.docId, | ||
bin: snapshot.bin, | ||
createdAt: record?.createdAt ?? snapshot.timestamp, | ||
updatedAt: snapshot.timestamp, | ||
}); | ||
} | ||
|
||
trx.commit(); | ||
return true; | ||
} | ||
|
||
protected override async getDocUpdates(docId: string): Promise<DocRecord[]> { | ||
const trx = this.db.transaction('updates', 'readonly'); | ||
const updates = await trx.store.index('docId').getAll(docId); | ||
|
||
return updates.map(update => ({ | ||
docId, | ||
bin: update.bin, | ||
timestamp: update.createdAt, | ||
})); | ||
} | ||
|
||
protected override async markUpdatesMerged( | ||
docId: string, | ||
updates: DocRecord[] | ||
): Promise<number> { | ||
const trx = this.db.transaction('updates', 'readwrite'); | ||
|
||
await Promise.all( | ||
updates.map(update => trx.store.delete([docId, update.timestamp])) | ||
); | ||
|
||
trx.commit(); | ||
return updates.length; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export * from './blob'; | ||
export * from './doc'; | ||
export * from './sync'; |
Oops, something went wrong.