-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- add crawling for korean face video and voice data
- #3
- Loading branch information
1 parent
03c1cfc
commit 50aed32
Showing
6 changed files
with
380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
col_names,col_explanation | ||
name,컨텐츠 올린 채널명 | ||
title,컨텐츠 제목 | ||
url_link,연결 링크 |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import os | ||
import pandas as pd | ||
from moviepy.editor import VideoFileClip | ||
import numpy as np | ||
import face_recognition | ||
import shutil | ||
|
||
# 얼굴이 감지된 영상의 오디오를 추출하는 함수 | ||
def extract_audio_with_face(video_clip, start_time, end_time): | ||
audio = video_clip.audio.subclip(start_time, end_time) | ||
return audio | ||
|
||
# 주어진 영상에서 얼굴이 감지된 구간의 오디오를 추출하는 함수 | ||
def extract_audio_with_faces(video_clip, face_detections): | ||
# 얼굴이 감지된 부분의 오디오를 모아서 저장할 리스트 | ||
audio_clips = [] | ||
|
||
# 각 얼굴 감지된 구간에서 오디오를 추출하여 리스트에 추가 | ||
for start_time, end_time in face_detections: | ||
audio_clip = extract_audio_with_face(video_clip, start_time, end_time) | ||
audio_clips.append(audio_clip) | ||
|
||
# 오디오 클립들을 합쳐서 하나의 오디오 클립으로 반환 | ||
final_audio = np.concatenate([clip.to_soundarray() for clip in audio_clips]) | ||
return final_audio | ||
|
||
# 얼굴 감지 함수 | ||
def detect_faces(video_clip): | ||
frames = [frame for frame in video_clip.iter_frames()] | ||
frame_rate = video_clip.fps | ||
frame_times = np.arange(len(frames)) / frame_rate | ||
|
||
# 얼굴 감지된 구간의 시작 및 끝 시간을 저장할 리스트 | ||
face_detections = [] | ||
|
||
# 각 프레임에 대해 얼굴 감지 수행 | ||
for i, frame in enumerate(frames): | ||
face_locations = face_recognition.face_locations(frame) | ||
if face_locations: | ||
start_time = frame_times[max(0, i - 1)] | ||
end_time = frame_times[min(len(frames) - 1, i + 1)] | ||
face_detections.append((start_time, end_time)) | ||
|
||
return face_detections | ||
|
||
# 새로운 영상을 생성하는 함수 | ||
def create_new_video(video_clip, face_detections, output_path): | ||
# 새로운 비디오 클립 초기화 | ||
new_video_clip = None | ||
|
||
# 얼굴이 감지된 구간에서만 비디오를 추출하여 새로운 비디오 클립에 추가 | ||
for start_time, end_time in face_detections: | ||
subclip = video_clip.subclip(start_time, end_time) | ||
if new_video_clip is None: | ||
new_video_clip = subclip | ||
else: | ||
new_video_clip = new_video_clip.append(subclip) | ||
|
||
# 새로운 비디오 클립을 파일로 저장 | ||
new_video_clip.write_videofile(output_path) | ||
|
||
# CSV 파일에서 데이터 읽어오기 | ||
csv_file_path = "/Users/imseohyeon/Documents/crawling/data/Youtube_search_df.csv" | ||
df = pd.read_csv(csv_file_path) | ||
|
||
# 다운로드된 영상 파일들이 저장된 폴더 경로 | ||
DOWNLOAD_FOLDER = "/Users/imseohyeon/Documents/crawling/download/" | ||
# 새로운 폴더 경로 | ||
NEW_FOLDER = "/Users/imseohyeon/Documents/crawling/processed_videos/" | ||
|
||
# 새로운 폴더 생성 | ||
if not os.path.exists(NEW_FOLDER): | ||
os.makedirs(NEW_FOLDER) | ||
|
||
# 각 영상에 대해 반복하며 얼굴 감지된 구간의 영상과 오디오를 추출하고 새로운 영상 생성 | ||
for idx, row in df.iterrows(): | ||
video_filename = f"{idx}_video.mp4" | ||
video_path = os.path.join(DOWNLOAD_FOLDER, video_filename) | ||
|
||
if os.path.exists(video_path): | ||
try: | ||
# 영상 클립 생성 | ||
video_clip = VideoFileClip(video_path) | ||
|
||
# 얼굴 감지 수행 | ||
face_detections = detect_faces(video_clip) | ||
|
||
if face_detections: | ||
# 얼굴이 감지된 구간의 오디오 추출 | ||
final_audio = extract_audio_with_faces(video_clip, face_detections) | ||
|
||
# 새로운 영상 생성 | ||
output_path = os.path.join(NEW_FOLDER, f"{idx}_new_video.mp4") | ||
create_new_video(video_clip, face_detections, output_path) | ||
|
||
print(f"{video_filename}에 대한 처리 완료") | ||
else: | ||
print(f"{video_filename}에서 얼굴을 감지할 수 없습니다.") | ||
except Exception as e: | ||
print(f"{video_filename} 처리 중 오류 발생: {e}") | ||
else: | ||
print(f"{video_filename} 파일이 존재하지 않습니다.") | ||
|
||
# 다 처리된 영상을 다른 폴더에 옮기기 | ||
processed_files = os.listdir(NEW_FOLDER) | ||
for file in processed_files: | ||
shutil.move(os.path.join(NEW_FOLDER, file), DOWNLOAD_FOLDER) | ||
|
||
print("모든 영상 처리 완료") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import os | ||
import pandas as pd | ||
|
||
# CSV 파일에서 링크 읽어오기 | ||
csv_file_path = "/Users/imseohyeon/Documents/crawling/data/Youtube_search_df.csv" | ||
df = pd.read_csv(csv_file_path) | ||
|
||
# 다운로드한 영상들이 저장된 폴더 경로 | ||
DOWNLOAD_FOLDER = "/Users/imseohyeon/Documents/crawling/download/" | ||
|
||
# 폴더 내의 모든 파일을 확인하여 이름 변경 | ||
for filename in os.listdir(DOWNLOAD_FOLDER): | ||
# 파일의 전체 경로 | ||
file_path = os.path.join(DOWNLOAD_FOLDER, filename) | ||
# 파일이 .mp4인지 확인 | ||
if filename.endswith(".mp4"): | ||
# 파일 이름에서 index 값을 추출 (영상의 title이 index로 저장된 것으로 가정) | ||
idx = filename.split("_")[0] # 예시: "0_video.mp4" -> "0" | ||
# 새로운 파일 이름 생성 | ||
new_filename = f"{idx}_video.mp4" | ||
# 새로운 파일 경로 생성 | ||
new_file_path = os.path.join(DOWNLOAD_FOLDER, new_filename) | ||
# 파일 이름 변경 | ||
os.rename(file_path, new_file_path) | ||
print(f"파일 이름 변경: {filename} -> {new_filename}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from selenium import webdriver | ||
from selenium.webdriver.common.by import By | ||
from selenium.webdriver.common.keys import Keys | ||
|
||
import requests | ||
from bs4 import BeautifulSoup | ||
import time | ||
import pandas as pd | ||
import os | ||
|
||
# WebDriver 초기화 (PATH에 추가했기 때문에 executable_path를 사용하지 않음) | ||
browser = webdriver.Chrome() | ||
|
||
# 접속 url | ||
url = "https://youtube.com/" | ||
|
||
# 검색 키워드 | ||
keyword = "나혼자 여행" | ||
|
||
# 스크롤을 어디까지 내리는지 기준 | ||
# finish_line = 40000 기준: 162 개 | ||
finish_line = 10000 | ||
|
||
browser.maximize_window() | ||
browser.get(url) | ||
time.sleep(2) | ||
search = browser.find_element(By.NAME, "search_query") | ||
time.sleep(2) | ||
search.send_keys(keyword) | ||
search.send_keys(Keys.ENTER) | ||
|
||
# 검색 후 url 작업창 변경 (파싱) | ||
present_url = browser.current_url | ||
browser.get(present_url) | ||
last_page_height = browser.execute_script("return document.documentElement.scrollHeight") | ||
|
||
# 스크롤 100번 수행 | ||
scroll_count = 0 | ||
while scroll_count < 100: | ||
# 우선 스크롤 내리기 | ||
browser.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);") | ||
time.sleep(2.0) # 작업 중간에 1이상으로 간격을 줘야 데이터 취득가능(스크롤을 내릴 때의 데이터 로딩 시간 때문) | ||
# 현재 위치 담기 | ||
new_page_height = browser.execute_script("return document.documentElement.scrollHeight") | ||
|
||
# 스크롤 횟수 증가 | ||
scroll_count += 1 | ||
|
||
html_source = browser.page_source | ||
soup = BeautifulSoup(html_source, 'html.parser') | ||
|
||
# finish line까지 모든 검색 결과 정보 가져오기 | ||
# 모든 컨텐츠 관련 부분을 떼어내기 | ||
# find_all: 해당 정보의 모든 부분 가져오기 | ||
elem = soup.find_all("ytd-video-renderer", class_="style-scope ytd-item-section-renderer") | ||
|
||
# 필요한 정보 가져오기 | ||
df = [] | ||
for t in elem[:100]: # 처음 100개의 동영상 정보만 가져오도록 수정 | ||
title = t.find("yt-formatted-string", class_="style-scope ytd-video-renderer").get_text() | ||
name = t.find("a", class_="yt-simple-endpoint style-scope yt-formatted-string").get_text() | ||
content_url = t.find("a", class_="yt-simple-endpoint style-scope ytd-video-renderer")["href"] | ||
df.append([name, title , 'https://www.youtube.com/'+content_url]) | ||
|
||
## 자료 저장 | ||
# 데이터 프레임 만들기 | ||
new = pd.DataFrame(columns=['name', 'title' , 'url_link']) | ||
|
||
# 자료 집어넣기 | ||
for i in range(len(df)): | ||
new.loc[i] = df[i] | ||
|
||
# 데이터를 저장할 디렉토리 생성 | ||
df_dir = "./data/" | ||
if not os.path.exists(df_dir): | ||
os.makedirs(df_dir) | ||
|
||
# 저장하기 | ||
new.to_csv(os.path.join(df_dir, "Youtube_search_df.csv"), index=True, encoding='utf8') # 인덱스 포함하여 저장 | ||
|
||
## 컬럼 정보 저장 | ||
# 컬럼 설명 테이블 | ||
col_names = ['name', 'title' ,'url_link'] | ||
col_exp = ['컨텐츠 올린 채널명', '컨텐츠 제목', '연결 링크'] | ||
|
||
new_exp = pd.DataFrame({'col_names':col_names, | ||
'col_explanation':col_exp}) | ||
|
||
# 저장하기 | ||
new_exp.to_csv(os.path.join(df_dir, "Youtube_col_exp.csv"), index=False, encoding='utf8') | ||
|
||
# 브라우저 닫기 | ||
browser.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import os | ||
import pandas as pd | ||
from pytube import YouTube | ||
import time | ||
|
||
# CSV 파일에서 링크 읽어오기 | ||
csv_file_path = "/Users/imseohyeon/Documents/crawling/data/Youtube_search_df.csv" | ||
df = pd.read_csv(csv_file_path) | ||
|
||
# 다운로드할 폴더 경로 정의 | ||
DOWNLOAD_FOLDER = "/Users/imseohyeon/Documents/crawling/download/" | ||
|
||
# 폴더가 없다면 생성 | ||
if not os.path.exists(DOWNLOAD_FOLDER): | ||
os.makedirs(DOWNLOAD_FOLDER) | ||
|
||
# 각 영상에 대해 반복하며 다운로드 | ||
for idx, row in df.iterrows(): | ||
video_url = row['url_link'] | ||
try: | ||
# Pytube를 사용하여 영상 정보 가져오기 | ||
yt = YouTube(video_url) | ||
length_seconds = yt.length | ||
|
||
# 파일 이름 설정 | ||
filename = f"{idx}_video.mp4" | ||
|
||
# 영상 길이가 5분 이상이면 처음 5분까지만 다운로드 | ||
if length_seconds > 5 * 60: | ||
print(f"{yt.title} 영상이 5분을 초과합니다. 처음 5분만 다운로드합니다.") | ||
stream = yt.streams.filter(adaptive=True, file_extension='mp4').first() | ||
if stream: | ||
print(f"다운로드 중: {yt.title}") | ||
stream.download(output_path=DOWNLOAD_FOLDER, filename=filename) | ||
print(f"{yt.title} 다운로드 완료") | ||
else: | ||
print(f"{yt.title}에 대한 최고 품질 스트림이 없습니다.") | ||
else: | ||
# 5분 이하의 영상은 전체를 다운로드 | ||
stream = yt.streams.get_highest_resolution() | ||
if stream: | ||
print(f"다운로드 중: {yt.title}") | ||
stream.download(output_path=DOWNLOAD_FOLDER, filename=filename) | ||
print(f"{yt.title} 다운로드 완료") | ||
else: | ||
print(f"{yt.title}에 대한 최고 품질 스트림이 없습니다.") | ||
except Exception as e: | ||
print(f"{yt.title} 다운로드 실패: {e}") |