Video AI primo commit

This commit is contained in:
c.rosati 2025-03-05 19:31:45 +01:00
commit 3c4904521f
15 changed files with 1005 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
*.md
*.docx
*.zip
*.mp4
venv
__pycache__
videos/*.*

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

164
app.py Normal file
View File

@ -0,0 +1,164 @@
from openai import OpenAI
import os
import math
from moviepy import VideoFileClip
import tempfile
from openai_client import OpenAIClient
MAX_FILE_SIZE = 26214400 # 25MB in bytes
CHUNK_SIZE = MAX_FILE_SIZE // 2 # Split into ~12MB chunks
OUTPUT_DIR = "E:\\sources\\PDC\\video_ai\\videos"
def extract_audio(video_path):
"""Extract audio from video file and return path to audio file."""
print("Extracting audio from video file...")
temp_audio_path = os.path.join(OUTPUT_DIR, "temp_audio.mp3")
video = VideoFileClip(video_path)
video.audio.write_audiofile(temp_audio_path)
video.close()
return temp_audio_path
def split_audio_file(file_path):
"""Split an audio file into smaller chunks using binary reading."""
chunks_dir = os.path.join(OUTPUT_DIR, 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
chunks = []
file_size = os.path.getsize(file_path)
total_chunks = math.ceil(file_size / CHUNK_SIZE)
with open(file_path, 'rb') as source_file:
for chunk_number in range(total_chunks):
chunk_filename = os.path.join(chunks_dir, f"chunk_{chunk_number}.mp3")
with open(chunk_filename, 'wb') as chunk_file:
chunk_data = source_file.read(CHUNK_SIZE)
chunk_file.write(chunk_data)
chunks.append(chunk_filename)
return chunks
def process_audio_file(client, file_path, response_format):
"""Process a single audio file and return its transcription."""
with open(file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format=response_format
)
return transcript
def summarize_transcription(client, transcript):
"""Use GPT-4 to summarize the transcription."""
response = client.completions.create(
model="gpt-4o",
prompt=f"Please summarize the following transcription: {transcript}",
max_tokens=12000,
temperature=0.5
)
return response.choices[0].text.strip()
def convert_video_to_audio_with_chunking(file_path):
"""Convert video to audio and handle chunking if necessary."""
is_video = file_path.lower().endswith('.mp4')
audio_path = extract_audio(file_path) if is_video else file_path
try:
file_size = os.path.getsize(audio_path)
if file_size > MAX_FILE_SIZE:
print(f"File size ({file_size} bytes) exceeds the maximum limit. Splitting into chunks...")
return split_audio_file(audio_path), audio_path, is_video
return [audio_path], audio_path, is_video
except Exception as e:
print(f"Error in video/audio conversion: {e}")
raise
def transcribe_audio(client, audio_chunks, response_format="text"):
"""Handle the transcription of audio chunks."""
try:
full_transcript = ""
for i, chunk_path in enumerate(audio_chunks, 1):
print(f"Processing chunk {i} of {len(audio_chunks)}...")
chunk_transcript = process_audio_file(client, chunk_path, response_format)
print(f"Chunk {i} transcript:")
print(chunk_transcript)
if response_format == "text":
full_transcript += chunk_transcript + " "
else: # vtt format
full_transcript += chunk_transcript + "\n\n"
transcript = full_transcript.strip()
# Save the transcript
transcript_filename = os.path.join(OUTPUT_DIR, "transcript.txt")
with open(transcript_filename, "w") as f:
f.write(transcript)
print(f"Transcript saved to {transcript_filename}")
return transcript
except Exception as e:
print(f"Error in transcription: {e}")
raise
def generate_summary(openai_client, transcript):
"""Generate and save a summary of the transcript."""
try:
summary = summarize_transcription(openai_client, transcript)
print("Summary:")
print(summary)
# Save the summary
summary_filename = os.path.join(OUTPUT_DIR, "summary.txt")
with open(summary_filename, "w") as f:
f.write(summary)
print(f"Summary saved to {summary_filename}")
return summary
except Exception as e:
print(f"Error in summary generation: {e}")
raise
def cleanup_files(audio_path, is_video, chunks):
"""Clean up temporary files."""
try:
# Clean up chunks
for chunk_path in chunks:
if os.path.exists(chunk_path):
os.remove(chunk_path)
# Clean up temporary audio file if it was extracted from video
if is_video and os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
print(f"Error during cleanup: {e}")
def main():
try:
# Configuration
file_path = 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Modulo ESG-20250116_105531.mp4'
response_format = "text"
api_key = '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'
# Initialize OpenAI clients
client = OpenAI(api_key=api_key)
openai_client = OpenAIClient(api_key=api_key)
# Step 1: Convert video to audio and handle chunking
audio_chunks, audio_path, is_video = convert_video_to_audio_with_chunking(file_path)
try:
# Step 2: Transcribe the audio
transcript = transcribe_audio(client, audio_chunks, response_format)
# Step 3: Generate summary
summary = generate_summary(openai_client, transcript)
finally:
# Clean up temporary files
cleanup_files(audio_path, is_video, audio_chunks)
except Exception as e:
print("Error:", e)
if __name__ == "__main__":
main()

418
assembly.py Normal file
View File

@ -0,0 +1,418 @@
# Start by making sure the `assemblyai` package is installed.
# If not, you can install it by running the following command:
# pip install -U assemblyai
#
# Note: Some macOS users may need to use `pip3` instead of `pip`.
from openai import OpenAI
import assemblyai as aai
from openai_client import OpenAIClient
import os
import math
from moviepy import VideoFileClip
from assembly_gpt import summarize_transcription
from assembly_gpt import clean_summary
from assembly_gpt import verbale
import json
from pydub import AudioSegment
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import pypandoc
def setup_api_keys():
"""Setup API keys and configurations."""
aai.settings.api_key = "3ad01ea0917f4d63bd3e418d4387e810"
api_key = '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'
client = OpenAIClient(api_key)
clientAudio = OpenAI(api_key=api_key)
return client, clientAudio
def get_config():
"""Get configuration settings."""
return {
#'file_url': 'E:\\sources\\PDC\\video_ai\\videos\\PdC - Confronto PianoSoluzione_ seconda parte 2025-02-20 16-28-38.mp4',
'max_file_size': 50000000000, # 50mb
'chunk_size': 3000000, # Split into ~25MB chunks
'output_dir': "E:\\sources\\PDC\\video_ai\\videos"
}
def extract_audio(video_path, output_dir):
"""
Extract audio from video file and return path to audio file.
Args:
video_path (str): Path to the video file
output_dir (str): Directory to save the extracted audio
Returns:
str: Path to the extracted audio file
"""
print("Extracting audio from video file...")
temp_audio_path = os.path.join(output_dir, "temp_audio.mp3")
video = VideoFileClip(video_path)
video.audio.write_audiofile(temp_audio_path)
video.close()
return temp_audio_path
def process_video(file_path, output_dir):
"""
Process video file and convert to audio chunks if necessary.
Args:
file_path (str): Path to the video/audio file
output_dir (str): Directory to save processed files
Returns:
tuple: List of audio chunks, audio path, and whether input was video
"""
is_video = file_path.lower().endswith('.mp4')
audio_path = extract_audio(file_path, output_dir) if is_video else file_path
try:
return [audio_path], audio_path, is_video
except Exception as e:
print(f"Error in video/audio conversion: {e}")
raise
def split_audio_file(file_path):
"""Split an audio file into smaller chunks using pydub for proper audio handling."""
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
# Load the audio file
try:
audio = AudioSegment.from_file(file_path)
except Exception as e:
print(f"Error loading audio file: {e}")
return []
# Calculate chunk length in milliseconds (10MB equivalent)
chunk_length_ms = 10 * 60 * 1000 # 10 minutes per chunk
chunks = []
# Split the audio into chunks
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
chunk_end = chunk_start + chunk_length_ms
chunk = audio[chunk_start:chunk_end]
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
try:
chunk.export(chunk_filename, format="mp3")
chunks.append(chunk_filename)
except Exception as e:
print(f"Error exporting chunk {i}: {e}")
continue
return chunks
def split_audio_by_duration(file_path, duration_seconds):
"""
Split an audio file into chunks of a specified duration.
Args:
file_path (str): Path to the audio file
duration_seconds (int): Duration of each chunk in seconds
Returns:
list: List of paths to the audio chunks
"""
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
try:
audio = AudioSegment.from_file(file_path)
except Exception as e:
print(f"Error loading audio file: {e}")
return []
chunks = []
chunk_length_ms = duration_seconds * 1000 # Convert seconds to milliseconds
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
chunk_end = chunk_start + chunk_length_ms
chunk = audio[chunk_start:chunk_end]
chunk_filename = os.path.join(chunks_dir, f"chunk_{i}.mp3")
try:
chunk.export(chunk_filename, format="mp3")
chunks.append(chunk_filename)
print(f"Created chunk: {chunk_filename}")
except Exception as e:
print(f"Error exporting chunk: {e}")
return chunks
def transcribe_audio(audio_chunks, clientAudio):
"""
Transcribe audio chunks using AssemblyAI.
Args:
audio_chunks (list): List of audio file paths to transcribe
Returns:
str: Transcribed text
"""
transcripts = []
for chunk in audio_chunks:
print(f"Transcribing chunk: {chunk}")
audio_file = open(chunk, "rb")
try:
transcript = clientAudio.audio.transcriptions.create(
file=audio_file,
model="whisper-1",
response_format="text",
timestamp_granularities=["segment"],
language="it",
prompt="L'audio è il pezzo di una riunione registrata; potrebbe essere la continuazione di un precedente pezzo di audio"
)
transcripts.append(transcript)
except Exception as e:
print(f"Error in transcription: {e}")
return ' '.join(transcripts)
def generate_summary(transcription, base_filename, client, clientAudio):
"""
Generate summary from transcription using GPT.
Args:
transcription (str): Transcribed text
base_filename (str): Base filename for output files
clientAudio: Instance of OpenAI client for audio processing
Returns:
tuple: Original summary and cleaned summary
"""
summary = summarize_transcription(clientAudio, transcription)
summary_clean = clean_summary(clientAudio, summary)
verbaleGen = verbale(clientAudio, summary_clean)
return summary, summary_clean, verbaleGen
def cleanup_temp_files(audio_path):
"""
Clean up temporary files created during processing.
Args:
audio_path (str): Path to the temporary audio file
"""
if audio_path and os.path.exists(audio_path):
try:
os.remove(audio_path)
print(f"Cleaned up temporary audio file: {audio_path}")
except Exception as e:
print(f"Error cleaning up temporary file: {e}")
# Clean up the chunks directory
chunks_dir = os.path.join(get_config()['output_dir'], 'chunks')
if os.path.exists(chunks_dir):
try:
for filename in os.listdir(chunks_dir):
file_path = os.path.join(chunks_dir, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Removed chunk file: {file_path}")
os.rmdir(chunks_dir)
print(f"Removed chunks directory: {chunks_dir}")
except Exception as e:
print(f"Error cleaning up chunks directory: {e}")
def convert_markdown_to_word(markdown_text, output_file):
"""
Convert markdown text to MS Word document format using pypandoc.
Args:
markdown_text (str): The markdown text to convert
output_file (str): Path where to save the Word document
"""
try:
# Convert markdown to docx using pandoc
pypandoc.convert_text(
markdown_text,
'docx',
format='md',
outputfile=output_file,
extra_args=['--wrap=none', '--toc'] # --toc adds table of contents
)
return output_file
except Exception as e:
print(f"Error converting markdown to Word: {e}")
# Fallback to basic conversion if pandoc fails
doc = Document()
doc.add_paragraph(markdown_text)
doc.save(output_file)
return output_file
def verbalizza(client, verbale_file_name):
# Carica tutti i file che terminano per _summary_clean.md e metti il contenuto in una stringa
files = []
for root, dirs, filenames in os.walk(get_config()['output_dir']):
for filename in filenames:
if filename.endswith("vito_2.txt"):
files.append(os.path.join(root, filename))
# Ordina i file per data di modifica
files.sort(key=lambda x: os.path.getmtime(x))
# Carica il contenuto dei file in una stringa
all_text = ""
for file in files:
with open(file, 'r', encoding='utf-8') as f:
all_text += f.read() + "\n\n"
verbaleGen = verbale(client, all_text)
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
return verbaleGen
def main(file_url):
"""Main function to orchestrate the video processing pipeline."""
# Setup
client, clientAudio = setup_api_keys()
config = get_config()
base_filename = os.path.splitext(file_url)[0]
filename = os.path.basename(file_url)
filename = os.path.splitext(filename)[0]
folder_url = os.path.dirname(file_url)
print("BASE FILENAME: ", base_filename)
print("FILE URL: ", file_url)
print("OUTPUT DIR: ", config['output_dir'])
summary_file_name = 'AI - SUMMARY - TEMP - ' + filename
summary_clean_file_name = filename + ' - AISUMMARY'
verbale_file_name = filename + ' - AIVERBALE'
full_verbale_file_name = 'AI - FULL VERBALE - ' + filename
docx_file_name = folder_url + '/' + summary_clean_file_name + '.docx'
docx_verbale_file_name = folder_url + '/' + verbale_file_name + '.docx'
docx_full_verbale_file_name = folder_url + '/' + full_verbale_file_name + '.docx'
transcription_file_name = base_filename + '_transcription.txt'
audio_path = None
print("AUDIO PATH: ", audio_path)
print("SUMMARY FILE NAME: ", summary_file_name)
print("SUMMARY CLEAN FILE NAME: ", summary_clean_file_name)
print("VERBALE FILE NAME: ", verbale_file_name)
print("FULL VERBALE FILE NAME: ", full_verbale_file_name)
print("TRANSCRIPTION FILE NAME: ", transcription_file_name)
print("DOCX FILE NAME: ", docx_file_name)
print("DOCX VERBALE FILE NAME: ", docx_verbale_file_name)
print("DOCX FULL VERBALE FILE NAME: ", docx_full_verbale_file_name)
try:
# Process video/audio
audio_chunks, audio_path, is_video = process_video(
file_url,
config['output_dir']
)
# Split audio into chunks if necessary
if len(audio_chunks) == 1 and audio_chunks[0].endswith('.mp3'):
audio_chunks = split_audio_file(audio_path)
# Transcribe audio
transcription = transcribe_audio(audio_chunks, clientAudio)
# Save transcription to file
with open(transcription_file_name, "w", encoding='utf-8') as f:
f.write(transcription)
print(f"Saved transcription to file: {transcription_file_name}")
transcription = None
try:
with open(transcription_file_name, 'r', encoding='utf-8') as f:
transcription = f.read()
except FileNotFoundError:
print(f"Transcription file '{transcription_file_name}' not found. Skipping load transcription step.")
# Generate summary
summary, summary_clean, verbaleGen = generate_summary(transcription, base_filename, clientAudio, client)
# Save summaries to files
with open(summary_file_name + ".md", "w", encoding='utf-8') as f:
f.write(summary)
with open(summary_clean_file_name + ".md", "w", encoding='utf-8') as f:
f.write(summary_clean)
with open(verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
# Load the cleaned summary file
with open(summary_clean_file_name + ".md", 'r', encoding='utf-8') as f:
summary_clean = f.read()
# Convert to Word if needed
convert_markdown_to_word(summary_clean, docx_file_name)
# Convert to Word if needed
convert_markdown_to_word(verbaleGen, docx_verbale_file_name)
# Cleanup
cleanup_temp_files(audio_path)
print("Processing completed successfully!")
print("**************\n\n")
print(summary_clean)
except Exception as e:
print(f"Error during processing: {e}")
raise
def processo_verbale():
client, clientAudio = setup_api_keys()
config = get_config()
full_verbale_file_name = "full_verbale.md"
docx_full_verbale_file_name = full_verbale_file_name + '.docx'
verbaleGen = verbalizza(client, full_verbale_file_name)
with open(full_verbale_file_name + ".md", "w", encoding='utf-8') as f:
f.write(verbaleGen)
# Convert to Word if needed
convert_markdown_to_word(verbaleGen, docx_full_verbale_file_name)
if __name__ == "__main__":
folder = "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\03 Mar 2025 FFIT Replatforming PDC PO - Analisi Tecnica - Follow up pt2"
# for file in os.listdir(folder):
# if file.endswith(".mp4"):
# main(os.path.join(folder, file))
# folder = "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\03 Mar 2025 FFIT PDC ALLINEAMENTO PERIODICO"
# for file in os.listdir(folder):
# if file.endswith(".mp4"):
# main(os.path.join(folder, file))
# folder = "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\24 Feb 2025 FFIT - Replatforming PDC-PO - Definizione soluzione da adottare - Analisi tecnica"
# for file in os.listdir(folder):
# if file.endswith(".mp4"):
# main(os.path.join(folder, file))
main("E:\\obs\\PdC Review Piano Parte1 - 2025-03-05 09-11-44.mp4")
# folder = "C:\\Users\\rosat\\ARMUNDIA GROUP SRL\\Trasformazione PdC - Documents\\03 - Analisi e Disegno\\meetings\\24 Feb 2025 FFIT - Replatforming PDC-PO - Definizione soluzione da adottare - Analisi tecnica"
# for file in os.listdir(folder):
# if file.endswith(".mp4"):
# main(os.path.join(folder, file))
#processo_verbale()

228
assembly_gpt.py Normal file
View File

@ -0,0 +1,228 @@
from openai import OpenAI
import assemblyai as aai
from openai_client import OpenAIClient
import math
def split_into_chunks(text, num_chunks=5):
"""Split text into specified number of chunks of roughly equal size."""
words = text.split()
chunk_size = math.ceil(len(words) / num_chunks)
chunks = []
if chunk_size < 1:
return chunks
for i in range(0, len(words), chunk_size):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks
def summarize_chunk(client, chunk, previous_summary=None, previous_chunk=None):
"""Summarize a single chunk considering previous context."""
# Split previous_chunk into n parts
def split_into_n_parts(text, n):
part_length = len(text) // n
parts = [text[i * part_length:(i + 1) * part_length] for i in range(n - 1)]
parts.append(text[(n - 1) * part_length:]) # Add the remaining text to the last part
return parts
parts = []
if previous_chunk:
parts = split_into_n_parts(previous_chunk, 10)
last_part = parts[-1] if parts else ""
context = ""
if previous_summary and last_part:
context = f"""Previous chunk: {last_part}
Elaboration: {previous_summary}
"""
# system_prompt = f"""You will find next a transcript of a Teams Meeting divided into several chunks (you will receive a chunk at a
# time and the last parte of the previous one so that you can continue any interrupetd sentence or thought).
# [PAY ATTENTION: The text may not be perfect as it was transcribed from video]
# {context}
# Actual chunk: {chunk}
# [GOAL]
# Return a text in line with the user request. The text will be called "elaboration" in your context.
# Please update the "Elaboration" considering the additional information contained in the chunk.
# If you find no useful information in the chunk, just copy the "Elaboration" without modifications otherwise:
# add all the new information to the previous "Elaboration" and in case remove information that is no more relevant.
# Be complete and detailed in your response but avoid unnecessary details.
# The final result should be as long and detailed as possible.
# The result should be in business italian language.
# [PAY ATTENTION: due to the bad quality of the audio some naming could be incorrectly interpreted. Try to uniform names based on the whole context]
# \n"""
system_prompt = f"""You will find next a transcript of a Teams Meeting divided into several chunks (you will receive a chunk at a
time and the last part of the previous one so that you can continue any interrupetd sentence or thought).
[PAY ATTENTION: The text may not be perfect as it was transcribed from video try your best to find the correct meaning]
{context}
Actual chunk: {chunk}
[GOAL]
Return a text in line with the user request. The text will be called "elaboration" in your context.
Please update the "Elaboration" considering the additional information contained in the chunk.
If you find no useful information in the chunk, just copy the "Elaboration" without modifications otherwise:
add all the new information to the previous "Elaboration" and in case remove information that is no more relevant.
The result should be in business italian language.
[PAY ATTENTION: due to the bad quality of the audio some naming could be incorrectly interpreted. Try to uniform names based on the whole context]
\n"""
question = "Mi servono gli appunti di questa riunione come fossi uno studente che deve studiarla [dettaglia quante più informazioni possibile]. Segnala in coda un resoconto degli open-point e/o delle decisioni prese."
#question = "Mi serve il verbale di questa riunione (in formato società di consulenza in stile Deloitte). Di particolare importanza sono le decisioni prese e le cose da approfondire nei prossimi incontri."
response = client.create_completion(
model='gpt-4o',
messages=[
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': question}
],
temperature=0.1,
top_p=0.5,
max_tokens=16000,
)
if response.choices:
tmp = response.choices[0].message.content.strip()
print(tmp)
print("--------------------------------------------- \n\n\n")
return tmp
return ""
def clean_summary(client,summary):
system_prompt = f"""Following an automated generated text from a video transcript. Due to audio quality and
lack of knowledge about the context, the text may not be complete or accurate.
[ORIGINAL TEXT]
{summary}
[CONTEXT INFORMATION]
1 - The context of the conversation: is a meeting for the development the "PDC Project" where PDC stands for "Piattaforma di Consulenza".
2 - The PdC is a whealt management platform for the Fideuram Bank used by it's 6000 Private Banker.
3 - Related to the PDC there is te PO (Piattaforma Operativa) which is an appliation which manage the orders generated by the PDC.
4 - The platform is written through a framework called DARWIN which as front-end as e library called XDCE and as front a library called BEAR
one based in angular the other in java spring boot)
5 - WEADD è un'altra piattaforma di consulenza di Intesa Private che dovrà convergere all'interno della PDC
6 - FFIT o FF (Forward Fit) è un progetto in corso di allineamento di tutte le anagrafiche clienti e portafogli del mondo Intesa (ISPB, Fideuram etc.)
7 - FINV Sistema di invio ordini di Intesa
8 - CJ (customer journey) Moduli software includibili all'interno della PDC
9 - MUP (Motore unico Portafoglio)
10 - AUS (Anagrafica unica Strumenti)
11 - LDD (Layer di Disaccoppiamento). E' sufficiente l'acronimo (non va mai messo per intero)
12 - Convergenza Fondi è un altro progetto di interesse per il mondo Intesa/Fideuram che andrà in parallelo con quello della PDC
13 - Il concetto di "pianificazione" è molto usato in questo contesto (parole simili potrebbero fare riferimento a questo)
14 - PDC is the name of the platform so it's often used in this context (similar words may refer to this)
15 - Pay attention to the word "Pianificazione" it could be the name of a PDC Module or the word "Pianificazione" in italian. When it's the module name put it into brakets.
[GOAL]
Based on ORIGINAL TEXT and CONTEXT INFORMATION return what asked by the user.
\n"""
question = "Ritorna ORIGINAL TEXT corretto secondo il contesto passato in ingresso (correggi parole o acronimi errati)"
response = client.create_completion(
model='gpt-4o',
messages=[
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': question}
],
temperature=0.5,
max_tokens=16000,
)
if response.choices:
tmp = response.choices[0].message.content.strip()
print(tmp)
print("--------------------------------------------- \n\n\n")
return tmp
return ""
def verbale(client,summary):
system_prompt = f"""L'utente ti passerà il resoconto dettagliato di una riunione.
[CONTEXT INFORMATION]
1 - The context of the conversation: is a meeting for the development the "PDC Project" where PDC stands for "Piattaforma di Consulenza".
2 - The PdC is a whealt management platform for the Fideuram Bank used by it's 6000 Private Banker.
3 - Related to the PDC there is te PO (Piattaforma Operativa) which is an appliation which manage the orders generated by the PDC.
4 - The platform is written through a framework called DARWIN which as front-end as e library called XDCE and as front a library called BEAR
one based in angular the other in java spring boot)
5 - WEADD è un'altra piattaforma di consulenza di Intesa Private che dovrà convergere all'interno della PDC
6 - FFIT o FF (Forward Fit) è un progetto in corso di allineamento di tutte le anagrafiche clienti e portafogli del mondo Intesa (ISPB, Fideuram etc.)
7 - FINV Sistema di invio ordini di Intesa
8 - CJ (customer journey) Moduli software includibili all'interno della PDC
9 - MUP (Motore unico Portafoglio)
10 - AUS (Anagrafica unica Strumenti)
11 - LDD (Layer di Disaccoppiamento). E' sufficiente l'acronimo (non va mai messo per intero)
12 - Convergenza Fondi è un altro progetto di interesse per il mondo Intesa/Fideuram che andrà in parallelo con quello della PDC
13 - Il concetto di "pianificazione" è molto usato in questo contesto (parole simili potrebbero fare riferimento a questo)
14 - PDC is the name of the platform so it's often used in this context (similar words may refer to this)
15 - Pay attention to the word "Pianificazione" it could be the name of a PDC Module or the word "Pianificazione" in italian. When it's the module name put it into brakets.
16 - Si può parlare di Stored Procedure (quelle del database)
[OBIETTIVO]
Basandoti sul resoconto e dal contesto Mi serve che mi crei un verbale della riunione.
Mantieni l'ordine di quanto è scritto nel resoconto.
Il contesto è quello di una riunione in una banca per parlare dello sviluppo di un software.
Il contesto è business e software oriented.
Evita l'ordine del giorno.
Se trovi una sezione open-point anche questa va verbalizzata in modo riassuntivo (metti quelli più importanti o raggruppa dove puoi)
\n"""
question = summary
response = client.create_completion(
model='gpt-4o',
messages=[
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': question}
],
temperature=0.5,
max_tokens=16000,
)
if response.choices:
tmp = response.choices[0].message.content.strip()
print(tmp)
print("--------------------------------------------- \n\n\n")
return tmp
return ""
def summarize_transcription(client, transcript):
"""Use GPT-4 to summarize the transcription using recursive chunking."""
# Split transcript into chunks
chunks = split_into_chunks(transcript)
# Initialize variables for recursive summarization
current_summary = None
previous_chunk = None
i = 0
# Process each chunk recursively
for chunk in chunks:
print("**************** " + f"Chunk {i} \n\n")
i += 1
current_summary = summarize_chunk(client, chunk, current_summary, previous_chunk)
previous_chunk = chunk
return current_summary

59
chunk_test.py Normal file
View File

@ -0,0 +1,59 @@
import os
import math
from moviepy import VideoFileClip
MAX_FILE_SIZE = 26214400 # 25MB in bytes
CHUNK_SIZE = MAX_FILE_SIZE // 2 # Split into ~12MB chunks
def extract_audio(video_path):
"""Extract audio from video file and return path to audio file."""
print("Extracting audio from video file...")
temp_audio_path = os.path.join(os.path.dirname(video_path), "temp_audio.mp3")
video = VideoFileClip(video_path)
video.audio.write_audiofile(temp_audio_path)
video.close()
return temp_audio_path
def split_audio_file(file_path):
"""Split an audio file into smaller chunks using binary reading."""
chunks_dir = os.path.join(os.path.dirname(file_path), 'chunks')
os.makedirs(chunks_dir, exist_ok=True)
chunks = []
file_size = os.path.getsize(file_path)
total_chunks = math.ceil(file_size / CHUNK_SIZE)
with open(file_path, 'rb') as source_file:
for chunk_number in range(total_chunks):
# Create a file for each chunk in the chunks directory
chunk_filename = os.path.join(chunks_dir, f"chunk_{chunk_number}.mp3")
with open(chunk_filename, 'wb') as chunk_file:
chunk_data = source_file.read(CHUNK_SIZE)
chunk_file.write(chunk_data)
chunks.append(chunk_filename)
return chunks
def main():
FILE_URL = 'E:/obs/Trasform PdC - Architettura Fideuram con focus sulla PdC -20241212- parte I.mp4'
# FILE_URL = 'E:/obs/Trasform PdC - Architettura Fideuram con focus sulla PdC- 20241212 - parte II.mp4'
video_path = FILE_URL #input("Enter the file path of the video file: ")
video_path = video_path.strip('"')
# Extract audio from the video
audio_path = extract_audio(video_path)
# # Split the audio file into chunks
# chunks = split_audio_file(audio_path)
# print("Audio has been split into the following chunks:")
# for chunk in chunks:
# print(chunk)
if __name__ == "__main__":
main()

13
openai_client.py Normal file
View File

@ -0,0 +1,13 @@
from openai import OpenAI
#from config import Config
class OpenAIClient:
def __init__(self, api_key=None):
self.api_key = api_key #or Config.OPENAI_API_KEY
if not self.api_key:
raise ValueError("OpenAI API key not provided.")
self.client = OpenAI(api_key=api_key)
def create_completion(self, **kwargs):
return self.client.chat.completions.create(**kwargs)

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
openai
moviepy
ffmpeg-python
assemblyai
pydub
python-docx
pypandoc

58
transcript_openai.txt Normal file

File diff suppressed because one or more lines are too long