Skip to content

Commit

Permalink
feat: adding self-hosted servers (#952)
Browse files Browse the repository at this point in the history
This adds support for adding self-hosted servers. It adds the following
functions:

- `project.$member.addServerPeer()`
- `project.$sync.connectServers()`
- `project.$sync.disconnectServers()`

This change doesn't include end-to-end tests for the server. That's
deliberate! We can't (easily) add those tests without the server being
released, but we can't (easily) release the server without this change.
Once this change is released, we can release the server, and then we
should be able to add end-to-end tests.

See [#886](#886) for more.

Co-Authored-By: Gregor MacLennan <[email protected]>
  • Loading branch information
EvanHahn and gmaclennan authored Oct 31, 2024
1 parent 4a7bf54 commit 4f33bd0
Show file tree
Hide file tree
Showing 16 changed files with 1,081 additions and 367 deletions.
521 changes: 167 additions & 354 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
"@types/sub-encoder": "^2.1.0",
"@types/throttle-debounce": "^5.0.0",
"@types/varint": "^6.0.1",
"@types/ws": "^8.5.12",
"@types/yauzl-promise": "^4.0.0",
"@types/yazl": "^2.4.5",
"bitfield": "^4.2.0",
Expand Down Expand Up @@ -200,6 +201,7 @@
"type-fest": "^4.5.0",
"undici": "^6.13.0",
"varint": "^6.0.0",
"ws": "^8.18.0",
"yauzl-promise": "^4.0.0"
}
}
10 changes: 10 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@ import {
COORDINATOR_ROLE_ID,
MEMBER_ROLE_ID,
} from './roles.js'
import { kProjectReplicate } from './mapeo-project.js'
export { plugin as CoMapeoMapsFastifyPlugin } from './fastify-plugins/maps.js'
export { FastifyController } from './fastify-controller.js'
export { MapeoManager } from './mapeo-manager.js'
/** @import { MapeoProject } from './mapeo-project.js' */

/**
* @param {MapeoProject} project
* @param {Parameters<MapeoProject.prototype[kProjectReplicate]>} args
* @returns {ReturnType<MapeoProject.prototype[kProjectReplicate]>}
*/
export const replicateProject = (project, ...args) =>
project[kProjectReplicate](...args)

export const roles = /** @type {const} */ ({
CREATOR_ROLE_ID,
Expand Down
47 changes: 47 additions & 0 deletions src/lib/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Create an `Error` with a `code` property.
*
* @example
* const err = new ErrorWithCode('INVALID_DATA', 'data was invalid')
* err.message
* // => 'data was invalid'
* err.code
* // => 'INVALID_DATA'
*/
export class ErrorWithCode extends Error {
/**
* @param {string} code
* @param {string} message
* @param {object} [options]
* @param {unknown} [options.cause]
*/
constructor(code, message, options) {
super(message, options)
/** @readonly */ this.code = code
}
}

/**
* Get the error message from an object if possible.
* Otherwise, stringify the argument.
*
* @param {unknown} maybeError
* @returns {string}
* @example
* try {
* // do something
* } catch (err) {
* console.error(getErrorMessage(err))
* }
*/
export function getErrorMessage(maybeError) {
if (maybeError && typeof maybeError === 'object' && 'message' in maybeError) {
try {
const { message } = maybeError
if (typeof message === 'string') return message
} catch (_err) {
// Ignored
}
}
return 'unknown error'
}
10 changes: 10 additions & 0 deletions src/lib/get-own.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* @template {object} T
* @template {keyof T} K
* @param {T} obj
* @param {K} key
* @returns {undefined | T[K]}
*/
export function getOwn(obj, key) {
return Object.hasOwn(obj, key) ? obj[key] : undefined
}
26 changes: 26 additions & 0 deletions src/lib/is-hostname-ip-address.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { isIPv4, isIPv6 } from 'node:net'

/**
* Is this hostname an IP address?
*
* @param {string} hostname
* @returns {boolean}
* @example
* isHostnameIpAddress('100.64.0.42')
* // => false
*
* isHostnameIpAddress('[2001:0db8:85a3:0000:0000:8a2e:0370:7334]')
* // => true
*
* isHostnameIpAddress('example.com')
* // => false
*/
export function isHostnameIpAddress(hostname) {
if (isIPv4(hostname)) return true

if (hostname.startsWith('[') && hostname.endsWith(']')) {
return isIPv6(hostname.slice(1, -1))
}

return false
}
47 changes: 47 additions & 0 deletions src/lib/ws-core-replicator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { pipeline } from 'node:stream/promises'
import { Transform } from 'node:stream'
import { createWebSocketStream } from 'ws'
/** @import { WebSocket } from 'ws' */
/** @import { ReplicationStream } from '../types.js' */

/**
* @param {WebSocket} ws
* @param {ReplicationStream} replicationStream
* @returns {Promise<void>}
*/
export function wsCoreReplicator(ws, replicationStream) {
// This is purely to satisfy typescript at its worst. `pipeline` expects a
// NodeJS ReadWriteStream, but our replicationStream is a streamx Duplex
// stream. The difference is that streamx does not implement the
// `setEncoding`, `unpipe`, `wrap` or `isPaused` methods. The `pipeline`
// function does not depend on any of these methods (I have read through the
// NodeJS source code at cebf21d (v22.9.0) to confirm this), so we can safely
// cast the stream to a NodeJS ReadWriteStream.
const _replicationStream = /** @type {NodeJS.ReadWriteStream} */ (
/** @type {unknown} */ (replicationStream)
)
return pipeline(
_replicationStream,
wsSafetyTransform(ws),
createWebSocketStream(ws),
_replicationStream
)
}

/**
* Avoid writing data to a closing or closed websocket, which would result in an
* error. Instead we drop the data and wait for the stream close/end events to
* propagate and close the streams cleanly.
*
* @param {WebSocket} ws
*/
function wsSafetyTransform(ws) {
return new Transform({
transform(chunk, encoding, callback) {
if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
return callback()
}
callback(null, chunk)
},
})
}
2 changes: 1 addition & 1 deletion src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ export class MapeoManager extends TypedEmitter {
* downloaded their proof of project membership and the project config.
*
* @param {Pick<import('./generated/rpc.js').ProjectJoinDetails, 'projectKey' | 'encryptionKeys'> & { projectName: string }} projectJoinDetails
* @param {{ waitForSync?: boolean }} [opts] For internal use in tests, set opts.waitForSync = false to not wait for sync during addProject()
* @param {{ waitForSync?: boolean }} [opts] Set opts.waitForSync = false to not wait for sync during addProject()
* @returns {Promise<string>}
*/
addProject = async (
Expand Down
63 changes: 53 additions & 10 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ import {
import { migrate } from './lib/drizzle-helpers.js'
import { omit } from './lib/omit.js'
import { MemberApi } from './member-api.js'
import { SyncApi, kHandleDiscoveryKey } from './sync/sync-api.js'
import {
SyncApi,
kHandleDiscoveryKey,
kWaitForInitialSyncWithPeer,
} from './sync/sync-api.js'
import { Logger } from './logger.js'
import { IconApi } from './icon-api.js'
import { readConfig } from './config-import.js'
Expand Down Expand Up @@ -77,8 +81,9 @@ const EMPTY_PROJECT_SETTINGS = Object.freeze({})
* @extends {TypedEmitter<{ close: () => void }>}
*/
export class MapeoProject extends TypedEmitter {
#projectId
#projectKey
#deviceId
#identityKeypair
#coreManager
#indexWriter
#dataStores
Expand Down Expand Up @@ -135,10 +140,12 @@ export class MapeoProject extends TypedEmitter {

this.#l = Logger.create('project', logger)
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
this.#projectKey = projectKey
this.#loadingConfig = false
this.#isArchiveDevice = isArchiveDevice

const getReplicationStream = this[kProjectReplicate].bind(this, true)

///////// 1. Setup database

this.#sqlite = new Database(dbPath)
Expand Down Expand Up @@ -317,7 +324,7 @@ export class MapeoProject extends TypedEmitter {
},
}),
}
const identityKeypair = keyManager.getIdentityKeypair()
this.#identityKeypair = keyManager.getIdentityKeypair()
const coreKeypairs = getCoreKeypairs({
projectKey,
projectSecretKey,
Expand All @@ -326,31 +333,33 @@ export class MapeoProject extends TypedEmitter {
this.#coreOwnership = new CoreOwnership({
dataType: this.#dataTypes.coreOwnership,
coreKeypairs,
identityKeypair,
identityKeypair: this.#identityKeypair,
})
this.#roles = new Roles({
dataType: this.#dataTypes.role,
coreOwnership: this.#coreOwnership,
coreManager: this.#coreManager,
projectKey: projectKey,
deviceKey: keyManager.getIdentityKeypair().publicKey,
deviceKey: this.#identityKeypair.publicKey,
})

this.#memberApi = new MemberApi({
deviceId: this.#deviceId,
roles: this.#roles,
coreOwnership: this.#coreOwnership,
encryptionKeys,
getProjectName: this.#getProjectName.bind(this),
projectKey,
rpc: localPeers,
getReplicationStream,
waitForInitialSyncWithPeer: (deviceId, abortSignal) =>
this.$sync[kWaitForInitialSyncWithPeer](deviceId, abortSignal),
dataTypes: {
deviceInfo: this.#dataTypes.deviceInfo,
project: this.#dataTypes.projectSettings,
},
})

const projectPublicId = projectKeyToPublicId(projectKey)

this.#blobStore = new BlobStore({
coreManager: this.#coreManager,
})
Expand All @@ -362,7 +371,7 @@ export class MapeoProject extends TypedEmitter {
if (!base.endsWith('/')) {
base += '/'
}
return base + projectPublicId
return base + this.#projectPublicId
},
})

Expand All @@ -374,7 +383,7 @@ export class MapeoProject extends TypedEmitter {
if (!base.endsWith('/')) {
base += '/'
}
return base + projectPublicId
return base + this.#projectPublicId
},
})

Expand All @@ -384,6 +393,24 @@ export class MapeoProject extends TypedEmitter {
roles: this.#roles,
blobDownloadFilter: null,
logger: this.#l,
getServerWebsocketUrls: async () => {
const members = await this.#memberApi.getMany()
/** @type {string[]} */
const serverWebsocketUrls = []
for (const member of members) {
if (
member.deviceType === 'selfHostedServer' &&
member.selfHostedServerDetails
) {
const { baseUrl } = member.selfHostedServerDetails
const wsUrl = new URL(`/sync/${this.#projectPublicId}`, baseUrl)
wsUrl.protocol = wsUrl.protocol === 'http:' ? 'ws:' : 'wss:'
serverWebsocketUrls.push(wsUrl.href)
}
}
return serverWebsocketUrls
},
getReplicationStream,
})

this.#translationApi = new TranslationApi({
Expand Down Expand Up @@ -458,6 +485,14 @@ export class MapeoProject extends TypedEmitter {
return this.#deviceId
}

get #projectId() {
return projectKeyToId(this.#projectKey)
}

get #projectPublicId() {
return projectKeyToPublicId(this.#projectKey)
}

/**
* Resolves when hypercores have all loaded
*
Expand Down Expand Up @@ -603,6 +638,13 @@ export class MapeoProject extends TypedEmitter {
}
}

/**
* @returns {Promise<undefined | string>}
*/
async #getProjectName() {
return (await this.$getProjectSettings()).name
}

async $getOwnRole() {
return this.#roles.getRole(this.#deviceId)
}
Expand Down Expand Up @@ -640,6 +682,7 @@ export class MapeoProject extends TypedEmitter {
* Hypercore types need updating.
* @type {any}
*/ ({
keyPair: this.#identityKeypair,
/** @param {Buffer} discoveryKey */
ondiscoverykey: async (discoveryKey) => {
const protomux =
Expand Down
Loading

0 comments on commit 4f33bd0

Please sign in to comment.