Added more logging + ffmpeg download functionality

Added logging and fixed README '-h' output
This commit is contained in:
Robert
2024-05-06 20:32:41 -07:00
parent c38a329e3b
commit a6d87a805b
2 changed files with 143 additions and 51 deletions

View File

@@ -12,10 +12,12 @@ Original: `YouTube contains an incredible amount of knowledge, much of which is
Save time and use the `config.txt` file, it allows you to set these settings and have them used when ran.
```
usage: diarize.py [-h] [--api_url API_URL] [--num_speakers NUM_SPEAKERS] [--whisper_model WHISPER_MODEL]
[--offset OFFSET] [--vad_filter]
usage: diarize.py [-h] [--api_name API_NAME] [--api_key API_KEY] [--num_speakers NUM_SPEAKERS] [--whisper_model WHISPER_MODEL] [--offset OFFSET]
[--vad_filter] [--log_level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
[input_path]
Transcribe and summarize videos.
positional arguments:
input_path Path or URL of the video
@@ -27,6 +29,7 @@ options:
Number of speakers (default: 2)
--whisper_model WHISPER_MODEL
Whisper model (default: small.en)
Available one: "`small`", "`medium`", "`small.en`","`medium.en`"
--offset OFFSET Offset in seconds (default: 0)
--vad_filter Enable VAD filter
--log_level {DEBUG,INFO,WARNING,ERROR,CRITICAL}

View File

@@ -20,6 +20,8 @@ import yt_dlp
#
#######
# To Do
# Offline diarization - https://github.com/pyannote/pyannote-audio/blob/develop/tutorials/community/offline_usage_speaker_diarization.ipynb
####
@@ -170,9 +172,11 @@ def decide_cpugpu():
processing_input = input("Would you like to use your GPU or CPU for transcription? (1/cuda)GPU/(2/cpu)CPU): ")
if processing_choice == "cuda" and (processing_input.lower() == "cuda" or processing_input == "1"):
print("You've chosen to use the GPU.")
logging.debug("GPU is being used for processing")
processing_choice = "cuda"
elif processing_input.lower() == "cpu" or processing_input == "2":
print("You've chosen to use the CPU.")
logging.debug("CPU is being used for processing")
processing_choice = "cpu"
else:
print("Invalid choice. Please select either GPU or CPU.")
@@ -182,10 +186,68 @@ def decide_cpugpu():
# check for existence of ffmpeg
def check_ffmpeg():
if shutil.which("ffmpeg"):
logging.debug("ffmpeg found installed on the local system, or at least in the local PATH")
pass
else:
print("ffmpeg is not installed.\n You can either install it manually, or through your package manager of choice.\n Windows users, builds are here: https://www.gyan.dev/ffmpeg/builds/")
print("Script will continue, but is likely to break")
logging.debug("ffmpeg not installed on the local system/in local PATH")
print("ffmpeg is not installed.\n\n You can either install it manually, or through your package manager of choice.\n Windows users, builds are here: https://www.gyan.dev/ffmpeg/builds/")
if userOS == "Windows":
download_ffmpeg()
elif userOS == "Linux":
print("You should install ffmpeg using your platform's appropriate package manager, 'apt install ffmpeg','dnf install ffmpeg' or 'pacman', etc.")
else:
logging.debug("running an unsupported OS")
print("You're running an unspported/Un-tested OS")
exit_script = input("Let's exit the script, unless you're feeling lucky? (y/n)")
if exit_script == "y" or "yes" or "1":
exit()
# Download ffmpeg
def download_ffmpeg():
user_choice = input("Do you want to download ffmpeg? (yes/no): ")
if user_choice.lower() == 'yes' or 'y' or '1':
# Download the zip file
url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-essentials.zip"
response = requests.get(url)
if response.status_code == 200:
logging.debug("Saving ffmpeg zip file")
zip_path = "ffmpeg-release-essentials.zip"
with open(zip_path, 'wb') as file:
file.write(response.content)
logging.debug("Extracting the 'ffmpeg.exe' file from the zip")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
ffmpeg_path = "ffmpeg-7.0-essentials_build/bin/ffmpeg.exe"
logging.debug("checking if the './Bin' folder exists, creating if not")
bin_folder = "Bin"
if not os.path.exists(bin_folder):
logging.debug("Creating a folder for './Bin', it didn't previously exist")
os.makedirs(bin_folder)
logging.debug("Extracting 'ffmpeg.exe' to the './Bin' folder")
zip_ref.extract(ffmpeg_path, path=bin_folder)
logging.debug("Moving 'ffmpeg.exe' to the './Bin' folder")
src_path = os.path.join(bin_folder, ffmpeg_path)
dst_path = os.path.join(bin_folder, "ffmpeg.exe")
shutil.move(src_path, dst_path)
# Remove the downloaded zip file
os.remove(zip_path)
logging.debug("ffmpeg.exe has been downloaded and extracted to the './Bin' folder.")
print("ffmpeg.exe has been downloaded and extracted to the './Bin' folder.")
else:
logging.error("Failed to download the zip file.")
print("Failed to download the zip file.")
else:
logging.debug("User chose to not download ffmpeg")
print("ffmpeg will not be downloaded.")
#
#
####################################################################################################################################
@@ -208,6 +270,7 @@ def read_paths_from_file(file_path):
for line in file:
line = line.strip()
if line and not os.path.exists(os.path.join('Results', normalize_title(line.split('/')[-1].split('.')[0]) + '.json')):
logging.debug("line successfully imported from file and added to list to be transcribed")
paths.append(line)
return paths
@@ -216,8 +279,10 @@ def read_paths_from_file(file_path):
def process_path(path):
""" Decides whether the path is a URL or a local file and processes accordingly. """
if path.startswith('http'):
logging.debug("file is a URL")
return get_youtube(path) # For YouTube URLs, modify to download and extract info
elif os.path.exists(path):
logging.debug("File is a path")
return process_local_file(path) # For local files, define a function to handle them
else:
logging.error(f"Path does not exist: {path}")
@@ -230,8 +295,11 @@ def process_local_file(file_path):
logging.info(f"Processing local file: {file_path}")
title = normalize_title(os.path.splitext(os.path.basename(file_path))[0])
info_dict = {'title': title}
logging.debug(f"Creating {title} directory...")
download_path = create_download_directory(title)
logging.debug(f"Converting '{title}' to an audio file (wav).")
audio_file = convert_to_wav(file_path) # Assumes input files are videos needing audio extraction
logging.debug(f"'{title}' succesfully converted to an audio file (wav).")
return download_path, info_dict, audio_file
#
#
@@ -246,37 +314,17 @@ def process_local_file(file_path):
# Video Download/Handling
#
# Ask the user for the URL of the video to be downloaded. Alternatively, ask the user for the location of a local txt file to be read in and parsed to a list to be processed individually
def get_video_url():
user_choice = input("Enter '1' to provide a video URL or '2' to specify a local text file path\n\t(the text file may contain both URLs and local file paths: ")
if user_choice == '1':
video_url = input("Enter the URL of the video to be downloaded: ")
return video_url
elif user_choice == '2':
file_path = input("Enter the path of the local text file to be read and processed: ")
return file_path
else:
print("Invalid choice. Please enter either '1' or '2'.")
return None
# Perform processing of list to create array of URLs/Files to be downloaded & converted.
# Parse list for lines starting with 'http' -> Sort into urls_array[]
# Parse list for file paths (?) -> Sort into urls_local[]
# Download + convert items in urls_array[] list
# Convert (if necessary) items in urls_array[] list
def create_download_directory(title):
base_dir = "Results"
# Remove characters that are illegal in Windows filenames and normalize
safe_title = normalize_title(title)
logging.debug(f"{title} successfully normalized")
session_path = os.path.join(base_dir, safe_title)
if not os.path.exists(session_path):
os.makedirs(session_path, exist_ok=True)
print(f"Created directory: {session_path}")
logging.debug(f"Created directory for downloaded video: {session_path}")
else:
print(f"Directory already exists: {session_path}")
logging.debug(f"Directory already exists for downloaded video: {session_path}")
return session_path
@@ -297,12 +345,15 @@ def get_youtube(video_url):
'extract_flat': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
logging.debug("About to extract youtube info")
info_dict = ydl.extract_info(video_url, download=False)
logging.debug("Youtube info successfully extracted")
return info_dict
def download_video(video_url, download_path, info_dict):
logging.debug("About to normalize downloaded video title")
title = normalize_title(info_dict['title'])
file_path = os.path.join(download_path, f"{title}.m4a")
ydl_opts = {
@@ -310,7 +361,9 @@ def download_video(video_url, download_path, info_dict):
'outtmpl': file_path,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
logging.debug("About to download video with youtube-dl")
ydl.download([video_url])
logging.debug("Video successfully downloaded with youtube-dl")
return file_path
#
#
@@ -336,6 +389,7 @@ def convert_to_wav(video_file_path, offset=0):
try:
if os.name == "nt":
logging.debug("Whisper being ran on windows")
command = [
r".\Bin\ffmpeg.exe", # Assuming the working directory is correctly set where .\Bin exists
"-ss", "00:00:00", # Start at the beginning of the video
@@ -368,7 +422,6 @@ def convert_to_wav(video_file_path, offset=0):
# Transcribe .wav into .segments.json
def speech_to_text(audio_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False):
logging.info('Loading faster_whisper model: %s', whisper_model)
from faster_whisper import WhisperModel
@@ -429,38 +482,38 @@ def speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embed
2. Applying agglomerative clustering on the embeddings to identify the speaker for each segment.
"""
try:
# Load embedding model
from pyannote.audio import Audio
from pyannote.core import Segment
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding( embedding_model, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))
import numpy as np
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
import tqdm
import wave
embedding_model = PretrainedSpeakerEmbedding( embedding_model, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))
_,file_ending = os.path.splitext(f'{video_file_path}')
audio_file = video_file_path.replace(file_ending, ".wav")
out_file = video_file_path.replace(file_ending, ".diarize.json")
# Get duration
import wave
logging.debug("getting duration of audio file")
with contextlib.closing(wave.open(audio_file,'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)
logging.debug("duration of audio file obtained")
print(f"duration of audio file: {duration}")
# Create embedding
def segment_embedding(segment):
logging.debug("Creating embedding")
audio = Audio()
start = segment["start"]
end = segment["end"]
# enforce a minimum segment length
# Enforcing a minimum segment length
if end-start < 0.3:
padding = 0.3-(end-start)
start -= padding/2
@@ -548,16 +601,17 @@ def speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embed
# Summarize with OpenAI ChatGPT
def extract_text_from_segments(segments):
logging.debug(f"openai: extracting text from {segment}")
text = ' '.join([segment['text'] for segment in segments])
return text
def summarize_with_openai(api_key, file_path, model):
try:
# Load your JSON data
logging.debug("openai: Loading json data for summarization")
with open(file_path, 'r') as file:
segments = json.load(file)
# Extract text from the segments
logging.debug("openai: Extracting text from the segments")
text = extract_text_from_segments(segments)
headers = {
@@ -565,7 +619,7 @@ def summarize_with_openai(api_key, file_path, model):
'Content-Type': 'application/json'
}
# Prepare the data for the OpenAI API
logging.debug("openai: Preparing data + prompt for submittal")
prompt_text = f"{text} \n\n\n\nPlease provide a detailed, bulleted list of the points made throughout the transcribed video and any supporting arguments made for said points"
data = {
"model": model,
@@ -582,28 +636,31 @@ def summarize_with_openai(api_key, file_path, model):
"max_tokens": 4096, # Adjust tokens as needed
"temperature": 0.7
}
logging.debug("openai: Posting request")
response = requests.post('https://api.openai.com/v1/chat/completions', headers=headers, json=data)
if response.status_code == 200:
summary = response.json()['choices'][0]['message']['content'].strip()
print("Summary processed successfully.")
logging.debug("openai: Summarization successful")
print("Summarization successful.")
return summary
else:
logging.debug("openai: Summarization failed")
print("Failed to process summary:", response.text)
return None
except Exception as e:
logging.debug("openai: Generalized error, see above")
print("Error occurred while processing summary with OpenAI:", str(e))
return None
def summarize_with_claude(api_key, file_path, model):
try:
# Load your JSON data
logging.debug("anthropic: Loading JSON data")
with open(file_path, 'r') as file:
segments = json.load(file)
# Extract text from the segments
logging.debug("anthropic: Extracting text from the segments")
text = extract_text_from_segments(segments)
headers = {
@@ -612,7 +669,7 @@ def summarize_with_claude(api_key, file_path, model):
'Content-Type': 'application/json'
}
# Prepare the data for the Claude API
logging.debug("anthropic: Prepping data + prompt for submittal")
user_message = {
"role": "user",
"content": f"{text} \n\n\n\nPlease provide a detailed, bulleted list of the points made throughout the transcribed video and any supporting arguments made for said points"
@@ -633,26 +690,33 @@ def summarize_with_claude(api_key, file_path, model):
"system": "You are a professional summarizer."
}
logging.debug("anthropic: Posting request to API")
response = requests.post('https://api.anthropic.com/v1/messages', headers=headers, json=data)
# Check if the status code indicates success
if response.status_code == 200:
logging.debug("anthropic: Post submittal successful")
response_data = response.json()
try:
summary = response_data['content'][0]['text'].strip()
logging.debug("anthropic: Summarization succesful")
print("Summary processed successfully.")
return summary
except (IndexError, KeyError) as e:
logging.debug("anthropic: Unexpected data in response")
print("Unexpected response format from Claude API:", response.text)
return None
elif response.status_code == 500: # Handle internal server error specifically
logging.debug("anthropic: Internal server error")
print("Internal server error from API. Retrying may be necessary.")
return None
else:
logging.debug(f"anthropic: Failed to summarize, status code {response.status_code}: {response.text}")
print(f"Failed to process summary, status code {response.status_code}: {response.text}")
return None
except Exception as e:
logging.debug("anthropic: Generalized error, see above")
print("Error occurred while processing summary with Claude:", str(e))
return None
@@ -660,11 +724,11 @@ def summarize_with_claude(api_key, file_path, model):
# Summarize with Cohere
def summarize_with_cohere(api_key, file_path, model):
# Load your JSON data
logging.debug("cohere: Loading JSON data")
with open(file_path, 'r') as file:
segments = json.load(file)
# Extract text from the segments
logging.debug("cohere: Extracting text from segments")
text = extract_text_from_segments(segments)
headers = {
@@ -673,7 +737,7 @@ def summarize_with_cohere(api_key, file_path, model):
'Authorization': f'Bearer {api_key}'
}
# Prepare the data for the Cohere API
logging.debug("cohere: Preparing data + prompt for submittal")
#prompt_text = f"As a professional summarizer, create a concise and comprehensive summary of: {text}"
prompt_text = f"{text} \n\n\n\nAs a professional summarizer, create a concise and comprehensive summary of the provided text, be it an article, post, conversation, or passage, while adhering to these guidelines: Craft a summary that is detailed, thorough, in-depth, and complex, while maintaining clarity and conciseness. Incorporate main ideas and essential information, eliminating extraneous language and focusing on critical aspects. Rely strictly on the provided text, without including external information. Format the summary in paragraph form for easy understanding. Conclude your notes with [End of Notes, Message #X] to indicate completion, where 'X' represents the total number of messages that I have sent. In other words, include a message counter where you start with #1 and add 1 to the message counter every time I send a message. By following this optimized prompt, you will generate an effective summary that encapsulates the essence of the given text in a clear, concise, and reader-friendly manner. Utilize markdown to cleanly format your output. Example: Bold key subject matter and potential areas that may need expanded information"
data = {
@@ -684,14 +748,17 @@ def summarize_with_cohere(api_key, file_path, model):
"model": model,
"connectors": [{"id": "web-search"}]
}
logging.debug("cohere: Submitting request to API endpoint")
response = requests.post('https://api.cohere.ai/v1/chat', headers=headers, json=data)
if response.status_code == 200:
logging.debug("cohere: Request was successful!")
summary = response.json()['response'].strip()
print("Summary processed successfully.")
return summary
else:
logging.debug("cohere: Unsuccessful request :(")
print("Failed to process summary:", response.text)
return None
@@ -699,14 +766,14 @@ def summarize_with_cohere(api_key, file_path, model):
def summarize_with_llama(api_url, file_path):
try:
# Load your JSON data
logging.debug("llama: Loading JSON data")
with open(file_path, 'r') as file:
segments = json.load(file)
# Extract text from the segments
logging.debug("llama: Extracting text from segments")
text = extract_text_from_segments(segments)
# Prepare the data for the llama.cpp API
logging.debug("llama: Preparing data + prompt for submittal")
data = {
"prompt": f"{text} \n\n\n\nPlease provide a detailed, bulleted list of the points made throughout the transcribed video and any supporting arguments made for said points",
"max_tokens": 4096,
@@ -721,22 +788,27 @@ def summarize_with_llama(api_url, file_path):
"n_predict": 4096
}
logging.debug("llama: POSTing data to API endpoint")
response = requests.post(api_url, json=data)
if response.status_code == 200:
logging.debug("llama: POST Successful")
summary = response.json()['content'].strip()
print("Summary processed successfully.")
return summary
else:
logging.debug("llama: Unsuccessful POST")
print("Failed to process summary:", response.text)
return None
except Exception as e:
logging.debug("llama: Generalized error, see above")
print("Error occurred while processing summary with llama.cpp:", str(e))
return None
def save_summary_to_file(summary, file_path):
summary_file_path = file_path.replace('.segments.json', '_summary.txt')
logging.debug("Opening summary file for writing, *segments.json with *_summary.txt")
with open(summary_file_path, 'w') as file:
file.write(summary)
logging.info(f"Summary saved to file: {summary_file_path}")
@@ -756,6 +828,7 @@ def save_summary_to_file(summary, file_path):
def main(input_path, api_name=None, api_key=None, num_speakers=2, whisper_model="small.en", offset=0, vad_filter=False):
if os.path.isfile(input_path) and input_path.endswith('.txt'):
logging.debug("MAIN: User passed in a text file, processing text file...")
paths = read_paths_from_file(input_path)
else:
paths = [input_path]
@@ -764,19 +837,28 @@ def main(input_path, api_name=None, api_key=None, num_speakers=2, whisper_model=
for path in paths:
try:
if path.startswith('http'):
logging.debug("MAIN: URL Detected")
info_dict = get_youtube(path)
if info_dict:
logging.debug("MAIN: Creating path for video file...")
download_path = create_download_directory(info_dict['title'])
logging.debug("MAIN: Path created successfully")
logging.debug("MAIN: Downloading video from yt_dlp...")
video_path = download_video(path, download_path, info_dict)
logging.debug("MAIN: Video downloaded successfully")
logging.debug("MAIN: Converting video file to WAV...")
audio_file = convert_to_wav(video_path, offset)
logging.debug("MAIN: Audio file converted succesfully")
else:
if os.path.exists(path):
logging.debug("MAIN: Local file path detected")
download_path, info_dict, audio_file = process_local_file(path)
else:
logging.error(f"File does not exist: {path}")
continue
if info_dict:
logging.debug("MAIN: Creating transcription file from WAV")
segments = speech_to_text(audio_file, whisper_model=whisper_model, vad_filter=vad_filter)
transcription_result = {
'video_path': path,
@@ -788,6 +870,7 @@ def main(input_path, api_name=None, api_key=None, num_speakers=2, whisper_model=
# Perform summarization based on the specified API
if api_name:
logging.debug(f"MAIN: Summarization being performed by {api_name}")
json_file_path = audio_file.replace('.wav', '.segments.json')
if api_name.lower() == 'openai':
api_key = openai_api_key
@@ -839,10 +922,13 @@ if __name__ == "__main__":
logging.info('Starting the transcription and summarization process.')
logging.info(f'Input path: {args.input_path}')
logging.info(f'API Name: {args.api_name}')
logging.debug(f'API Key: {api_key}') # ehhhhh
logging.info(f'Number of speakers: {args.num_speakers}')
logging.info(f'Whisper model: {args.whisper_model}')
logging.info(f'Offset: {args.offset}')
logging.info(f'VAD filter: {args.vad_filter}')
logging.info(f'Log Level: {args.log_level}') #lol
if args.api_name and args.api_key:
logging.info(f'API: {args.api_name}')
@@ -850,8 +936,11 @@ if __name__ == "__main__":
else:
logging.info('No API specified. Summarization will not be performed.')
logging.debug("Platform check being performed...")
platform_check()
logging.debug("CUDA check being performed...")
cuda_check()
logging.debug("ffmpeg check being performed...")
check_ffmpeg()
try: