Skip to content

Commit

Permalink
Merge pull request #190 from Automattic/add/image-runner
Browse files Browse the repository at this point in the history
Support for running arbitrary images
  • Loading branch information
scinos authored Nov 12, 2020
2 parents 1a4337b + 435e0dd commit aa068ca
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 136 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"@types/tar-fs": "^1.16.1",
"@types/useragent": "^2.1.1",
"bunyan": "^1.8.12",
"docker-parse-image": "^3.0.1",
"dockerode": "^3.0.0",
"express": "^4.16.3",
"express-session": "^1.15.6",
Expand Down
271 changes: 217 additions & 54 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import httpProxy from 'http-proxy';
import Docker from 'dockerode';
import Docker, { ImageInfo } from 'dockerode';
import _ from 'lodash';
import portfinder from 'portfinder';
import git from 'nodegit';
Expand All @@ -13,14 +13,15 @@ import { l } from './logger';
import { pendingHashes } from './builder';
import { exec } from 'child_process';

import { CONTAINER_EXPIRY_TIME, START_TIME, TEN_MINUTES } from './constants';
import { CONTAINER_EXPIRY_TIME } from './constants';
import { timing } from './stats';

type APIState = {
accesses: Map< CommitHash, number >;
accesses: Map< ContainerName, number >;
branchHashes: Map< CommitHash, BranchName >;
containers: Map< string, Docker.ContainerInfo >;
localImages: Map< string, Docker.ImageInfo >;
localImages: Map< ImageName, Docker.ImageInfo >;
pullingImages: Map< ImageName, Promise< DockerodeStream > >;
remoteBranches: Map< BranchName, CommitHash >;
startingContainers: Map< CommitHash, Promise< ContainerInfo > >;
};
Expand All @@ -30,6 +31,7 @@ export const state: APIState = {
branchHashes: new Map(),
containers: new Map(),
localImages: new Map(),
pullingImages: new Map(),
remoteBranches: new Map(),
startingContainers: new Map(),
};
Expand All @@ -43,6 +45,17 @@ export type BranchName = string;
export type PortNumber = number;
export type ImageStatus = 'NoImage' | 'Inactive' | PortNumber;
export type RunEnv = string;
export type DockerRepository = string;
export type ImageName = string;
export type ContainerName = string;
export type DockerodeStream = any;
export type ContainerSearchOptions = {
image?: ImageName;
env?: RunEnv;
status?: string;
id?: string;
name?: string;
};

export const getImageName = ( hash: CommitHash ) => `${ config.build.tagPrefix }:${ hash }`;
export const extractCommitFromImage = ( imageName: string ): CommitHash => {
Expand All @@ -58,30 +71,48 @@ export const extractEnvironmentFromImage = ( image: ContainerInfo ): RunEnv => {
};

/**
* Polls the local Docker daemon to
* fetch an updated list of images
* Polls the local Docker daemon to fetch an updated list of images
*
* It saves them in the Map `stcate.localImages`, indexed by tag name. If an image has more than
* one tag it will appear multiple times in the map.
*/
export async function refreshLocalImages() {
const images = await docker.listImages();
const isTag = ( tag: string ) => tag.startsWith( config.build.tagPrefix );
const hasTag = ( image: Docker.ImageInfo ) => image.RepoTags && image.RepoTags.some( isTag );

state.localImages = new Map(
images
.filter( hasTag )
.map( image => [ image.RepoTags.find( isTag ), image ] as [ string, Docker.ImageInfo ] )
images.reduce(
( acc, image ) => [
...acc,
...( image.RepoTags || [] ).map( tag => [ tag, image ] as [ ImageName, ImageInfo ] ),
],
[]
)
);
}

/**
* Returns the list of local images
* Returns the list of images built by dserve
*/
export function getLocalImages() {
return new Map(
Array.from( state.localImages.entries() ).filter( ( [ imageName ] ) =>
imageName.startsWith( config.build.tagPrefix )
)
);
}

/**
* Returns the list of all images
*/
export function getAllImages() {
return state.localImages;
}

export function getAllContainers() {
return state.containers;
}

export async function hasHashLocally( hash: CommitHash ): Promise< boolean > {
return state.localImages.has( getImageName( hash ) );
return getLocalImages().has( getImageName( hash ) );
}

export async function deleteImage( hash: CommitHash ) {
Expand Down Expand Up @@ -196,7 +227,7 @@ export async function startContainer( commitHash: CommitHash, env: RunEnv ) {
{ image, freePort, commitHash },
`Successfully started container for ${ image } on ${ freePort }`
);
return refreshRunningContainers().then( () => getRunningContainerForHash( commitHash ) );
return refreshContainers().then( () => getRunningContainerForHash( commitHash ) );
},
( { error, freePort } ) => {
l.error(
Expand Down Expand Up @@ -224,8 +255,8 @@ export async function startContainer( commitHash: CommitHash, env: RunEnv ) {
return startPromise;
}

export async function refreshRunningContainers() {
const containers = await docker.listContainers();
export async function refreshContainers() {
const containers = await docker.listContainers( { all: true } );
state.containers = new Map(
containers.map( container => [ container.Id, container ] as [ string, ContainerInfo ] )
);
Expand All @@ -241,13 +272,6 @@ export function getRunningContainerForHash( hash: CommitHash, env?: RunEnv ): Co
);
}

export function getRunningContainersForHash( hash: CommitHash ): ContainerInfo[] {
const image = getImageName( hash );
return Array.from( state.containers.values() ).filter(
ci => ci.Image === image && ci.State === 'running'
);
}

export function isContainerRunning( hash: CommitHash, env?: RunEnv ): boolean {
return !! getRunningContainerForHash( hash, env );
}
Expand Down Expand Up @@ -376,39 +400,29 @@ export function getCommitHashForBranch( branch: BranchName ): CommitHash | undef
return state.remoteBranches.get( branch );
}

export function touchCommit( hash: CommitHash ) {
state.accesses.set( hash, Date.now() );
export function touchCommit( hash: CommitHash, env?: RunEnv ) {
const container = getRunningContainerForHash( hash, env );
if ( ! container ) throw `Running container for commit ${ hash } not found}`;

const name = getContainerName( container );
touchContainer( name );
}

export function getCommitAccessTime( hash: CommitHash ): number | undefined {
if ( ! hash ) {
return undefined;
}
return state.accesses.get( hash );
export function touchContainer( name: ContainerName ) {
state.accesses.set( name, Date.now() );
}

export function getContainerAccessTime( name: ContainerName ): number | undefined {
return state.accesses.get( name );
}

/*
* Get all currently running containers that were created by dserve and have expired.
* Get all currently running containers that have expired.
* Expired means have not been accessed in EXPIRED_DURATION
*/
export function getExpiredContainers(
containers: Array< ContainerInfo >,
getAccessTime: Function
) {
// if the server is newly spun up, wait a bit before killing off running containers
if ( Date.now() - START_TIME < TEN_MINUTES ) {
return [];
}

// otherwise, filter off containers that are still valid
return containers.filter( ( container: ContainerInfo ) => {
const imageName: string = container.Image;

// exclude container if it wasnt created by this app
// if ( ! imageName.startsWith( config.build.tagPrefix ) ) {
// return false;
// }

export function getExpiredContainers() {
// Filter off containers that are still valid
return Array.from( state.containers.values() ).filter( ( container: ContainerInfo ) => {
if ( container.State === 'dead' || container.State === 'created' ) {
// ignore dead and just created containers
return false;
Expand All @@ -420,7 +434,7 @@ export function getExpiredContainers(
}

const createdAgo = Date.now() - container.Created * 1000;
const lastAccessed = getAccessTime( extractCommitFromImage( imageName ) );
const lastAccessed = getContainerAccessTime( getContainerName( container ) );

return (
createdAgo > CONTAINER_EXPIRY_TIME &&
Expand All @@ -431,8 +445,8 @@ export function getExpiredContainers(

// stop any container that hasn't been accessed within ten minutes
export async function cleanupExpiredContainers() {
const containers = Array.from( await docker.listContainers( { all: true } ) );
const expiredContainers = getExpiredContainers( containers, getCommitAccessTime );
await refreshContainers();
const expiredContainers = getExpiredContainers();
for ( let container of expiredContainers ) {
const imageName: string = container.Image;

Expand All @@ -459,7 +473,7 @@ export async function cleanupExpiredContainers() {
l.error( { err, imageName, containerId: container.Id }, 'Failed to remove container' );
}
}
refreshRunningContainers();
refreshContainers();
}

const proxy = httpProxy.createProxyServer( {} ); // See (†)
Expand All @@ -474,10 +488,159 @@ export async function proxyRequestToHash( req: any, res: any ) {
return;
}

touchCommit( commitHash, runEnv );
proxy.web( req, res, { target: `http://localhost:${ port }` }, err => {
if ( err && ( err as any ).code === 'ECONNRESET' ) {
return;
}
l.log( { err, req, res, commitHash }, 'unexpected error occured while proxying' );
} );
}

export function getContainerName( container: ContainerInfo ) {
// The first character is a `/`, skip it
return container.Names[ 0 ].substring( 1 );
}

export function findContainer( { id, image, env, status, name }: ContainerSearchOptions ) {
return Array.from( state.containers.values() ).find( container => {
if ( image && ( container.Image !== image && container.ImageID !== image ) ) return false;
if ( env && container.Labels[ 'calypsoEnvironment' ] !== env ) return false;
if ( status && container.Status !== status ) return false;
if ( id && container.Id !== id ) return false;
// In the Docker internal list, names start with `/`
if ( name && ! container.Names.includes( '/' + name ) ) return false;
return true;
} );
}

export async function proxyRequestToContainer( req: any, res: any, container: ContainerInfo ) {
// In the Docker internal list, names start with `/`
const containerName = getContainerName( container );

if ( ! container.Ports[ 0 ] ) {
l.log( { containerName }, `Could not find port for container` );
throw new Error( `Could not find port for container ${ containerName }` );
}
const port = container.Ports[ 0 ].PublicPort;

let retryCounter = config.proxyRetry;
const proxyToContainer = () =>
proxy.web( req, res, { target: `http://localhost:${ port }` }, errorHandler );
const errorHandler = ( err: any ) => {
if ( err && ( err as any ).code === 'ECONNRESET' ) {
retryCounter--;
if ( retryCounter > 0 ) setTimeout( proxyToContainer, 1000 );
}
l.log( { err, req, res, containerName }, 'unexpected error occured while proxying' );
throw new Error( 'unexpected error occured while proxying' );
};
touchContainer( containerName );
proxyToContainer();
}

/**
* Pulls an image. Calls onProgress() when there is an update, resolves the returned promise
* when the image is pulled
*/
export async function pullImage( imageName: ImageName, onProgress: ( data: any ) => void ) {
// Store the stream in memory, so other requets can "join" and listen for the progress
if ( ! state.pullingImages.has( imageName ) ) {
const stream = docker.pull( imageName, {} ) as Promise< DockerodeStream >;
state.pullingImages.set( imageName, stream );
}

const stream = state.pullingImages.get( imageName );
return new Promise( async ( resolve, reject ) => {
const resolvedStream = await stream;

docker.modem.followProgress(
resolvedStream,
( err: any ) => {
state.pullingImages.delete( imageName );
if ( err ) reject( err );
else resolve();
},
onProgress
);
} );
}

/**
* Asks a container nicely to stop, waits for 10 seconds and then obliterates it
*/
export async function deleteContainer( containerInfo: ContainerInfo ) {
const container = docker.getContainer( containerInfo.Id );
if ( containerInfo.State === 'running' ) {
await container.stop( { t: 10 } );
}
await container.remove( { force: true } );
await refreshContainers();
}

/**
* Creates a container
*
* createContainer is async, but we don't keep a list of container being creates to ensure atomicity for a few reasons:
*
* - Creating container is quite fast (a few ms), so the chances of collisions are quite low
* - Even if we get two requests with the same image+env at the same time, creating two separate containers for the same
* image is ok. Each one will get a different URL, and if one of them is not used it will get eventually cleaned up.
*/
export async function createContainer( imageName: ImageName, env: RunEnv ) {
const exposedPort = `${ config.build.exposedPort }/tcp`;

let freePort: number;
try {
freePort = await portfinder.getPortPromise();
} catch ( err ) {
l.error( { err, imageName }, `Error while attempting to find a free port for ${ imageName }` );
throw err;
}

try {
const container = await docker.createContainer( {
...config.build.containerCreateOptions,
...envContainerConfig( env ),
Image: imageName,
ExposedPorts: { [ exposedPort ]: {} },
HostConfig: {
PortBindings: { [ exposedPort ]: [ { HostPort: freePort.toString() } ] },
},
Labels: {
calypsoEnvironment: env,
},
} );
l.log( { imageName }, `Successfully created container for ${ imageName }` );
await refreshContainers();

// Returns a ContainerInfo for the created container, in order to avoid exposing a real Container object.
return findContainer( {
id: container.id,
} );
} catch ( error ) {
l.error( { imageName, error }, `Failed creating container for ${ imageName }` );
throw error;
}
}

/**
* Starts a container that was dormant (either never started, or stopped)
*/
export async function reviveContainer( containerInfo: ContainerInfo ) {
const containerName = getContainerName( containerInfo );
const container = docker.getContainer( containerInfo.Id );

try {
await container.start();
await refreshContainers();

// This returns the same containerInfo object, but updated
return findContainer( {
id: container.id,
} );
} catch ( error ) {
l.error( { containerName, error }, `Failed starting container ${ containerName }` );
throw error;
}
}
Loading

0 comments on commit aa068ca

Please sign in to comment.