diff --git a/diarize.py b/diarize.py index 0d1fab7..656a9dc 100644 --- a/diarize.py +++ b/diarize.py @@ -1,21 +1,36 @@ #!/usr/bin/env python3 import datetime -import unicodedata -import time -import os -import subprocess import json import logging -import torch -import contextlib import platform # used for checking OS version import shutil # used for checking existence of ffmpeg +import time +import unicodedata +import os +import subprocess +import contextlib import ffmpeg # Used for issuing commands to underlying ffmpeg executable, pip package ffmpeg is from 2018 +import torch import yt_dlp -# idk.... +#### +# +# TL/DW: Too Long Didn't Watch +# +# Project originally created by https://github.com/the-crypt-keeper +# Modifications made by https://github.com/rmusser01 +# All credit to the original authors, I've just glued shit together. +# +### + +### # To Dos -# Implement more logging and remove print statements +# Implement more logging (add an actual log file) +# Add conditional args for whether its ran in batch mode(File supplied) or single use (single url) +# Add support for actual summarization +# Add benchmarking for summarization results for various LLM usages. +# Add option for Whisper model selection/download +### os.environ['KMP_DUPLICATE_LIB_OK']='True' @@ -34,9 +49,6 @@ source_language_list = [key[0] for key in source_languages.items()] # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -# Global variable to store the path of the last downloaded video -last_video_path = None - @@ -80,8 +92,6 @@ def platform_check(): - - # Check for NVIDIA GPU and CUDA availability def cuda_check(): global processing_choice @@ -124,6 +134,38 @@ def check_ffmpeg(): +def read_paths_from_file(file_path): + """ Reads a file containing URLs or local file paths and returns them as a list. """ + paths = [] + with open(file_path, 'r') as file: + for line in file: + line = line.strip() + if line and not os.path.exists(os.path.join('Results', normalize_title(line.split('/')[-1].split('.')[0]) + '.json')): + paths.append(line) + return paths + + + +def process_path(path): + """ Decides whether the path is a URL or a local file and processes accordingly. """ + if path.startswith('http'): + return get_youtube(path) # For YouTube URLs, modify to download and extract info + elif os.path.exists(path): + return process_local_file(path) # For local files, define a function to handle them + else: + logging.error(f"Path does not exist: {path}") + return None + + + +# FIXME +def process_local_file(file_path): + logging.info(f"Processing local file: {file_path}") + # Implement processing logic here + return {'title': os.path.basename(file_path)} + + + # Ask the user for the URL of the video to be downloaded. Alternatively, ask the user for the location of a local txt file to be read in and parsed to a list to be processed individually def get_video_url(): user_choice = input("Enter '1' to provide a video URL or '2' to specify a local text file path\n\t(the text file may contain both URLs and local file paths: ") @@ -398,7 +440,7 @@ def speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embed raise RuntimeError("Error Running inference with local model", e) - +""" def main(youtube_url: str, num_speakers: int = 2, whisper_model: str = "small.en", offset: int = 0, vad_filter : bool = False): info_dict = get_youtube(youtube_url) download_path = create_download_directory(info_dict['title']) @@ -410,6 +452,32 @@ def main(youtube_url: str, num_speakers: int = 2, whisper_model: str = "small.en # print("diarize complete:", save_path) print("Transcription complete:", audio_file) #FIXME +""" + + +def main(input_path: str, num_speakers: int = 2, whisper_model: str = "small.en", offset: int = 0, vad_filter: bool = False): + if os.path.isfile(input_path) and input_path.endswith('.txt'): + paths = read_paths_from_file(input_path) + elif input_path.startswith('http'): + paths = [input_path] + else: + logging.error("Invalid input: Please provide a valid URL or a text file path.") + return + + results = [] + for path in paths: + info_dict = process_path(path) + if info_dict: + download_path = create_download_directory(info_dict['title']) + video_path = download_video(path, download_path, info_dict) + audio_file = convert_to_wav(video_path, offset) + segments = speech_to_text(audio_file, whisper_model=whisper_model, vad_filter=vad_filter) + # Uncomment the next line if diarization is needed + # df_results, save_path = speaker_diarize(audio_file, segments, num_speakers=num_speakers) + results.append({'video_path': video_path, 'audio_file': audio_file, 'transcription': segments}) + logging.info("Transcription complete: " + audio_file) + return results + # Main Function - Execution starts here @@ -419,4 +487,4 @@ if __name__ == "__main__": cuda_check() decide_cpugpu() check_ffmpeg() - fire.Fire(main) \ No newline at end of file + fire.Fire(main)