|
|
|
@@ -9,7 +9,7 @@ from typing import TypeVar, Type, Optional, ClassVar, Any
|
|
|
|
|
from .message import Message, MessageFilter, MessageError
|
|
|
|
|
|
|
|
|
|
ChatInst = TypeVar('ChatInst', bound='Chat')
|
|
|
|
|
ChatDirInst = TypeVar('ChatDirInst', bound='ChatDir')
|
|
|
|
|
ChatDBInst = TypeVar('ChatDBInst', bound='ChatDB')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChatError(Exception):
|
|
|
|
@@ -39,6 +39,29 @@ class Chat:
|
|
|
|
|
"""
|
|
|
|
|
self.messages = [m for m in self.messages if m.match(mfilter)]
|
|
|
|
|
|
|
|
|
|
def sort(self, reverse: bool = False) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Sort the messages according to 'Message.msg_id()'.
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# the message may not have an ID if it doesn't have a file_path
|
|
|
|
|
self.messages.sort(key=lambda m: m.msg_id(), reverse=reverse)
|
|
|
|
|
except MessageError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def clear(self) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Delete all messages.
|
|
|
|
|
"""
|
|
|
|
|
self.messages = []
|
|
|
|
|
|
|
|
|
|
def add_msgs(self, msgs: list[Message]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Add new messages and sort them if possible.
|
|
|
|
|
"""
|
|
|
|
|
self.messages += msgs
|
|
|
|
|
self.sort()
|
|
|
|
|
|
|
|
|
|
def print(self, dump: bool = False) -> None:
|
|
|
|
|
if dump:
|
|
|
|
|
pp(self)
|
|
|
|
@@ -58,7 +81,7 @@ class Chat:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ChatDir(Chat):
|
|
|
|
|
class ChatDB(Chat):
|
|
|
|
|
"""
|
|
|
|
|
A 'Chat' class that is bound to a given directory structure. Supports reading
|
|
|
|
|
and writing messages from / to that structure. Such a structure consists of
|
|
|
|
@@ -74,17 +97,19 @@ class ChatDir(Chat):
|
|
|
|
|
# a MessageFilter that all messages must match (if given)
|
|
|
|
|
mfilter: Optional[MessageFilter] = None
|
|
|
|
|
file_suffix: str = default_file_suffix
|
|
|
|
|
# the glob pattern for all messages
|
|
|
|
|
glob: Optional[str] = None
|
|
|
|
|
# set containing all file names of the current messages
|
|
|
|
|
message_files: set[str] = field(default_factory=set)
|
|
|
|
|
message_files: set[str] = field(default_factory=set, repr=False)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_dir(cls: Type[ChatDirInst],
|
|
|
|
|
def from_dir(cls: Type[ChatDBInst],
|
|
|
|
|
cache_path: pathlib.Path,
|
|
|
|
|
db_path: pathlib.Path,
|
|
|
|
|
glob: Optional[str] = None,
|
|
|
|
|
mfilter: Optional[MessageFilter] = None) -> ChatDirInst:
|
|
|
|
|
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
|
|
|
|
|
"""
|
|
|
|
|
Create a 'ChatDir' instance from the given directory structure.
|
|
|
|
|
Create a 'ChatDB' instance from the given directory structure.
|
|
|
|
|
Reads all messages from 'db_path' into the local message list.
|
|
|
|
|
Parameters:
|
|
|
|
|
* 'cache_path': path to the directory for temporary messages
|
|
|
|
@@ -106,19 +131,20 @@ class ChatDir(Chat):
|
|
|
|
|
message_files.add(file_path.name)
|
|
|
|
|
except MessageError as e:
|
|
|
|
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
|
|
|
|
return cls(messages, cache_path, db_path, mfilter, cls.default_file_suffix, message_files)
|
|
|
|
|
return cls(messages, cache_path, db_path, mfilter,
|
|
|
|
|
cls.default_file_suffix, glob, message_files)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_messages(cls: Type[ChatDirInst],
|
|
|
|
|
def from_messages(cls: Type[ChatDBInst],
|
|
|
|
|
cache_path: pathlib.Path,
|
|
|
|
|
db_path: pathlib.Path,
|
|
|
|
|
messages: list[Message],
|
|
|
|
|
mfilter: Optional[MessageFilter]) -> ChatDirInst:
|
|
|
|
|
mfilter: Optional[MessageFilter]) -> ChatDBInst:
|
|
|
|
|
"""
|
|
|
|
|
Create a ChatDir instance from the given message list.
|
|
|
|
|
Note that the next call to 'dump()' will write all files
|
|
|
|
|
in order to synchronize the messages. 'update()' is not
|
|
|
|
|
supported until after the first 'dump()'.
|
|
|
|
|
Create a ChatDB instance from the given message list. Note that the next
|
|
|
|
|
call to 'dump()' will write all files in order to synchronize the messages.
|
|
|
|
|
Similarly, 'update()' will read all messages, so you may end up with a lot
|
|
|
|
|
of duplicates when using 'update()' first.
|
|
|
|
|
"""
|
|
|
|
|
return cls(messages, cache_path, db_path, mfilter)
|
|
|
|
|
|
|
|
|
@@ -137,9 +163,10 @@ class ChatDir(Chat):
|
|
|
|
|
|
|
|
|
|
def dump(self, to_db: bool = False, force_all: bool = False) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Writes all messages to the 'cache_path' or 'db_path'. If a message has no file_path,
|
|
|
|
|
it will create a new one. By default, only messages that have not been written
|
|
|
|
|
(or read) before will be dumped. Use 'force_all' to force writing all message files.
|
|
|
|
|
Write all messages to 'cache_path' (or 'db_path' if 'to_db' is True). If a message
|
|
|
|
|
has no file_path, a new one will be created. By default, only messages that have
|
|
|
|
|
not been written (or read) before will be dumped. Use 'force_all' to force writing
|
|
|
|
|
all message files.
|
|
|
|
|
"""
|
|
|
|
|
for message in self.messages:
|
|
|
|
|
# skip messages that we have already written (or read)
|
|
|
|
@@ -152,3 +179,26 @@ class ChatDir(Chat):
|
|
|
|
|
file_path = self.db_path / fname if to_db else self.cache_path / fname
|
|
|
|
|
self.set_next_fid(fid)
|
|
|
|
|
message.to_file(file_path)
|
|
|
|
|
|
|
|
|
|
def update(self, from_cache: bool = False, force_all: bool = False) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Read new messages from 'db_path' (or 'cache_path' if 'from_cache' is true).
|
|
|
|
|
By default, only messages that have not been read (or written) before will
|
|
|
|
|
be read. Use 'force_all' to force reading all messages.
|
|
|
|
|
"""
|
|
|
|
|
if from_cache:
|
|
|
|
|
file_iter = self.cache_path.glob(self.glob) if self.glob else self.cache_path.iterdir()
|
|
|
|
|
else:
|
|
|
|
|
file_iter = self.cache_path.glob(self.glob) if self.glob else self.cache_path.iterdir()
|
|
|
|
|
for file_path in sorted(file_iter):
|
|
|
|
|
if file_path.is_file():
|
|
|
|
|
if file_path.name in self.message_files and not force_all:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
message = Message.from_file(file_path, self.mfilter)
|
|
|
|
|
if message:
|
|
|
|
|
self.messages.append(message)
|
|
|
|
|
self.message_files.add(file_path.name)
|
|
|
|
|
except MessageError as e:
|
|
|
|
|
print(f"Error processing message in '{file_path}': {str(e)}")
|
|
|
|
|
self.sort()
|
|
|
|
|