Still fumbling around

This commit is contained in:
Robert
2024-05-04 17:50:55 -07:00
parent e80bd9d20a
commit e3e121a83e
4 changed files with 444 additions and 89 deletions

BIN
.gitignore vendored

Binary file not shown.

View File

@@ -1,8 +1,45 @@
# TL/DW: Too Long, Didnt Watch
YouTube contains an incredible amount of knowledge, much of which is locked inside multi-hour videos. Let's extract and summarize with AI!
Take a URL, single video, list of URLs, or list of local videos + URLs and feed it into the script and have each video transcribed (and downloaded if not local) using faster-whisper. Transcriptions can then be shuffled off to an LLM API endpoint of your choice, whether that be local or remote. Any site supported by yt-dl is supported, so you can use this with sites besides just youtube.
Original: `YouTube contains an incredible amount of knowledge, much of which is locked inside multi-hour videos. Let's extract and summarize it with AI!`
### tl/dr:
- Use the script to transcribe a local file or remote url. Any url youtube-dl supports _should_ work. If you pass an OpenAPI endpoint as a second argument, and add your API key to the config file, you can have your resulting transcriptions summarized as well.
* The current approach to summarization is currently 'dumb'/naive, and will likely be replaced or additional functionality added to reflect actual practices and not just 'dump txt in and get an answer' approach.
Save time and use the `config.txt` file, it allows you to set these settings and have them used when ran.
```
usage: diarize.py [-h] [--api_url API_URL] [--num_speakers NUM_SPEAKERS] [--whisper_model WHISPER_MODEL]
[--offset OFFSET] [--vad_filter]
[input_path]
positional arguments:
input_path Path or URL of the video
options:
-h, --help show this help message and exit
--api_url API_URL API URL for summarization (optional)
--num_speakers NUM_SPEAKERS
Number of speakers (default: 2)
--whisper_model WHISPER_MODEL
Whisper model (default: small.en)
--offset OFFSET Offset in seconds (default: 0)
--vad_filter Enable VAD filter
```
### Pieces
- **Workflow**
1. Setup python + packages
2. Setup ffmpeg
3. Run `python diarize.py <video_url>` or `python diarize.py <List_of_videos.txt>`
4. If you want summarization, add your API keys (if needed[is needed for now]) to the `config.txt` file, and then re-run the script, passing in the URL endpoint of the API you want to use.
- OpenAI:
- Anthropic:
- Cohere:
### What's in the repo?
- `diarize.py` - download, transcribe and diarize audio
1. First uses [yt-dlp](https://github.com/yt-dlp/yt-dlp) to download audio(optionally video) from supplied URL
2. Next, it uses [ffmpeg](https://github.com/FFmpeg/FFmpeg) to convert the resulting `.m4a` file to `.wav`
@@ -41,6 +78,15 @@ YouTube contains an incredible amount of knowledge, much of which is locked insi
8. For feeding the transcriptions to the API of your choice, simply use the corresponding script for your API provider.
* FIXME: add scripts for OpenAI api (generic) and others
### Usage
- Single file (remote URL) transcription
* Single URL: `python diarize.py https://example.com/video.mp4`
- Single file (local) transcription)
* Transcribe a local file: `python diarize.py /path/to/your/localfile.mp4`
- Multiple files (local & remote)
* List of Files(can be URLs and local files mixed): `python diarize.py ./path/to/your/text_file.txt"`
### Credits
- [original](https://github.com/the-crypt-keeper/tldw)
- [yt-dlp](https://github.com/yt-dlp/yt-dlp)

13
config.txt Normal file
View File

@@ -0,0 +1,13 @@
[API]
openai_api_key = your_openai_api_key
anthropic_api_key = your_anthropic_api_key
anthropic_model = claude-v1
openai_model = gpt-3.5-turbo
cohere_api_key = your_cohere_api_key
cohere_model = base
[Paths]
output_path = Results
[Processing]
processing_choice = cuda

View File

@@ -1,19 +1,27 @@
#!/usr/bin/env python3
import datetime
import json
import logging
import platform # used for checking OS version
import shutil # used for checking existence of ffmpeg
import time
import unicodedata
import os
import subprocess
import argparse, configparser, datetime, json, logging, os, platform, requests, shutil, subprocess, sys, time, unicodedata
from datetime import datetime
import contextlib
import ffmpeg # Used for issuing commands to underlying ffmpeg executable, pip package ffmpeg is from 2018
import torch
import yt_dlp
#######
# Function Sections
#
# System Checks
# Processing Paths and local file handling
# Video Download/Handling
# Audio Transcription
# Diarization
# Summarizers
# Main
#
#######
####
#
# TL/DW: Too Long Didn't Watch
@@ -22,12 +30,25 @@ import yt_dlp
# Modifications made by https://github.com/rmusser01
# All credit to the original authors, I've just glued shit together.
#
#
# Usage:
# Single URL: python diarize.py https://example.com/video.mp4
# Transcribe a single URL:
# python diarize.py https://example.com/video.mp4
#
# List of Files: python diarize.py --input_path="path_to_your_text_file.txt"
# Transcribe a single URL and have the resulting transcription summarized:
# python diarize.py https://example.com/video.mp4
#
# Transcribe a local file: python diarize.py /path/to/your/localfile.mp4
# Transcribe a list of files:
# python diarize.py ./path/to/your/text_file.txt
#
# Transcribe a local file:
# python diarize.py /path/to/your/localfile.mp4
#
# Transcribe a local file and have it summarized:
# python diarize.py ./input.mp4 --api_name openai --api_key <your_openai_api_key>
#
# Transcribe a list of files and have them all summarized:
# python diarize.py path_to_your_text_file.txt --api_name <openai> --api_key <your_openai_api_key>
#
###
@@ -49,7 +70,28 @@ source_language_list = [key[0] for key in source_languages.items()]
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Read configuration from file
config = configparser.ConfigParser()
config.read('config.txt')
# Retrieve API keys and output paths from the configuration file
openai_api_key = config.get('API', 'openai_api_key', fallback=None)
anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None)
cohere_api_key = config.get('API', 'cohere_api_key', fallback=None)
output_path = config.get('Paths', 'output_path', fallback='Results')
# Retrieve Anthropic model from the configuration file
anthropic_model = config.get('API', 'anthropic_model', fallback='claude-v1')
# Retrieve OpenAI model from the configuration file
openai_model = config.get('API', 'openai_model', fallback='ChatGPT-4')
# Retrieve Cohere model from the configuration file
cohere_model = config.get('API', 'cohere_model', fallback='base')
# Retrieve processing choice from the configuration file
processing_choice = config.get('Processing', 'processing_choice', fallback='cpu')
print(r"""_____ _ ________ _ _
@@ -76,6 +118,10 @@ print(r"""_____ _ ________ _ _
\__,_||_| \__,_||_| |_| \__| \_/\_/ \__,_| \__| \___||_| |_|
""")
####################################################################################################################################
# System Checks
#
#
# Perform Platform Check
userOS = ""
@@ -99,7 +145,7 @@ def cuda_check():
nvidia_smi = subprocess.check_output("nvidia-smi", shell=True).decode()
if "NVIDIA-SMI" in nvidia_smi:
print("NVIDIA GPU with CUDA is available.")
processing_choice = "gpu" # Set processing_choice to gpu if NVIDIA GPU with CUDA is available
processing_choice = "cuda" # Set processing_choice to gpu if NVIDIA GPU with CUDA is available
else:
print("NVIDIA GPU with CUDA is not available.\nYou either have an AMD GPU, or you're stuck with CPU only.")
processing_choice = "cpu" # Set processing_choice to cpu if NVIDIA GPU with CUDA is not available
@@ -112,8 +158,8 @@ def cuda_check():
# Ask user if they would like to use either their GPU or their CPU for transcription
def decide_cpugpu():
global processing_choice
processing_input = input("Would you like to use your GPU or CPU for transcription? (1)GPU/(2)CPU): ")
if processing_choice == "gpu" and (processing_input.lower() == "gpu" or processing_input == "1"):
processing_input = input("Would you like to use your GPU or CPU for transcription? (1/cuda)GPU/(2/cpu)CPU): ")
if processing_choice == "cuda" and (processing_input.lower() == "cuda" or processing_input == "1"):
print("You've chosen to use the GPU.")
processing_choice = "cuda"
elif processing_input.lower() == "cpu" or processing_input == "2":
@@ -131,9 +177,21 @@ def check_ffmpeg():
else:
print("ffmpeg is not installed.\n You can either install it manually, or through your package manager of choice.\n Windows users, builds are here: https://www.gyan.dev/ffmpeg/builds/")
print("Script will continue, but is likely to break")
#
#
####################################################################################################################################
####################################################################################################################################
# Processing Paths and local file handling
#
#
def read_paths_from_file(file_path):
""" Reads a file containing URLs or local file paths and returns them as a list. """
paths = []
@@ -166,9 +224,19 @@ def process_local_file(file_path):
download_path = create_download_directory(title)
audio_file = convert_to_wav(file_path) # Assumes input files are videos needing audio extraction
return download_path, info_dict, audio_file
#
#
####################################################################################################################################
####################################################################################################################################
# Video Download/Handling
#
# Ask the user for the URL of the video to be downloaded. Alternatively, ask the user for the location of a local txt file to be read in and parsed to a list to be processed individually
def get_video_url():
user_choice = input("Enter '1' to provide a video URL or '2' to specify a local text file path\n\t(the text file may contain both URLs and local file paths: ")
@@ -235,100 +303,114 @@ def download_video(video_url, download_path, info_dict):
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
return file_path
#
#
####################################################################################################################################
####################################################################################################################################
# Audio Transcription
#
# Convert video .m4a into .wav using ffmpeg
# ffmpeg -i "example.mp4" -ar 16000 -ac 1 -c:a pcm_s16le "output.wav"
# https://www.gyan.dev/ffmpeg/builds/
# ffmpeg -i "example.mp4" -ar 16000 -ac 1 -c:a pcm_s16le "output.wav"
# https://www.gyan.dev/ffmpeg/builds/
#
#os.system(r'.\Bin\ffmpeg.exe -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"')
def convert_to_wav(video_file_path, offset=0):
print("Starting conversion process of .m4a to .WAV\n\t You may have to hit 'ENTER' after a minute or two...")
# Change the extension of the output file to .wav
out_path = video_file_path.rsplit('.', 1)[0] + ".wav"
print("Starting conversion process of .m4a to .WAV\n\t...You may need to hit enter after a minute or so...")
out_path = os.path.splitext(video_file_path)[0] + ".wav"
try:
if os.name == "nt": # Check if the operating system is Windows
if os.name == "nt":
command = [
r".\Bin\ffmpeg.exe", # Assuming the working directory is correctly set where .\Bin exists
"-ss", "00:00:00", # Start at the beginning of the video
r".\Bin\ffmpeg.exe", # Assuming the working directory is correctly set where .\Bin exists
"-ss", "00:00:00", # Start at the beginning of the video
"-i", video_file_path,
"-ar", "16000", # Audio sample rate
"-ac", "1", # Number of audio channels
"-c:a", "pcm_s16le", # Audio codec
"-ar", "16000", # Audio sample rate
"-ac", "1", # Number of audio channels
"-c:a", "pcm_s16le", # Audio codec
out_path
]
result = subprocess.run(command, text=True, capture_output=True)
if result.returncode == 0:
print("FFmpeg executed successfully")
print("Output:", result.stdout)
logging.info("FFmpeg executed successfully")
logging.debug("Output: %s", result.stdout)
else:
print("Error in running FFmpeg")
print("Error Output:", result.stderr)
elif os.name == "posix": # Check if the operating system is Linux or macOS
logging.error("Error in running FFmpeg")
logging.error("Error Output: %s", result.stderr)
elif os.name == "posix":
os.system(f'ffmpeg -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"')
else:
print("Other OS detected. Not sure how you got here...")
print("Conversion to WAV completed:", out_path)
raise RuntimeError("Unsupported operating system")
logging.info("Conversion to WAV completed: %s", out_path)
except subprocess.CalledProcessError as e:
logging.error("Error executing FFmpeg command: %s", str(e))
raise RuntimeError("Error converting video file to WAV")
except Exception as e:
raise RuntimeError("Error converting video file to WAV. An issue occurred with ffmpeg.")
logging.error("Unexpected error occurred: %s", str(e))
raise RuntimeError("Error converting video file to WAV")
return out_path
# Transcribe .wav into .segments.json
def speech_to_text(video_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False):
print('loading faster_whisper model:', whisper_model)
def speech_to_text(audio_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False):
logging.info('Loading faster_whisper model: %s', whisper_model)
from faster_whisper import WhisperModel
# printf(processing_choice)
# 1 == GPU / 2 == CPU
model = WhisperModel(whisper_model, device=f"{processing_choice}")
time_start = time.time()
if(video_file_path == None):
raise ValueError("Error no video input")
print(video_file_path)
if audio_file_path is None:
raise ValueError("No audio file provided")
logging.info("Audio file path: %s", audio_file_path)
try:
# Read and convert youtube video
_,file_ending = os.path.splitext(f'{video_file_path}')
audio_file = video_file_path.replace(file_ending, ".wav")
out_file = video_file_path.replace(file_ending, ".segments.json")
_, file_ending = os.path.splitext(audio_file_path)
out_file = audio_file_path.replace(file_ending, ".segments.json")
if os.path.exists(out_file):
print("segments file already exists:", out_file)
logging.info("Segments file already exists: %s", out_file)
with open(out_file) as f:
segments = json.load(f)
return segments
# Transcribe audio
print('starting transcription...')
logging.info('Starting transcription...')
options = dict(language=selected_source_lang, beam_size=5, best_of=5, vad_filter=vad_filter)
transcribe_options = dict(task="transcribe", **options)
# TODO: https://github.com/SYSTRAN/faster-whisper#vad-filter
segments_raw, info = model.transcribe(audio_file, **transcribe_options)
segments_raw, info = model.transcribe(audio_file_path, **transcribe_options)
# Convert back to original openai format
segments = []
i = 0
for segment_chunk in segments_raw:
chunk = {}
chunk["start"] = segment_chunk.start
chunk["end"] = segment_chunk.end
chunk["text"] = segment_chunk.text
print(chunk)
chunk = {
"start": segment_chunk.start,
"end": segment_chunk.end,
"text": segment_chunk.text
}
logging.debug("Segment: %s", chunk)
segments.append(chunk)
i += 1
print("transcribe audio done with fast whisper")
with open(out_file,'w') as f:
f.write(json.dumps(segments, indent=2))
logging.info("Transcription completed with faster_whisper")
with open(out_file, 'w') as f:
json.dump(segments, f, indent=2)
except Exception as e:
raise RuntimeError("Error transcribing.")
logging.error("Error transcribing audio: %s", str(e))
raise RuntimeError("Error transcribing audio")
return segments
#
#
####################################################################################################################################
####################################################################################################################################
# Diarization
#
# TODO: https://huggingface.co/pyannote/speaker-diarization-3.1
# embedding_model = "pyannote/embedding", embedding_size=512
# embedding_model = "speechbrain/spkrec-ecapa-voxceleb", embedding_size=192
@@ -441,10 +523,158 @@ def speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embed
except Exception as e:
raise RuntimeError("Error Running inference with local model", e)
#
#
####################################################################################################################################
def main(input_path: str, num_speakers: int = 2, whisper_model: str = "small.en", offset: int = 0, vad_filter: bool = False):
####################################################################################################################################
#Summarizers
#
#
# Summarize with OpenAI ChatGPT
def summarize_with_openai(api_key, file_path, model):
# Load your JSON data
with open(file_path, 'r') as file:
data = json.load(file)
# Extract text from your data structure, modify the key access as needed
text = data.get('transcription', '') # Adjust depending on your JSON structure
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
# Prepare the data for the OpenAI API
prompt_text = f"As a professional summarizer, create a concise and comprehensive summary of: {text}"
data = {
"model": model,
"messages": [
{
"role": "system",
"content": "You are a professional summarizer."
},
{
"role": "user",
"content": prompt_text
}
],
"max_tokens": 1024, # Adjust tokens as needed
"temperature": 0.7
}
response = requests.post('https://api.openai.com/v1/chat/completions', headers=headers, json=data)
if response.status_code == 200:
summary = response.json()['choices'][0]['message']['content'].strip()
print("Summary processed successfully.")
return summary
else:
print("Failed to process summary:", response.text)
return None
# Summarize with Anthropic Claude
def summarize_with_claude(api_key, file_path, model):
# Load your JSON data
with open(file_path, 'r') as file:
data = json.load(file)
# Extract text from your data structure, modify the key access as needed
text = data.get('transcription', '') # Adjust depending on your JSON structure
headers = {
'x-api-key': api_key,
'Content-Type': 'application/json'
}
# Prepare the data for the Claude API
prompt_text = f"As a professional summarizer, create a concise and comprehensive summary of: {text}"
data = {
"model": model,
"prompt": prompt_text,
"max_tokens_to_sample": 1024, # Adjust tokens as needed
"stop_sequences": ["\n\nHuman:"],
"temperature": 0.7
}
response = requests.post('https://api.anthropic.com/v1/complete', headers=headers, json=data)
if response.status_code == 200:
summary = response.json()['completion'].strip()
print("Summary processed successfully.")
return summary
else:
print("Failed to process summary:", response.text)
return None
# Summarize with Cohere
def summarize_with_cohere(api_key, file_path, model):
# Load your JSON data
with open(file_path, 'r') as file:
data = json.load(file)
# Extract text from your data structure, modify the key access as needed
text = data.get('transcription', '') # Adjust depending on your JSON structure
headers = {
'accept': 'application/json',
'content-type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
# Prepare the data for the Cohere API
prompt_text = f"As a professional summarizer, create a concise and comprehensive summary of: {text}"
data = {
"chat_history": [
{"role": "USER", "message": prompt_text}
],
"message": "Please provide a summary.",
"model": model,
"connectors": [{"id": "web-search"}]
}
response = requests.post('https://api.cohere.ai/v1/chat', headers=headers, json=data)
if response.status_code == 200:
summary = response.json()['response'].strip()
print("Summary processed successfully.")
return summary
else:
print("Failed to process summary:", response.text)
return None
def save_summary_to_file(summary, file_path):
summary_file_path = file_path.replace('.segments.json', '_summary.txt')
with open(summary_file_path, 'w') as file:
file.write(summary)
logging.info(f"Summary saved to file: {summary_file_path}")
#
#
####################################################################################################################################
####################################################################################################################################
# Main()
#
def main(input_path, api_name=None, api_key=None, num_speakers=2, whisper_model="small.en", offset=0, vad_filter=False):
if os.path.isfile(input_path) and input_path.endswith('.txt'):
paths = read_paths_from_file(input_path)
else:
@@ -452,37 +682,103 @@ def main(input_path: str, num_speakers: int = 2, whisper_model: str = "small.en"
results = []
for path in paths:
if path.startswith('http'):
info_dict = get_youtube(path)
if info_dict:
download_path = create_download_directory(info_dict['title'])
video_path = download_video(path, download_path, info_dict)
audio_file = convert_to_wav(video_path, offset)
else:
if os.path.exists(path):
download_path, info_dict, audio_file = process_local_file(path)
try:
if path.startswith('http'):
info_dict = get_youtube(path)
if info_dict:
download_path = create_download_directory(info_dict['title'])
video_path = download_video(path, download_path, info_dict)
audio_file = convert_to_wav(video_path, offset)
else:
logging.error(f"File does not exist: {path}")
continue
if os.path.exists(path):
download_path, info_dict, audio_file = process_local_file(path)
else:
logging.error(f"File does not exist: {path}")
continue
if info_dict:
segments = speech_to_text(audio_file, whisper_model=whisper_model, vad_filter=vad_filter)
results.append({
'video_path': path,
'audio_file': audio_file,
'transcription': segments
})
logging.info(f"Transcription complete: {audio_file}")
if info_dict:
segments = speech_to_text(audio_file, whisper_model=whisper_model, vad_filter=vad_filter)
transcription_result = {
'video_path': path,
'audio_file': audio_file,
'transcription': segments
}
results.append(transcription_result)
logging.info(f"Transcription complete: {audio_file}")
# Perform summarization based on the specified API
if api_name and api_key:
json_file_path = audio_file.replace('.wav', '.segments.json')
if api_name.lower() == 'openai':
summary = summarize_with_openai(api_key, json_file_path, openai_model)
elif api_name.lower() == 'anthropic':
summary = summarize_with_claude(api_key, json_file_path, anthropic_model)
elif api_name.lower() == 'cohere':
summary = summarize_with_cohere(api_key, json_file_path, cohere_model)
else:
logging.warning(f"Unsupported API: {api_name}")
summary = None
if summary:
transcription_result['summary'] = summary
logging.info(f"Summary generated using {api_name} API")
save_summary_to_file(summary, json_file_path)
else:
logging.warning(f"Failed to generate summary using {api_name} API")
except Exception as e:
logging.error(f"Error processing path: {path}")
logging.error(str(e))
return results
# Main Function - Execution starts here
if __name__ == "__main__":
import fire
parser = argparse.ArgumentParser(description='Transcribe and summarize videos.')
parser.add_argument('input_path', type=str, help='Path or URL of the video', nargs='?')
parser.add_argument('--api_url', type=str, help='API URL for summarization (optional)')
parser.add_argument('--api_name', type=str, help='API name for summarization (optional)')
parser.add_argument('--api_key', type=str, help='API key for summarization (optional)')
parser.add_argument('--num_speakers', type=int, default=2, help='Number of speakers (default: 2)')
parser.add_argument('--whisper_model', type=str, default='small.en', help='Whisper model (default: small.en)')
parser.add_argument('--offset', type=int, default=0, help='Offset in seconds (default: 0)')
parser.add_argument('--vad_filter', action='store_true', help='Enable VAD filter')
parser.add_argument('--anthropic_model', type=str, default='claude-v1', help='Anthropic model (default: claude-v1)')
parser.add_argument('--openai_model', type=str, default='base', help='OpenAI model (default: base)')
parser.add_argument('--cohere_model', type=str, default='base', help='Cohere model (default: base)')
parser.add_argument('--log_level', type=str, default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help='Log level (default: INFO)')
args = parser.parse_args()
if args.input_path is None:
parser.print_help()
sys.exit(1)
logging.basicConfig(level=getattr(logging, args.log_level), format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Starting the transcription and summarization process.')
logging.info(f'Input path: {args.input_path}')
logging.info(f'API URL: {args.api_url}')
logging.info(f'Number of speakers: {args.num_speakers}')
logging.info(f'Whisper model: {args.whisper_model}')
logging.info(f'Offset: {args.offset}')
logging.info(f'VAD filter: {args.vad_filter}')
if args.api_name and args.api_key:
logging.info(f'API: {args.api_name}')
logging.info('Summarization will be performed.')
else:
logging.info('No API specified. Summarization will not be performed.')
platform_check()
cuda_check()
decide_cpugpu()
check_ffmpeg()
fire.Fire(main)
try:
results = main(args.input_path, api_name=args.api_name, api_key=args.api_key, num_speakers=args.num_speakers, whisper_model=args.whisper_model, offset=args.offset, vad_filter=args.vad_filter)
logging.info('Transcription process completed.')
except Exception as e:
logging.error('An error occurred during the transcription process.')
logging.error(str(e))
sys.exit(1)