Skip to content

Commit

Permalink
test: always connect peers more realistically (#856)
Browse files Browse the repository at this point in the history
Our tests currently connect `MapeoManager`s in two ways:

1. By starting peer discovery servers and connecting. This is similar to
   what the real app does.
2. By manually creating streams and connecting them in tests. This is
   less realistic.

This commit removes the second way because:

- it is less realistic
- it lets us remove some test-only code in the `src/` directory
- it will make an upcoming change easier
- it used to be significantly faster, but that's no longer true

I also tried to fix a possible (test-only) race condition, which *could*
have been a reason for the less realistic option:

1. Start connecting peers by calling `connectPeers()`. This begins the
   process of starting peer discovery servers.
2. Disconnect them by calling the callback returned by `connectPeers()`.
3. The peer discovery servers start, and begin connecting. *This is bad*
   because we already wanted to disconnect!
  • Loading branch information
EvanHahn authored Oct 29, 2024
1 parent e916bc6 commit 0bd8a7a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 78 deletions.
19 changes: 1 addition & 18 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'
import { Roles } from './roles.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import { Logger } from './logger.js'
import {
kSyncState,
kRequestFullStop,
kRescindFullStopRequest,
} from './sync/sync-api.js'
/** @import { ProjectSettingsValue as ProjectValue } from '@comapeo/schema' */
/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */
/** @import { SetNonNullable } from 'type-fest' */
/** @import { CoreStorage, Namespace } from './types.js' */
/** @import { DeviceInfoParam } from './schema/client.js' */
Expand Down Expand Up @@ -81,7 +81,6 @@ export const DEFAULT_ONLINE_STYLE_URL =
'https://demotiles.maplibre.org/style.json'

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
Expand Down Expand Up @@ -233,22 +232,6 @@ export class MapeoManager extends TypedEmitter {
return this.#deviceId
}

/**
* Create a Mapeo replication stream. This replication connects the Mapeo RPC
* channel and allows invites. All active projects will sync automatically to
* this replication stream. Only use for local (trusted) connections, because
* the RPC channel key is public. To sync a specific project without
* connecting RPC, use project[kProjectReplication].
*
* @param {boolean} isInitiator
*/
[kManagerReplicate](isInitiator) {
const noiseStream = new NoiseSecretStream(isInitiator, undefined, {
keyPair: this.#keyManager.getIdentityKeypair(),
})
return this.#replicate(noiseStream)
}

/**
* @param {'blobs' | 'icons' | 'maps'} mediaType
* @returns {Promise<string>}
Expand Down
3 changes: 1 addition & 2 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ export class MapeoProject extends TypedEmitter {
/**
* Replicate a project to a @hyperswarm/secret-stream. Invites will not
* function because the RPC channel is not connected for project replication,
* and only this project will replicate (to replicate multiple projects you
* need to replicate the manager instance via manager[kManagerReplicate])
* and only this project will replicate.
*
* @param {(
* boolean |
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/device-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ test('device info written to projects', async (t) => {

test('device info sent to peers', async (t) => {
const managers = await createManagers(3, t)
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })

Expand Down
2 changes: 1 addition & 1 deletion test-e2e/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test('Local peers discovery each other and share device info', async (t) => {
const mobileManagers = await createManagers(5, t, 'mobile')
const desktopManagers = await createManagers(5, t, 'desktop')
const managers = [...mobileManagers, ...desktopManagers]
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })
const deviceInfos = [...mobileManagers, ...desktopManagers].map((m) =>
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/sync-fuzz.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => {
const managers = await createManagers(managerCount, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down
38 changes: 19 additions & 19 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
const COUNT = 10
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
await disconnect()
Expand All @@ -50,7 +50,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
return acc
}, new Set())

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForSync(projects, 'initial')

Expand Down Expand Up @@ -125,7 +125,7 @@ test('syncing blobs', async (t) => {
fastifyController.start(),
])

let disconnectPeers = connectPeers(managers, { discovery: false })
let disconnectPeers = connectPeers(managers)
t.after(() => disconnectPeers())
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [invitee], projectId })
Expand All @@ -147,7 +147,7 @@ test('syncing blobs', async (t) => {
blobMetadata({ mimeType: 'image/jpeg' })
)

disconnectPeers = connectPeers(managers, { discovery: false })
disconnectPeers = connectPeers(managers)
await waitForSync(projects, 'initial')

invitorProject.$sync.start()
Expand All @@ -174,7 +174,7 @@ test('start and stop sync', async function (t) {
const COUNT = 2
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -240,7 +240,7 @@ test('sync only happens if both sides are enabled', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down Expand Up @@ -310,7 +310,7 @@ test('auto-stop', async (t) => {
])
t.after(() => fastifyController.stop())

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -472,7 +472,7 @@ test('disabling auto-stop timeout', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -531,7 +531,7 @@ test('gracefully shutting down sync for all projects when backgrounded', async f
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectGroupsAfterFirstStep = await Promise.all(
Expand Down Expand Up @@ -625,7 +625,7 @@ test('shares cores', async function (t) {
const COUNT = 5
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
Expand Down Expand Up @@ -670,7 +670,7 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>
const COUNT = 3
const managers = await createManagers(COUNT, t)
const [invitor, invitee, blocked] = managers
const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({
invitor,
Expand Down Expand Up @@ -752,7 +752,7 @@ test('Sync state emitted when starting and stopping sync', async function (t) {
/** @type {State[]} */ let states = statesBeforeStart
project.$sync.on('sync-state', (state) => states.push(state))

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -809,7 +809,7 @@ test('updates sync state when peers are added', async (t) => {
'data sync state is correct at start'
)

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await invite({ invitor, invitees, projectId })

Expand All @@ -835,7 +835,7 @@ test('Correct sync state prior to data sync', async function (t) {
const [invitor, ...invitees] = managers
const projectId = await invitor.createProject({ name: 'Mapeo' })

const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)

await invite({ invitor, invitees, projectId })

Expand All @@ -849,7 +849,7 @@ test('Correct sync state prior to data sync', async function (t) {
// Disconnect and reconnect, because currently pre-have messages about data
// sync state are only shared on first connection
await disconnect1()
const disconnect2 = connectPeers(managers, { discovery: false })
const disconnect2 = connectPeers(managers)
await waitForPeers(managers)

const expected = managers.map((manager, index) => {
Expand Down Expand Up @@ -1122,7 +1122,7 @@ test(

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const disconnectA = connectPeers([invitor, inviteeA])
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA], projectId })

Expand All @@ -1138,7 +1138,7 @@ test(

await disconnectA()

const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
const disconnectB = connectPeers([invitor, inviteeB])
await invite({ invitor, invitees: [inviteeB], projectId })
await pTimeout(invitorProject.$sync.waitForSync('initial'), {
milliseconds: 1000,
Expand All @@ -1162,8 +1162,8 @@ test(

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
const disconnectA = connectPeers([invitor, inviteeA])
const disconnectB = connectPeers([invitor, inviteeB])
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA, inviteeB], projectId })

Expand Down
55 changes: 19 additions & 36 deletions test-e2e/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import * as v8 from 'node:v8'
import { pEvent } from 'p-event'

import { MapeoManager, roles } from '../src/index.js'
import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js'
import { kRPC } from '../src/mapeo-manager.js'
import { generate } from '@mapeo/mock-data'
import { valueOf } from '../src/utils.js'
import { randomBytes, randomInt } from 'node:crypto'
Expand All @@ -29,43 +29,26 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url)
* @param {readonly MapeoManager[]} managers
* @returns {() => Promise<void>}
*/
export function connectPeers(managers, { discovery = true } = {}) {
if (discovery) {
for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
})
}
return async () => {
await Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
)
}
} else {
/** @type {import('../src/types.js').ReplicationStream[]} */
const replicationStreams = []
for (let i = 0; i < managers.length; i++) {
for (let j = i + 1; j < managers.length; j++) {
const r1 = managers[i][kManagerReplicate](true)
const r2 = managers[j][kManagerReplicate](false)
replicationStreams.push(r1, r2)
r1.pipe(r2).pipe(r1)
export function connectPeers(managers) {
let requestedDisconnect = false

for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
if (requestedDisconnect) return
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
}
return async () => {
await Promise.all(
replicationStreams.map((stream) => {
const onClosePromise = pEvent(stream, 'close')
stream.destroy()
return onClosePromise
})
})
}

return async () => {
requestedDisconnect = true
await Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
}
)
}
}

Expand Down

0 comments on commit 0bd8a7a

Please sign in to comment.