402 lines
14 KiB
Python
402 lines
14 KiB
Python
# Start by making sure the `assemblyai` package is installed.
|
|
# If not, you can install it by running the following command:
|
|
# pip install -U assemblyai
|
|
#
|
|
# Note: Some macOS users may need to use `pip3` instead of `pip`.
|
|
from openai import OpenAI
|
|
import assemblyai as aai
|
|
from openai_client import OpenAIClient
|
|
import os
|
|
import math
|
|
from moviepy import VideoFileClip
|
|
from assembly_gpt import summarize_transcription
|
|
from assembly_gpt import clean_summary
|
|
from assembly_gpt import verbale
|
|
import json
|
|
from pydub import AudioSegment
|
|
from docx import Document
|
|
from docx.shared import Pt, RGBColor
|
|
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
|
import pypandoc
|
|
|
|
def setup_api_keys():
|
|
"""Setup API keys and configurations."""
|
|
aai.settings.api_key = "3ad01ea0917f4d63bd3e418d4387e810"
|
|
api_key = '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'
|
|
client = OpenAIClient(api_key)
|
|
clientAudio = OpenAI(api_key=api_key)
|
|
|
|
return client, clientAudio
|
|
|
|
def get_config():
|
|
"""Get configuration settings."""
|
|
return {
|
|
#'file_url': 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Confronto PianoSoluzione_ seconda parte 2025-02-20 16-28-38.mp4',
|
|
'max_file_size': 50000000000, # 50mb
|
|
'chunk_size': 3000000, # Split into ~25MB chunks
|
|
'output_dir': "E:\\sources\\PDC\\video_ai\\videos"
|
|
}
|
|
|
|
def extract_audio(video_path, output_dir):
|
|
"""
|
|
Extract audio from video file and return path to audio file.
|
|
|
|
Args:
|
|
video_path (str): Path to the video file
|
|
output_dir (str): Directory to save the extracted audio
|
|
|
|
Returns:
|
|
str: Path to the extracted audio file
|
|
"""
|
|
print("Extracting audio from video file...")
|
|
temp_audio_path = os.path.join(output_dir, "temp_audio.mp3")
|
|
video = VideoFileClip(video_path)
|
|
video.audio.write_audiofile(temp_audio_path)
|
|
video.close()
|
|
return temp_audio_path
|
|
|
|
def process_video(file_path, output_dir):
|
|
"""
|
|
Process video file and convert to audio chunks if necessary.
|
|
|
|
Args:
|
|
file_path (str): Path to the video/audio file
|
|
output_dir (str): Directory to save processed files
|
|
|
|
Returns:
|
|
tuple: List of audio chunks, audio path, and whether input was video
|
|
"""
|
|
is_video = file_path.lower().endswith('.mp4')
|
|
audio_path = extract_audio(file_path, output_dir) if is_video else file_path
|
|
|
|
try:
|
|
return [audio_path], audio_path, is_video
|
|
except Exception as e:
|
|
print(f"Error in video/audio conversion: {e}")
|
|
raise
|
|
|
|
def split_audio_file(file_path):
|
|
"""Split an audio file into smaller chunks using pydub for proper audio handling."""
|
|
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
|
|
os.makedirs(chunks_dir, exist_ok=True)
|
|
|
|
# Load the audio file
|
|
try:
|
|
audio = AudioSegment.from_file(file_path)
|
|
except Exception as e:
|
|
print(f"Error loading audio file: {e}")
|
|
return []
|
|
|
|
# Calculate chunk length in milliseconds (10MB equivalent)
|
|
chunk_length_ms = 10 * 60 * 1000 # 10 minutes per chunk
|
|
chunks = []
|
|
|
|
# Split the audio into chunks
|
|
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
|
|
chunk_end = chunk_start + chunk_length_ms
|
|
chunk = audio[chunk_start:chunk_end]
|
|
|
|
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
|
|
try:
|
|
chunk.export(chunk_filename, format="mp3")
|
|
chunks.append(chunk_filename)
|
|
except Exception as e:
|
|
print(f"Error exporting chunk {i}: {e}")
|
|
continue
|
|
|
|
return chunks
|
|
|
|
def split_audio_by_duration(file_path, duration_seconds):
|
|
"""
|
|
Split an audio file into chunks of a specified duration.
|
|
|
|
Args:
|
|
file_path (str): Path to the audio file
|
|
duration_seconds (int): Duration of each chunk in seconds
|
|
|
|
Returns:
|
|
list: List of paths to the audio chunks
|
|
"""
|
|
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
|
|
os.makedirs(chunks_dir, exist_ok=True)
|
|
|
|
try:
|
|
audio = AudioSegment.from_file(file_path)
|
|
except Exception as e:
|
|
print(f"Error loading audio file: {e}")
|
|
return []
|
|
|
|
chunks = []
|
|
chunk_length_ms = duration_seconds * 1000 # Convert seconds to milliseconds
|
|
|
|
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
|
|
chunk_end = chunk_start + chunk_length_ms
|
|
chunk = audio[chunk_start:chunk_end]
|
|
|
|
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
|
|
try:
|
|
chunk.export(chunk_filename, format="mp3")
|
|
chunks.append(chunk_filename)
|
|
print(f"Created chunk: {chunk_filename}")
|
|
except Exception as e:
|
|
print(f"Error exporting chunk: {e}")
|
|
|
|
return chunks
|
|
|
|
def transcribe_audio(audio_chunks, clientAudio):
|
|
"""
|
|
Transcribe audio chunks using AssemblyAI.
|
|
|
|
Args:
|
|
audio_chunks (list): List of audio file paths to transcribe
|
|
|
|
Returns:
|
|
str: Transcribed text
|
|
"""
|
|
transcripts = []
|
|
for chunk in audio_chunks:
|
|
print(f"Transcribing chunk: {chunk}")
|
|
audio_file = open(chunk, "rb")
|
|
try:
|
|
transcript = clientAudio.audio.transcriptions.create(
|
|
file=audio_file,
|
|
model="whisper-1",
|
|
response_format="text",
|
|
timestamp_granularities=["segment"],
|
|
language="it",
|
|
prompt="L'audio è il pezzo di una riunione registrata; potrebbe essere la continuazione di un precedente pezzo di audio"
|
|
)
|
|
transcripts.append(transcript)
|
|
except Exception as e:
|
|
print(f"Error in transcription: {e}")
|
|
|
|
return ' '.join(transcripts)
|
|
|
|
def generate_summary(transcription, base_filename, client, clientAudio):
|
|
"""
|
|
Generate summary from transcription using GPT.
|
|
|
|
Args:
|
|
transcription (str): Transcribed text
|
|
base_filename (str): Base filename for output files
|
|
clientAudio: Instance of OpenAI client for audio processing
|
|
|
|
Returns:
|
|
tuple: Original summary and cleaned summary
|
|
"""
|
|
summary = summarize_transcription(clientAudio, transcription)
|
|
summary_clean = clean_summary(clientAudio, summary)
|
|
|
|
verbaleGen = verbale(clientAudio, summary_clean)
|
|
|
|
return summary, summary_clean, verbaleGen
|
|
|
|
def cleanup_temp_files(audio_path):
|
|
"""
|
|
Clean up temporary files created during processing.
|
|
|
|
Args:
|
|
audio_path (str): Path to the temporary audio file
|
|
"""
|
|
if audio_path and os.path.exists(audio_path):
|
|
try:
|
|
os.remove(audio_path)
|
|
print(f"Cleaned up temporary audio file: {audio_path}")
|
|
except Exception as e:
|
|
print(f"Error cleaning up temporary file: {e}")
|
|
|
|
# Clean up the chunks directory
|
|
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
|
|
if os.path.exists(chunks_dir):
|
|
try:
|
|
for filename in os.listdir(chunks_dir):
|
|
file_path = os.path.join(chunks_dir, filename)
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
print(f"Removed chunk file: {file_path}")
|
|
os.rmdir(chunks_dir)
|
|
print(f"Removed chunks directory: {chunks_dir}")
|
|
except Exception as e:
|
|
print(f"Error cleaning up chunks directory: {e}")
|
|
|
|
def convert_markdown_to_word(markdown_text, output_file):
|
|
"""
|
|
Convert markdown text to MS Word document format using pypandoc.
|
|
|
|
Args:
|
|
markdown_text (str): The markdown text to convert
|
|
output_file (str): Path where to save the Word document
|
|
"""
|
|
try:
|
|
# Convert markdown to docx using pandoc
|
|
pypandoc.convert_text(
|
|
markdown_text,
|
|
'docx',
|
|
format='md',
|
|
outputfile=output_file,
|
|
extra_args=['--wrap=none', '--toc'] # --toc adds table of contents
|
|
)
|
|
return output_file
|
|
except Exception as e:
|
|
print(f"Error converting markdown to Word: {e}")
|
|
# Fallback to basic conversion if pandoc fails
|
|
doc = Document()
|
|
doc.add_paragraph(markdown_text)
|
|
doc.save(output_file)
|
|
return output_file
|
|
|
|
def verbalizza(client, verbale_file_name):
|
|
# Carica tutti i file che terminano per _summary_clean.md e metti il contenuto in una stringa
|
|
files = []
|
|
for root, dirs, filenames in os.walk(get_config()['output_dir']):
|
|
for filename in filenames:
|
|
if filename.endswith("vito_2.txt"):
|
|
files.append(os.path.join(root, filename))
|
|
|
|
# Ordina i file per data di modifica
|
|
files.sort(key=lambda x: os.path.getmtime(x))
|
|
|
|
# Carica il contenuto dei file in una stringa
|
|
all_text = ""
|
|
for file in files:
|
|
with open(file, 'r', encoding='utf-8') as f:
|
|
all_text += f.read() + "\n\n"
|
|
|
|
verbaleGen = verbale(client, all_text)
|
|
|
|
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
|
|
f.write(verbaleGen)
|
|
|
|
return verbaleGen
|
|
|
|
|
|
|
|
def main(file_url):
|
|
"""Main function to orchestrate the video processing pipeline."""
|
|
# Setup
|
|
client, clientAudio = setup_api_keys()
|
|
config = get_config()
|
|
|
|
base_filename = os.path.splitext(file_url)[0]
|
|
|
|
filename = os.path.basename(file_url)
|
|
filename = os.path.splitext(filename)[0]
|
|
|
|
folder_url = os.path.dirname(file_url)
|
|
|
|
print("BASE FILENAME: ", base_filename)
|
|
print("FILE URL: ", file_url)
|
|
print("OUTPUT DIR: ", config['output_dir'])
|
|
|
|
summary_file_name = 'AI - SUMMARY - TEMP - ' + filename
|
|
summary_clean_file_name = filename + ' - AISUMMARY'
|
|
verbale_file_name = filename + ' - AIVERBALE'
|
|
full_verbale_file_name = 'AI - FULL VERBALE - ' + filename
|
|
docx_file_name = folder_url + '/' + summary_clean_file_name + '.docx'
|
|
docx_verbale_file_name = folder_url + '/' + verbale_file_name + '.docx'
|
|
docx_full_verbale_file_name = folder_url + '/' + full_verbale_file_name + '.docx'
|
|
transcription_file_name = base_filename + '_transcription.txt'
|
|
audio_path = None
|
|
|
|
print("AUDIO PATH: ", audio_path)
|
|
print("SUMMARY FILE NAME: ", summary_file_name)
|
|
print("SUMMARY CLEAN FILE NAME: ", summary_clean_file_name)
|
|
print("VERBALE FILE NAME: ", verbale_file_name)
|
|
print("FULL VERBALE FILE NAME: ", full_verbale_file_name)
|
|
print("TRANSCRIPTION FILE NAME: ", transcription_file_name)
|
|
print("DOCX FILE NAME: ", docx_file_name)
|
|
print("DOCX VERBALE FILE NAME: ", docx_verbale_file_name)
|
|
print("DOCX FULL VERBALE FILE NAME: ", docx_full_verbale_file_name)
|
|
|
|
|
|
try:
|
|
# Process video/audio
|
|
audio_chunks, audio_path, is_video = process_video(
|
|
file_url,
|
|
config['output_dir']
|
|
)
|
|
|
|
# Split audio into chunks if necessary
|
|
if len(audio_chunks) == 1 and audio_chunks[0].endswith('.mp3'):
|
|
audio_chunks = split_audio_file(audio_path)
|
|
|
|
# Transcribe audio
|
|
transcription = transcribe_audio(audio_chunks, clientAudio)
|
|
|
|
# Save transcription to file
|
|
with open(transcription_file_name, "w", encoding='utf-8') as f:
|
|
f.write(transcription)
|
|
print(f"Saved transcription to file: {transcription_file_name}")
|
|
|
|
transcription = None
|
|
try:
|
|
with open(transcription_file_name, 'r', encoding='utf-8') as f:
|
|
transcription = f.read()
|
|
except FileNotFoundError:
|
|
print(f"Transcription file '{transcription_file_name}' not found. Skipping load transcription step.")
|
|
|
|
|
|
# Generate summary
|
|
summary, summary_clean, verbaleGen = generate_summary(transcription, base_filename, clientAudio, client)
|
|
|
|
# Save summaries to files
|
|
with open(summary_file_name + ".md", "w", encoding='utf-8') as f:
|
|
f.write(summary)
|
|
|
|
with open(summary_clean_file_name + ".md", "w", encoding='utf-8') as f:
|
|
f.write(summary_clean)
|
|
|
|
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
|
|
f.write(verbaleGen)
|
|
|
|
# Load the cleaned summary file
|
|
with open(summary_clean_file_name + ".md", 'r', encoding='utf-8') as f:
|
|
summary_clean = f.read()
|
|
|
|
# Convert to Word if needed
|
|
convert_markdown_to_word(summary_clean, docx_file_name)
|
|
|
|
# Convert to Word if needed
|
|
convert_markdown_to_word(verbaleGen, docx_verbale_file_name)
|
|
|
|
# Cleanup
|
|
cleanup_temp_files(audio_path)
|
|
|
|
print("Processing completed successfully!")
|
|
print("**************\n\n")
|
|
print(summary_clean)
|
|
|
|
except Exception as e:
|
|
print(f"Error during processing: {e}")
|
|
raise
|
|
|
|
|
|
def processo_verbale():
|
|
client, clientAudio = setup_api_keys()
|
|
config = get_config()
|
|
|
|
full_verbale_file_name = "full_verbale.md"
|
|
docx_full_verbale_file_name = full_verbale_file_name + '.docx'
|
|
|
|
verbaleGen = verbalizza(client, full_verbale_file_name)
|
|
|
|
with open(full_verbale_file_name + ".md", "w", encoding='utf-8') as f:
|
|
f.write(verbaleGen)
|
|
|
|
# Convert to Word if needed
|
|
convert_markdown_to_word(verbaleGen, docx_full_verbale_file_name)
|
|
|
|
if __name__ == "__main__":
|
|
folders = [
|
|
"C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\24 Mar 2025 FFIT PDC ALLIENAMENTO PERIODICO",
|
|
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia DB Schemi Cat Prodotti e Consulenza Unica + Mappatura - Configurazione dei Prodotti e Relativi Servizi",
|
|
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 PDC Armundia Accesso Sorgenti Batch + Giro NEO4J + Microservizi",
|
|
# "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\20 Mar 2025 FFIT Replatforming PDC PO Analisi Funzionale"
|
|
]
|
|
|
|
for folder in folders:
|
|
for file in os.listdir(folder):
|
|
if file.endswith(".mp4"):
|
|
main(os.path.join(folder, file))
|
|
|
|
#processo_verbale() |