Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move tables to storage #13

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,166 changes: 1 addition & 1,165 deletions cdk.out/development-EncodeService.template.json

Large diffs are not rendered by default.

431 changes: 301 additions & 130 deletions cdk.out/development-TranscribeService.template.json

Large diffs are not rendered by default.

1,314 changes: 228 additions & 1,086 deletions cdk.out/manifest.json

Large diffs are not rendered by default.

2,002 changes: 372 additions & 1,630 deletions cdk.out/tree.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions infrastructure/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"dynamo": {
"stack": "Dynamo",
"entry": "./src/core/dynamo"
},
"storage": {
"stack": "Storage",
"entry": "./src/core/storage"
}
},
"services": [
Expand Down
7 changes: 7 additions & 0 deletions infrastructure/infrastructure.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { Dynamo } = require("./central/dynamodb");

const { EncodeService } = require("./services/encode");
const { TranscribeService } = require("./services/transcribe");
const { Storage } = require('./central/storage');

console.log(process.env.AWS_ACCESS_KEY_ID)

Expand All @@ -31,6 +32,11 @@ new Dynamo(app, `${stage}-Dynamo`, {
env,
stage,
});
new Storage(app, `${stage}-Storage`, {
serviceName: "storage",
env,
stage
});

// Services
new EncodeService(app, `${stage}-EncodeService`, {
Expand All @@ -47,4 +53,5 @@ new TranscribeService(app, `${stage}-TranscribeService`, {
dependencies: {},
});


app.synth();
1 change: 0 additions & 1 deletion infrastructure/services/encode/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const s3n = require('@aws-cdk/aws-s3-notifications');
const { Duration }= require('@aws-cdk/core');
const sqs = require('@aws-cdk/aws-sqs');
const { SqsEventSource, S3EventSource } = require('@aws-cdk/aws-lambda-event-sources');
const s3 = require('@aws-cdk/aws-s3');

const createLambdaFunction = require('../../utils/create-lambda-function');
class EncodeService extends cdk.Stack {
Expand Down
18 changes: 13 additions & 5 deletions infrastructure/services/transcribe/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const cdk = require('@aws-cdk/core');
const s3 = require('@aws-cdk/aws-s3');
const s3n = require('@aws-cdk/aws-s3-notifications');
const sqs = require('@aws-cdk/aws-sqs');
const dynamodb = require('@aws-cdk/aws-dynamodb');
const { S3EventSource } = require('@aws-cdk/aws-lambda-event-sources');


Expand All @@ -16,10 +17,10 @@ class TranscribeService extends cdk.Stack {
super(app, id);

// A user uploads a video
const videoInputBucket = fromBucketName(this, 'VideoInputBucket', `video-input-bucket-${stage}`);
const videoInputBucket = s3.Bucket.fromBucketName(this, 'VideoInputBucket', `development-storage-videoinputbucket940f4f43-1du1ixen5jp8u`);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSM parameters

const extractedAudioBucket = s3.Bucket.fromBucketName(this, 'AudioExtractedBucket', `development-storage-audioextractedbuckete38bcdcf-10n4xngbp78mz`);

videoInputBucket.addEventNotification(s3.EventType.OBJECT_CREATED, new s3n.LambdaDestination(extractAudioLambda))
videoInputBucket.grantReadWrite(extractAudioLambda);
const videoTable = dynamodb.Table.fromTableName(this, 'DynamoTableVideos', 'development-videos' )

const ffmpegLayer = new lambda.LayerVersion(this, 'ffmpeg-layer', {
compatibleRuntimes: [
Expand All @@ -45,12 +46,19 @@ class TranscribeService extends cdk.Stack {
SUPABASE_API_URL: process.env.SUPABASE_API_URL,
SUPABASE_API_KEY: process.env.SUPABASE_API_KEY,
VIDEO_INPUT_BUCKET: videoInputBucket.bucketArn,
EXTRACTED_VIDEO_AUDIO_BUCKET: extractAudioBucket.bucketArn
EXTRACTED_VIDEO_AUDIO_BUCKET: extractedAudioBucket.bucketArn
},
layers: [ffmpegLayer]
});

const extractedAudioBucket = fromBucketName(this, 'AudioExtractedBucket', `audio-extracted-bucket-${stage}`);
videoInputBucket.addEventNotification(s3.EventType.OBJECT_CREATED, new s3n.LambdaDestination(extractAudioLambda))
videoInputBucket.grantReadWrite(extractAudioLambda);

extractedAudioBucket.grantReadWrite(extractAudioLambda);

videoTable.grantReadWriteData(extractAudioLambda);



// For the extracted audio, dispatch a job to Assembly A.I for transcribining

Expand Down
2 changes: 1 addition & 1 deletion infrastructure/utils/create-lambda-function.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const createLambdaFunction = ({
functionName,
code: Code.fromAsset(codeAssetPath),
handler,
runtime: Runtime.NODEJS_10_X,
runtime: Runtime.NODEJS_12_X,
memorySize,
timeout: timeout || cdk.Duration.seconds(60),
layers,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
"@aws-cdk/aws-lambda-event-sources": "^1.110.1",
"@aws-cdk/aws-rds": "^1.51.0",
"@aws-cdk/aws-s3": "^1.51.0",
"@aws-cdk/aws-s3-notifications": "^1.51.0",
"@aws-cdk/aws-s3-assets": "^1.97.0",
"@aws-cdk/aws-s3-notifications": "^1.51.0",
"@aws-cdk/aws-secretsmanager": "^1.51.0",
"@aws-cdk/aws-sns": "^1.51.0",
"@aws-cdk/aws-sns-subscriptions": "^1.51.0",
Expand All @@ -83,6 +83,7 @@
"babel-plugin-module-resolver": "^3.2.0",
"babel-plugin-source-map-support": "^2.0.1",
"babel-plugin-styled-components": "^1.12.0",
"dynamodb-toolbox": "^0.3.4",
"eslint": "^7.22.0",
"eslint-config-airbnb": "^18.0.1",
"eslint-config-airbnb-base": "^14.0.0",
Expand Down
199 changes: 34 additions & 165 deletions src/services/transcribe/extract-audio/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import uuidv4 from "uuid/v4";
import { v4 as uuidv4 } from "uuid";
import fs from "fs";
import path from "path";
import os from "os";
Expand All @@ -12,9 +12,13 @@ import { createS3Client } from "../../../utils/aws/s3";

import customConfig from './config';
import { generateUUID } from "../../../utils/uuid";
import childProcess from 'child_process';
import cleanOutTmp from "../../../utils/clean-out-tmp";
import { createDynamoDbClient } from "../../../utils/aws/dynamodb";

const createClients = (config: any) => ({
s3: createS3Client(),
db: createDynamoDbClient()
});

process.env.PATH =
Expand All @@ -26,31 +30,30 @@ const getExtension = (filename: string) => {
return ext[ext.length - 1];
};

const generateDefaultVideoItem = ({
videoBucketKey,
}) => ({
id: '' + generateUUID(),
userId: ''+ generateUUID(),
state: 'pending',
videoBucketKey,
extractedAudioKey: null,
transcriptionState: 'pending',
transcriptionKey: null
})

async function extractAudio(event, { logger }) {
const config = getConfig(customConfig);

const clients = createClients(config);



try {

await fs.readdir('/tmp/',async (err, files) => {
if (err) throw err;
logger.info(`do you like files ${files}`);
for (const file of files) {
await fs.unlink(path.join('/tmp/', file), err => {
if (err) throw err;
});
}
});

logger.info('inside the extract audio lambda')
logger.info(JSON.stringify(event));

const eventRecord = event.Records && event.Records[0];

const id = generateUUID();
const inputBucket = eventRecord.s3.bucket.name;
const key = eventRecord.s3.object.key;

Expand All @@ -59,183 +62,49 @@ async function extractAudio(event, { logger }) {
const blob = await clients.s3.get({ bucket: inputBucket, key});
const body = blob?.Body;

const videoKeyName = `/tmp/${key}`;
const audioKeyName = `/tmp/test.mp3`;
const item = generateDefaultVideoItem({videoBucketKey: key});

// @ts-ignore
// defensive check later
await clients.db.put(item);

const videoKeyName = `/tmp/${key}`;
const audioKeyName = `/tmp/temp.mp3`;

logger.info('Writing video file to temp...');


fs.writeFileSync(videoKeyName, body);

const args = [
"-i" ,
videoKeyName,
"-q:a",
"0",
"-map a",
"-vn",
audioKeyName
];

//replace with mp4 with mp3
logger.info(`Extracting audio from video :: Video key ${inputBucket}/${key}`);

fs.readdirSync('/opt/').forEach(file => {
logger.info(`opt: ${file}`);
});


const stout = childProcess.execFileSync("/opt/ffmpeg", args, {});
// ID (generate an ID using uuid package)
// state (starts with pending)
// videoBucketKey (location of uploaded video on s3)
// extractedAudioKey (location of extracted audio key, will be null initially)
// transcriptionState (starts with pending)
// transcriptionKey (location of transcription SRT file from Assembly.AI, will be null initially)

logger.info(`Extracting audio from video :: Video key ${inputBucket}/${key}`);


const blob = await clients.s3.get({ bucket: inputBucket, key});
logger.info(`da blob ${blob}`);
logger.info('pas the blobl')

const body = blob?.Body?.toString();
logger.info(`da body ${JSON.stringify(body)}`);
logger.info('pas the body')

const videoKeyName = `/tmp/${key}`;
const audioKeyName = `/tmp/test.mp3`;
logger.info('past key anems');
// @ts-ignore
// defensive check later


logger.info('Writing video file to temp...');

if(body){
fs.writeFileSync(videoKeyName, body);
}

const args = [
"-i" ,
videoKeyName,
"-q:a",
"0",
"-map a",
audioKeyName
];

//replace with mp4 with mp3
logger.info(`xffmphe lahyer time`);


// await fs.renameSync('/opt/ffmpeg', '/tmp/ffmpeg');
// await fs.chmodSync('/tmp/ffmpeg', '777');

fs.readdir('/opt/ffmpeg', (err, files) => {
files.forEach(file => {
logger.info(file);
});
});

const stout = childProcess.execFileSync("/opt/ffmpeg/", args, {});

const audioBucket = 'development-transcribese-extractaudiobucket197901-4zaqtw7geuta';
childProcess.execFileSync("/opt/ffmpeg", args, {});
//
const audioBucket = 'development-storage-audioextractedbuckete38bcdcf-10n4xngbp78mz';

const audioName = fs.readFileSync(audioKeyName);
logger.info(`audoName:${audioName} Reading audio file...`);

clients.s3.put({ file: audioName, bucket: audioBucket, key: audioKeyName });

const {data, error} = await clients.supabase.from('Video').insert([{
id,
state: 'pending',
videoBucketKey: key,
extractedAudioKey: null,
transcriptionState: 'pending',
transcriptionKey: null
}]);
if (error) {
throw new Error(`Failed to retrive video upload :: ${error.message}`)
}

logger.info(`Retrieved video ${JSON.stringify(data)}`);


// compress video
// split video into audio
// create audioKey
// store audio in s3
// update video record with audio key

// const id = context.awsRequestId,
// const resultKey = key.replace(/\.[^.]+$/, EXTENSION),
// const workdir = os.tmpdir(),
// const inputFile = path.join(workdir, id + path.extname(key)),
// const outputFile = path.join(workdir, id + EXTENSION);

// return s3Util.downloadFileFromS3(inputBucket, key, inputFile)
// .then(() => childProcessPromise.spawn(
// '/opt/bin/ffmpeg',
// ['-loglevel', 'error', '-y', '-i', inputFile, '-vf', `thumbnail,scale=${THUMB_WIDTH}:-1`, '-frames:v', '1', outputFile],
// {
// env: process.env,
// cwd: workdir
// }
// ))
// .then(() => s3Util.uploadFileToS3(OUTPUT_BUCKET, resultKey, outputFile, MIME_TYPE));

// development-encodeservic-videoinputbucket940f4f43-iy9if872u4ib

const audioBucket = 'development-encodeservice-extractaudiobucket197901-1wjvufyahi68c';
logger.info(`audioName:${audioKeyName} Reading audio file...`);

const audioName = fs.readFileSync(audioKeyName);
logger.info(`audoName:${audioName} Reading audio file...`);

clients.s3.put({ file: audioName, bucket: audioBucket, key: audioKeyName });
const audioBucketKeyName = `${generateUUID()}-temp.mp3`

// //.... rest of the logic to clear up locally written files from running the executable
await clients.s3.put({ file: audioName, bucket: audioBucket, key: audioBucketKeyName });
logger.info(`audioName:${audioKeyName} Writing audio file to s3 ...`);

item.extractedAudioKey = audioBucketKeyName as any;

// // compress video
// // split video into audio
// // create audioKey
// // store audio in s3
// // update video record with audio key


await clients.db.update(item);

} catch (error) {
logger.error(error);
fs.readdir('/tmp/', (err, files) => {
if (err) throw err;

for (const file of files) {
fs.unlink(path.join('/tmp/', file), err => {
if (err) throw err;
logger.info(`Deleting ${file}`);
});
}
});

logger.info('cleaning out tmp')
await fs.readdir('/tmp/',async (err, files) => {
if (err) throw err;
logger.info(files);


for (const file of files) {
await fs.unlink(path.join('/tmp/', file), err => {
if (err) throw err;
});
}
});

throw error;
throw error;
}
cleanOutTmp(logger);
}

const options = {
Expand Down
Loading