Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: index with discoveryId not discoveryKey #859

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added comapeo-schema-PR267+bc403d5.tgz
Binary file not shown.
Binary file added multi-core-indexer-PR25+2537f7d.tgz
Binary file not shown.
14 changes: 8 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"yazl": "^2.5.1"
},
"dependencies": {
"@comapeo/schema": "1.0.0",
"@comapeo/schema": "file:comapeo-schema-PR267+bc403d5.tgz",
"@digidem/types": "^2.3.0",
"@electron/asar": "^3.2.8",
"@fastify/error": "^3.4.1",
Expand Down Expand Up @@ -181,7 +181,7 @@
"magic-bytes.js": "^1.10.0",
"map-obj": "^5.0.2",
"mime": "^4.0.3",
"multi-core-indexer": "^1.0.0-alpha.10",
"multi-core-indexer": "file:multi-core-indexer-PR25+2537f7d.tgz",
"p-defer": "^4.0.0",
"p-event": "^6.0.1",
"p-timeout": "^6.1.2",
Expand Down
7 changes: 3 additions & 4 deletions src/core-manager/core-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ export class CoreIndex {
* @param {Object} options
* @param {import('hypercore')<"binary", Buffer>} options.core Hypercore instance
* @param {Buffer} options.key Buffer containing public key of this core
* @param {string} options.discoveryId discoveryId of this core
* @param {Namespace} options.namespace
* @param {boolean} [options.writer] Is this a writer core?
*/
add({ core, key, namespace, writer = false }) {
const discoveryKey = crypto.discoveryKey(key)
const discoveryId = discoveryKey.toString('hex')
const record = { core, key, namespace }
add({ core, key, discoveryId, namespace, writer = false }) {
const record = { core, key, discoveryId, namespace }
if (writer) {
this.#writersByNamespace.set(namespace, record)
}
Expand Down
10 changes: 6 additions & 4 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Corestore from 'corestore'
import { debounce } from 'throttle-debounce'
import assert from 'node:assert/strict'
import { sql, eq } from 'drizzle-orm'
import hCrypto from 'hypercore-crypto'

import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { Logger } from '../logger.js'
Expand All @@ -20,7 +21,7 @@ const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000
export const kCoreManagerReplicate = Symbol('replicate core manager')

/** @typedef {Hypercore<'binary', Buffer>} Core */
/** @typedef {{ core: Core, key: Buffer, namespace: Namespace }} CoreRecord */
/** @typedef {{ core: Core, key: Buffer, discoveryId: string, namespace: Namespace }} CoreRecord */
/**
* @typedef {Object} Events
* @property {(coreRecord: CoreRecord) => void} add-core
Expand Down Expand Up @@ -273,6 +274,7 @@ export class CoreManager extends TypedEmitter {
if (existingCore) return existingCore

const { publicKey: key, secretKey } = keyPair
const discoveryId = hCrypto.discoveryKey(key).toString('hex')
const writer = !!secretKey
const core = this.#corestore.get({
keyPair,
Expand All @@ -285,7 +287,7 @@ export class CoreManager extends TypedEmitter {
core.setMaxListeners(0)
// @ts-ignore - ensure key is defined before hypercore is ready
core.key = key
this.#coreIndex.add({ core, key, namespace, writer })
this.#coreIndex.add({ core, key, discoveryId, namespace, writer })

// **Hack** As soon as a peer is added, eagerly send a "want" for the entire
// core. This ensures that the peer sends back its entire bitfield.
Expand Down Expand Up @@ -341,9 +343,9 @@ export class CoreManager extends TypedEmitter {
namespace,
key
)
this.emit('add-core', { core, key, namespace })
this.emit('add-core', { core, key, discoveryId, namespace })

return { core, key, namespace }
return { core, key, discoveryId, namespace }
}

/**
Expand Down
7 changes: 4 additions & 3 deletions src/core-ownership.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,13 @@ export class CoreOwnership extends TypedEmitter {
* the doc with the lowest index (e.g. the first)
*
* @param {CoreOwnershipWithSignatures} doc
* @param {import('@comapeo/schema').VersionIdObject} version
* @param {import('@comapeo/schema').VersionDiscoveryIdObject} version
* @returns {import('@comapeo/schema').CoreOwnership}
*/
export function mapAndValidateCoreOwnership(doc, { coreDiscoveryKey }) {
export function mapAndValidateCoreOwnership(doc, { coreDiscoveryId }) {
if (
!coreDiscoveryKey.equals(discoveryKey(Buffer.from(doc.authCoreId, 'hex')))
coreDiscoveryId !==
discoveryKey(Buffer.from(doc.authCoreId, 'hex')).toString('hex')
) {
throw new Error('Invalid coreOwnership record: mismatched authCoreId')
}
Expand Down
33 changes: 14 additions & 19 deletions src/datastore/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { TypedEmitter } from 'tiny-typed-emitter'
import { encode, decode, getVersionId, parseVersionId } from '@comapeo/schema'
import MultiCoreIndexer from 'multi-core-indexer'
import pDefer from 'p-defer'
import { discoveryKey } from 'hypercore-crypto'
import { NAMESPACE_SCHEMAS } from '../constants.js'
import { createMap } from '../utils.js'
/** @import { MapeoDoc } from '@comapeo/schema' */
Expand Down Expand Up @@ -36,7 +35,7 @@ export class DataStore extends TypedEmitter {
#coreManager
#namespace
#batch
#writerCore
#writerCoreRecord
#coreIndexer
/** @type {Map<string, import('p-defer').DeferredPromise<void>>} */
#pendingIndex = new Map()
Expand All @@ -61,7 +60,7 @@ export class DataStore extends TypedEmitter {
NAMESPACE_SCHEMAS[namespace],
() => new Set()
)
this.#writerCore = coreManager.getWriterCore(namespace).core
this.#writerCoreRecord = coreManager.getWriterCore(namespace)
const cores = coreManager.getCores(namespace).map((cr) => cr.core)
this.#coreIndexer = new MultiCoreIndexer(cores, {
storage,
Expand All @@ -88,7 +87,7 @@ export class DataStore extends TypedEmitter {
}

get writerCore() {
return this.#writerCore
return this.#writerCoreRecord.core
}

getIndexState() {
Expand All @@ -105,9 +104,11 @@ export class DataStore extends TypedEmitter {
// Writes to the writerCore need to wait until the entry is indexed before
// returning, so we check if any incoming entry has a pending promise
for (const entry of entries) {
if (!entry.key.equals(this.#writerCore.key)) continue
if (entry.discoveryId !== this.#writerCoreRecord.discoveryId) {
continue
}
const versionId = getVersionId({
coreDiscoveryKey: discoveryKey(entry.key),
coreDiscoveryId: entry.discoveryId,
index: entry.index,
})
const pending = this.#pendingIndex.get(versionId)
Expand Down Expand Up @@ -153,23 +154,20 @@ export class DataStore extends TypedEmitter {
// same tick, so we can't know their index before append resolves.
const deferredAppend = pDefer()
this.#pendingAppends.add(deferredAppend.promise)
const { length } = await this.#writerCore.append(block)
const { length } = await this.writerCore.append(block)
deferredAppend.resolve()
this.#pendingAppends.delete(deferredAppend.promise)

const index = length - 1
const coreDiscoveryKey = this.#writerCore.discoveryKey
if (!coreDiscoveryKey) {
throw new Error('Writer core is not ready')
}
const versionId = getVersionId({ coreDiscoveryKey, index })
const coreDiscoveryId = this.#writerCoreRecord.discoveryId
const versionId = getVersionId({ coreDiscoveryId, index })
/** @type {import('p-defer').DeferredPromise<void>} */
const deferred = pDefer()
this.#pendingIndex.set(versionId, deferred)
await deferred.promise

return /** @type {Extract<MapeoDoc, TDoc>} */ (
decode(block, { coreDiscoveryKey, index })
decode(block, { coreDiscoveryId, index })
)
}

Expand All @@ -189,13 +187,10 @@ export class DataStore extends TypedEmitter {

/** @param {Buffer} buf */
async writeRaw(buf) {
const { length } = await this.#writerCore.append(buf)
const { length } = await this.writerCore.append(buf)
const index = length - 1
const coreDiscoveryKey = this.#writerCore.discoveryKey
if (!coreDiscoveryKey) {
throw new Error('Writer core is not ready')
}
const versionId = getVersionId({ coreDiscoveryKey, index })
const coreDiscoveryId = this.#writerCoreRecord.discoveryId
const versionId = getVersionId({ coreDiscoveryId, index })
return versionId
}

Expand Down
11 changes: 5 additions & 6 deletions src/index-writer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ import { decode } from '@comapeo/schema'
import SqliteIndexer from '@mapeo/sqlite-indexer'
import { getTableConfig } from 'drizzle-orm/sqlite-core'
import { getBacklinkTableName } from '../schema/utils.js'
import { discoveryKey } from 'hypercore-crypto'
import { Logger } from '../logger.js'
/** @import { MapeoDoc, VersionIdObject } from '@comapeo/schema' */
/** @import { MapeoDoc, VersionDiscoveryIdObject } from '@comapeo/schema' */
/** @import { MapeoDocTables } from '../datatype/index.js' */

/**
Expand Down Expand Up @@ -32,7 +31,7 @@ export class IndexWriter {
* @param {object} opts
* @param {import('better-sqlite3').Database} opts.sqlite
* @param {TTables[]} opts.tables
* @param {(doc: MapeoDocInternal, version: VersionIdObject) => MapeoDoc} [opts.mapDoc] optionally transform a document prior to indexing. Can also validate, if an error is thrown then the document will not be indexed
* @param {(doc: MapeoDocInternal, version: VersionDiscoveryIdObject) => MapeoDoc} [opts.mapDoc] optionally transform a document prior to indexing. Can also validate, if an error is thrown then the document will not be indexed
* @param {typeof import('@mapeo/sqlite-indexer').defaultGetWinner} [opts.getWinner] custom function to determine the "winner" of two forked documents. Defaults to choosing the document with the most recent `updatedAt`
* @param {Logger} [opts.logger]
*/
Expand Down Expand Up @@ -71,13 +70,13 @@ export class IndexWriter {
const queued = {}
/** @type {IndexedDocIds} */
const indexed = {}
for (const { block, key, index } of entries) {
for (const { block, discoveryId, index } of entries) {
/** @type {MapeoDoc} */ let doc
try {
const version = { coreDiscoveryKey: discoveryKey(key), index }
const version = { coreDiscoveryId: discoveryId, index }
doc = this.#mapDoc(decode(block, version), version)
} catch (e) {
this.#l.log('Could not decode entry %d of %h', index, key)
this.#l.log('Could not decode entry %d of %S', index, discoveryId)
// Unknown or invalid entry - silently ignore
continue
}
Expand Down
11 changes: 7 additions & 4 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ export class MapeoProject extends TypedEmitter {
projectSettingsEntries.push(entry)
} else if (schemaName === 'translation') {
const doc = decode(entry.block, {
coreDiscoveryKey: entry.key,
coreDiscoveryId: entry.discoveryId,
index: entry.index,
})

Expand Down Expand Up @@ -966,11 +966,14 @@ function getCoreKeypairs({ projectKey, projectSecretKey, keyManager }) {
* e.g. version.coreKey should equal docId
*
* @param {import('@comapeo/schema').DeviceInfo} doc
* @param {import('@comapeo/schema').VersionIdObject} version
* @param {import('@comapeo/schema').VersionDiscoveryIdObject} version
* @returns {import('@comapeo/schema').DeviceInfo}
*/
function mapAndValidateDeviceInfo(doc, { coreDiscoveryKey }) {
if (!coreDiscoveryKey.equals(discoveryKey(Buffer.from(doc.docId, 'hex')))) {
function mapAndValidateDeviceInfo(doc, { coreDiscoveryId }) {
if (
coreDiscoveryId !==
discoveryKey(Buffer.from(doc.docId, 'hex')).toString('hex')
) {
throw new Error(
'Invalid deviceInfo record, cannot write deviceInfo for another device'
)
Expand Down
36 changes: 20 additions & 16 deletions test-e2e/config-import.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,26 @@ import assert from 'node:assert/strict'
import { createManager } from './utils.js'
import { defaultConfigPath } from '../tests/helpers/default-config.js'

test(' config import - load default config when passed a path to `createProject`', async (t) => {
const manager = createManager('device0', t)
const project = await manager.getProject(
await manager.createProject({ configPath: defaultConfigPath })
)
const presets = await project.preset.getMany()
const fields = await project.field.getMany()
const translations = await project.$translation.dataType.getMany()
assert.equal(presets.length, 28, 'correct number of loaded presets')
assert.equal(fields.length, 11, 'correct number of loaded fields')
assert.equal(
translations.length,
870,
'correct number of loaded translations'
)
})
test(
' config import - load default config when passed a path to `createProject`',
{ only: true },
async (t) => {
const manager = createManager('device0', t)
const project = await manager.getProject(
await manager.createProject({ configPath: defaultConfigPath })
)
const presets = await project.preset.getMany()
const fields = await project.field.getMany()
const translations = await project.$translation.dataType.getMany()
assert.equal(presets.length, 28, 'correct number of loaded presets')
assert.equal(fields.length, 11, 'correct number of loaded fields')
assert.equal(
translations.length,
870,
'correct number of loaded translations'
)
}
)

test('config import - load and re-load config manually', async (t) => {
const manager = createManager('device0', t)
Expand Down
Loading
Loading