video-ai/app.py
2025-03-05 19:31:45 +01:00

164 lines
5.8 KiB
Python

from openai import OpenAI
import os
import math
from moviepy import VideoFileClip
import tempfile
from openai_client import OpenAIClient
MAX_FILE_SIZE = 26214400 # 25MB in bytes
CHUNK_SIZE = MAX_FILE_SIZE // 2 # Split into ~12MB chunks
OUTPUT_DIR = "E:\\sources\\PDC\\video_ai\\videos"
def extract_audio(video_path):
"""Extract audio from video file and return path to audio file."""
print("Extracting audio from video file...")
temp_audio_path = os.path.join(OUTPUT_DIR, "temp_audio.mp3")
video = VideoFileClip(video_path)
video.audio.write_audiofile(temp_audio_path)
video.close()
return temp_audio_path
def split_audio_file(file_path):
"""Split an audio file into smaller chunks using binary reading."""
chunks_dir = os.path.join(OUTPUT_DIR, 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
chunks = []
file_size = os.path.getsize(file_path)
total_chunks = math.ceil(file_size / CHUNK_SIZE)
with open(file_path, 'rb') as source_file:
for chunk_number in range(total_chunks):
chunk_filename = os.path.join(chunks_dir, f"chunk_{chunk_number}.mp3")
with open(chunk_filename, 'wb') as chunk_file:
chunk_data = source_file.read(CHUNK_SIZE)
chunk_file.write(chunk_data)
chunks.append(chunk_filename)
return chunks
def process_audio_file(client, file_path, response_format):
"""Process a single audio file and return its transcription."""
with open(file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format=response_format
)
return transcript
def summarize_transcription(client, transcript):
"""Use GPT-4 to summarize the transcription."""
response = client.completions.create(
model="gpt-4o",
prompt=f"Please summarize the following transcription: {transcript}",
max_tokens=12000,
temperature=0.5
)
return response.choices[0].text.strip()
def convert_video_to_audio_with_chunking(file_path):
"""Convert video to audio and handle chunking if necessary."""
is_video = file_path.lower().endswith('.mp4')
audio_path = extract_audio(file_path) if is_video else file_path
try:
file_size = os.path.getsize(audio_path)
if file_size > MAX_FILE_SIZE:
print(f"File size ({file_size} bytes) exceeds the maximum limit. Splitting into chunks...")
return split_audio_file(audio_path), audio_path, is_video
return [audio_path], audio_path, is_video
except Exception as e:
print(f"Error in video/audio conversion: {e}")
raise
def transcribe_audio(client, audio_chunks, response_format="text"):
"""Handle the transcription of audio chunks."""
try:
full_transcript = ""
for i, chunk_path in enumerate(audio_chunks, 1):
print(f"Processing chunk {i} of {len(audio_chunks)}...")
chunk_transcript = process_audio_file(client, chunk_path, response_format)
print(f"Chunk {i} transcript:")
print(chunk_transcript)
if response_format == "text":
full_transcript += chunk_transcript + " "
else: # vtt format
full_transcript += chunk_transcript + "\n\n"
transcript = full_transcript.strip()
# Save the transcript
transcript_filename = os.path.join(OUTPUT_DIR, "transcript.txt")
with open(transcript_filename, "w") as f:
f.write(transcript)
print(f"Transcript saved to {transcript_filename}")
return transcript
except Exception as e:
print(f"Error in transcription: {e}")
raise
def generate_summary(openai_client, transcript):
"""Generate and save a summary of the transcript."""
try:
summary = summarize_transcription(openai_client, transcript)
print("Summary:")
print(summary)
# Save the summary
summary_filename = os.path.join(OUTPUT_DIR, "summary.txt")
with open(summary_filename, "w") as f:
f.write(summary)
print(f"Summary saved to {summary_filename}")
return summary
except Exception as e:
print(f"Error in summary generation: {e}")
raise
def cleanup_files(audio_path, is_video, chunks):
"""Clean up temporary files."""
try:
# Clean up chunks
for chunk_path in chunks:
if os.path.exists(chunk_path):
os.remove(chunk_path)
# Clean up temporary audio file if it was extracted from video
if is_video and os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
print(f"Error during cleanup: {e}")
def main():
try:
# Configuration
file_path = 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Modulo ESG-20250116_105531.mp4'
response_format = "text"
api_key = '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'
# Initialize OpenAI clients
client = OpenAI(api_key=api_key)
openai_client = OpenAIClient(api_key=api_key)
# Step 1: Convert video to audio and handle chunking
audio_chunks, audio_path, is_video = convert_video_to_audio_with_chunking(file_path)
try:
# Step 2: Transcribe the audio
transcript = transcribe_audio(client, audio_chunks, response_format)
# Step 3: Generate summary
summary = generate_summary(openai_client, transcript)
finally:
# Clean up temporary files
cleanup_files(audio_path, is_video, audio_chunks)
except Exception as e:
print("Error:", e)
if __name__ == "__main__":
main()