Files
Luke Harries 0487e16623 feat: handle incoming media (images, audio, videos, documents) and show names in group messages (#34)
* added support for recieving media + improved group printing

* fix formatting
2025-04-06 17:39:40 +01:00

768 lines
24 KiB
Python

import sqlite3
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, List, Tuple
import os.path
import requests
import json
import audio
MESSAGES_DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'whatsapp-bridge', 'store', 'messages.db')
WHATSAPP_API_BASE_URL = "http://localhost:8080/api"
@dataclass
class Message:
timestamp: datetime
sender: str
content: str
is_from_me: bool
chat_jid: str
id: str
chat_name: Optional[str] = None
media_type: Optional[str] = None
@dataclass
class Chat:
jid: str
name: Optional[str]
last_message_time: Optional[datetime]
last_message: Optional[str] = None
last_sender: Optional[str] = None
last_is_from_me: Optional[bool] = None
@property
def is_group(self) -> bool:
"""Determine if chat is a group based on JID pattern."""
return self.jid.endswith("@g.us")
@dataclass
class Contact:
phone_number: str
name: Optional[str]
jid: str
@dataclass
class MessageContext:
message: Message
before: List[Message]
after: List[Message]
def get_sender_name(sender_jid: str) -> str:
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# First try matching by exact JID
cursor.execute("""
SELECT name
FROM chats
WHERE jid = ?
LIMIT 1
""", (sender_jid,))
result = cursor.fetchone()
# If no result, try looking for the number within JIDs
if not result:
# Extract the phone number part if it's a JID
if '@' in sender_jid:
phone_part = sender_jid.split('@')[0]
else:
phone_part = sender_jid
cursor.execute("""
SELECT name
FROM chats
WHERE jid LIKE ?
LIMIT 1
""", (f"%{phone_part}%",))
result = cursor.fetchone()
if result and result[0]:
return result[0]
else:
return sender_jid
except sqlite3.Error as e:
print(f"Database error while getting sender name: {e}")
return sender_jid
finally:
if 'conn' in locals():
conn.close()
def format_message(message: Message, show_chat_info: bool = True) -> None:
"""Print a single message with consistent formatting."""
output = ""
if show_chat_info and message.chat_name:
output += f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] Chat: {message.chat_name} "
else:
output += f"[{message.timestamp:%Y-%m-%d %H:%M:%S}] "
content_prefix = ""
if hasattr(message, 'media_type') and message.media_type:
content_prefix = f"[{message.media_type} - Message ID: {message.id} - Chat JID: {message.chat_jid}] "
try:
sender_name = get_sender_name(message.sender) if not message.is_from_me else "Me"
output += f"From: {sender_name}: {content_prefix}{message.content}\n"
except Exception as e:
print(f"Error formatting message: {e}")
return output
def format_messages_list(messages: List[Message], show_chat_info: bool = True) -> None:
output = ""
if not messages:
output += "No messages to display."
return output
for message in messages:
output += format_message(message, show_chat_info)
return output
def list_messages(
after: Optional[str] = None,
before: Optional[str] = None,
sender_phone_number: Optional[str] = None,
chat_jid: Optional[str] = None,
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_context: bool = True,
context_before: int = 1,
context_after: int = 1
) -> List[Message]:
"""Get messages matching the specified criteria with optional context."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["SELECT messages.timestamp, messages.sender, chats.name, messages.content, messages.is_from_me, chats.jid, messages.id, messages.media_type FROM messages"]
query_parts.append("JOIN chats ON messages.chat_jid = chats.jid")
where_clauses = []
params = []
# Add filters
if after:
try:
after = datetime.fromisoformat(after)
except ValueError:
raise ValueError(f"Invalid date format for 'after': {after}. Please use ISO-8601 format.")
where_clauses.append("messages.timestamp > ?")
params.append(after)
if before:
try:
before = datetime.fromisoformat(before)
except ValueError:
raise ValueError(f"Invalid date format for 'before': {before}. Please use ISO-8601 format.")
where_clauses.append("messages.timestamp < ?")
params.append(before)
if sender_phone_number:
where_clauses.append("messages.sender = ?")
params.append(sender_phone_number)
if chat_jid:
where_clauses.append("messages.chat_jid = ?")
params.append(chat_jid)
if query:
where_clauses.append("LOWER(messages.content) LIKE LOWER(?)")
params.append(f"%{query}%")
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add pagination
offset = page * limit
query_parts.append("ORDER BY messages.timestamp DESC")
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
messages = cursor.fetchall()
result = []
for msg in messages:
message = Message(
timestamp=datetime.fromisoformat(msg[0]),
sender=msg[1],
chat_name=msg[2],
content=msg[3],
is_from_me=msg[4],
chat_jid=msg[5],
id=msg[6],
media_type=msg[7]
)
result.append(message)
if include_context and result:
# Add context for each message
messages_with_context = []
for msg in result:
context = get_message_context(msg.id, context_before, context_after)
messages_with_context.extend(context.before)
messages_with_context.append(context.message)
messages_with_context.extend(context.after)
return format_messages_list(messages_with_context, show_chat_info=True)
# Format and display messages without context
return format_messages_list(result, show_chat_info=True)
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_message_context(
message_id: str,
before: int = 5,
after: int = 5
) -> MessageContext:
"""Get context around a specific message."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Get the target message first
cursor.execute("""
SELECT messages.timestamp, messages.sender, chats.name, messages.content, messages.is_from_me, chats.jid, messages.id, messages.chat_jid, messages.media_type
FROM messages
JOIN chats ON messages.chat_jid = chats.jid
WHERE messages.id = ?
""", (message_id,))
msg_data = cursor.fetchone()
if not msg_data:
raise ValueError(f"Message with ID {message_id} not found")
target_message = Message(
timestamp=datetime.fromisoformat(msg_data[0]),
sender=msg_data[1],
chat_name=msg_data[2],
content=msg_data[3],
is_from_me=msg_data[4],
chat_jid=msg_data[5],
id=msg_data[6],
media_type=msg_data[8]
)
# Get messages before
cursor.execute("""
SELECT messages.timestamp, messages.sender, chats.name, messages.content, messages.is_from_me, chats.jid, messages.id, messages.media_type
FROM messages
JOIN chats ON messages.chat_jid = chats.jid
WHERE messages.chat_jid = ? AND messages.timestamp < ?
ORDER BY messages.timestamp DESC
LIMIT ?
""", (msg_data[7], msg_data[0], before))
before_messages = []
for msg in cursor.fetchall():
before_messages.append(Message(
timestamp=datetime.fromisoformat(msg[0]),
sender=msg[1],
chat_name=msg[2],
content=msg[3],
is_from_me=msg[4],
chat_jid=msg[5],
id=msg[6],
media_type=msg[7]
))
# Get messages after
cursor.execute("""
SELECT messages.timestamp, messages.sender, chats.name, messages.content, messages.is_from_me, chats.jid, messages.id, messages.media_type
FROM messages
JOIN chats ON messages.chat_jid = chats.jid
WHERE messages.chat_jid = ? AND messages.timestamp > ?
ORDER BY messages.timestamp ASC
LIMIT ?
""", (msg_data[7], msg_data[0], after))
after_messages = []
for msg in cursor.fetchall():
after_messages.append(Message(
timestamp=datetime.fromisoformat(msg[0]),
sender=msg[1],
chat_name=msg[2],
content=msg[3],
is_from_me=msg[4],
chat_jid=msg[5],
id=msg[6],
media_type=msg[7]
))
return MessageContext(
message=target_message,
before=before_messages,
after=after_messages
)
except sqlite3.Error as e:
print(f"Database error: {e}")
raise
finally:
if 'conn' in locals():
conn.close()
def list_chats(
query: Optional[str] = None,
limit: int = 20,
page: int = 0,
include_last_message: bool = True,
sort_by: str = "last_active"
) -> List[Chat]:
"""Get chats matching the specified criteria."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Build base query
query_parts = ["""
SELECT
chats.jid,
chats.name,
chats.last_message_time,
messages.content as last_message,
messages.sender as last_sender,
messages.is_from_me as last_is_from_me
FROM chats
"""]
if include_last_message:
query_parts.append("""
LEFT JOIN messages ON chats.jid = messages.chat_jid
AND chats.last_message_time = messages.timestamp
""")
where_clauses = []
params = []
if query:
where_clauses.append("(LOWER(chats.name) LIKE LOWER(?) OR chats.jid LIKE ?)")
params.extend([f"%{query}%", f"%{query}%"])
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
# Add sorting
order_by = "chats.last_message_time DESC" if sort_by == "last_active" else "chats.name"
query_parts.append(f"ORDER BY {order_by}")
# Add pagination
offset = (page ) * limit
query_parts.append("LIMIT ? OFFSET ?")
params.extend([limit, offset])
cursor.execute(" ".join(query_parts), tuple(params))
chats = cursor.fetchall()
result = []
for chat_data in chats:
chat = Chat(
jid=chat_data[0],
name=chat_data[1],
last_message_time=datetime.fromisoformat(chat_data[2]) if chat_data[2] else None,
last_message=chat_data[3],
last_sender=chat_data[4],
last_is_from_me=chat_data[5]
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def search_contacts(query: str) -> List[Contact]:
"""Search contacts by name or phone number."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
# Split query into characters to support partial matching
search_pattern = '%' +query + '%'
cursor.execute("""
SELECT DISTINCT
jid,
name
FROM chats
WHERE
(LOWER(name) LIKE LOWER(?) OR LOWER(jid) LIKE LOWER(?))
AND jid NOT LIKE '%@g.us'
ORDER BY name, jid
LIMIT 50
""", (search_pattern, search_pattern))
contacts = cursor.fetchall()
result = []
for contact_data in contacts:
contact = Contact(
phone_number=contact_data[0].split('@')[0],
name=contact_data[1],
jid=contact_data[0]
)
result.append(contact)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_contact_chats(jid: str, limit: int = 20, page: int = 0) -> List[Chat]:
"""Get all chats involving the contact.
Args:
jid: The contact's JID to search for
limit: Maximum number of chats to return (default 20)
page: Page number for pagination (default 0)
"""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
c.jid,
c.name,
c.last_message_time,
m.content as last_message,
m.sender as last_sender,
m.is_from_me as last_is_from_me
FROM chats c
JOIN messages m ON c.jid = m.chat_jid
WHERE m.sender = ? OR c.jid = ?
ORDER BY c.last_message_time DESC
LIMIT ? OFFSET ?
""", (jid, jid, limit, page * limit))
chats = cursor.fetchall()
result = []
for chat_data in chats:
chat = Chat(
jid=chat_data[0],
name=chat_data[1],
last_message_time=datetime.fromisoformat(chat_data[2]) if chat_data[2] else None,
last_message=chat_data[3],
last_sender=chat_data[4],
last_is_from_me=chat_data[5]
)
result.append(chat)
return result
except sqlite3.Error as e:
print(f"Database error: {e}")
return []
finally:
if 'conn' in locals():
conn.close()
def get_last_interaction(jid: str) -> str:
"""Get most recent message involving the contact."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT
m.timestamp,
m.sender,
c.name,
m.content,
m.is_from_me,
c.jid,
m.id,
m.media_type
FROM messages m
JOIN chats c ON m.chat_jid = c.jid
WHERE m.sender = ? OR c.jid = ?
ORDER BY m.timestamp DESC
LIMIT 1
""", (jid, jid))
msg_data = cursor.fetchone()
if not msg_data:
return None
message = Message(
timestamp=datetime.fromisoformat(msg_data[0]),
sender=msg_data[1],
chat_name=msg_data[2],
content=msg_data[3],
is_from_me=msg_data[4],
chat_jid=msg_data[5],
id=msg_data[6],
media_type=msg_data[7]
)
return format_message(message)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_chat(chat_jid: str, include_last_message: bool = True) -> Optional[Chat]:
"""Get chat metadata by JID."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
query = """
SELECT
c.jid,
c.name,
c.last_message_time,
m.content as last_message,
m.sender as last_sender,
m.is_from_me as last_is_from_me
FROM chats c
"""
if include_last_message:
query += """
LEFT JOIN messages m ON c.jid = m.chat_jid
AND c.last_message_time = m.timestamp
"""
query += " WHERE c.jid = ?"
cursor.execute(query, (chat_jid,))
chat_data = cursor.fetchone()
if not chat_data:
return None
return Chat(
jid=chat_data[0],
name=chat_data[1],
last_message_time=datetime.fromisoformat(chat_data[2]) if chat_data[2] else None,
last_message=chat_data[3],
last_sender=chat_data[4],
last_is_from_me=chat_data[5]
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def get_direct_chat_by_contact(sender_phone_number: str) -> Optional[Chat]:
"""Get chat metadata by sender phone number."""
try:
conn = sqlite3.connect(MESSAGES_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT
c.jid,
c.name,
c.last_message_time,
m.content as last_message,
m.sender as last_sender,
m.is_from_me as last_is_from_me
FROM chats c
LEFT JOIN messages m ON c.jid = m.chat_jid
AND c.last_message_time = m.timestamp
WHERE c.jid LIKE ? AND c.jid NOT LIKE '%@g.us'
LIMIT 1
""", (f"%{sender_phone_number}%",))
chat_data = cursor.fetchone()
if not chat_data:
return None
return Chat(
jid=chat_data[0],
name=chat_data[1],
last_message_time=datetime.fromisoformat(chat_data[2]) if chat_data[2] else None,
last_message=chat_data[3],
last_sender=chat_data[4],
last_is_from_me=chat_data[5]
)
except sqlite3.Error as e:
print(f"Database error: {e}")
return None
finally:
if 'conn' in locals():
conn.close()
def send_message(recipient: str, message: str) -> Tuple[bool, str]:
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
url = f"{WHATSAPP_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"message": message,
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: {response.text}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def send_file(recipient: str, media_path: str) -> Tuple[bool, str]:
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
if not media_path:
return False, "Media path must be provided"
if not os.path.isfile(media_path):
return False, f"Media file not found: {media_path}"
url = f"{WHATSAPP_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"media_path": media_path
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: {response.text}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def send_audio_message(recipient: str, media_path: str) -> Tuple[bool, str]:
try:
# Validate input
if not recipient:
return False, "Recipient must be provided"
if not media_path:
return False, "Media path must be provided"
if not os.path.isfile(media_path):
return False, f"Media file not found: {media_path}"
if not media_path.endswith(".ogg"):
try:
media_path = audio.convert_to_opus_ogg_temp(media_path)
except Exception as e:
return False, f"Error converting file to opus ogg. You likely need to install ffmpeg: {str(e)}"
url = f"{WHATSAPP_API_BASE_URL}/send"
payload = {
"recipient": recipient,
"media_path": media_path
}
response = requests.post(url, json=payload)
# Check if the request was successful
if response.status_code == 200:
result = response.json()
return result.get("success", False), result.get("message", "Unknown response")
else:
return False, f"Error: HTTP {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
except json.JSONDecodeError:
return False, f"Error parsing response: {response.text}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def download_media(message_id: str, chat_jid: str) -> Optional[str]:
"""Download media from a message and return the local file path.
Args:
message_id: The ID of the message containing the media
chat_jid: The JID of the chat containing the message
Returns:
The local file path if download was successful, None otherwise
"""
try:
url = f"{WHATSAPP_API_BASE_URL}/download"
payload = {
"message_id": message_id,
"chat_jid": chat_jid
}
response = requests.post(url, json=payload)
if response.status_code == 200:
result = response.json()
if result.get("success", False):
path = result.get("path")
print(f"Media downloaded successfully: {path}")
return path
else:
print(f"Download failed: {result.get('message', 'Unknown error')}")
return None
else:
print(f"Error: HTTP {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f"Request error: {str(e)}")
return None
except json.JSONDecodeError:
print(f"Error parsing response: {response.text}")
return None
except Exception as e:
print(f"Unexpected error: {str(e)}")
return None