I love python packages

This commit is contained in:
WalkThroughTheDoorAndDoTheDinosaur
2024-04-30 20:50:07 -07:00
parent aa9a2ff806
commit 048b346370
5 changed files with 189 additions and 28 deletions

BIN
.gitignore vendored Normal file

Binary file not shown.

BIN
Bin/ffmpeg.exe Normal file

Binary file not shown.

View File

@@ -1,19 +1,40 @@
# Too Long, Didnt Watch
# TL/DW: Too Long, Didnt Watch
YouTube contains an incredible amount of knowledge, much of which is locked inside multi-hour videos. Let's extract and summarize with AI!
- `diarize.py` - download, transrcibe and diarize audio
- [yt-dlp](https://github.com/yt-dlp/yt-dlp) - download audio tracks of youtube videos
- [ffmpeg](https://github.com/FFmpeg/FFmpeg) - decompress audio
- [faster_whisper](https://github.com/SYSTRAN/faster-whisper) - speech to text
- [pyannote](https://github.com/pyannote/pyannote-audio) - diarization
### Pieces
- `diarize.py` - download, transcribe and diarize audio
1. First uses [yt-dlp](https://github.com/yt-dlp/yt-dlp) to download audio(optionally video) from supplied URL
2. Next, it uses [ffmpeg](https://github.com/FFmpeg/FFmpeg) to convert the resulting `.m4a` file to `.wav`
3. Then it uses [faster_whisper](https://github.com/SYSTRAN/faster-whisper) to transcribe the `.wav` file to `.txt`
4. After that, it uses [pyannote](https://github.com/pyannote/pyannote-audio) to perform 'diarorization'
5. Finally, it'll send the resulting txt to an LLM endpoint of your choice for summarization of the text.
* Goal is to support OpenAI/Claude/Cohere/Groq/local OpenAI endpoint (oobabooga/llama.cpp/exllama2) so you can either do a batch query to X endpoint, or just feed them one at a time. Your choice.
- `chunker.py` - break text into parts and prepare each part for LLM summarization
- `roller-*.py` - rolling summarization
- [can-ai-code](https://github.com/the-crypt-keeper/can-ai-code) - interview executors to run LLM inference
- `compare.py` - prepare LLM outputs for webapp
- `compare-app.py` - summary viewer webapp
This project is under active development and is not ready for production use.
### Setup
- **Linux**
1. X
2. Create a virtual env: `python -m venv`
3. Launch/activate your virtual env: `. .\scripts\activate.sh`
4. `pip install -r requirements.txt`
5.
- **Windows**
1. X
2. Create a virtual env: `python -m venv`
3. Launch/activate your virtual env: `. .\scripts\activate.ps1`
4. `pip install -r requirements.txt`
5.
### Credits
- [original](https://github.com/the-crypt-keeper/tldw)
- [yt-dlp](https://github.com/yt-dlp/yt-dlp)
- [ffmpeg](https://github.com/FFmpeg/FFmpeg)
- [faster_whisper](https://github.com/SYSTRAN/faster-whisper)
- [pyannote](https://github.com/pyannote/pyannote-audio)

View File

@@ -2,9 +2,16 @@
import datetime
import time
import os
import subprocess
import json
import torch
import contextlib
import platform # used for checking OS version
import shutil # used for checking existence of ffmpeg
import ffmpeg # Used for issuing commands to underlying ffmpeg executable, pip package ffmpeg is from 2018
# idk....
os.environ['KMP_DUPLICATE_LIB_OK']='True'
whisper_models = ["small", "medium", "small.en","medium.en"]
source_languages = {
@@ -18,6 +25,102 @@ source_languages = {
}
source_language_list = [key[0] for key in source_languages.items()]
print(r"""_____ _ ________ _ _
|_ _|| | / /| _ \| | | | _
| | | | / / | | | || | | |(_)
| | | | / / | | | || |/\| |
| | | |____ / / | |/ / \ /\ / _
\_/ \_____//_/ |___/ \/ \/ (_)
_ _
| | | |
| |_ ___ ___ | | ___ _ __ __ _
| __| / _ \ / _ \ | | / _ \ | '_ \ / _` |
| |_ | (_) || (_) | | || (_) || | | || (_| | _
\__| \___/ \___/ |_| \___/ |_| |_| \__, |( )
__/ ||/
|___/
_ _ _ _ _ _ _
| |(_) | | ( )| | | | | |
__| | _ __| | _ __ |/ | |_ __ __ __ _ | |_ ___ | |__
/ _` || | / _` || '_ \ | __| \ \ /\ / / / _` || __| / __|| '_ \
| (_| || || (_| || | | | | |_ \ V V / | (_| || |_ | (__ | | | |
\__,_||_| \__,_||_| |_| \__| \_/\_/ \__,_| \__| \___||_| |_|
""")
# Perform Platform Check
if platform.system() == "Linux":
print("Linux OS detected \n Running Linux appropriate commands")
userOS = "Linux"
elif platform.system() == "Windows":
print("Windows OS detected \n Running Windows appropriate commands")
userOS = "Windows"
else:
print("Other OS detected \n Maybe try running things manually?")
exit()
#print(userOS)
# Check for NVIDIA GPU and CUDA availability
try:
nvidia_smi = subprocess.check_output("nvidia-smi", shell=True).decode()
if "NVIDIA-SMI" in nvidia_smi:
print("NVIDIA GPU with CUDA is available.\n You can enable GPU processing if you wish.\n")
else:
print("NVIDIA GPU with CUDA is not available.\n You either have an AMD GPU, or you're stuck with CPU only.\n")
processing_choice = "cpu"
except subprocess.CalledProcessError:
print("NVIDIA GPU with CUDA is not available.\n You either have an AMD GPU, or you're stuck with CPU only.\n")
processing_choice = "cpu"
# Ask user if they would like to use either their GPU or their CPU for transcription
processing_input = input("Would you like to use your GPU or CPU for transcription? (1)GPU/(2)CPU): ").strip().upper()
if processing_choice.lower() != "cpu" and (processing_input.lower() == "gpu" or processing_input == "1"):
print("You've chosen to use the GPU.")
processing_choice = "gpu"
elif processing_input == "CPU" or processing_input == "2":
print("You've chosen to use the CPU.")
processing_choice = "cpu"
else:
print("Invalid choice. Please select either GPU or CPU.")
# check for existence of ffmpeg
if shutil.which("ffmpeg"):
pass
else:
print("ffmpeg is not installed.\n You can either install it manually, or through your package manager of choice.\n Windows users, builds are here: https://www.gyan.dev/ffmpeg/builds/")
print("Script will continue, but is likely to break")
#print(processing_choice)
# Ask the user for the URL of the video to be downloaded. Alternatively, ask the user for the location of a local txt file to be read in and parsed to a list to be processed individually
def get_video_url():
user_choice = input("Enter '1' to provide a video URL or '2' to specify a local text file path\n\t(the text file may contain both URLs and local file paths: ")
if user_choice == '1':
video_url = input("Enter the URL of the video to be downloaded: ")
return video_url
elif user_choice == '2':
file_path = input("Enter the path of the local text file to be read and processed: ")
return file_path
else:
print("Invalid choice. Please enter either '1' or '2'.")
return None
# Perform processing of list to create array of URLs/Files to be downloaded & converted.
# Parse list for lines starting with 'http' -> Sort into urls_array[]
# Parse list for file paths (?) -> Sort into urls_local[]
# Download + convert items in urls_array[] list
# Convert (if necessary) items in urls_array[] list
# Download video .m4a and info.json
def get_youtube(video_url):
import yt_dlp
@@ -35,29 +138,54 @@ def get_youtube(video_url):
print("Success download",video_url,"to", abs_video_path)
return abs_video_path
# Convert video .m4a into .wav
def convert_to_wav(video_file_path, offset = 0):
out_path = video_file_path.replace("m4a","wav")
if os.path.exists(out_path):
print("wav file already exists:", out_path)
return out_path
# Convert video .m4a into .wav using ffmpeg
# ffmpeg -i "example.mp4" -ar 16000 -ac 1 -c:a pcm_s16le "output.wav"
# https://www.gyan.dev/ffmpeg/builds/
import os
#os.system(r'.\Bin\ffmpeg.exe -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"')
def convert_to_wav(video_file_path, offset=0):
print("Starting conversion process of .m4a to .WAV\n\t You may have to hit 'ENTER' after a minute or two...")
# Change the extension of the output file to .wav
out_path = video_file_path.rsplit('.', 1)[0] + ".wav"
try:
print("starting conversion to wav")
offset_args = f"-ss {offset}" if offset>0 else ''
os.system(f'ffmpeg {offset_args} -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"')
print("conversion to wav ready:", out_path)
if os.name == "nt": # Check if the operating system is Windows
command = [
r".\Bin\ffmpeg.exe", # Assuming the working directory is correctly set where .\Bin exists
"-ss", "00:00:00", # Start at the beginning of the video
"-i", video_file_path,
"-ar", "16000", # Audio sample rate
"-ac", "1", # Number of audio channels
"-c:a", "pcm_s16le", # Audio codec
out_path
]
result = subprocess.run(command, text=True, capture_output=True)
if result.returncode == 0:
print("FFmpeg executed successfully")
print("Output:", result.stdout)
else:
print("Error in running FFmpeg")
print("Error Output:", result.stderr)
elif os.name == "posix": # Check if the operating system is Linux or macOS
os.system(f'ffmpeg -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"')
else:
print("Other OS detected. Not sure how you got here...")
print("Conversion to WAV completed:", out_path)
except Exception as e:
raise RuntimeError("Error converting.")
raise RuntimeError("Error converting video file to WAV. An issue occurred with ffmpeg.")
return out_path
# Transcribe .wav into .segments.json
def speech_to_text(video_file_path, selected_source_lang = 'en', whisper_model = 'small.en', vad_filter = False):
def speech_to_text(video_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False):
print('loading faster_whisper model:', whisper_model)
from faster_whisper import WhisperModel
model = WhisperModel(whisper_model, device="cuda")
# printf(processing_choice)
# 1 == GPU / 2 == CPU
model = WhisperModel(whisper_model, device=processing_choice)
time_start = time.time()
if(video_file_path == None):
raise ValueError("Error no video input")
@@ -102,6 +230,12 @@ def speech_to_text(video_file_path, selected_source_lang = 'en', whisper_model =
return segments
## Using Whisper.cpp
# Get-Whisper-GGML.ps1
# https://github.com/ggerganov/whisper.cpp/releases/latest
# TODO: https://huggingface.co/pyannote/speaker-diarization-3.1
# embedding_model = "pyannote/embedding", embedding_size=512
# embedding_model = "speechbrain/spkrec-ecapa-voxceleb", embedding_size=192
@@ -215,12 +349,18 @@ def speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embed
except Exception as e:
raise RuntimeError("Error Running inference with local model", e)
# Add function to check amount of arguments passed to script match what's expected
def main(youtube_url: str, num_speakers: int = 2, whisper_model: str = "small.en", offset: int = 0, vad_filter : bool = False):
# if user_choice == '2':
# video_path = get_youtube(list_of_videos)
#FIXME
video_path = get_youtube(youtube_url)
convert_to_wav(video_path, offset)
audio_file = convert_to_wav(video_path, offset)
segments = speech_to_text(video_path, whisper_model=whisper_model, vad_filter=vad_filter)
df_results, save_path = speaker_diarize(video_path, segments, num_speakers=num_speakers)
print("diarize complete:", save_path)
# df_results, save_path = speaker_diarize(video_path, segments, num_speakers=num_speakers)
# print("diarize complete:", save_path)
print("Transcription complete:", audio_file)
if __name__ == "__main__":
import fire

BIN
requirements.txt Normal file

Binary file not shown.