video-ai/assembly.py

402 lines
14 KiB
Python

# Start by making sure the `assemblyai` package is installed.
# If not, you can install it by running the following command:
# pip install -U assemblyai
#
# Note: Some macOS users may need to use `pip3` instead of `pip`.
from openai import OpenAI
import assemblyai as aai
from openai_client import OpenAIClient
import os
import math
from moviepy import VideoFileClip
from assembly_gpt import summarize_transcription
from assembly_gpt import clean_summary
from assembly_gpt import verbale
import json
from pydub import AudioSegment
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import pypandoc
def setup_api_keys():
"""Setup API keys and configurations."""
aai.settings.api_key = "3ad01ea0917f4d63bd3e418d4387e810"
api_key = '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'
client = OpenAIClient(api_key)
clientAudio = OpenAI(api_key=api_key)
return client, clientAudio
def get_config():
"""Get configuration settings."""
return {
#'file_url': 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Confronto PianoSoluzione_ seconda parte 2025-02-20 16-28-38.mp4',
'max_file_size': 50000000000, # 50mb
'chunk_size': 3000000, # Split into ~25MB chunks
'output_dir': "E:\\sources\\PDC\\video_ai\\videos"
}
def extract_audio(video_path, output_dir):
"""
Extract audio from video file and return path to audio file.
Args:
video_path (str): Path to the video file
output_dir (str): Directory to save the extracted audio
Returns:
str: Path to the extracted audio file
"""
print("Extracting audio from video file...")
temp_audio_path = os.path.join(output_dir, "temp_audio.mp3")
video = VideoFileClip(video_path)
video.audio.write_audiofile(temp_audio_path)
video.close()
return temp_audio_path
def process_video(file_path, output_dir):
"""
Process video file and convert to audio chunks if necessary.
Args:
file_path (str): Path to the video/audio file
output_dir (str): Directory to save processed files
Returns:
tuple: List of audio chunks, audio path, and whether input was video
"""
is_video = file_path.lower().endswith('.mp4')
audio_path = extract_audio(file_path, output_dir) if is_video else file_path
try:
return [audio_path], audio_path, is_video
except Exception as e:
print(f"Error in video/audio conversion: {e}")
raise
def split_audio_file(file_path):
"""Split an audio file into smaller chunks using pydub for proper audio handling."""
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
# Load the audio file
try:
audio = AudioSegment.from_file(file_path)
except Exception as e:
print(f"Error loading audio file: {e}")
return []
# Calculate chunk length in milliseconds (10MB equivalent)
chunk_length_ms = 10 * 60 * 1000 # 10 minutes per chunk
chunks = []
# Split the audio into chunks
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
chunk_end = chunk_start + chunk_length_ms
chunk = audio[chunk_start:chunk_end]
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
try:
chunk.export(chunk_filename, format="mp3")
chunks.append(chunk_filename)
except Exception as e:
print(f"Error exporting chunk {i}: {e}")
continue
return chunks
def split_audio_by_duration(file_path, duration_seconds):
"""
Split an audio file into chunks of a specified duration.
Args:
file_path (str): Path to the audio file
duration_seconds (int): Duration of each chunk in seconds
Returns:
list: List of paths to the audio chunks
"""
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
try:
audio = AudioSegment.from_file(file_path)
except Exception as e:
print(f"Error loading audio file: {e}")
return []
chunks = []
chunk_length_ms = duration_seconds * 1000 # Convert seconds to milliseconds
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
chunk_end = chunk_start + chunk_length_ms
chunk = audio[chunk_start:chunk_end]
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
try:
chunk.export(chunk_filename, format="mp3")
chunks.append(chunk_filename)
print(f"Created chunk: {chunk_filename}")
except Exception as e:
print(f"Error exporting chunk: {e}")
return chunks
def transcribe_audio(audio_chunks, clientAudio):
"""
Transcribe audio chunks using AssemblyAI.
Args:
audio_chunks (list): List of audio file paths to transcribe
Returns:
str: Transcribed text
"""
transcripts = []
for chunk in audio_chunks:
print(f"Transcribing chunk: {chunk}")
audio_file = open(chunk, "rb")
try:
transcript = clientAudio.audio.transcriptions.create(
file=audio_file,
model="whisper-1",
response_format="text",
timestamp_granularities=["segment"],
language="it",
prompt="L'audio è il pezzo di una riunione registrata; potrebbe essere la continuazione di un precedente pezzo di audio"
)
transcripts.append(transcript)
except Exception as e:
print(f"Error in transcription: {e}")
return ' '.join(transcripts)
def generate_summary(transcription, base_filename, client, clientAudio):
"""
Generate summary from transcription using GPT.
Args:
transcription (str): Transcribed text
base_filename (str): Base filename for output files
clientAudio: Instance of OpenAI client for audio processing
Returns:
tuple: Original summary and cleaned summary
"""
summary = summarize_transcription(clientAudio, transcription)
summary_clean = clean_summary(clientAudio, summary)
verbaleGen = verbale(clientAudio, summary_clean)
return summary, summary_clean, verbaleGen
def cleanup_temp_files(audio_path):
"""
Clean up temporary files created during processing.
Args:
audio_path (str): Path to the temporary audio file
"""
if audio_path and os.path.exists(audio_path):
try:
os.remove(audio_path)
print(f"Cleaned up temporary audio file: {audio_path}")
except Exception as e:
print(f"Error cleaning up temporary file: {e}")
# Clean up the chunks directory
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
if os.path.exists(chunks_dir):
try:
for filename in os.listdir(chunks_dir):
file_path = os.path.join(chunks_dir, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Removed chunk file: {file_path}")
os.rmdir(chunks_dir)
print(f"Removed chunks directory: {chunks_dir}")
except Exception as e:
print(f"Error cleaning up chunks directory: {e}")
def convert_markdown_to_word(markdown_text, output_file):
"""
Convert markdown text to MS Word document format using pypandoc.
Args:
markdown_text (str): The markdown text to convert
output_file (str): Path where to save the Word document
"""
try:
# Convert markdown to docx using pandoc
pypandoc.convert_text(
markdown_text,
'docx',
format='md',
outputfile=output_file,
extra_args=['--wrap=none', '--toc'] # --toc adds table of contents
)
return output_file
except Exception as e:
print(f"Error converting markdown to Word: {e}")
# Fallback to basic conversion if pandoc fails
doc = Document()
doc.add_paragraph(markdown_text)
doc.save(output_file)
return output_file
def verbalizza(client, verbale_file_name):
# Carica tutti i file che terminano per _summary_clean.md e metti il contenuto in una stringa
files = []
for root, dirs, filenames in os.walk(get_config()['output_dir']):
for filename in filenames:
if filename.endswith("vito_2.txt"):
files.append(os.path.join(root, filename))
# Ordina i file per data di modifica
files.sort(key=lambda x: os.path.getmtime(x))
# Carica il contenuto dei file in una stringa
all_text = ""
for file in files:
with open(file, 'r', encoding='utf-8') as f:
all_text += f.read() + "\n\n"
verbaleGen = verbale(client, all_text)
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
return verbaleGen
def main(file_url):
"""Main function to orchestrate the video processing pipeline."""
# Setup
client, clientAudio = setup_api_keys()
config = get_config()
base_filename = os.path.splitext(file_url)[0]
filename = os.path.basename(file_url)
filename = os.path.splitext(filename)[0]
folder_url = os.path.dirname(file_url)
print("BASE FILENAME: ", base_filename)
print("FILE URL: ", file_url)
print("OUTPUT DIR: ", config['output_dir'])
summary_file_name = 'AI - SUMMARY - TEMP - ' + filename
summary_clean_file_name = filename + ' - AISUMMARY'
verbale_file_name = filename + ' - AIVERBALE'
full_verbale_file_name = 'AI - FULL VERBALE - ' + filename
docx_file_name = folder_url + '/' + summary_clean_file_name + '.docx'
docx_verbale_file_name = folder_url + '/' + verbale_file_name + '.docx'
docx_full_verbale_file_name = folder_url + '/' + full_verbale_file_name + '.docx'
transcription_file_name = base_filename + '_transcription.txt'
audio_path = None
print("AUDIO PATH: ", audio_path)
print("SUMMARY FILE NAME: ", summary_file_name)
print("SUMMARY CLEAN FILE NAME: ", summary_clean_file_name)
print("VERBALE FILE NAME: ", verbale_file_name)
print("FULL VERBALE FILE NAME: ", full_verbale_file_name)
print("TRANSCRIPTION FILE NAME: ", transcription_file_name)
print("DOCX FILE NAME: ", docx_file_name)
print("DOCX VERBALE FILE NAME: ", docx_verbale_file_name)
print("DOCX FULL VERBALE FILE NAME: ", docx_full_verbale_file_name)
try:
# Process video/audio
audio_chunks, audio_path, is_video = process_video(
file_url,
config['output_dir']
)
# Split audio into chunks if necessary
if len(audio_chunks) == 1 and audio_chunks[0].endswith('.mp3'):
audio_chunks = split_audio_file(audio_path)
# Transcribe audio
transcription = transcribe_audio(audio_chunks, clientAudio)
# Save transcription to file
with open(transcription_file_name, "w", encoding='utf-8') as f:
f.write(transcription)
print(f"Saved transcription to file: {transcription_file_name}")
transcription = None
try:
with open(transcription_file_name, 'r', encoding='utf-8') as f:
transcription = f.read()
except FileNotFoundError:
print(f"Transcription file '{transcription_file_name}' not found. Skipping load transcription step.")
# Generate summary
summary, summary_clean, verbaleGen = generate_summary(transcription, base_filename, clientAudio, client)
# Save summaries to files
with open(summary_file_name + ".md", "w", encoding='utf-8') as f:
f.write(summary)
with open(summary_clean_file_name + ".md", "w", encoding='utf-8') as f:
f.write(summary_clean)
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
# Load the cleaned summary file
with open(summary_clean_file_name + ".md", 'r', encoding='utf-8') as f:
summary_clean = f.read()
# Convert to Word if needed
convert_markdown_to_word(summary_clean, docx_file_name)
# Convert to Word if needed
convert_markdown_to_word(verbaleGen, docx_verbale_file_name)
# Cleanup
cleanup_temp_files(audio_path)
print("Processing completed successfully!")
print("**************\n\n")
print(summary_clean)
except Exception as e:
print(f"Error during processing: {e}")
raise
def processo_verbale():
client, clientAudio = setup_api_keys()
config = get_config()
full_verbale_file_name = "full_verbale.md"
docx_full_verbale_file_name = full_verbale_file_name + '.docx'
verbaleGen = verbalizza(client, full_verbale_file_name)
with open(full_verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
# Convert to Word if needed
convert_markdown_to_word(verbaleGen, docx_full_verbale_file_name)
if __name__ == "__main__":
folders = [
"C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\24 Mar 2025 FFIT PDC ALLIENAMENTO PERIODICO",
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia DB Schemi Cat Prodotti e Consulenza Unica + Mappatura - Configurazione dei Prodotti e Relativi Servizi",
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia Accesso Sorgenti Batch + Giro NEO4J + Microservizi",
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 FFIT Replatforming PDC PO Analisi Funzionale"
]
for folder in folders:
for file in os.listdir(folder):
if file.endswith(".mp4"):
main(os.path.join(folder, file))
#processo_verbale()