164 lines
5.8 KiB
Python
164 lines
5.8 KiB
Python
from openai import OpenAI
|
|
import os
|
|
import math
|
|
from moviepy import VideoFileClip
|
|
import tempfile
|
|
from openai_client import OpenAIClient
|
|
|
|
MAX_FILE_SIZE = 26214400 # 25MB in bytes
|
|
CHUNK_SIZE = MAX_FILE_SIZE // 2 # Split into ~12MB chunks
|
|
OUTPUT_DIR = "E:\\sources\\PDC\\video_ai\\videos"
|
|
|
|
def extract_audio(video_path):
|
|
"""Extract audio from video file and return path to audio file."""
|
|
print("Extracting audio from video file...")
|
|
temp_audio_path = os.path.join(OUTPUT_DIR, "temp_audio.mp3")
|
|
video = VideoFileClip(video_path)
|
|
video.audio.write_audiofile(temp_audio_path)
|
|
video.close()
|
|
return temp_audio_path
|
|
|
|
def split_audio_file(file_path):
|
|
"""Split an audio file into smaller chunks using binary reading."""
|
|
chunks_dir = os.path.join(OUTPUT_DIR, 'chunks')
|
|
os.makedirs(chunks_dir, exist_ok=True)
|
|
|
|
chunks = []
|
|
file_size = os.path.getsize(file_path)
|
|
total_chunks = math.ceil(file_size / CHUNK_SIZE)
|
|
|
|
with open(file_path, 'rb') as source_file:
|
|
for chunk_number in range(total_chunks):
|
|
chunk_filename = os.path.join(chunks_dir, f"chunk_{chunk_number}.mp3")
|
|
with open(chunk_filename, 'wb') as chunk_file:
|
|
chunk_data = source_file.read(CHUNK_SIZE)
|
|
chunk_file.write(chunk_data)
|
|
chunks.append(chunk_filename)
|
|
|
|
return chunks
|
|
|
|
def process_audio_file(client, file_path, response_format):
|
|
"""Process a single audio file and return its transcription."""
|
|
with open(file_path, "rb") as audio_file:
|
|
transcript = client.audio.transcriptions.create(
|
|
model="whisper-1",
|
|
file=audio_file,
|
|
response_format=response_format
|
|
)
|
|
return transcript
|
|
|
|
def summarize_transcription(client, transcript):
|
|
"""Use GPT-4 to summarize the transcription."""
|
|
response = client.completions.create(
|
|
model="gpt-4o",
|
|
prompt=f"Please summarize the following transcription: {transcript}",
|
|
max_tokens=12000,
|
|
temperature=0.5
|
|
)
|
|
return response.choices[0].text.strip()
|
|
|
|
def convert_video_to_audio_with_chunking(file_path):
|
|
"""Convert video to audio and handle chunking if necessary."""
|
|
is_video = file_path.lower().endswith('.mp4')
|
|
audio_path = extract_audio(file_path) if is_video else file_path
|
|
|
|
try:
|
|
file_size = os.path.getsize(audio_path)
|
|
if file_size > MAX_FILE_SIZE:
|
|
print(f"File size ({file_size} bytes) exceeds the maximum limit. Splitting into chunks...")
|
|
return split_audio_file(audio_path), audio_path, is_video
|
|
return [audio_path], audio_path, is_video
|
|
except Exception as e:
|
|
print(f"Error in video/audio conversion: {e}")
|
|
raise
|
|
|
|
def transcribe_audio(client, audio_chunks, response_format="text"):
|
|
"""Handle the transcription of audio chunks."""
|
|
try:
|
|
full_transcript = ""
|
|
for i, chunk_path in enumerate(audio_chunks, 1):
|
|
print(f"Processing chunk {i} of {len(audio_chunks)}...")
|
|
chunk_transcript = process_audio_file(client, chunk_path, response_format)
|
|
print(f"Chunk {i} transcript:")
|
|
print(chunk_transcript)
|
|
|
|
if response_format == "text":
|
|
full_transcript += chunk_transcript + " "
|
|
else: # vtt format
|
|
full_transcript += chunk_transcript + "\n\n"
|
|
|
|
transcript = full_transcript.strip()
|
|
|
|
# Save the transcript
|
|
transcript_filename = os.path.join(OUTPUT_DIR, "transcript.txt")
|
|
with open(transcript_filename, "w") as f:
|
|
f.write(transcript)
|
|
print(f"Transcript saved to {transcript_filename}")
|
|
|
|
return transcript
|
|
except Exception as e:
|
|
print(f"Error in transcription: {e}")
|
|
raise
|
|
|
|
def generate_summary(openai_client, transcript):
|
|
"""Generate and save a summary of the transcript."""
|
|
try:
|
|
summary = summarize_transcription(openai_client, transcript)
|
|
print("Summary:")
|
|
print(summary)
|
|
|
|
# Save the summary
|
|
summary_filename = os.path.join(OUTPUT_DIR, "summary.txt")
|
|
with open(summary_filename, "w") as f:
|
|
f.write(summary)
|
|
print(f"Summary saved to {summary_filename}")
|
|
|
|
return summary
|
|
except Exception as e:
|
|
print(f"Error in summary generation: {e}")
|
|
raise
|
|
|
|
def cleanup_files(audio_path, is_video, chunks):
|
|
"""Clean up temporary files."""
|
|
try:
|
|
# Clean up chunks
|
|
for chunk_path in chunks:
|
|
if os.path.exists(chunk_path):
|
|
os.remove(chunk_path)
|
|
|
|
# Clean up temporary audio file if it was extracted from video
|
|
if is_video and os.path.exists(audio_path):
|
|
os.remove(audio_path)
|
|
except Exception as e:
|
|
print(f"Error during cleanup: {e}")
|
|
|
|
def main():
|
|
try:
|
|
# Configuration
|
|
file_path = 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Modulo ESG-20250116_105531.mp4'
|
|
response_format = "text"
|
|
api_key = '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'
|
|
|
|
# Initialize OpenAI clients
|
|
client = OpenAI(api_key=api_key)
|
|
openai_client = OpenAIClient(api_key=api_key)
|
|
|
|
# Step 1: Convert video to audio and handle chunking
|
|
audio_chunks, audio_path, is_video = convert_video_to_audio_with_chunking(file_path)
|
|
|
|
try:
|
|
# Step 2: Transcribe the audio
|
|
transcript = transcribe_audio(client, audio_chunks, response_format)
|
|
|
|
# Step 3: Generate summary
|
|
summary = generate_summary(openai_client, transcript)
|
|
|
|
finally:
|
|
# Clean up temporary files
|
|
cleanup_files(audio_path, is_video, audio_chunks)
|
|
|
|
except Exception as e:
|
|
print("Error:", e)
|
|
|
|
if __name__ == "__main__":
|
|
main() |