5 Commits

21 changed files with 357 additions and 1525 deletions
+64 -104
View File
@@ -37,96 +37,63 @@ cmm [global options] command [command options]
### Global Options
- `-C`, `--config`: Config file name (defaults to `.config.yaml`).
- `-c`, `--config`: Config file name (defaults to `.config.yaml`).
### Commands
- `ask`: Ask a question.
- `hist`: Print chat history.
- `tag`: Manage tags.
- `config`: Manage configuration.
- `print`: Print files.
### Command Options
#### Question
#### `ask` Command Options
The `question` command is used to ask, create, and process questions.
- `-q`, `--question`: Question to ask (required).
- `-m`, `--max-tokens`: Max tokens to use.
- `-T`, `--temperature`: Temperature to use.
- `-M`, `--model`: Model to use.
- `-n`, `--number`: Number of answers to produce (default is 3).
- `-s`, `--source`: Add content of a file to the query.
- `-S`, `--only-source-code`: Add pure source code to the chat history.
- `-t`, `--tags`: List of tag names.
- `-e`, `--extags`: List of tag names to exclude.
- `-o`, `--output-tags`: List of output tag names (default is the input tags).
- `-a`, `--match-all-tags`: All given tags must match when selecting chat history entries.
```bash
cmm question [-t OTAGS]... [-k ATAGS]... [-x XTAGS]... [-o OUTTAGS]... [-A AI_ID] [-M MODEL] [-n NUM] [-m MAX] [-T TEMP] (-a QUESTION | -c QUESTION | -r [MESSAGE ...] | -p [MESSAGE ...]) [-O] [-s FILE]... [-S FILE]...
```
#### `hist` Command Options
* `-t, --or-tags OTAGS`: List of tags (one must match)
* `-k, --and-tags ATAGS`: List of tags (all must match)
* `-x, --exclude-tags XTAGS`: List of tags to exclude
* `-o, --output-tags OUTTAGS`: List of output tags (default: use input tags)
* `-A, --AI AI_ID`: AI ID to use
* `-M, --model MODEL`: Model to use
* `-n, --num-answers NUM`: Number of answers to request
* `-m, --max-tokens MAX`: Max. number of tokens
* `-T, --temperature TEMP`: Temperature value
* `-a, --ask QUESTION`: Ask a question
* `-c, --create QUESTION`: Create a question
* `-r, --repeat [MESSAGE ...]`: Repeat a question
* `-p, --process [MESSAGE ...]`: Process existing questions
* `-O, --overwrite`: Overwrite existing messages when repeating them
* `-s, --source-text FILE`: Add content of a file to the query
* `-S, --source-code FILE`: Add source code file content to the chat history
- `-d`, `--dump`: Print chat history as Python structure.
- `-w`, `--with-tags`: Print chat history with tags.
- `-W`, `--with-files`: Print chat history with filenames.
- `-S`, `--only-source-code`: Print only source code.
- `-t`, `--tags`: List of tag names.
- `-e`, `--extags`: List of tag names to exclude.
- `-a`, `--match-all-tags`: All given tags must match when selecting chat history entries.
#### Hist
#### `tag` Command Options
The `hist` command is used to print the chat history.
- `-l`, `--list`: List all tags and their frequency.
```bash
cmm hist [-t OTAGS]... [-k ATAGS]... [-x XTAGS]... [-w] [-W] [-S] [-A SUBSTRING] [-Q SUBSTRING]
```
#### `config` Command Options
* `-t, --or-tags OTAGS`: List of tags (one must match)
* `-k, --and-tags ATAGS`: List of tags (all must match)
* `-x, --exclude-tags XTAGS`: List of tags to exclude
* `-w, --with-tags`: Print chat history with tags
* `-W, --with-files`: Print chat history with filenames
* `-S, --source-code-only`: Only print embedded source code
* `-A, --answer SUBSTRING`: Search for answer substring
* `-Q, --question SUBSTRING`: Search for question substring
- `-l`, `--list-models`: List all available models.
- `-m`, `--print-model`: Print the currently configured model.
- `-M`, `--model`: Set model in the config file.
#### Tags
#### `print` Command Options
The `tags` command is used to manage tags.
```bash
cmm tags (-l | -p PREFIX | -c SUBSTRING)
```
* `-l, --list`: List all tags and their frequency
* `-p, --prefix PREFIX`: Filter tags by prefix
* `-c, --contain SUBSTRING`: Filter tags by contained substring
#### Config
The `config` command is used to manage the configuration.
```bash
cmm config (-l | -m | -c FILE)
```
* `-l, --list-models`: List all available models
* `-m, --print-model`: Print the currently configured model
* `-c, --create FILE`: Create config with default settings in the given file
#### Print
The `print` command is used to print message files.
```bash
cmm print (-f FILE | -l) [-q | -a | -S]
```
* `-f, --file FILE`: Print given file
* `-l, --latest`: Print latest message
* `-q, --question`: Only print the question
* `-a, --answer`: Only print the answer
* `-S, --only-source-code`: Only print embedded source code
- `-f`, `--file`: File to print (required).
- `-S`, `--only-source-code`: Print only source code.
### Examples
1. Ask a question:
```bash
cmm question -a "What is the meaning of life?" -t philosophy -x religion
cmm ask -q "What is the meaning of life?" -t philosophy -e religion
```
2. Display the chat history:
@@ -138,19 +105,19 @@ cmm hist
3. Filter chat history by tags:
```bash
cmm hist --or-tags tag1 tag2
cmm hist -t tag1 tag2
```
4. Exclude chat history by tags:
```bash
cmm hist --exclude-tags tag3 tag4
cmm hist -e tag3 tag4
```
5. List all tags and their frequency:
```bash
cmm tags -l
cmm tag -l
```
6. Print the contents of a file:
@@ -161,27 +128,18 @@ cmm print -f example.yaml
## Configuration
The default configuration filename is `.config.yaml` (it is searched in the current working directory).
Use the command `cmm config --create <FILENAME>` to create a default configuration:
The configuration file (`.config.yaml`) should contain the following fields:
```
cache: .
db: ./db/
ais:
myopenai:
name: openai
model: gpt-3.5-turbo-16k
api_key: 0123456789
temperature: 1.0
max_tokens: 4000
top_p: 1.0
frequency_penalty: 0.0
presence_penalty: 0.0
system: You are an assistant
```
Each AI has its own section and the name of that section is called the 'AI ID' (in the example above it is `myopenai`).
The AI ID can be any string, as long as it's unique within the `ais` section. The AI ID is used for all commands that support the `AI` parameter and it's also stored within each message file.
- `openai`:
- `api_key`: Your OpenAI API key.
- `model`: The name of the OpenAI model to use (e.g. "text-davinci-002").
- `temperature`: The temperature value for the model.
- `max_tokens`: The maximum number of tokens for the model.
- `top_p`: The top P value for the model.
- `frequency_penalty`: The frequency penalty value.
- `presence_penalty`: The presence penalty value.
- `system`: The system message used to set the behavior of the AI.
- `db`: The directory where the question-answer pairs are stored in YAML files.
## Autocompletion
@@ -196,33 +154,33 @@ After adding this line, restart your shell or run `source <your-shell-config-fil
## Contributing
### Enable commit hooks
```bash
```
pip install pre-commit
pre-commit install
```
### Execute tests before opening a PR
```bash
```
pytest
```
### Consider using `pyenv` / `pyenv-virtualenv`
Short installation instructions:
* install `pyenv`:
```bash
```
cd ~
git clone https://github.com/pyenv/pyenv .pyenv
cd ~/.pyenv && src/configure && make -C src
```
* make sure that `~/.pyenv/shims` and `~/.pyenv/bin` are the first entries in your `PATH`, e.g., by setting it in `~/.bashrc`
* make sure that `~/.pyenv/shims` and `~/.pyenv/bin` are the first entries in your `PATH`, e. g. by setting it in `~/.bashrc`
* add the following to your `~/.bashrc` (after setting `PATH`): `eval "$(pyenv init -)"`
* create a new terminal or source the changes (e.g., `source ~/.bashrc`)
* create a new terminal or source the changes (e. g. `source ~/.bashrc`)
* install `virtualenv`
```bash
```
git clone https://github.com/pyenv/pyenv-virtualenv.git $(pyenv root)/plugins/pyenv-virtualenv
```
* add the following to your `~/.bashrc` (after the commands above): `eval "$(pyenv virtualenv-init -)`
* create a new terminal or source the changes (e.g., `source ~/.bashrc`)
* go back to the `ChatMasterMind` repo and create a virtual environment with the latest `Python`, e.g., `3.11.4`:
```bash
* create a new terminal or source the changes (e. g. `source ~/.bashrc`)
* go back to the `ChatMasterMind` repo and create a virtual environment with the latest `Python`, e. g. `3.11.4`:
```
cd <CMM_REPO_PATH>
pyenv install 3.11.4
pyenv virtualenv 3.11.4 py311
@@ -233,3 +191,5 @@ pyenv activate py311
## License
This project is licensed under the terms of the WTFPL License.
-6
View File
@@ -59,12 +59,6 @@ class AI(Protocol):
"""
raise NotImplementedError
def print_models(self) -> None:
"""
Print all models supported by this AI.
"""
raise NotImplementedError
def tokens(self, data: Union[Message, Chat]) -> int:
"""
Computes the nr. of AI language tokens for the given message
+10 -16
View File
@@ -4,39 +4,33 @@ Creates different AI instances, based on the given configuration.
import argparse
from typing import cast
from .configuration import Config, AIConfig, OpenAIConfig
from .configuration import Config, OpenAIConfig, default_ai_ID
from .ai import AI, AIError
from .ais.openai import OpenAI
def create_ai(args: argparse.Namespace, config: Config) -> AI: # noqa: 11
def create_ai(args: argparse.Namespace, config: Config) -> AI:
"""
Creates an AI subclass instance from the given arguments
and configuration file. If AI has not been set in the
arguments, it searches for the ID 'default'. If that
is not found, it uses the first AI in the list.
and configuration file.
"""
ai_conf: AIConfig
if hasattr(args, 'AI') and args.AI:
if args.AI:
try:
ai_conf = config.ais[args.AI]
except KeyError:
raise AIError(f"AI ID '{args.AI}' does not exist in this configuration")
elif 'default' in config.ais:
ai_conf = config.ais['default']
elif default_ai_ID in config.ais:
ai_conf = config.ais[default_ai_ID]
else:
try:
ai_conf = next(iter(config.ais.values()))
except StopIteration:
raise AIError("No AI found in this configuration")
raise AIError("No AI name given and no default exists")
if ai_conf.name == 'openai':
ai = OpenAI(cast(OpenAIConfig, ai_conf))
if hasattr(args, 'model') and args.model:
if args.model:
ai.config.model = args.model
if hasattr(args, 'max_tokens') and args.max_tokens:
if args.max_tokens:
ai.config.max_tokens = args.max_tokens
if hasattr(args, 'temperature') and args.temperature:
if args.temperature:
ai.config.temperature = args.temperature
return ai
else:
+3 -8
View File
@@ -45,14 +45,14 @@ class OpenAI(AI):
presence_penalty=self.config.presence_penalty)
question.answer = Answer(response['choices'][0]['message']['content'])
question.tags = otags
question.ai = self.ID
question.ai = self.name
question.model = self.config.model
answers: list[Message] = [question]
for choice in response['choices'][1:]: # type: ignore
answers.append(Message(question=question.question,
answer=Answer(choice['message']['content']),
tags=otags,
ai=self.ID,
ai=self.name,
model=self.config.model))
return AIResponse(answers, Tokens(response['usage']['prompt_tokens'],
response['usage']['completion_tokens'],
@@ -62,12 +62,7 @@ class OpenAI(AI):
"""
Return all models supported by this AI.
"""
ret = []
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
if engine['ready']:
ret.append(engine['id'])
ret.sort()
return ret
raise NotImplementedError
def print_models(self) -> None:
"""
+97 -334
View File
@@ -6,18 +6,13 @@ from pathlib import Path
from pprint import PrettyPrinter
from pydoc import pager
from dataclasses import dataclass
from typing import TypeVar, Type, Optional, ClassVar, Any, Callable, Literal, Union
from .configuration import default_config_file
from typing import TypeVar, Type, Optional, ClassVar, Any, Callable
from .message import Message, MessageFilter, MessageError, message_in
from .tags import Tag
ChatInst = TypeVar('ChatInst', bound='Chat')
ChatDBInst = TypeVar('ChatDBInst', bound='ChatDB')
db_next_file = '.next'
ignored_files = [db_next_file, default_config_file]
msg_location = Literal['mem', 'disk', 'cache', 'db', 'all']
class ChatError(Exception):
pass
@@ -50,15 +45,13 @@ def read_dir(dir_path: Path,
messages: list[Message] = []
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
for file_path in sorted(file_iter):
if (file_path.is_file()
and file_path.name not in ignored_files # noqa: W503
and file_path.suffix in Message.file_suffixes): # noqa: W503
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
try:
message = Message.from_file(file_path, mfilter)
if message:
messages.append(message)
except MessageError as e:
print(f"WARNING: Skipping message in '{file_path}': {str(e)}")
print(f"Error processing message in '{file_path}': {str(e)}")
return messages
@@ -69,10 +62,7 @@ def make_file_path(dir_path: Path,
Create a file_path for the given directory using the
given file_suffix and ID generator function.
"""
file_path = dir_path / f"{next_fid():04d}{file_suffix}"
while file_path.exists():
file_path = dir_path / f"{next_fid():04d}{file_suffix}"
return file_path
return dir_path / f"{next_fid():04d}{file_suffix}"
def write_dir(dir_path: Path,
@@ -107,9 +97,7 @@ def clear_dir(dir_path: Path,
"""
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
for file_path in file_iter:
if (file_path.is_file()
and file_path.name not in ignored_files # noqa: W503
and file_path.suffix in Message.file_suffixes): # noqa: W503
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
file_path.unlink(missing_ok=True)
@@ -121,43 +109,14 @@ class Chat:
messages: list[Message]
def __post_init__(self) -> None:
self.validate()
def validate(self) -> None:
"""
Validate this Chat instance.
"""
def msg_paths(stem: str) -> list[str]:
return [str(fp) for fp in file_paths if fp.stem == stem]
file_paths: set[Path] = {m.file_path for m in self.messages if m.file_path is not None}
file_stems = [m.file_path.stem for m in self.messages if m.file_path is not None]
error = False
for fp in file_paths:
if file_stems.count(fp.stem) > 1:
print(f"ERROR: Found multiple copies of message '{fp.stem}': {msg_paths(fp.stem)}")
error = True
if error:
raise ChatError("Validation failed")
def msg_name_matches(self, file_path: Path, name: str) -> bool:
"""
Return True if the given name matches the given file_path.
Matching is True if:
* 'name' matches the full 'file_path'
* 'name' matches 'file_path.name' (i. e. including the suffix)
* 'name' matches 'file_path.stem' (i. e. without a suffix)
"""
return Path(name) == file_path or name == file_path.name or name == file_path.stem
def msg_filter(self, mfilter: MessageFilter) -> None:
def filter(self, mfilter: MessageFilter) -> None:
"""
Use 'Message.match(mfilter) to remove all messages that
don't fulfill the filter requirements.
"""
self.messages = [m for m in self.messages if m.match(mfilter)]
def msg_sort(self, reverse: bool = False) -> None:
def sort(self, reverse: bool = False) -> None:
"""
Sort the messages according to 'Message.msg_id()'.
"""
@@ -167,71 +126,48 @@ class Chat:
except MessageError:
pass
def msg_unique_id(self) -> None:
"""
Remove duplicates from the internal messages, based on the msg_id (i. e. file_path).
Messages without a file_path are kept.
"""
old_msgs = self.messages.copy()
self.messages = []
for m in old_msgs:
if not message_in(m, self.messages):
self.messages.append(m)
self.msg_sort()
def msg_unique_content(self) -> None:
"""
Remove duplicates from the internal messages, based on the content (i. e. question + answer).
"""
self.messages = list(set(self.messages))
self.msg_sort()
def msg_clear(self) -> None:
def clear(self) -> None:
"""
Delete all messages.
"""
self.messages = []
def msg_add(self, messages: list[Message]) -> None:
def add_messages(self, messages: list[Message]) -> None:
"""
Add new messages and sort them if possible.
"""
self.messages += messages
self.msg_sort()
self.sort()
def msg_latest(self, mfilter: Optional[MessageFilter] = None) -> Optional[Message]:
def latest_message(self) -> Optional[Message]:
"""
Return the last added message (according to the file ID) that matches the given filter.
When containing messages without a valid file_path, it returns the latest message in
the internal list.
Returns the last added message (according to the file ID).
"""
if len(self.messages) > 0:
self.msg_sort()
for m in reversed(self.messages):
if mfilter is None or m.match(mfilter):
return m
return None
self.sort()
return self.messages[-1]
else:
return None
def msg_find(self, msg_names: list[str]) -> list[Message]:
def find_messages(self, msg_names: list[str]) -> list[Message]:
"""
Search and return the messages with the given names. Names can either be filenames
(with or without suffix), full paths or Message.msg_id(). Messages that can't be
found are ignored (i. e. the caller should check the result if they require all
messages).
(incl. suffixes) or full paths. Messages that can't be found are ignored (i. e. the
caller should check the result if he requires all messages).
"""
return [m for m in self.messages
if any((m.file_path and self.msg_name_matches(m.file_path, mn)) for mn in msg_names)]
if any((m.file_path and (m.file_path == Path(mn) or m.file_path.name == mn)) for mn in msg_names)]
def msg_remove(self, msg_names: list[str]) -> None:
def remove_messages(self, msg_names: list[str]) -> None:
"""
Remove the messages with the given names. Names can either be filenames
(with or without suffix), full paths or Message.msg_id().
(incl. the suffix) or full paths.
"""
self.messages = [m for m in self.messages
if not any((m.file_path and self.msg_name_matches(m.file_path, mn)) for mn in msg_names)]
self.msg_sort()
if not any((m.file_path and (m.file_path == Path(mn) or m.file_path.name == mn)) for mn in msg_names)]
self.sort()
def msg_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Get the tags of all messages, optionally filtered by prefix or substring.
"""
@@ -240,7 +176,7 @@ class Chat:
tags |= m.filter_tags(prefix, contain)
return set(sorted(tags))
def msg_tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
def tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
"""
Get the frequency of all tags of all messages, optionally filtered by prefix or substring.
"""
@@ -265,6 +201,7 @@ class Chat:
output.append(message.to_str(source_code_only=True))
continue
output.append(message.to_str(with_tags, with_files))
# output.append('\n' + ('-' * terminal_width()) + '\n')
if paged:
print_paged('\n'.join(output))
else:
@@ -293,11 +230,10 @@ class ChatDB(Chat):
def __post_init__(self) -> None:
# contains the latest message ID
self.next_path = self.db_path / db_next_file
self.next_fname = self.db_path / '.next'
# make all paths absolute
self.cache_path = self.cache_path.absolute()
self.db_path = self.db_path.absolute()
self.validate()
@classmethod
def from_dir(cls: Type[ChatDBInst],
@@ -333,7 +269,7 @@ class ChatDB(Chat):
def get_next_fid(self) -> int:
try:
with open(self.next_path, 'r') as f:
with open(self.next_fname, 'r') as f:
next_fid = int(f.read()) + 1
self.set_next_fid(next_fid)
return next_fid
@@ -342,255 +278,69 @@ class ChatDB(Chat):
return 1
def set_next_fid(self, fid: int) -> None:
with open(self.next_path, 'w') as f:
with open(self.next_fname, 'w') as f:
f.write(f'{fid}')
def msg_write(self, messages: Optional[list[Message]] = None) -> None:
def read_db(self) -> None:
"""
Write either the given messages or the internal ones to their CURRENT file_path.
If messages are given, they all must have a valid file_path. When writing the
internal messages, the ones with a valid file_path are written, the others
are ignored.
"""
if messages and any(m.file_path is None for m in messages):
raise ChatError("Can't write files without a valid file_path")
msgs = iter(messages if messages else self.messages)
while (m := next(msgs, None)):
m.to_file()
def msg_update(self, messages: list[Message], write: bool = True) -> None:
"""
Update EXISTING messages. A message is determined as 'existing' if a message with
Reads new messages from the DB directory. New ones are added to the internal list,
existing ones are replaced. A message is determined as 'existing' if a message with
the same base filename (i. e. 'file_path.name') is already in the list.
Only accepts existing messages.
"""
if any(not message_in(m, self.messages) for m in messages):
raise ChatError("Can't update messages that are not in the internal list")
# remove old versions and add new ones
self.messages = [m for m in self.messages if not message_in(m, messages)]
self.messages += messages
self.msg_sort()
# write the UPDATED messages if requested
if write:
self.msg_write(messages)
def msg_gather(self,
loc: msg_location,
require_file_path: bool = False,
mfilter: Optional[MessageFilter] = None) -> list[Message]:
"""
Gather and return messages from the given locations:
* 'mem' : messages currently in memory
* 'disk' : messages on disk (cache + DB directory), but not in memory
* 'cache': messages in the cache directory
* 'db' : messages in the DB directory
* 'all' : all messages ('mem' + 'disk')
If 'require_file_path' is True, return only files with a valid file_path.
"""
loc_messages: list[Message] = []
if loc in ['mem', 'all']:
if require_file_path:
loc_messages += [m for m in self.messages if (m.file_path is not None and (mfilter is None or m.match(mfilter)))]
else:
loc_messages += [m for m in self.messages if (mfilter is None or m.match(mfilter))]
if loc in ['cache', 'disk', 'all']:
loc_messages += read_dir(self.cache_path, mfilter=mfilter)
if loc in ['db', 'disk', 'all']:
loc_messages += read_dir(self.db_path, mfilter=mfilter)
# remove_duplicates and sort the list
unique_messages: list[Message] = []
for m in loc_messages:
if not message_in(m, unique_messages):
unique_messages.append(m)
try:
unique_messages.sort(key=lambda m: m.msg_id())
# messages in 'mem' can have an empty file_path
except MessageError:
pass
return unique_messages
def msg_find(self,
msg_names: list[str],
loc: msg_location = 'mem',
) -> list[Message]:
"""
Search and return the messages with the given names. Names can either be filenames
(with or without suffix), full paths or Message.msg_id(). Messages that can't be
found are ignored (i. e. the caller should check the result if they require all
messages).
Searches one of the following locations:
* 'mem' : messages currently in memory
* 'disk' : messages on disk (cache + DB directory), but not in memory
* 'cache': messages in the cache directory
* 'db' : messages in the DB directory
* 'all' : all messages ('mem' + 'disk')
"""
loc_messages = self.msg_gather(loc, require_file_path=True)
return [m for m in loc_messages
if any((m.file_path and self.msg_name_matches(m.file_path, mn)) for mn in msg_names)]
def msg_remove(self, msg_names: list[str], loc: msg_location = 'mem') -> None:
"""
Remove the messages with the given names. Names can either be filenames
(with or without suffix), full paths or Message.msg_id(). Also deletes the
files of all given messages with a valid file_path.
Delete files from one of the following locations:
* 'mem' : messages currently in memory
* 'disk' : messages on disk (cache + DB directory), but not in memory
* 'cache': messages in the cache directory
* 'db' : messages in the DB directory
* 'all' : all messages ('mem' + 'disk')
"""
if loc != 'mem':
# delete the message files first
rm_messages = self.msg_find(msg_names, loc=loc)
for m in rm_messages:
if (m.file_path):
m.file_path.unlink()
# then remove them from the internal list
super().msg_remove(msg_names)
def msg_latest(self,
mfilter: Optional[MessageFilter] = None,
loc: msg_location = 'mem') -> Optional[Message]:
"""
Return the last added message (according to the file ID) that matches the given filter.
Only consider messages with a valid file_path (except if loc is 'mem').
Searches one of the following locations:
* 'mem' : messages currently in memory
* 'disk' : messages on disk (cache + DB directory), but not in memory
* 'cache': messages in the cache directory
* 'db' : messages in the DB directory
* 'all' : all messages ('mem' + 'disk')
"""
# only consider messages with a valid file_path so they can be sorted
loc_messages = self.msg_gather(loc, require_file_path=True)
loc_messages.sort(key=lambda m: m.msg_id(), reverse=True)
for m in loc_messages:
if mfilter is None or m.match(mfilter):
return m
return None
def msg_in_cache(self, message: Union[Message, str]) -> bool:
"""
Return true if the given Message (or filename or Message.msg_id())
is located in the cache directory. False otherwise.
"""
if isinstance(message, Message):
return (message.file_path is not None
and message.file_path.parent.samefile(self.cache_path) # noqa: W503
and message.file_path.exists()) # noqa: W503
else:
return len(self.msg_find([message], loc='cache')) > 0
def msg_in_db(self, message: Union[Message, str]) -> bool:
"""
Return true if the given Message (or filename or Message.msg_id())
is located in the DB directory. False otherwise.
"""
if isinstance(message, Message):
return (message.file_path is not None
and message.file_path.parent.samefile(self.db_path) # noqa: W503
and message.file_path.exists()) # noqa: W503
else:
return len(self.msg_find([message], loc='db')) > 0
def cache_read(self, glob: Optional[str] = None, mfilter: Optional[MessageFilter] = None) -> None:
"""
Read messages from the cache directory. New ones are added to the internal list,
existing ones are replaced. A message is determined as 'existing' if a message
with the same base filename (i. e. 'file_path.name') is already in the list.
"""
new_messages = read_dir(self.cache_path, glob, mfilter)
# remove all messages from self.messages that are in the new list
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
# copy the messages from the temporary list to self.messages and sort them
self.messages += new_messages
self.msg_sort()
def cache_write(self, messages: Optional[list[Message]] = None) -> None:
"""
Write messages to the cache directory. If a message has no file_path, a new one
will be created. If message.file_path exists, it will be modified to point to
the cache directory.
Does NOT add the messages to the internal list (use 'cache_add()' for that)!
"""
write_dir(self.cache_path,
messages if messages else self.messages,
self.file_suffix,
self.get_next_fid)
def cache_add(self, messages: list[Message], write: bool = True) -> None:
"""
Add NEW messages and set the file_path to the cache directory.
Only accepts messages without a file_path.
"""
if any(m.file_path is not None for m in messages):
raise ChatError("Can't add new messages with existing file_path")
if write:
write_dir(self.cache_path,
messages,
self.file_suffix,
self.get_next_fid)
else:
for m in messages:
m.file_path = make_file_path(self.cache_path, self.default_file_suffix, self.get_next_fid)
self.messages += messages
self.msg_sort()
def cache_clear(self, glob: Optional[str] = None) -> None:
"""
Delete all message files from the cache dir and remove them from the internal list.
"""
clear_dir(self.cache_path, glob)
# only keep messages from DB dir (or those that have not yet been written)
self.messages = [m for m in self.messages if not m.file_path or m.file_path.parent.samefile(self.db_path)]
def cache_move(self, message: Message) -> None:
"""
Moves the given messages to the cache directory.
"""
# remember the old path (if any)
old_path: Optional[Path] = None
if message.file_path:
old_path = message.file_path
# write message to the new destination
self.cache_write([message])
# remove the old one (if any)
if old_path:
self.msg_remove([str(old_path)], loc='db')
# (re)add it to the internal list
self.msg_add([message])
def db_read(self, glob: Optional[str] = None, mfilter: Optional[MessageFilter] = None) -> None:
"""
Read messages from the DB directory. New ones are added to the internal list,
existing ones are replaced. A message is determined as 'existing' if a message
with the same base filename (i. e. 'file_path.name') is already in the list.
"""
new_messages = read_dir(self.db_path, self.glob, self.mfilter)
# remove all messages from self.messages that are in the new list
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
# copy the messages from the temporary list to self.messages and sort them
self.messages += new_messages
self.msg_sort()
self.sort()
def db_write(self, messages: Optional[list[Message]] = None) -> None:
def read_cache(self) -> None:
"""
Reads new messages from the cache directory. New ones are added to the internal list,
existing ones are replaced. A message is determined as 'existing' if a message with
the same base filename (i. e. 'file_path.name') is already in the list.
"""
new_messages = read_dir(self.cache_path, self.glob, self.mfilter)
# remove all messages from self.messages that are in the new list
self.messages = [m for m in self.messages if not message_in(m, new_messages)]
# copy the messages from the temporary list to self.messages and sort them
self.messages += new_messages
self.sort()
def write_db(self, messages: Optional[list[Message]] = None) -> None:
"""
Write messages to the DB directory. If a message has no file_path, a new one
will be created. If message.file_path exists, it will be modified to point
to the DB directory.
Does NOT add the messages to the internal list (use 'db_add()' for that)!
"""
write_dir(self.db_path,
messages if messages else self.messages,
self.file_suffix,
self.get_next_fid)
def db_add(self, messages: list[Message], write: bool = True) -> None:
def write_cache(self, messages: Optional[list[Message]] = None) -> None:
"""
Add NEW messages and set the file_path to the DB directory.
Write messages to the cache directory. If a message has no file_path, a new one
will be created. If message.file_path exists, it will be modified to point to
the cache directory.
"""
write_dir(self.cache_path,
messages if messages else self.messages,
self.file_suffix,
self.get_next_fid)
def clear_cache(self) -> None:
"""
Deletes all Message files from the cache dir and removes those messages from
the internal list.
"""
clear_dir(self.cache_path, self.glob)
# only keep messages from DB dir (or those that have not yet been written)
self.messages = [m for m in self.messages if not m.file_path or m.file_path.parent.samefile(self.db_path)]
def add_to_db(self, messages: list[Message], write: bool = True) -> None:
"""
Add the given new messages and set the file_path to the DB directory.
Only accepts messages without a file_path.
"""
if any(m.file_path is not None for m in messages):
@@ -604,20 +354,33 @@ class ChatDB(Chat):
for m in messages:
m.file_path = make_file_path(self.db_path, self.default_file_suffix, self.get_next_fid)
self.messages += messages
self.msg_sort()
self.sort()
def db_move(self, message: Message) -> None:
def add_to_cache(self, messages: list[Message], write: bool = True) -> None:
"""
Moves the given messages to the db directory.
Add the given new messages and set the file_path to the cache directory.
Only accepts messages without a file_path.
"""
# remember the old path (if any)
old_path: Optional[Path] = None
if message.file_path:
old_path = message.file_path
# write message to the new destination
self.db_write([message])
# remove the old one (if any)
if old_path:
self.msg_remove([str(old_path)], loc='cache')
# (re)add it to the internal list
self.msg_add([message])
if write:
write_dir(self.cache_path,
messages,
self.file_suffix,
self.get_next_fid)
else:
for m in messages:
m.file_path = make_file_path(self.cache_path, self.default_file_suffix, self.get_next_fid)
self.messages += messages
self.sort()
def write_messages(self, messages: Optional[list[Message]] = None) -> None:
"""
Write either the given messages or the internal ones to their current file_path.
If messages are given, they all must have a valid file_path. When writing the
internal messages, the ones with a valid file_path are written, the others
are ignored.
"""
if messages and any(m.file_path is None for m in messages):
raise ChatError("Can't write files without a valid file_path")
msgs = iter(messages if messages else self.messages)
while (m := next(msgs, None)):
m.to_file()
-9
View File
@@ -1,8 +1,6 @@
import argparse
from pathlib import Path
from ..configuration import Config
from ..ai import AI
from ..ai_factory import create_ai
def config_cmd(args: argparse.Namespace) -> None:
@@ -11,10 +9,3 @@ def config_cmd(args: argparse.Namespace) -> None:
"""
if args.create:
Config.create_default(Path(args.create))
elif args.list_models or args.print_model:
config: Config = Config.from_file(args.config)
ai: AI = create_ai(args, config)
if args.list_models:
ai.print_models()
else:
print(ai.config.model)
+1 -1
View File
@@ -15,7 +15,7 @@ def hist_cmd(args: argparse.Namespace, config: Config) -> None:
tags_not=args.exclude_tags,
question_contains=args.question,
answer_contains=args.answer)
chat = ChatDB.from_dir(Path(config.cache),
chat = ChatDB.from_dir(Path('.'),
Path(config.db),
mfilter=mfilter)
chat.print(args.source_code_only,
+16 -34
View File
@@ -3,43 +3,25 @@ import argparse
from pathlib import Path
from ..configuration import Config
from ..message import Message, MessageError
from ..chat import ChatDB
def print_message(message: Message, args: argparse.Namespace) -> None:
"""
Print given message according to give arguments.
"""
if args.question:
print(message.question)
elif args.answer:
print(message.answer)
elif message.answer and args.only_source_code:
for code in message.answer.source_code():
print(code)
else:
print(message.to_str())
def print_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'print' command.
"""
# print given file
if args.file is not None:
fname = Path(args.file)
try:
message = Message.from_file(fname)
if message:
print_message(message, args)
except MessageError:
print(f"File is not a valid message: {args.file}")
sys.exit(1)
# print latest message
elif args.latest:
chat = ChatDB.from_dir(Path(config.cache), Path(config.db))
latest = chat.msg_latest(loc='disk')
if not latest:
print("No message found!")
sys.exit(1)
print_message(latest, args)
fname = Path(args.file)
try:
message = Message.from_file(fname)
if message:
if args.question:
print(message.question)
elif args.answer:
print(message.answer)
elif message.answer and args.only_source_code:
for code in message.answer.source_code():
print(code)
else:
print(message.to_str())
except MessageError:
print(f"File is not a valid message: {args.file}")
sys.exit(1)
+45 -104
View File
@@ -1,118 +1,61 @@
import sys
import argparse
from pathlib import Path
from itertools import zip_longest
from ..configuration import Config
from ..chat import ChatDB
from ..message import Message, MessageFilter, MessageError, Question, source_code
from ..message import Message, MessageFilter, Question, source_code
from ..ai_factory import create_ai
from ..ai import AI, AIResponse
def add_file_as_text(question_parts: list[str], file: str) -> None:
"""
Add the given file as plain text to the question part list.
If the file is a Message, add the answer.
"""
file_path = Path(file)
content: str
try:
message = Message.from_file(file_path)
if message and message.answer:
content = message.answer
except MessageError:
with open(file) as r:
content = r.read().strip()
if len(content) > 0:
question_parts.append(content)
def add_file_as_code(question_parts: list[str], file: str) -> None:
"""
Add all source code from the given file. If no code segments can be extracted,
the whole content is added as source code segment. If the file is a Message,
extract the source code from the answer.
"""
file_path = Path(file)
content: str
try:
message = Message.from_file(file_path)
if message and message.answer:
content = message.answer
except MessageError:
with open(file) as r:
content = r.read().strip()
# extract and add source code
code_parts = source_code(content, include_delims=True)
if len(code_parts) > 0:
question_parts += code_parts
else:
question_parts.append(f"```\n{content}\n```")
def create_message(chat: ChatDB, args: argparse.Namespace) -> Message:
"""
Create a new message from the given arguments and write it
to the cache directory.
Creates (and writes) a new message from the given arguments.
"""
question_parts = []
question_list = args.ask if args.ask is not None else []
text_files = args.source_text if args.source_text is not None else []
code_files = args.source_code if args.source_code is not None else []
for question, text_file, code_file in zip_longest(question_list, text_files, code_files, fillvalue=None):
for question, source, code in zip_longest(question_list, text_files, code_files, fillvalue=None):
if question is not None and len(question.strip()) > 0:
question_parts.append(question)
if text_file is not None and len(text_file) > 0:
add_file_as_text(question_parts, text_file)
if code_file is not None and len(code_file) > 0:
add_file_as_code(question_parts, code_file)
if source is not None and len(source) > 0:
with open(source) as r:
content = r.read().strip()
if len(content) > 0:
question_parts.append(content)
if code is not None and len(code) > 0:
with open(code) as r:
content = r.read().strip()
if len(content) == 0:
continue
# try to extract and add source code
code_parts = source_code(content, include_delims=True)
if len(code_parts) > 0:
question_parts += code_parts
# if there's none, add the whole file
else:
question_parts.append(f"```\n{content}\n```")
full_question = '\n\n'.join(question_parts)
message = Message(question=Question(full_question),
tags=args.output_tags,
tags=args.output_tags, # FIXME
ai=args.AI,
model=args.model)
# only write the new message to the cache,
# don't add it to the internal list
chat.cache_write([message])
chat.add_to_cache([message])
return message
def make_request(ai: AI, chat: ChatDB, message: Message, args: argparse.Namespace) -> None:
"""
Make an AI request with the given AI, chat history, message and arguments.
Write the response(s) to the cache directory, without appending it to the
given chat history. Then print the response(s).
"""
# print history and message question before making the request
ai.print()
chat.print(paged=False)
print(message.to_str())
response: AIResponse = ai.request(message,
chat,
args.num_answers,
args.output_tags)
# only write the response messages to the cache,
# don't add them to the internal list
chat.cache_write(response.messages)
for idx, msg in enumerate(response.messages):
print(f"=== ANSWER {idx+1} ===")
print(msg.answer)
if response.tokens:
print("===============")
print(response.tokens)
def question_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'question' command.
"""
mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(),
tags_and=args.and_tags if args.and_tags is not None else set(),
tags_not=args.exclude_tags if args.exclude_tags is not None else set())
chat = ChatDB.from_dir(cache_path=Path(config.cache),
mfilter = MessageFilter(tags_or=args.or_tags,
tags_and=args.and_tags,
tags_not=args.exclude_tags)
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db),
mfilter=mfilter)
# if it's a new question, create and store it immediately
@@ -123,29 +66,27 @@ def question_cmd(args: argparse.Namespace, config: Config) -> None:
# create the correct AI instance
ai: AI = create_ai(args, config)
# === ASK ===
if args.ask:
make_request(ai, chat, message, args)
# === REPEAT ===
ai.print()
chat.print(paged=False)
response: AIResponse = ai.request(message,
chat,
args.num_answers, # FIXME
args.output_tags) # FIXME
chat.add_to_cache(response.messages)
for idx, msg in enumerate(response.messages):
print(f"=== ANSWER {idx+1} ===")
print(msg.answer)
if response.tokens:
print("===============")
print(response.tokens)
elif args.repeat is not None:
lmessage = chat.msg_latest(loc='cache')
if lmessage is None:
print("No message found to repeat!")
sys.exit(1)
else:
print(f"Repeating message '{lmessage.msg_id()}':")
# overwrite the latest message if requested or empty
if lmessage.answer is None or args.overwrite is True:
lmessage.clear_answer()
make_request(ai, chat, lmessage, args)
# otherwise create a new one
else:
args.ask = [lmessage.question]
message = create_message(chat, args)
make_request(ai, chat, message, args)
# === PROCESS ===
lmessage = chat.latest_message()
assert lmessage
# TODO: repeat either the last question or the
# one(s) given in 'args.repeat' (overwrite
# existing ones if 'args.overwrite' is True)
pass
elif args.process is not None:
# TODO: process either all questions without an
# answer or the one(s) given in 'args.process'
+2 -2
View File
@@ -8,10 +8,10 @@ def tags_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'tags' command.
"""
chat = ChatDB.from_dir(cache_path=Path(config.cache),
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
if args.list:
tags_freq = chat.msg_tags_frequency(args.prefix, args.contain)
tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
# TODO: add renaming
+7 -23
View File
@@ -9,25 +9,14 @@ OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
supported_ais: list[str] = ['openai']
default_config_file = '.config.yaml'
default_ai_ID: str = 'default'
default_config_path = '.config.yaml'
class ConfigError(Exception):
pass
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
"""
Changes the YAML dump style to multiline syntax for multiline strings.
"""
if len(data.splitlines()) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
@dataclass
class AIConfig:
"""
@@ -39,7 +28,6 @@ class AIConfig:
name: ClassVar[str]
# a user-defined ID for an AI configuration entry
ID: str
model: str = 'n/a'
# the name must not be changed
def __setattr__(self, name: str, value: Any) -> None:
@@ -58,15 +46,15 @@ class OpenAIConfig(AIConfig):
# all members have default values, so we can easily create
# a default configuration
ID: str = 'myopenai'
ID: str = 'default'
api_key: str = '0123456789'
system: str = 'You are an assistant'
model: str = 'gpt-3.5-turbo-16k'
temperature: float = 1.0
max_tokens: int = 4000
top_p: float = 1.0
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
system: str = 'You are an assistant'
@classmethod
def from_dict(cls: Type[OpenAIConfigInst], source: dict[str, Any]) -> OpenAIConfigInst:
@@ -74,14 +62,14 @@ class OpenAIConfig(AIConfig):
Create OpenAIConfig from a dict.
"""
res = cls(
system=str(source['system']),
api_key=str(source['api_key']),
model=str(source['model']),
max_tokens=int(source['max_tokens']),
temperature=float(source['temperature']),
top_p=float(source['top_p']),
frequency_penalty=float(source['frequency_penalty']),
presence_penalty=float(source['presence_penalty']),
system=str(source['system'])
presence_penalty=float(source['presence_penalty'])
)
# overwrite default ID if provided
if 'ID' in source:
@@ -116,7 +104,6 @@ class Config:
"""
# all members have default values, so we can easily create
# a default configuration
cache: str = '.'
db: str = './db/'
ais: dict[str, AIConfig] = field(default_factory=create_default_ai_configs)
@@ -133,7 +120,6 @@ class Config:
ai_conf = ai_config_instance(conf['name'], conf)
ais[ID] = ai_conf
return cls(
cache=str(source['cache']) if 'cache' in source else '.',
db=str(source['db']),
ais=ais
)
@@ -162,8 +148,6 @@ class Config:
def as_dict(self) -> dict[str, Any]:
res = asdict(self)
# add the AI name manually (as first element)
# (not done by 'asdict' because it's a class variable)
for ID, conf in res['ais'].items():
res['ais'][ID] = {**{'name': self.ais[ID].name}, **conf}
conf.update({'name': self.ais[ID].name})
return res
+21 -25
View File
@@ -7,7 +7,7 @@ import argcomplete
import argparse
from pathlib import Path
from typing import Any
from .configuration import Config, default_config_file
from .configuration import Config, default_config_path
from .message import Message
from .commands.question import question_cmd
from .commands.tags import tags_cmd
@@ -24,7 +24,7 @@ def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="ChatMastermind is a Python application that automates conversation with AI")
parser.add_argument('-C', '--config', help='Config file name.', default=default_config_file)
parser.add_argument('-C', '--config', help='Config file name.', default=default_config_path)
# subcommand-parser
cmdparser = parser.add_subparsers(dest='command',
@@ -44,13 +44,13 @@ def create_parser() -> argparse.ArgumentParser:
help='List of tags to exclude', metavar='XTAGS')
etag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
help='List of output tags (default: use input tags)', metavar='OUTAGS')
help='List of output tags (default: use input tags)', metavar='OUTTAGS')
otag_arg.completer = tags_completer # type: ignore
# a parent parser for all commands that support AI configuration
ai_parser = argparse.ArgumentParser(add_help=False)
ai_parser.add_argument('-A', '--AI', help='AI ID to use', metavar='AI_ID')
ai_parser.add_argument('-M', '--model', help='Model to use', metavar='MODEL')
ai_parser.add_argument('-A', '--AI', help='AI ID to use')
ai_parser.add_argument('-M', '--model', help='Model to use')
ai_parser.add_argument('-n', '--num-answers', help='Number of answers to request', type=int, default=1)
ai_parser.add_argument('-m', '--max-tokens', help='Max. nr. of tokens', type=int)
ai_parser.add_argument('-T', '--temperature', help='Temperature value', type=float)
@@ -61,15 +61,14 @@ def create_parser() -> argparse.ArgumentParser:
aliases=['q'])
question_cmd_parser.set_defaults(func=question_cmd)
question_group = question_cmd_parser.add_mutually_exclusive_group(required=True)
question_group.add_argument('-a', '--ask', nargs='+', help='Ask a question', metavar='QUESTION')
question_group.add_argument('-c', '--create', nargs='+', help='Create a question', metavar='QUESTION')
question_group.add_argument('-r', '--repeat', nargs='*', help='Repeat a question', metavar='MESSAGE')
question_group.add_argument('-p', '--process', nargs='*', help='Process existing questions', metavar='MESSAGE')
question_group.add_argument('-a', '--ask', nargs='+', help='Ask a question')
question_group.add_argument('-c', '--create', nargs='+', help='Create a question')
question_group.add_argument('-r', '--repeat', nargs='*', help='Repeat a question')
question_group.add_argument('-p', '--process', nargs='*', help='Process existing questions')
question_cmd_parser.add_argument('-O', '--overwrite', help='Overwrite existing messages when repeating them',
action='store_true')
question_cmd_parser.add_argument('-s', '--source-text', nargs='+', help='Add content of a file to the query', metavar='FILE')
question_cmd_parser.add_argument('-S', '--source-code', nargs='+', help='Add source code file content to the chat history',
metavar='FILE')
question_cmd_parser.add_argument('-s', '--source-text', nargs='+', help='Add content of a file to the query')
question_cmd_parser.add_argument('-S', '--source-code', nargs='+', help='Add source code file content to the chat history')
# 'hist' command parser
hist_cmd_parser = cmdparser.add_parser('hist', parents=[tag_parser],
@@ -80,10 +79,10 @@ def create_parser() -> argparse.ArgumentParser:
action='store_true')
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
action='store_true')
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Only print embedded source code',
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring', metavar='SUBSTRING')
hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring', metavar='SUBSTRING')
hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring')
hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring')
# 'tags' command parser
tags_cmd_parser = cmdparser.add_parser('tags',
@@ -93,34 +92,31 @@ def create_parser() -> argparse.ArgumentParser:
tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True)
tags_group.add_argument('-l', '--list', help="List all tags and their frequency",
action='store_true')
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix", metavar='PREFIX')
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring", metavar='SUBSTRING')
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
# 'config' command parser
config_cmd_parser = cmdparser.add_parser('config',
help="Manage configuration",
aliases=['c'])
config_cmd_parser.set_defaults(func=config_cmd)
config_cmd_parser.add_argument('-A', '--AI', help='AI ID to use')
config_group = config_cmd_parser.add_mutually_exclusive_group(required=True)
config_group.add_argument('-l', '--list-models', help="List all available models",
action='store_true')
config_group.add_argument('-m', '--print-model', help="Print the currently configured model",
action='store_true')
config_group.add_argument('-c', '--create', help="Create config with default settings in the given file", metavar='FILE')
config_group.add_argument('-c', '--create', help="Create config with default settings in the given file")
# 'print' command parser
print_cmd_parser = cmdparser.add_parser('print',
help="Print message files.",
aliases=['p'])
print_cmd_parser.set_defaults(func=print_cmd)
print_group = print_cmd_parser.add_mutually_exclusive_group(required=True)
print_group.add_argument('-f', '--file', help='Print given message file', metavar='FILE')
print_group.add_argument('-l', '--latest', help='Print latest message', action='store_true')
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
print_cmd_modes = print_cmd_parser.add_mutually_exclusive_group()
print_cmd_modes.add_argument('-q', '--question', help='Only print the question', action='store_true')
print_cmd_modes.add_argument('-a', '--answer', help='Only print the answer', action='store_true')
print_cmd_modes.add_argument('-S', '--only-source-code', help='Only print embedded source code', action='store_true')
print_cmd_modes.add_argument('-q', '--question', help='Print only question', action='store_true')
print_cmd_modes.add_argument('-a', '--answer', help='Print only answer', action='store_true')
print_cmd_modes.add_argument('-S', '--only-source-code', help='Print only source code', action='store_true')
argcomplete.autocomplete(parser)
return parser
+19 -53
View File
@@ -3,8 +3,6 @@ Module implementing message related functions and classes.
"""
import pathlib
import yaml
import tempfile
import shutil
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
from dataclasses import dataclass, asdict, field
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
@@ -222,36 +220,12 @@ class Message():
ai_yaml_key: ClassVar[str] = 'ai'
model_yaml_key: ClassVar[str] = 'model'
def __post_init__(self) -> None:
# convert some types that are often set wrong
if self.tags is not None and not isinstance(self.tags, set):
self.tags = set(self.tags)
if self.file_path is not None and not isinstance(self.file_path, pathlib.Path):
self.file_path = pathlib.Path(self.file_path)
def __hash__(self) -> int:
"""
The hash value is computed based on immutable members.
"""
return hash((self.question, self.answer))
def equals(self, other: MessageInst, tags: bool = True, ai: bool = True,
model: bool = True, file_path: bool = True, verbose: bool = False) -> bool:
"""
Compare this message with another one, including the metadata.
Return True if everything is identical, False otherwise.
"""
equal: bool = ((not tags or (self.tags == other.tags))
and (not ai or (self.ai == other.ai)) # noqa: W503
and (not model or (self.model == other.model)) # noqa: W503
and (not file_path or (self.file_path == other.file_path)) # noqa: W503
and (self == other)) # noqa: W503
if not equal and verbose:
print("Messages not equal:")
print(self)
print(other)
return equal
@classmethod
def from_dict(cls: Type[MessageInst], data: dict[str, Any]) -> MessageInst:
"""
@@ -338,7 +312,7 @@ class Message():
mfilter.tags_not if mfilter else None)
else:
message = cls.__from_file_yaml(file_path)
if message and (mfilter is None or message.match(mfilter)):
if message and (not mfilter or (mfilter and message.match(mfilter))):
return message
else:
return None
@@ -394,7 +368,7 @@ class Message():
try:
question_idx = text.index(Question.txt_header) + 1
except ValueError:
raise MessageError(f"'{file_path}' does not contain a valid message")
raise MessageError(f"Question header '{Question.txt_header}' not found in '{file_path}'")
try:
answer_idx = text.index(Answer.txt_header)
question = Question.from_list(text[question_idx:answer_idx])
@@ -414,12 +388,9 @@ class Message():
* Message.model_yaml_key: str [Optional]
"""
with open(file_path, "r") as fd:
try:
data = yaml.load(fd, Loader=yaml.FullLoader)
data[cls.file_yaml_key] = file_path
return cls.from_dict(data)
except Exception:
raise MessageError(f"'{file_path}' does not contain a valid message")
data = yaml.load(fd, Loader=yaml.FullLoader)
data[cls.file_yaml_key] = file_path
return cls.from_dict(data)
def to_str(self, with_tags: bool = False, with_file: bool = False, source_code_only: bool = False) -> str:
"""
@@ -442,6 +413,9 @@ class Message():
output.append(self.answer)
return '\n'.join(output)
def __str__(self) -> str:
return self.to_str(True, True, False)
def to_file(self, file_path: Optional[pathlib.Path]=None) -> None: # noqa: 11
"""
Write a Message to the given file. Type is determined based on the suffix.
@@ -471,18 +445,16 @@ class Message():
* Answer.txt_header
* Answer
"""
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
temp_file_path = pathlib.Path(temp_fd.name)
with open(file_path, "w") as fd:
if self.tags:
temp_fd.write(f'{TagLine.from_set(self.tags)}\n')
fd.write(f'{TagLine.from_set(self.tags)}\n')
if self.ai:
temp_fd.write(f'{AILine.from_ai(self.ai)}\n')
fd.write(f'{AILine.from_ai(self.ai)}\n')
if self.model:
temp_fd.write(f'{ModelLine.from_model(self.model)}\n')
temp_fd.write(f'{Question.txt_header}\n{self.question}\n')
fd.write(f'{ModelLine.from_model(self.model)}\n')
fd.write(f'{Question.txt_header}\n{self.question}\n')
if self.answer:
temp_fd.write(f'{Answer.txt_header}\n{self.answer}\n')
shutil.move(temp_file_path, file_path)
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
"""
@@ -494,8 +466,7 @@ class Message():
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
"""
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
temp_file_path = pathlib.Path(temp_fd.name)
with open(file_path, "w") as fd:
data: YamlDict = {Question.yaml_key: str(self.question)}
if self.answer:
data[Answer.yaml_key] = str(self.answer)
@@ -505,8 +476,7 @@ class Message():
data[self.model_yaml_key] = self.model
if self.tags:
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, temp_fd, sort_keys=False)
shutil.move(temp_file_path, file_path)
yaml.dump(data, fd, sort_keys=False)
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
@@ -538,7 +508,7 @@ class Message():
Return True if all attributes match, else False.
"""
mytags = self.tags or set()
if (((mfilter.tags_or is not None or mfilter.tags_and is not None or mfilter.tags_not is not None)
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
@@ -561,17 +531,13 @@ class Message():
if self.tags:
self.tags = rename_tags(self.tags, tags_rename)
def clear_answer(self) -> None:
self.answer = None
def msg_id(self) -> str:
"""
Returns an ID that is unique throughout all messages in the same (DB) directory.
Currently this is the file name without suffix. The ID is also used for sorting
messages.
Currently this is the file name. The ID is also used for sorting messages.
"""
if self.file_path:
return self.file_path.stem
return self.file_path.name
else:
raise MessageError("Can't create file ID without a file path")
-56
View File
@@ -1,56 +0,0 @@
<?php
$secret_key = '123';
// check for POST request
if ($_SERVER['REQUEST_METHOD'] != 'POST') {
error_log('FAILED - not POST - '. $_SERVER['REQUEST_METHOD']);
exit();
}
// get content type
$content_type = isset($_SERVER['CONTENT_TYPE']) ? strtolower(trim($_SERVER['CONTENT_TYPE'])) : '';
if ($content_type != 'application/json') {
error_log('FAILED - not application/json - '. $content_type);
exit();
}
// get payload
$payload = trim(file_get_contents("php://input"));
if (empty($payload)) {
error_log('FAILED - no payload');
exit();
}
// get header signature
$header_signature = isset($_SERVER['HTTP_X_GITEA_SIGNATURE']) ? $_SERVER['HTTP_X_GITEA_SIGNATURE'] : '';
if (empty($header_signature)) {
error_log('FAILED - header signature missing');
exit();
}
// calculate payload signature
$payload_signature = hash_hmac('sha256', $payload, $secret_key, false);
// check payload signature against header signature
if ($header_signature !== $payload_signature) {
error_log('FAILED - payload signature');
exit();
}
// convert json to array
$decoded = json_decode($payload, true);
// check for json decode errors
if (json_last_error() !== JSON_ERROR_NONE) {
error_log('FAILED - json decode - '. json_last_error());
exit();
}
// success, do something
$output = shell_exec('/home/kaizen/repos/ChatMastermind/hooks/push_hook.sh');
echo "$output";
?>
-8
View File
@@ -1,8 +0,0 @@
#!/usr/bin/bash
. /home/kaizen/.bashrc
set -e
cd /home/kaizen/repos/ChatMastermind
git pull
pre-commit run -a
pytest
+2 -2
View File
@@ -10,7 +10,7 @@ from chatmastermind.ais.openai import OpenAI
class TestCreateAI(unittest.TestCase):
def setUp(self) -> None:
self.args = MagicMock(spec=argparse.Namespace)
self.args.AI = 'myopenai'
self.args.AI = 'default'
self.args.model = None
self.args.max_tokens = None
self.args.temperature = None
@@ -18,7 +18,7 @@ class TestCreateAI(unittest.TestCase):
def test_create_ai_from_args(self) -> None:
# Create an AI with the default configuration
config = Config()
self.args.AI = 'myopenai'
self.args.AI = 'default'
ai = create_ai(self.args, config)
self.assertIsInstance(ai, OpenAI)
-81
View File
@@ -1,81 +0,0 @@
import unittest
from unittest import mock
from chatmastermind.ais.openai import OpenAI
from chatmastermind.message import Message, Question, Answer
from chatmastermind.chat import Chat
from chatmastermind.ai import AIResponse, Tokens
from chatmastermind.configuration import OpenAIConfig
class OpenAITest(unittest.TestCase):
@mock.patch('openai.ChatCompletion.create')
def test_request(self, mock_create: mock.MagicMock) -> None:
# Create a test instance of OpenAI
config = OpenAIConfig()
openai = OpenAI(config)
# Set up the mock response from openai.ChatCompletion.create
mock_response = {
'choices': [
{
'message': {
'content': 'Answer 1'
}
},
{
'message': {
'content': 'Answer 2'
}
}
],
'usage': {
'prompt_tokens': 10,
'completion_tokens': 20,
'total_tokens': 30
}
}
mock_create.return_value = mock_response
# Create test data
question = Message(Question('Question'))
chat = Chat([
Message(Question('Question 1'), answer=Answer('Answer 1')),
Message(Question('Question 2'), answer=Answer('Answer 2')),
# add message without an answer -> expect to be skipped
Message(Question('Question 3'))
])
# Make the request
response = openai.request(question, chat, num_answers=2)
# Assert the AIResponse
self.assertIsInstance(response, AIResponse)
self.assertEqual(len(response.messages), 2)
self.assertEqual(response.messages[0].answer, 'Answer 1')
self.assertEqual(response.messages[1].answer, 'Answer 2')
self.assertIsNotNone(response.tokens)
self.assertIsInstance(response.tokens, Tokens)
assert response.tokens
self.assertEqual(response.tokens.prompt, 10)
self.assertEqual(response.tokens.completion, 20)
self.assertEqual(response.tokens.total, 30)
# Assert the mock call to openai.ChatCompletion.create
mock_create.assert_called_once_with(
model=f'{config.model}',
messages=[
{'role': 'system', 'content': f'{config.system}'},
{'role': 'user', 'content': 'Question 1'},
{'role': 'assistant', 'content': 'Answer 1'},
{'role': 'user', 'content': 'Question 2'},
{'role': 'assistant', 'content': 'Answer 2'},
{'role': 'user', 'content': 'Question'}
],
temperature=config.temperature,
max_tokens=config.max_tokens,
top_p=config.top_p,
n=2,
frequency_penalty=config.frequency_penalty,
presence_penalty=config.presence_penalty
)
+51 -276
View File
@@ -2,7 +2,6 @@ import unittest
import pathlib
import tempfile
import time
import yaml
from io import StringIO
from unittest.mock import patch
from chatmastermind.tags import TagLine
@@ -10,18 +9,7 @@ from chatmastermind.message import Message, Question, Answer, Tag, MessageFilter
from chatmastermind.chat import Chat, ChatDB, ChatError
class TestChatBase(unittest.TestCase):
def assert_messages_equal(self, msg1: list[Message], msg2: list[Message]) -> None:
"""
Compare messages using more than just Question and Answer.
"""
self.assertEqual(len(msg1), len(msg2))
for m1, m2 in zip(msg1, msg2):
# exclude the file_path, compare only Q, A and metadata
self.assertTrue(m1.equals(m2, file_path=False, verbose=True))
class TestChat(TestChatBase):
class TestChat(unittest.TestCase):
def setUp(self) -> None:
self.chat = Chat([])
self.message1 = Message(Question('Question 1'),
@@ -32,103 +20,73 @@ class TestChat(TestChatBase):
Answer('Answer 2'),
{Tag('btag2')},
file_path=pathlib.Path('0002.txt'))
self.maxDiff = None
def test_unique_id(self) -> None:
# test with two identical messages
self.chat.msg_add([self.message1, self.message1])
self.assert_messages_equal(self.chat.messages, [self.message1, self.message1])
self.chat.msg_unique_id()
self.assert_messages_equal(self.chat.messages, [self.message1])
# test with two different messages
self.chat.msg_add([self.message2])
self.chat.msg_unique_id()
self.assert_messages_equal(self.chat.messages, [self.message1, self.message2])
def test_unique_content(self) -> None:
# test with two identical messages
self.chat.msg_add([self.message1, self.message1])
self.assert_messages_equal(self.chat.messages, [self.message1, self.message1])
self.chat.msg_unique_content()
self.assert_messages_equal(self.chat.messages, [self.message1])
# test with two different messages
self.chat.msg_add([self.message2])
self.chat.msg_unique_content()
self.assert_messages_equal(self.chat.messages, [self.message1, self.message2])
def test_filter(self) -> None:
self.chat.msg_add([self.message1, self.message2])
self.chat.msg_filter(MessageFilter(answer_contains='Answer 1'))
self.chat.add_messages([self.message1, self.message2])
self.chat.filter(MessageFilter(answer_contains='Answer 1'))
self.assertEqual(len(self.chat.messages), 1)
self.assertEqual(self.chat.messages[0].question, 'Question 1')
def test_sort(self) -> None:
self.chat.msg_add([self.message2, self.message1])
self.chat.msg_sort()
self.chat.add_messages([self.message2, self.message1])
self.chat.sort()
self.assertEqual(self.chat.messages[0].question, 'Question 1')
self.assertEqual(self.chat.messages[1].question, 'Question 2')
self.chat.msg_sort(reverse=True)
self.chat.sort(reverse=True)
self.assertEqual(self.chat.messages[0].question, 'Question 2')
self.assertEqual(self.chat.messages[1].question, 'Question 1')
def test_clear(self) -> None:
self.chat.msg_add([self.message1])
self.chat.msg_clear()
self.chat.add_messages([self.message1])
self.chat.clear()
self.assertEqual(len(self.chat.messages), 0)
def test_add_messages(self) -> None:
self.chat.msg_add([self.message1, self.message2])
self.chat.add_messages([self.message1, self.message2])
self.assertEqual(len(self.chat.messages), 2)
self.assertEqual(self.chat.messages[0].question, 'Question 1')
self.assertEqual(self.chat.messages[1].question, 'Question 2')
def test_tags(self) -> None:
self.chat.msg_add([self.message1, self.message2])
tags_all = self.chat.msg_tags()
self.chat.add_messages([self.message1, self.message2])
tags_all = self.chat.tags()
self.assertSetEqual(tags_all, {Tag('atag1'), Tag('btag2')})
tags_pref = self.chat.msg_tags(prefix='a')
tags_pref = self.chat.tags(prefix='a')
self.assertSetEqual(tags_pref, {Tag('atag1')})
tags_cont = self.chat.msg_tags(contain='2')
tags_cont = self.chat.tags(contain='2')
self.assertSetEqual(tags_cont, {Tag('btag2')})
def test_tags_frequency(self) -> None:
self.chat.msg_add([self.message1, self.message2])
tags_freq = self.chat.msg_tags_frequency()
self.chat.add_messages([self.message1, self.message2])
tags_freq = self.chat.tags_frequency()
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
def test_find_remove_messages(self) -> None:
self.chat.msg_add([self.message1, self.message2])
msgs = self.chat.msg_find(['0001.txt'])
self.chat.add_messages([self.message1, self.message2])
msgs = self.chat.find_messages(['0001.txt'])
self.assertListEqual(msgs, [self.message1])
msgs = self.chat.msg_find(['0001.txt', '0002.txt'])
msgs = self.chat.find_messages(['0001.txt', '0002.txt'])
self.assertListEqual(msgs, [self.message1, self.message2])
# add new Message with full path
message3 = Message(Question('Question 2'),
Answer('Answer 2'),
{Tag('btag2')},
file_path=pathlib.Path('/foo/bla/0003.txt'))
self.chat.msg_add([message3])
self.chat.add_messages([message3])
# find new Message by full path
msgs = self.chat.msg_find(['/foo/bla/0003.txt'])
msgs = self.chat.find_messages(['/foo/bla/0003.txt'])
self.assertListEqual(msgs, [message3])
# find Message with full path only by filename
msgs = self.chat.msg_find(['0003.txt'])
msgs = self.chat.find_messages(['0003.txt'])
self.assertListEqual(msgs, [message3])
# remove last message
self.chat.msg_remove(['0003.txt'])
self.chat.remove_messages(['0003.txt'])
self.assertListEqual(self.chat.messages, [self.message1, self.message2])
def test_latest_message(self) -> None:
self.assertIsNone(self.chat.msg_latest())
self.chat.msg_add([self.message1])
self.assertEqual(self.chat.msg_latest(), self.message1)
self.chat.msg_add([self.message2])
self.assertEqual(self.chat.msg_latest(), self.message2)
@patch('sys.stdout', new_callable=StringIO)
def test_print(self, mock_stdout: StringIO) -> None:
self.chat.msg_add([self.message1, self.message2])
self.chat.add_messages([self.message1, self.message2])
self.chat.print(paged=False)
expected_output = f"""{Question.txt_header}
Question 1
@@ -143,7 +101,7 @@ Answer 2
@patch('sys.stdout', new_callable=StringIO)
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
self.chat.msg_add([self.message1, self.message2])
self.chat.add_messages([self.message1, self.message2])
self.chat.print(paged=False, with_tags=True, with_files=True)
expected_output = f"""{TagLine.prefix} atag1 btag2
FILE: 0001.txt
@@ -161,7 +119,7 @@ Answer 2
self.assertEqual(mock_stdout.getvalue(), expected_output)
class TestChatDB(TestChatBase):
class TestChatDB(unittest.TestCase):
def setUp(self) -> None:
self.db_path = tempfile.TemporaryDirectory()
self.cache_path = tempfile.TemporaryDirectory()
@@ -191,41 +149,20 @@ class TestChatDB(TestChatBase):
next_fname = pathlib.Path(self.db_path.name) / '.next'
with open(next_fname, 'w') as f:
f.write('4')
# add some "trash" in order to test if it's correctly handled / ignored
self.trash_files = ['.config.yaml', 'foo.yaml', 'bla.txt']
for file in self.trash_files:
with open(pathlib.Path(self.db_path.name) / file, 'w') as f:
f.write('test trash')
# also create a file with actual yaml content
with open(pathlib.Path(self.db_path.name) / 'content.yaml', 'w') as f:
yaml.dump({'key': 'value'}, f)
self.trash_files.append('content.yaml')
self.maxDiff = None
def message_list(self, tmp_dir: tempfile.TemporaryDirectory) -> list[pathlib.Path]:
"""
List all Message files in the given TemporaryDirectory.
"""
# exclude '.next'
return [f for f in pathlib.Path(tmp_dir.name).glob('*.[ty]*') if f.name not in self.trash_files]
return list(pathlib.Path(tmp_dir.name).glob('*.[ty]*'))
def tearDown(self) -> None:
self.db_path.cleanup()
self.cache_path.cleanup()
pass
def test_validate(self) -> None:
duplicate_message = Message(Question('Question 4'),
Answer('Answer 4'),
{Tag('tag4')},
file_path=pathlib.Path('0004.txt'))
duplicate_message.to_file(pathlib.Path(self.db_path.name, '0004.txt'))
with self.assertRaises(ChatError) as cm:
ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
self.assertEqual(str(cm.exception), "Validation failed")
def test_from_dir(self) -> None:
def test_chat_db_from_dir(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
self.assertEqual(len(chat_db.messages), 4)
@@ -241,7 +178,7 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.messages[3].file_path,
pathlib.Path(self.db_path.name, '0004.yaml'))
def test_from_dir_glob(self) -> None:
def test_chat_db_from_dir_glob(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
glob='*.txt')
@@ -253,25 +190,7 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.messages[1].file_path,
pathlib.Path(self.db_path.name, '0003.txt'))
def test_from_dir_filter_tags(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(tags_or={Tag('tag1')}))
self.assertEqual(len(chat_db.messages), 1)
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
self.assertEqual(chat_db.messages[0].file_path,
pathlib.Path(self.db_path.name, '0001.txt'))
def test_from_dir_filter_tags_empty(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(tags_or=set(),
tags_and=set(),
tags_not=set()))
self.assertEqual(len(chat_db.messages), 0)
def test_from_dir_filter_answer(self) -> None:
def test_chat_db_filter(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(answer_contains='Answer 2'))
@@ -282,7 +201,7 @@ class TestChatDB(TestChatBase):
pathlib.Path(self.db_path.name, '0002.yaml'))
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
def test_from_messages(self) -> None:
def test_chat_db_from_messges(self) -> None:
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
messages=[self.message1, self.message2,
@@ -291,35 +210,16 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
def test_fids(self) -> None:
def test_chat_db_fids(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
self.assertEqual(chat_db.get_next_fid(), 5)
self.assertEqual(chat_db.get_next_fid(), 6)
self.assertEqual(chat_db.get_next_fid(), 7)
with open(chat_db.next_path, 'r') as f:
with open(chat_db.next_fname, 'r') as f:
self.assertEqual(f.read(), '7')
def test_msg_in_db_or_cache(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
self.assertTrue(chat_db.msg_in_db(self.message1))
self.assertTrue(chat_db.msg_in_db(str(self.message1.file_path)))
self.assertTrue(chat_db.msg_in_db(self.message1.msg_id()))
self.assertFalse(chat_db.msg_in_cache(self.message1))
self.assertFalse(chat_db.msg_in_cache(str(self.message1.file_path)))
self.assertFalse(chat_db.msg_in_cache(self.message1.msg_id()))
# add new message to the cache dir
cache_message = Message(question=Question("Question 1"),
answer=Answer("Answer 1"))
chat_db.cache_add([cache_message])
self.assertTrue(chat_db.msg_in_cache(cache_message))
self.assertTrue(chat_db.msg_in_cache(cache_message.msg_id()))
self.assertFalse(chat_db.msg_in_db(cache_message))
self.assertFalse(chat_db.msg_in_db(str(cache_message.file_path)))
def test_db_write(self) -> None:
def test_chat_db_write(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
@@ -330,7 +230,7 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
# write the messages to the cache directory
chat_db.cache_write()
chat_db.write_cache()
# check if the written files are in the cache directory
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 4)
@@ -350,7 +250,7 @@ class TestChatDB(TestChatBase):
old_timestamps = {file: file.stat().st_mtime for file in db_dir_files}
# overwrite the messages in the db directory
time.sleep(0.05)
chat_db.db_write()
chat_db.write_db()
# check if the written files are in the DB directory
db_dir_files = self.message_list(self.db_path)
self.assertEqual(len(db_dir_files), 4)
@@ -367,7 +267,7 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
def test_db_read(self) -> None:
def test_chat_db_read(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
@@ -383,7 +283,7 @@ class TestChatDB(TestChatBase):
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
# read and check them
chat_db.db_read()
chat_db.read_db()
self.assertEqual(len(chat_db.messages), 6)
self.assertEqual(chat_db.messages[4].file_path, pathlib.Path(self.db_path.name, '0005.txt'))
self.assertEqual(chat_db.messages[5].file_path, pathlib.Path(self.db_path.name, '0006.yaml'))
@@ -398,7 +298,7 @@ class TestChatDB(TestChatBase):
new_message3.to_file(pathlib.Path(self.cache_path.name, '0007.txt'))
new_message4.to_file(pathlib.Path(self.cache_path.name, '0008.yaml'))
# read and check them
chat_db.cache_read()
chat_db.read_cache()
self.assertEqual(len(chat_db.messages), 8)
# check that the new message have the cache dir path
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.cache_path.name, '0007.txt'))
@@ -413,7 +313,7 @@ class TestChatDB(TestChatBase):
new_message1.to_file(pathlib.Path(self.db_path.name, '0005.txt'))
new_message2.to_file(pathlib.Path(self.db_path.name, '0006.yaml'))
# read from the DB dir and check if the modified messages have been updated
chat_db.db_read()
chat_db.read_db()
self.assertEqual(len(chat_db.messages), 8)
self.assertEqual(chat_db.messages[4].question, 'New Question 1')
self.assertEqual(chat_db.messages[5].question, 'New Question 2')
@@ -424,13 +324,13 @@ class TestChatDB(TestChatBase):
new_message3.to_file(pathlib.Path(self.db_path.name, '0007.txt'))
new_message4.to_file(pathlib.Path(self.db_path.name, '0008.yaml'))
# read and check them
chat_db.db_read()
chat_db.read_db()
self.assertEqual(len(chat_db.messages), 8)
# check that they now have the DB path
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.db_path.name, '0007.txt'))
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.db_path.name, '0008.yaml'))
def test_cache_clear(self) -> None:
def test_chat_db_clear(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
@@ -441,13 +341,13 @@ class TestChatDB(TestChatBase):
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
# write the messages to the cache directory
chat_db.cache_write()
chat_db.write_cache()
# check if the written files are in the cache directory
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 4)
# now rewrite them to the DB dir and check for modified paths
chat_db.db_write()
chat_db.write_db()
db_dir_files = self.message_list(self.db_path)
self.assertEqual(len(db_dir_files), 4)
self.assertIn(pathlib.Path(self.db_path.name, '0001.txt'), db_dir_files)
@@ -462,10 +362,10 @@ class TestChatDB(TestChatBase):
message_cache = Message(question=Question("What the hell am I doing here?"),
answer=Answer("You're a creep!"),
file_path=pathlib.Path(self.cache_path.name, '0005.txt'))
chat_db.msg_add([message_empty, message_cache])
chat_db.add_messages([message_empty, message_cache])
# clear the cache and check the cache dir
chat_db.cache_clear()
chat_db.clear_cache()
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 0)
# make sure that the DB messages (and the new message) are still there
@@ -475,7 +375,7 @@ class TestChatDB(TestChatBase):
# but not the message with the cache dir path
self.assertFalse(any(m.file_path == message_cache.file_path for m in chat_db.messages))
def test_add(self) -> None:
def test_chat_db_add(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
@@ -486,7 +386,7 @@ class TestChatDB(TestChatBase):
# add new messages to the cache dir
message1 = Message(question=Question("Question 1"),
answer=Answer("Answer 1"))
chat_db.cache_add([message1])
chat_db.add_to_cache([message1])
# check if the file_path has been correctly set
self.assertIsNotNone(message1.file_path)
self.assertEqual(message1.file_path.parent, pathlib.Path(self.cache_path.name)) # type: ignore [union-attr]
@@ -496,17 +396,14 @@ class TestChatDB(TestChatBase):
# add new messages to the DB dir
message2 = Message(question=Question("Question 2"),
answer=Answer("Answer 2"))
chat_db.db_add([message2])
chat_db.add_to_db([message2])
# check if the file_path has been correctly set
self.assertIsNotNone(message2.file_path)
self.assertEqual(message2.file_path.parent, pathlib.Path(self.db_path.name)) # type: ignore [union-attr]
db_dir_files = self.message_list(self.db_path)
self.assertEqual(len(db_dir_files), 5)
with self.assertRaises(ChatError):
chat_db.cache_add([Message(Question("?"), file_path=pathlib.Path("foo"))])
def test_msg_write(self) -> None:
def test_chat_db_write_messages(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
@@ -520,133 +417,11 @@ class TestChatDB(TestChatBase):
message = Message(question=Question("Question 1"),
answer=Answer("Answer 1"))
with self.assertRaises(ChatError):
chat_db.msg_write([message])
chat_db.write_messages([message])
# write a message with a valid file_path
message.file_path = pathlib.Path(self.cache_path.name) / '123456.txt'
chat_db.msg_write([message])
chat_db.write_messages([message])
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 1)
self.assertIn(pathlib.Path(self.cache_path.name, '123456.txt'), cache_dir_files)
def test_msg_update(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
db_dir_files = self.message_list(self.db_path)
self.assertEqual(len(db_dir_files), 4)
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 0)
message = chat_db.messages[0]
message.answer = Answer("New answer")
# update message without writing
chat_db.msg_update([message], write=False)
self.assertEqual(chat_db.messages[0].answer, Answer("New answer"))
# re-read the message and check for old content
chat_db.db_read()
self.assertEqual(chat_db.messages[0].answer, Answer("Answer 1"))
# now check with writing (message should be overwritten)
chat_db.msg_update([message], write=True)
chat_db.db_read()
self.assertEqual(chat_db.messages[0].answer, Answer("New answer"))
# test without file_path -> expect error
message1 = Message(question=Question("Question 1"),
answer=Answer("Answer 1"))
with self.assertRaises(ChatError):
chat_db.msg_update([message1])
def test_msg_find(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
# search for a DB file in memory
self.assertEqual(chat_db.msg_find([str(self.message1.file_path)], loc='mem'), [self.message1])
self.assertEqual(chat_db.msg_find([self.message1.msg_id()], loc='mem'), [self.message1])
self.assertEqual(chat_db.msg_find(['0001.txt'], loc='mem'), [self.message1])
self.assertEqual(chat_db.msg_find(['0001'], loc='mem'), [self.message1])
# and on disk
self.assertEqual(chat_db.msg_find([str(self.message2.file_path)], loc='db'), [self.message2])
self.assertEqual(chat_db.msg_find([self.message2.msg_id()], loc='db'), [self.message2])
self.assertEqual(chat_db.msg_find(['0002.yaml'], loc='db'), [self.message2])
self.assertEqual(chat_db.msg_find(['0002'], loc='db'), [self.message2])
# now search the cache -> expect empty result
self.assertEqual(chat_db.msg_find([str(self.message3.file_path)], loc='cache'), [])
self.assertEqual(chat_db.msg_find([self.message3.msg_id()], loc='cache'), [])
self.assertEqual(chat_db.msg_find(['0003.txt'], loc='cache'), [])
self.assertEqual(chat_db.msg_find(['0003'], loc='cache'), [])
# search for multiple messages
# -> search one twice, expect result to be unique
search_names = ['0001', '0002.yaml', self.message3.msg_id(), str(self.message3.file_path)]
expected_result = [self.message1, self.message2, self.message3]
result = chat_db.msg_find(search_names, loc='all')
self.assert_messages_equal(result, expected_result)
def test_msg_latest(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
self.assertEqual(chat_db.msg_latest(loc='mem'), self.message4)
self.assertEqual(chat_db.msg_latest(loc='db'), self.message4)
self.assertEqual(chat_db.msg_latest(loc='disk'), self.message4)
self.assertEqual(chat_db.msg_latest(loc='all'), self.message4)
# the cache is currently empty:
self.assertIsNone(chat_db.msg_latest(loc='cache'))
# add new messages to the cache dir
new_message = Message(question=Question("New Question"),
answer=Answer("New Answer"))
chat_db.cache_add([new_message])
self.assertEqual(chat_db.msg_latest(loc='cache'), new_message)
self.assertEqual(chat_db.msg_latest(loc='mem'), new_message)
self.assertEqual(chat_db.msg_latest(loc='disk'), new_message)
self.assertEqual(chat_db.msg_latest(loc='all'), new_message)
# the DB does not contain the new message
self.assertEqual(chat_db.msg_latest(loc='db'), self.message4)
def test_msg_gather(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
all_messages = [self.message1, self.message2, self.message3, self.message4]
self.assert_messages_equal(chat_db.msg_gather(loc='all'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='db'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='mem'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='disk'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='cache'), [])
# add a new message, but only to the internal list
new_message = Message(Question("What?"))
all_messages_mem = all_messages + [new_message]
chat_db.msg_add([new_message])
self.assert_messages_equal(chat_db.msg_gather(loc='mem'), all_messages_mem)
self.assert_messages_equal(chat_db.msg_gather(loc='all'), all_messages_mem)
# the nr. of messages on disk did not change -> expect old result
self.assert_messages_equal(chat_db.msg_gather(loc='db'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='disk'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='cache'), [])
# test with MessageFilter
self.assert_messages_equal(chat_db.msg_gather(loc='all', mfilter=MessageFilter(tags_or={Tag('tag1')})),
[self.message1])
self.assert_messages_equal(chat_db.msg_gather(loc='disk', mfilter=MessageFilter(tags_or={Tag('tag2')})),
[self.message2])
self.assert_messages_equal(chat_db.msg_gather(loc='cache', mfilter=MessageFilter(tags_or={Tag('tag3')})),
[])
self.assert_messages_equal(chat_db.msg_gather(loc='mem', mfilter=MessageFilter(question_contains="What")),
[new_message])
def test_msg_move_and_gather(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
all_messages = [self.message1, self.message2, self.message3, self.message4]
self.assert_messages_equal(chat_db.msg_gather(loc='db'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='cache'), [])
# move first message to the cache
chat_db.cache_move(self.message1)
self.assert_messages_equal(chat_db.msg_gather(loc='cache'), [self.message1])
self.assertEqual(self.message1.file_path.parent, pathlib.Path(self.cache_path.name)) # type: ignore [union-attr]
self.assert_messages_equal(chat_db.msg_gather(loc='db'), [self.message2, self.message3, self.message4])
self.assert_messages_equal(chat_db.msg_gather(loc='all'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='disk'), all_messages)
self.assert_messages_equal(chat_db.msg_gather(loc='mem'), all_messages)
# now move first message back to the DB
chat_db.db_move(self.message1)
self.assert_messages_equal(chat_db.msg_gather(loc='cache'), [])
self.assertEqual(self.message1.file_path.parent, pathlib.Path(self.db_path.name)) # type: ignore [union-attr]
self.assert_messages_equal(chat_db.msg_gather(loc='db'), all_messages)
+7 -14
View File
@@ -57,10 +57,9 @@ class TestConfig(unittest.TestCase):
def test_from_dict_should_create_config_from_dict(self) -> None:
source_dict = {
'cache': '.',
'db': './test_db/',
'ais': {
'myopenai': {
'default': {
'name': 'openai',
'system': 'Custom system',
'api_key': '9876543210',
@@ -74,13 +73,12 @@ class TestConfig(unittest.TestCase):
}
}
config = Config.from_dict(source_dict)
self.assertEqual(config.cache, '.')
self.assertEqual(config.db, './test_db/')
self.assertEqual(len(config.ais), 1)
self.assertEqual(config.ais['myopenai'].name, 'openai')
self.assertEqual(cast(OpenAIConfig, config.ais['myopenai']).system, 'Custom system')
self.assertEqual(config.ais['default'].name, 'openai')
self.assertEqual(cast(OpenAIConfig, config.ais['default']).system, 'Custom system')
# check that 'ID' has been added
self.assertEqual(config.ais['myopenai'].ID, 'myopenai')
self.assertEqual(config.ais['default'].ID, 'default')
def test_create_default_should_create_default_config(self) -> None:
Config.create_default(Path(self.test_file.name))
@@ -91,7 +89,6 @@ class TestConfig(unittest.TestCase):
def test_from_file_should_load_config_from_file(self) -> None:
source_dict = {
'cache': './test_cache/',
'db': './test_db/',
'ais': {
'default': {
@@ -111,7 +108,6 @@ class TestConfig(unittest.TestCase):
yaml.dump(source_dict, f)
config = Config.from_file(self.test_file.name)
self.assertIsInstance(config, Config)
self.assertEqual(config.cache, './test_cache/')
self.assertEqual(config.db, './test_db/')
self.assertEqual(len(config.ais), 1)
self.assertIsInstance(config.ais['default'], AIConfig)
@@ -119,11 +115,10 @@ class TestConfig(unittest.TestCase):
def test_to_file_should_save_config_to_file(self) -> None:
config = Config(
cache='./test_cache/',
db='./test_db/',
ais={
'myopenai': OpenAIConfig(
ID='myopenai',
'default': OpenAIConfig(
ID='default',
system='Custom system',
api_key='9876543210',
model='custom_model',
@@ -138,14 +133,12 @@ class TestConfig(unittest.TestCase):
config.to_file(Path(self.test_file.name))
with open(self.test_file.name, 'r') as f:
saved_config = yaml.load(f, Loader=yaml.FullLoader)
self.assertEqual(saved_config['cache'], './test_cache/')
self.assertEqual(saved_config['db'], './test_db/')
self.assertEqual(len(saved_config['ais']), 1)
self.assertEqual(saved_config['ais']['myopenai']['system'], 'Custom system')
self.assertEqual(saved_config['ais']['default']['system'], 'Custom system')
def test_from_file_error_unknown_ai(self) -> None:
source_dict = {
'cache': './test_cache/',
'db': './test_db/',
'ais': {
'default': {
+1 -7
View File
@@ -300,12 +300,6 @@ This is a question.
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNone(message)
def test_from_file_txt_empty_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_or=set(),
tags_and=set()))
self.assertIsNone(message)
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_not={Tag('tag1')}))
@@ -730,7 +724,7 @@ class MessageIDTestCase(unittest.TestCase):
self.file_path.unlink()
def test_msg_id_txt(self) -> None:
self.assertEqual(self.message.msg_id(), self.file_path.stem)
self.assertEqual(self.message.msg_id(), self.file_path.name)
def test_msg_id_txt_exception(self) -> None:
with self.assertRaises(MessageError):
+11 -362
View File
@@ -3,45 +3,23 @@ import unittest
import argparse
import tempfile
from pathlib import Path
from unittest import mock
from unittest.mock import MagicMock, call, ANY
from typing import Optional
from chatmastermind.configuration import Config
from chatmastermind.commands.question import create_message, question_cmd
from chatmastermind.tags import Tag
from chatmastermind.message import Message, Question, Answer
from chatmastermind.chat import Chat, ChatDB
from chatmastermind.ai import AI, AIResponse, Tokens, AIError
from unittest.mock import MagicMock
from chatmastermind.commands.question import create_message
from chatmastermind.message import Message, Question
from chatmastermind.chat import ChatDB
class TestQuestionCmdBase(unittest.TestCase):
def assert_messages_equal(self, msg1: list[Message], msg2: list[Message]) -> None:
"""
Compare messages using more than just Question and Answer.
"""
self.assertEqual(len(msg1), len(msg2))
for m1, m2 in zip(msg1, msg2):
# exclude the file_path, compare only Q, A and metadata
self.assertTrue(m1.equals(m2, file_path=False, verbose=True))
class TestMessageCreate(TestQuestionCmdBase):
class TestMessageCreate(unittest.TestCase):
"""
Test if messages created by the 'question' command have
the correct format.
"""
def setUp(self) -> None:
# create ChatDB structure
self.db_dir = tempfile.TemporaryDirectory()
self.cache_dir = tempfile.TemporaryDirectory()
self.chat = ChatDB.from_dir(cache_path=Path(self.cache_dir.name),
db_path=Path(self.db_dir.name))
# create some messages
self.message_text = Message(Question("What is this?"),
Answer("It is pure text"))
self.message_code = Message(Question("What is this?"),
Answer("Text\n```\nIt is embedded code\n```\ntext"))
self.chat.db_add([self.message_text, self.message_code])
self.db_path = tempfile.TemporaryDirectory()
self.cache_path = tempfile.TemporaryDirectory()
self.chat = ChatDB.from_dir(cache_path=Path(self.cache_path.name),
db_path=Path(self.db_path.name))
# create arguments mock
self.args = MagicMock(spec=argparse.Namespace)
self.args.source_text = None
@@ -90,7 +68,6 @@ Aaaand again some text."""
os.remove(self.source_file1.name)
os.remove(self.source_file2.name)
os.remove(self.source_file3.name)
os.remove(self.source_file4.name)
def message_list(self, tmp_dir: tempfile.TemporaryDirectory) -> list[Path]:
# exclude '.next'
@@ -98,10 +75,10 @@ Aaaand again some text."""
def test_message_file_created(self) -> None:
self.args.ask = ["What is this?"]
cache_dir_files = self.message_list(self.cache_dir)
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 0)
create_message(self.chat, self.args)
cache_dir_files = self.message_list(self.cache_dir)
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 1)
message = Message.from_file(cache_dir_files[0])
self.assertIsInstance(message, Message)
@@ -183,331 +160,3 @@ This is embedded source code.
This is embedded source code.
```
"""))
def test_single_question_with_text_only_message(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_text = [f"{self.chat.messages[0].file_path}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# file contains no source code (only text)
# -> don't expect any in the question
self.assertEqual(len(message.question.source_code()), 0)
self.assertEqual(message.question, Question(f"""What is this?
{self.message_text.answer}"""))
def test_single_question_with_message_and_embedded_code(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_code = [f"{self.chat.messages[1].file_path}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# answer contains 1 source code block
# -> expect it in the question
self.assertEqual(len(message.question.source_code()), 1)
self.assertEqual(message.question, Question("""What is this?
```
It is embedded code
```
"""))
class TestQuestionCmd(TestQuestionCmdBase):
def setUp(self) -> None:
# create DB and cache
self.db_dir = tempfile.TemporaryDirectory()
self.cache_dir = tempfile.TemporaryDirectory()
# create configuration
self.config = Config()
self.config.cache = self.cache_dir.name
self.config.db = self.db_dir.name
# create a mock argparse.Namespace
self.args = argparse.Namespace(
ask=['What is the meaning of life?'],
num_answers=1,
output_tags=['science'],
AI='openai',
model='gpt-3.5-turbo',
or_tags=None,
and_tags=None,
exclude_tags=None,
source_text=None,
source_code=None,
create=None,
repeat=None,
process=None,
overwrite=None
)
# create a mock AI instance
self.ai = MagicMock(spec=AI)
self.ai.request.side_effect = self.mock_request
def input_message(self, args: argparse.Namespace) -> Message:
"""
Create the expected input message for a question using the
given arguments.
"""
# NOTE: we only use the first question from the "ask" list
# -> message creation using "question.create_message()" is
# tested above
# the answer is always empty for the input message
return Message(Question(args.ask[0]),
tags=args.output_tags,
ai=args.AI,
model=args.model)
def mock_request(self,
question: Message,
chat: Chat,
num_answers: int = 1,
otags: Optional[set[Tag]] = None) -> AIResponse:
"""
Mock the 'ai.request()' function
"""
question.answer = Answer("Answer 0")
question.tags = set(otags) if otags else None
question.ai = 'FakeAI'
question.model = 'FakeModel'
answers: list[Message] = [question]
for n in range(1, num_answers):
answers.append(Message(question=question.question,
answer=Answer(f"Answer {n}"),
tags=otags,
ai='FakeAI',
model='FakeModel'))
return AIResponse(answers, Tokens(10, 10, 20))
def message_list(self, tmp_dir: tempfile.TemporaryDirectory) -> list[Path]:
# exclude '.next'
return sorted([f for f in Path(tmp_dir.name).glob('*.[ty]*')])
@mock.patch('chatmastermind.commands.question.create_ai')
def test_ask_single_answer(self, mock_create_ai: MagicMock) -> None:
"""
Test single answer with no errors.
"""
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
# execute the command
question_cmd(self.args, self.config)
# check for correct request call
self.ai.request.assert_called_once_with(expected_question,
ANY,
self.args.num_answers,
self.args.output_tags)
# check for the expected message files
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
@mock.patch('chatmastermind.commands.question.ChatDB.from_dir')
@mock.patch('chatmastermind.commands.question.create_ai')
def test_ask_single_answer_mocked(self, mock_create_ai: MagicMock, mock_from_dir: MagicMock) -> None:
"""
Test single answer with no errors (mocked ChatDB version).
"""
chat = MagicMock(spec=ChatDB)
mock_from_dir.return_value = chat
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
# execute the command
question_cmd(self.args, self.config)
# check for correct request call
self.ai.request.assert_called_once_with(expected_question,
chat,
self.args.num_answers,
self.args.output_tags)
# check for the correct ChatDB calls:
# - initial question has been written (prior to the actual request)
# - responses have been written (after the request)
chat.cache_write.assert_has_calls([call([expected_question]),
call(expected_responses)],
any_order=False)
# check that the messages have not been added to the internal message list
chat.cache_add.assert_not_called()
@mock.patch('chatmastermind.commands.question.create_ai')
def test_ask_with_error(self, mock_create_ai: MagicMock) -> None:
"""
Provoke an error during the AI request and verify that the question
has been correctly stored in the cache.
"""
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
self.ai.request.side_effect = AIError
# execute the command
with self.assertRaises(AIError):
question_cmd(self.args, self.config)
# check for correct request call
self.ai.request.assert_called_once_with(expected_question,
ANY,
self.args.num_answers,
self.args.output_tags)
# check for the expected message files
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, [expected_question])
@mock.patch('chatmastermind.commands.question.create_ai')
def test_repeat_single_question(self, mock_create_ai: MagicMock) -> None:
"""
Repeat a single question.
"""
# 1. ask a question
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
question_cmd(self.args, self.config)
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
# 2. repeat the last question (without overwriting)
# -> expect two identical messages (except for the file_path)
self.args.ask = None
self.args.repeat = []
self.args.overwrite = False
expected_responses += expected_responses
question_cmd(self.args, self.config)
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 2)
self.assert_messages_equal(cached_msg, expected_responses)
@mock.patch('chatmastermind.commands.question.create_ai')
def test_repeat_single_question_overwrite(self, mock_create_ai: MagicMock) -> None:
"""
Repeat a single question and overwrite the old one.
"""
# 1. ask a question
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
question_cmd(self.args, self.config)
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
assert cached_msg[0].file_path
cached_msg_file_id = cached_msg[0].file_path.stem
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
# 2. repeat the last question (WITH overwriting)
# -> expect a single message afterwards
self.args.ask = None
self.args.repeat = []
self.args.overwrite = True
question_cmd(self.args, self.config)
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
# also check that the file ID has not been changed
assert cached_msg[0].file_path
self.assertEqual(cached_msg_file_id, cached_msg[0].file_path.stem)
@mock.patch('chatmastermind.commands.question.create_ai')
def test_repeat_single_question_after_error(self, mock_create_ai: MagicMock) -> None:
"""
Repeat a single question after an error.
"""
# 1. ask a question and provoke an error
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
self.ai.request.side_effect = AIError
with self.assertRaises(AIError):
question_cmd(self.args, self.config)
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
assert cached_msg[0].file_path
cached_msg_file_id = cached_msg[0].file_path.stem
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, [expected_question])
# 2. repeat the last question (without overwriting)
# -> expect a single message because if the original has
# no answer, it should be overwritten by default
self.args.ask = None
self.args.repeat = []
self.args.overwrite = False
self.ai.request.side_effect = self.mock_request
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
question_cmd(self.args, self.config)
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
# also check that the file ID has not been changed
assert cached_msg[0].file_path
self.assertEqual(cached_msg_file_id, cached_msg[0].file_path.stem)
@mock.patch('chatmastermind.commands.question.create_ai')
def test_repeat_single_question_new_args(self, mock_create_ai: MagicMock) -> None:
"""
Repeat a single question with new arguments.
"""
# 1. ask a question
mock_create_ai.return_value = self.ai
expected_question = self.input_message(self.args)
expected_responses = self.mock_request(expected_question,
Chat([]),
self.args.num_answers,
self.args.output_tags).messages
question_cmd(self.args, self.config)
chat = ChatDB.from_dir(Path(self.cache_dir.name),
Path(self.db_dir.name))
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 1)
self.assert_messages_equal(cached_msg, expected_responses)
# 2. repeat the last question with new arguments (without overwriting)
# -> expect two messages with identical question and answer, but different metadata
self.args.ask = None
self.args.repeat = []
self.args.overwrite = False
self.args.output_tags = ['newtag']
self.args.AI = 'newai'
self.args.model = 'newmodel'
new_expected_question = Message(question=Question(expected_question.question),
tags=set(self.args.output_tags),
ai=self.args.AI,
model=self.args.model)
expected_responses += self.mock_request(new_expected_question,
Chat([]),
self.args.num_answers,
set(self.args.output_tags)).messages
question_cmd(self.args, self.config)
cached_msg = chat.msg_gather(loc='cache')
self.assertEqual(len(self.message_list(self.cache_dir)), 2)
self.assert_messages_equal(cached_msg, expected_responses)