# Start by making sure the `assemblyai` package is installed. # If not, you can install it by running the following command: # pip install -U assemblyai # # Note: Some macOS users may need to use `pip3` instead of `pip`. from openai import OpenAI import assemblyai as aai from openai_client import OpenAIClient import os import math from moviepy import VideoFileClip from assembly_gpt import summarize_transcription from assembly_gpt import clean_summary from assembly_gpt import verbale import json from pydub import AudioSegment from docx import Document from docx.shared import Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT import pypandoc def setup_api_keys(): """Setup API keys and configurations.""" aai.settings.api_key = "3ad01ea0917f4d63bd3e418d4387e810" api_key = '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' client = OpenAIClient(api_key) clientAudio = OpenAI(api_key=api_key) return client, clientAudio def get_config(): """Get configuration settings.""" return { #'file_url': 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Confronto PianoSoluzione_ seconda parte 2025-02-20 16-28-38.mp4', 'max_file_size': 50000000000, # 50mb 'chunk_size': 3000000, # Split into ~25MB chunks 'output_dir': "E:\\sources\\PDC\\video_ai\\videos" } def extract_audio(video_path, output_dir): """ Extract audio from video file and return path to audio file. Args: video_path (str): Path to the video file output_dir (str): Directory to save the extracted audio Returns: str: Path to the extracted audio file """ print("Extracting audio from video file...") temp_audio_path = os.path.join(output_dir, "temp_audio.mp3") video = VideoFileClip(video_path) video.audio.write_audiofile(temp_audio_path) video.close() return temp_audio_path def process_video(file_path, output_dir): """ Process video file and convert to audio chunks if necessary. Args: file_path (str): Path to the video/audio file output_dir (str): Directory to save processed files Returns: tuple: List of audio chunks, audio path, and whether input was video """ is_video = file_path.lower().endswith('.mp4') audio_path = extract_audio(file_path, output_dir) if is_video else file_path try: return [audio_path], audio_path, is_video except Exception as e: print(f"Error in video/audio conversion: {e}") raise def split_audio_file(file_path): """Split an audio file into smaller chunks using pydub for proper audio handling.""" chunks_dir = os.path.join(get_config()['output_dir'], 'chunks') os.makedirs(chunks_dir, exist_ok=True) # Load the audio file try: audio = AudioSegment.from_file(file_path) except Exception as e: print(f"Error loading audio file: {e}") return [] # Calculate chunk length in milliseconds (10MB equivalent) chunk_length_ms = 10 * 60 * 1000 # 10 minutes per chunk chunks = [] # Split the audio into chunks for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)): chunk_end = chunk_start + chunk_length_ms chunk = audio[chunk_start:chunk_end] chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3") try: chunk.export(chunk_filename, format="mp3") chunks.append(chunk_filename) except Exception as e: print(f"Error exporting chunk {i}: {e}") continue return chunks def split_audio_by_duration(file_path, duration_seconds): """ Split an audio file into chunks of a specified duration. Args: file_path (str): Path to the audio file duration_seconds (int): Duration of each chunk in seconds Returns: list: List of paths to the audio chunks """ chunks_dir = os.path.join(get_config()['output_dir'], 'chunks') os.makedirs(chunks_dir, exist_ok=True) try: audio = AudioSegment.from_file(file_path) except Exception as e: print(f"Error loading audio file: {e}") return [] chunks = [] chunk_length_ms = duration_seconds * 1000 # Convert seconds to milliseconds for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)): chunk_end = chunk_start + chunk_length_ms chunk = audio[chunk_start:chunk_end] chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3") try: chunk.export(chunk_filename, format="mp3") chunks.append(chunk_filename) print(f"Created chunk: {chunk_filename}") except Exception as e: print(f"Error exporting chunk: {e}") return chunks def transcribe_audio(audio_chunks, clientAudio): """ Transcribe audio chunks using AssemblyAI. Args: audio_chunks (list): List of audio file paths to transcribe Returns: str: Transcribed text """ transcripts = [] for chunk in audio_chunks: print(f"Transcribing chunk: {chunk}") audio_file = open(chunk, "rb") try: transcript = clientAudio.audio.transcriptions.create( file=audio_file, model="whisper-1", response_format="text", timestamp_granularities=["segment"], language="it", prompt="L'audio รจ il pezzo di una riunione registrata; potrebbe essere la continuazione di un precedente pezzo di audio" ) transcripts.append(transcript) except Exception as e: print(f"Error in transcription: {e}") return ' '.join(transcripts) def generate_summary(transcription, base_filename, client, clientAudio): """ Generate summary from transcription using GPT. Args: transcription (str): Transcribed text base_filename (str): Base filename for output files clientAudio: Instance of OpenAI client for audio processing Returns: tuple: Original summary and cleaned summary """ summary = summarize_transcription(clientAudio, transcription) summary_clean = clean_summary(clientAudio, summary) verbaleGen = verbale(clientAudio, summary_clean) return summary, summary_clean, verbaleGen def cleanup_temp_files(audio_path): """ Clean up temporary files created during processing. Args: audio_path (str): Path to the temporary audio file """ if audio_path and os.path.exists(audio_path): try: os.remove(audio_path) print(f"Cleaned up temporary audio file: {audio_path}") except Exception as e: print(f"Error cleaning up temporary file: {e}") # Clean up the chunks directory chunks_dir = os.path.join(get_config()['output_dir'], 'chunks') if os.path.exists(chunks_dir): try: for filename in os.listdir(chunks_dir): file_path = os.path.join(chunks_dir, filename) if os.path.isfile(file_path): os.remove(file_path) print(f"Removed chunk file: {file_path}") os.rmdir(chunks_dir) print(f"Removed chunks directory: {chunks_dir}") except Exception as e: print(f"Error cleaning up chunks directory: {e}") def convert_markdown_to_word(markdown_text, output_file): """ Convert markdown text to MS Word document format using pypandoc. Args: markdown_text (str): The markdown text to convert output_file (str): Path where to save the Word document """ try: # Convert markdown to docx using pandoc pypandoc.convert_text( markdown_text, 'docx', format='md', outputfile=output_file, extra_args=['--wrap=none', '--toc'] # --toc adds table of contents ) return output_file except Exception as e: print(f"Error converting markdown to Word: {e}") # Fallback to basic conversion if pandoc fails doc = Document() doc.add_paragraph(markdown_text) doc.save(output_file) return output_file def verbalizza(client, verbale_file_name): # Carica tutti i file che terminano per _summary_clean.md e metti il contenuto in una stringa files = [] for root, dirs, filenames in os.walk(get_config()['output_dir']): for filename in filenames: if filename.endswith("vito_2.txt"): files.append(os.path.join(root, filename)) # Ordina i file per data di modifica files.sort(key=lambda x: os.path.getmtime(x)) # Carica il contenuto dei file in una stringa all_text = "" for file in files: with open(file, 'r', encoding='utf-8') as f: all_text += f.read() + "\n\n" verbaleGen = verbale(client, all_text) with open(verbale_file_name + ".md", "w", encoding='utf-8') as f: f.write(verbaleGen) return verbaleGen def main(file_url): """Main function to orchestrate the video processing pipeline.""" # Setup client, clientAudio = setup_api_keys() config = get_config() base_filename = os.path.splitext(file_url)[0] filename = os.path.basename(file_url) filename = os.path.splitext(filename)[0] folder_url = os.path.dirname(file_url) print("BASE FILENAME: ", base_filename) print("FILE URL: ", file_url) print("OUTPUT DIR: ", config['output_dir']) summary_file_name = 'AI - SUMMARY - TEMP - ' + filename summary_clean_file_name = filename + ' - AISUMMARY' verbale_file_name = filename + ' - AIVERBALE' full_verbale_file_name = 'AI - FULL VERBALE - ' + filename docx_file_name = folder_url + '/' + summary_clean_file_name + '.docx' docx_verbale_file_name = folder_url + '/' + verbale_file_name + '.docx' docx_full_verbale_file_name = folder_url + '/' + full_verbale_file_name + '.docx' transcription_file_name = base_filename + '_transcription.txt' audio_path = None print("AUDIO PATH: ", audio_path) print("SUMMARY FILE NAME: ", summary_file_name) print("SUMMARY CLEAN FILE NAME: ", summary_clean_file_name) print("VERBALE FILE NAME: ", verbale_file_name) print("FULL VERBALE FILE NAME: ", full_verbale_file_name) print("TRANSCRIPTION FILE NAME: ", transcription_file_name) print("DOCX FILE NAME: ", docx_file_name) print("DOCX VERBALE FILE NAME: ", docx_verbale_file_name) print("DOCX FULL VERBALE FILE NAME: ", docx_full_verbale_file_name) try: # Process video/audio audio_chunks, audio_path, is_video = process_video( file_url, config['output_dir'] ) # Split audio into chunks if necessary if len(audio_chunks) == 1 and audio_chunks[0].endswith('.mp3'): audio_chunks = split_audio_file(audio_path) # Transcribe audio transcription = transcribe_audio(audio_chunks, clientAudio) # Save transcription to file with open(transcription_file_name, "w", encoding='utf-8') as f: f.write(transcription) print(f"Saved transcription to file: {transcription_file_name}") transcription = None try: with open(transcription_file_name, 'r', encoding='utf-8') as f: transcription = f.read() except FileNotFoundError: print(f"Transcription file '{transcription_file_name}' not found. Skipping load transcription step.") # Generate summary summary, summary_clean, verbaleGen = generate_summary(transcription, base_filename, clientAudio, client) # Save summaries to files with open(summary_file_name + ".md", "w", encoding='utf-8') as f: f.write(summary) with open(summary_clean_file_name + ".md", "w", encoding='utf-8') as f: f.write(summary_clean) with open(verbale_file_name + ".md", "w", encoding='utf-8') as f: f.write(verbaleGen) # Load the cleaned summary file with open(summary_clean_file_name + ".md", 'r', encoding='utf-8') as f: summary_clean = f.read() # Convert to Word if needed convert_markdown_to_word(summary_clean, docx_file_name) # Convert to Word if needed convert_markdown_to_word(verbaleGen, docx_verbale_file_name) # Cleanup cleanup_temp_files(audio_path) print("Processing completed successfully!") print("**************\n\n") print(summary_clean) except Exception as e: print(f"Error during processing: {e}") raise def processo_verbale(): client, clientAudio = setup_api_keys() config = get_config() full_verbale_file_name = "full_verbale.md" docx_full_verbale_file_name = full_verbale_file_name + '.docx' verbaleGen = verbalizza(client, full_verbale_file_name) with open(full_verbale_file_name + ".md", "w", encoding='utf-8') as f: f.write(verbaleGen) # Convert to Word if needed convert_markdown_to_word(verbaleGen, docx_full_verbale_file_name) if __name__ == "__main__": folders = [ "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\24 Mar 2025 FFIT PDC ALLIENAMENTO PERIODICO", # "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia DB Schemi Cat Prodotti e Consulenza Unica + Mappatura - Configurazione dei Prodotti e Relativi Servizi", # "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia Accesso Sorgenti Batch + Giro NEO4J + Microservizi", # "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 FFIT Replatforming PDC PO Analisi Funzionale" ] for folder in folders: for file in os.listdir(folder): if file.endswith(".mp4"): main(os.path.join(folder, file)) #processo_verbale()