Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): multi region blob support #3653

Merged
merged 11 commits into from
Dec 10, 2024
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ jobs:
POSTGRES_USER: speckle
command: -c 'max_connections=1000' -c 'port=5433' -c 'wal_level=logical'
- image: 'minio/minio'
command: server /data --console-address ":9001"
command: server /data --console-address ":9001" --address "0.0.0.0:9000"
- image: 'minio/minio'
command: server /data --console-address ":9002" --address "0.0.0.0:9001"
iainsproat marked this conversation as resolved.
Show resolved Hide resolved
environment:
# Same as test-server:
NODE_ENV: test
Expand Down
16 changes: 16 additions & 0 deletions .circleci/multiregion.test-ci.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@
"main": {
"postgres": {
"connectionUri": "postgresql://speckle:[email protected]:5432/speckle2_test"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9000",
"s3Region": ""
}
},
"regions": {
"region1": {
"postgres": {
"connectionUri": "postgresql://speckle:[email protected]:5433/speckle2_test"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9001",
iainsproat marked this conversation as resolved.
Show resolved Hide resolved
"s3Region": ""
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ EMAIL_PORT="1025"

The web portal is available at `localhost:1080` and it's listening for mail on port `1025`.

### Minio (S3 storage)

Default credentials are: `minioadmin:minioadmin`
Main storage Web UI: [http://localhost:9001/](http://localhost:9001/)
Region1 storage Web UI: [http://localhost:9021/](http://localhost:9021/)

You can use the web UI to validate uploaded blobs

# Contributing

Please make sure you read the [contribution guidelines](https://github.com/specklesystems/speckle-server/blob/main/CONTRIBUTING.md) for an overview of the best practices we try to follow.
Expand Down
1 change: 1 addition & 0 deletions packages/server/db/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ export const migrateDbToLatest = async (params: { db: Knex; region: string }) =>
await db.migrate.latest()
} catch (err: unknown) {
logger.error({ err, region }, 'Error migrating db to latest for region "{region}".')
throw err
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { Knex } from 'knex'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'

const tables = {
streamActivity: <T extends object = StreamActivityRecord>(db: Knex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ import {
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'

const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()

Expand Down
2 changes: 1 addition & 1 deletion packages/server/modules/automate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import {
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
ProjectAutomationsUpdatedMessageType,
ProjectTriggeredAutomationsStatusUpdatedMessageType
Expand Down
2 changes: 1 addition & 1 deletion packages/server/modules/automate/rest/logStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ExecutionEngineFailedResponseError } from '@/modules/automate/errors/ex
import { getAutomationRunWithTokenFactory } from '@/modules/automate/repositories/automations'
import { corsMiddleware } from '@/modules/core/configs/cors'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
validateRequiredStreamFactory,
validateResourceAccess,
Expand Down
59 changes: 59 additions & 0 deletions packages/server/modules/blobstorage/clients/objectStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
getS3AccessKey,
getS3BucketName,
getS3Endpoint,
getS3Region,
getS3SecretKey
} from '@/modules/shared/helpers/envHelper'
import { S3Client, S3ClientConfig } from '@aws-sdk/client-s3'
import { Optional } from '@speckle/shared'

export type ObjectStorage = {
client: S3Client
bucket: string
}

export type GetObjectStorageParams = {
credentials: S3ClientConfig['credentials']
endpoint: S3ClientConfig['endpoint']
region: S3ClientConfig['region']
bucket: string
}

/**
* Get object storage client
*/
export const getObjectStorage = (params: GetObjectStorageParams): ObjectStorage => {
const { bucket, credentials, endpoint, region } = params

const config: S3ClientConfig = {
credentials,
endpoint,
region,
forcePathStyle: true
}
const client = new S3Client(config)
return { client, bucket }
}

let mainObjectStorage: Optional<ObjectStorage> = undefined

/**
* Get main object storage client
*/
export const getMainObjectStorage = (): ObjectStorage => {
if (mainObjectStorage) return mainObjectStorage

const mainParams: GetObjectStorageParams = {
credentials: {
accessKeyId: getS3AccessKey(),
secretAccessKey: getS3SecretKey()
},
endpoint: getS3Endpoint(),
region: getS3Region(),
bucket: getS3BucketName()
}

mainObjectStorage = getObjectStorage(mainParams)
return mainObjectStorage
}
12 changes: 4 additions & 8 deletions packages/server/modules/blobstorage/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
} from '@/modules/blobstorage/domain/types'
import { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
import type { Readable } from 'stream'
import { StoreFileStream } from '@/modules/blobstorage/domain/storageOperations'

export type GetBlobs = (params: {
streamId?: MaybeNullOrUndefined<string>
Expand Down Expand Up @@ -33,21 +34,16 @@ export type GetBlobMetadataCollection = (params: {
}) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable<string> }>

export type UploadFileStream = (
params1: {
streamData: {
streamId: string
userId: string | undefined
},
params2: {
blobData: {
blobId: string
fileName: string
fileType: string | undefined
fileStream: Readable | Buffer
}
) => Promise<{ blobId: string; fileName: string; fileHash: string }>

type FileStream = string | Blob | Readable | Uint8Array | Buffer

export type StoreFileStream = (args: {
objectKey: string
fileStream: FileStream
}) => Promise<{ fileHash: string }>
export { StoreFileStream }
23 changes: 23 additions & 0 deletions packages/server/modules/blobstorage/domain/storageOperations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type stream from 'stream'
import type { Readable } from 'stream'

export type GetObjectStream = (params: {
objectKey: string
}) => Promise<stream.Readable>

export type GetObjectAttributes = (params: { objectKey: string }) => Promise<{
fileSize: number
}>

type FileStream = string | Blob | Readable | Uint8Array | Buffer

export type StoreFileStream = (args: {
objectKey: string
fileStream: FileStream
}) => Promise<{ fileHash: string }>

export type DeleteObject = (params: { objectKey: string }) => Promise<void>

export type EnsureStorageAccess = (params: {
createBucketIfNotExists: boolean
}) => Promise<void>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
StreamBlobsArgs
} from '@/modules/core/graph/generated/graphql'
import { StreamGraphQLReturn } from '@/modules/core/helpers/graphTypes'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
BadRequestError,
NotFoundError,
Expand Down
57 changes: 44 additions & 13 deletions packages/server/modules/blobstorage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ import {
streamWritePermissionsPipelineFactory,
streamReadPermissionsPipelineFactory
} from '@/modules/shared/authz'
import {
ensureStorageAccess,
storeFileStream,
getObjectStream,
deleteObject,
getObjectAttributes
} from '@/modules/blobstorage/objectStorage'
import crs from 'crypto-random-string'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { isArray } from 'lodash'
Expand Down Expand Up @@ -42,20 +35,36 @@ import {
fullyDeleteBlobFactory
} from '@/modules/blobstorage/services/management'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { adminOverrideEnabled } from '@/modules/shared/helpers/envHelper'
import {
adminOverrideEnabled,
createS3Bucket
} from '@/modules/shared/helpers/envHelper'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { Request, Response } from 'express'
import { ensureError } from '@speckle/shared'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
deleteObjectFactory,
ensureStorageAccessFactory,
getObjectAttributesFactory,
getObjectStreamFactory,
storeFileStreamFactory
} from '@/modules/blobstorage/repositories/blobs'
import { getMainObjectStorage } from '@/modules/blobstorage/clients/objectStorage'
import { getProjectObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector'

const ensureConditions = async () => {
if (process.env.DISABLE_FILE_UPLOADS) {
moduleLogger.info('📦 Blob storage is DISABLED')
return
} else {
moduleLogger.info('📦 Init BlobStorage module')
await ensureStorageAccess()
const storage = getMainObjectStorage()
const ensureStorageAccess = ensureStorageAccessFactory({ storage })
await ensureStorageAccess({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't seem to be ensuring that we can access all the region storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens elsewhere (initializeRegisteredRegionClients())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is this a separate procedure from the one in the module's index.ts? why do we need to even invoke this if this gets validated on module init?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiregion/index.ts -> initializeRegisteredRegionClients() in multiregion/utils/blobStorageSelector -> initializeRegion -> ensureStorageAccessFactory. (This tests each additional regions, but ignores the primary region from multi-region config file.)

blobstorage/index.ts -> ensureStorageAccessFactory (This tests the 'main' storage, which is configured via the original environment variables and not from the multi-region config file)

We can ignore my comment at the top of this thread. And flipping from the original environment variables to the primary block of the multi-region config file can be done separately from this PR to align with the postgres database.

createBucketIfNotExists: createS3Bucket()
})
}

if (!process.env.S3_BUCKET) {
Expand Down Expand Up @@ -125,8 +134,12 @@ export const init: SpeckleModule['init'] = async (app) => {
limits: { fileSize: getFileSizeLimit() }
})

const projectDb = await getProjectDbClient({ projectId: streamId })
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const storeFileStream = storeFileStreamFactory({ storage: projectStorage })
const updateBlob = updateBlobFactory({ db: projectDb })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })

Expand All @@ -146,6 +159,11 @@ export const init: SpeckleModule['init'] = async (app) => {
updateBlob
})

const getObjectAttributes = getObjectAttributesFactory({
storage: projectStorage
})
const deleteObject = deleteObjectFactory({ storage: projectStorage })

busboy.on('file', (formKey, file, info) => {
const { filename: fileName } = info
const fileType = fileName?.split('.')?.pop()?.toLowerCase()
Expand Down Expand Up @@ -275,9 +293,15 @@ export const init: SpeckleModule['init'] = async (app) => {
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const streamId = req.params.streamId
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const getFileStream = getFileStreamFactory({ getBlobMetadata })
const getObjectStream = getObjectStreamFactory({ storage: projectStorage })

const { fileName } = await getBlobMetadata({
streamId: req.params.streamId,
Expand All @@ -304,12 +328,19 @@ export const init: SpeckleModule['init'] = async (app) => {
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const streamId = req.params.streamId
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const deleteBlob = fullyDeleteBlobFactory({
getBlobMetadata,
deleteBlob: deleteBlobFactory({ db: projectDb })
})
const deleteObject = deleteObjectFactory({ storage: projectStorage })

await deleteBlob({
streamId: req.params.streamId,
blobId: req.params.blobId,
Expand Down
Loading
Loading