Compare commits
12 Commits
pipes
...
d6e6e0f3ce
| Author | SHA1 | Date | |
|---|---|---|---|
| d6e6e0f3ce | |||
| 8396a3f403 | |||
| 24df4be80b | |||
| 935f1ee18a | |||
| c705902668 | |||
| ac0cdfe734 | |||
| 08a45d31b2 | |||
| 96980bc4a8 | |||
| 26f72ed002 | |||
| a5fa79a4e5 | |||
| 6736d1ce4f | |||
| 43fdb59dbf |
@@ -0,0 +1,64 @@
|
||||
from dataclasses import dataclass
|
||||
from abc import abstractmethod
|
||||
from typing import Protocol, Optional, Union
|
||||
from .configuration import AIConfig
|
||||
from .message import Message
|
||||
from .chat import Chat
|
||||
|
||||
|
||||
class AIError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tokens:
|
||||
prompt: int = 0
|
||||
completion: int = 0
|
||||
total: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AIResponse:
|
||||
"""
|
||||
The response to an AI request. Consists of one or more messages
|
||||
(each containing the question and a single answer) and the nr.
|
||||
of used tokens.
|
||||
"""
|
||||
messages: list[Message]
|
||||
tokens: Optional[Tokens] = None
|
||||
|
||||
|
||||
class AI(Protocol):
|
||||
"""
|
||||
The base class for AI clients.
|
||||
"""
|
||||
|
||||
name: str
|
||||
config: AIConfig
|
||||
|
||||
@abstractmethod
|
||||
def request(self,
|
||||
question: Message,
|
||||
context: Chat,
|
||||
num_answers: int = 1) -> AIResponse:
|
||||
"""
|
||||
Make an AI request, asking the given question with the given
|
||||
context (i. e. chat history). The nr. of requested answers
|
||||
corresponds to the nr. of messages in the 'AIResponse'.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def models(self) -> list[str]:
|
||||
"""
|
||||
Return all models supported by this AI.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def tokens(self, data: Union[Message, Chat]) -> int:
|
||||
"""
|
||||
Computes the nr. of AI language tokens for the given message
|
||||
or chat. Note that the computation may not be 100% accurate
|
||||
and is not implemented for all AIs.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@@ -0,0 +1,48 @@
|
||||
"""
|
||||
Implements the OpenAI client classes and functions.
|
||||
"""
|
||||
import openai
|
||||
from ..message import Message
|
||||
from ..chat import Chat
|
||||
from ..ai import AI, AIResponse
|
||||
|
||||
|
||||
class OpenAI(AI):
|
||||
"""
|
||||
The OpenAI AI client.
|
||||
"""
|
||||
|
||||
def request(self,
|
||||
question: Message,
|
||||
context: Chat,
|
||||
num_answers: int = 1) -> AIResponse:
|
||||
"""
|
||||
Make an AI request, asking the given question with the given
|
||||
context (i. e. chat history). The nr. of requested answers
|
||||
corresponds to the nr. of messages in the 'AIResponse'.
|
||||
"""
|
||||
# TODO:
|
||||
# * transform given message and chat context into OpenAI format
|
||||
# * make request
|
||||
# * create a new Message for each answer and return them
|
||||
# (writing Messages is done by the calles)
|
||||
raise NotImplementedError
|
||||
|
||||
def models(self) -> list[str]:
|
||||
"""
|
||||
Return all models supported by this AI.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def print_models(self) -> None:
|
||||
"""
|
||||
Print all models supported by the current AI.
|
||||
"""
|
||||
not_ready = []
|
||||
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
|
||||
if engine['ready']:
|
||||
print(engine['id'])
|
||||
else:
|
||||
not_ready.append(engine['id'])
|
||||
if len(not_ready) > 0:
|
||||
print('\nNot ready: ' + ', '.join(not_ready))
|
||||
+37
-11
@@ -45,7 +45,7 @@ def read_dir(dir_path: pathlib.Path,
|
||||
messages: list[Message] = []
|
||||
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
|
||||
for file_path in sorted(file_iter):
|
||||
if file_path.is_file():
|
||||
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
|
||||
try:
|
||||
message = Message.from_file(file_path, mfilter)
|
||||
if message:
|
||||
@@ -82,6 +82,17 @@ def write_dir(dir_path: pathlib.Path,
|
||||
message.to_file(file_path)
|
||||
|
||||
|
||||
def clear_dir(dir_path: pathlib.Path,
|
||||
glob: Optional[str] = None) -> None:
|
||||
"""
|
||||
Deletes all Message files in the given directory.
|
||||
"""
|
||||
file_iter = dir_path.glob(glob) if glob else dir_path.iterdir()
|
||||
for file_path in file_iter:
|
||||
if file_path.is_file() and file_path.suffix in Message.file_suffixes:
|
||||
file_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chat:
|
||||
"""
|
||||
@@ -127,7 +138,16 @@ class Chat:
|
||||
tags: set[Tag] = set()
|
||||
for m in self.messages:
|
||||
tags |= m.filter_tags(prefix, contain)
|
||||
return tags
|
||||
return set(sorted(tags))
|
||||
|
||||
def tags_frequency(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> dict[Tag, int]:
|
||||
"""
|
||||
Get the frequency of all tags of all messages, optionally filtered by prefix or substring.
|
||||
"""
|
||||
tags: list[Tag] = []
|
||||
for m in self.messages:
|
||||
tags += [tag for tag in m.filter_tags(prefix, contain)]
|
||||
return {tag: tags.count(tag) for tag in sorted(tags)}
|
||||
|
||||
def tokens(self) -> int:
|
||||
"""
|
||||
@@ -136,27 +156,24 @@ class Chat:
|
||||
"""
|
||||
return sum(m.tokens() for m in self.messages)
|
||||
|
||||
def print(self, dump: bool = False, source_code_only: bool = False,
|
||||
with_tags: bool = False, with_file: bool = False,
|
||||
def print(self, source_code_only: bool = False,
|
||||
with_tags: bool = False, with_files: bool = False,
|
||||
paged: bool = True) -> None:
|
||||
if dump:
|
||||
pp(self)
|
||||
return
|
||||
output: list[str] = []
|
||||
for message in self.messages:
|
||||
if source_code_only:
|
||||
output.extend(source_code(message.question, include_delims=True))
|
||||
continue
|
||||
output.append('-' * terminal_width())
|
||||
if with_tags:
|
||||
output.append(message.tags_str())
|
||||
if with_files:
|
||||
output.append('FILE: ' + str(message.file_path))
|
||||
output.append(Question.txt_header)
|
||||
output.append(message.question)
|
||||
if message.answer:
|
||||
output.append(Answer.txt_header)
|
||||
output.append(message.answer)
|
||||
if with_tags:
|
||||
output.append(message.tags_str())
|
||||
if with_file:
|
||||
output.append('FILE: ' + str(message.file_path))
|
||||
if paged:
|
||||
print_paged('\n'.join(output))
|
||||
else:
|
||||
@@ -283,3 +300,12 @@ class ChatDB(Chat):
|
||||
msgs if msgs else self.messages,
|
||||
self.file_suffix,
|
||||
self.get_next_fid)
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
"""
|
||||
Deletes all Message files from the cache dir and removes those messages from
|
||||
the internal list.
|
||||
"""
|
||||
clear_dir(self.cache_path, self.glob)
|
||||
# only keep messages from DB dir (or those that have not yet been written)
|
||||
self.messages = [m for m in self.messages if not m.file_path or m.file_path.parent.samefile(self.db_path)]
|
||||
|
||||
@@ -7,7 +7,15 @@ OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAIConfig():
|
||||
class AIConfig:
|
||||
"""
|
||||
The base class of all AI configurations.
|
||||
"""
|
||||
name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAIConfig(AIConfig):
|
||||
"""
|
||||
The OpenAI section of the configuration file.
|
||||
"""
|
||||
@@ -25,6 +33,7 @@ class OpenAIConfig():
|
||||
Create OpenAIConfig from a dict.
|
||||
"""
|
||||
return cls(
|
||||
name='OpenAI',
|
||||
api_key=str(source['api_key']),
|
||||
model=str(source['model']),
|
||||
max_tokens=int(source['max_tokens']),
|
||||
@@ -36,7 +45,7 @@ class OpenAIConfig():
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config():
|
||||
class Config:
|
||||
"""
|
||||
The configuration file structure.
|
||||
"""
|
||||
|
||||
+57
-45
@@ -6,11 +6,13 @@ import yaml
|
||||
import sys
|
||||
import argcomplete
|
||||
import argparse
|
||||
import pathlib
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, print_tags_frequency, ChatType
|
||||
from .storage import save_answers, create_chat_hist, get_tags, get_tags_unique, read_file, dump_data
|
||||
from pathlib import Path
|
||||
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
|
||||
from .storage import save_answers, create_chat_hist, read_file, dump_data
|
||||
from .api_client import ai, openai_api_key, print_models
|
||||
from .configuration import Config
|
||||
from .chat import ChatDB
|
||||
from .message import Message, MessageFilter
|
||||
from itertools import zip_longest
|
||||
from typing import Any
|
||||
|
||||
@@ -18,9 +20,8 @@ default_config = '.config.yaml'
|
||||
|
||||
|
||||
def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
|
||||
with open(parsed_args.config, 'r') as f:
|
||||
config = yaml.load(f, Loader=yaml.FullLoader)
|
||||
return get_tags_unique(config, prefix)
|
||||
config = Config.from_file(parsed_args.config)
|
||||
return list(Message.tags_from_dir(Path(config.db), prefix=prefix))
|
||||
|
||||
|
||||
def create_question_with_hist(args: argparse.Namespace,
|
||||
@@ -31,11 +32,11 @@ def create_question_with_hist(args: argparse.Namespace,
|
||||
by the specified tags.
|
||||
"""
|
||||
tags = args.tags or []
|
||||
extags = args.extags or []
|
||||
etags = args.etags or []
|
||||
otags = args.output_tags or []
|
||||
|
||||
if not args.only_source_code:
|
||||
print_tag_args(tags, extags, otags)
|
||||
if not args.source_code_only:
|
||||
print_tag_args(tags, etags, otags)
|
||||
|
||||
question_parts = []
|
||||
question_list = args.question if args.question is not None else []
|
||||
@@ -52,17 +53,23 @@ def create_question_with_hist(args: argparse.Namespace,
|
||||
question_parts.append(f"```\n{r.read().strip()}\n```")
|
||||
|
||||
full_question = '\n\n'.join(question_parts)
|
||||
chat = create_chat_hist(full_question, tags, extags, config,
|
||||
args.match_all_tags, False, False)
|
||||
chat = create_chat_hist(full_question, tags, etags, config,
|
||||
match_all_tags=True if args.atags else False, # FIXME
|
||||
with_tags=False,
|
||||
with_file=False)
|
||||
return chat, full_question, tags
|
||||
|
||||
|
||||
def tag_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'tag' command.
|
||||
Handler for the 'tags' command.
|
||||
"""
|
||||
chat = ChatDB.from_dir(cache_path=Path('.'),
|
||||
db_path=Path(config.db))
|
||||
if args.list:
|
||||
print_tags_frequency(get_tags(config, None))
|
||||
tags_freq = chat.tags_frequency(args.prefix, args.contain)
|
||||
for tag, freq in tags_freq.items():
|
||||
print(f"- {tag}: {freq}")
|
||||
|
||||
|
||||
def config_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
@@ -89,7 +96,7 @@ def ask_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
if args.model:
|
||||
config.openai.model = args.model
|
||||
chat, question, tags = create_question_with_hist(args, config)
|
||||
print_chat_hist(chat, False, args.only_source_code)
|
||||
print_chat_hist(chat, False, args.source_code_only)
|
||||
otags = args.output_tags or []
|
||||
answers, usage = ai(chat, config, args.number)
|
||||
save_answers(question, answers, tags, otags, config)
|
||||
@@ -101,21 +108,25 @@ def hist_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'hist' command.
|
||||
"""
|
||||
tags = args.tags or []
|
||||
extags = args.extags or []
|
||||
|
||||
chat = create_chat_hist(None, tags, extags, config,
|
||||
args.match_all_tags,
|
||||
args.with_tags,
|
||||
args.with_files)
|
||||
print_chat_hist(chat, args.dump, args.only_source_code)
|
||||
mfilter = MessageFilter(tags_or=args.tags,
|
||||
tags_and=args.atags,
|
||||
tags_not=args.etags,
|
||||
question_contains=args.question,
|
||||
answer_contains=args.answer)
|
||||
chat = ChatDB.from_dir(Path('.'),
|
||||
Path(config.db),
|
||||
mfilter=mfilter)
|
||||
chat.print(args.source_code_only,
|
||||
args.with_tags,
|
||||
args.with_files)
|
||||
|
||||
|
||||
def print_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
"""
|
||||
Handler for the 'print' command.
|
||||
"""
|
||||
fname = pathlib.Path(args.file)
|
||||
fname = Path(args.file)
|
||||
if fname.suffix == '.yaml':
|
||||
with open(args.file, 'r') as f:
|
||||
data = yaml.load(f, Loader=yaml.FullLoader)
|
||||
@@ -124,7 +135,7 @@ def print_cmd(args: argparse.Namespace, config: Config) -> None:
|
||||
else:
|
||||
print(f"Unknown file type: {args.file}")
|
||||
sys.exit(1)
|
||||
if args.only_source_code:
|
||||
if args.source_code_only:
|
||||
display_source_code(data['answer'])
|
||||
else:
|
||||
print(dump_data(data).strip())
|
||||
@@ -144,18 +155,17 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
# a parent parser for all commands that support tag selection
|
||||
tag_parser = argparse.ArgumentParser(add_help=False)
|
||||
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+',
|
||||
help='List of tag names', metavar='TAGS')
|
||||
help='List of tag names (one must match)', metavar='TAGS')
|
||||
tag_arg.completer = tags_completer # type: ignore
|
||||
extag_arg = tag_parser.add_argument('-e', '--extags', nargs='+',
|
||||
help='List of tag names to exclude', metavar='EXTAGS')
|
||||
extag_arg.completer = tags_completer # type: ignore
|
||||
atag_arg = tag_parser.add_argument('-a', '--atags', nargs='+',
|
||||
help='List of tag names (all must match)', metavar='TAGS')
|
||||
atag_arg.completer = tags_completer # type: ignore
|
||||
etag_arg = tag_parser.add_argument('-e', '--etags', nargs='+',
|
||||
help='List of tag names to exclude', metavar='ETAGS')
|
||||
etag_arg.completer = tags_completer # type: ignore
|
||||
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
|
||||
help='List of output tag names, default is input', metavar='OTAGS')
|
||||
otag_arg.completer = tags_completer # type: ignore
|
||||
tag_parser.add_argument('-a', '--match-all-tags',
|
||||
help="All given tags must match when selecting chat history entries",
|
||||
action='store_true')
|
||||
# enable autocompletion for tags
|
||||
|
||||
# 'ask' command parser
|
||||
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
|
||||
@@ -170,7 +180,7 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int,
|
||||
default=1)
|
||||
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
|
||||
ask_cmd_parser.add_argument('-S', '--only-source-code', help='Add pure source code to the chat history',
|
||||
ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
|
||||
action='store_true')
|
||||
|
||||
# 'hist' command parser
|
||||
@@ -178,23 +188,25 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
help="Print chat history.",
|
||||
aliases=['h'])
|
||||
hist_cmd_parser.set_defaults(func=hist_cmd)
|
||||
hist_cmd_parser.add_argument('-d', '--dump', help="Print chat history as Python structure",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-w', '--with-tags', help="Print chat history with tags.",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-W', '--with-files', help="Print chat history with filenames.",
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
|
||||
hist_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
|
||||
action='store_true')
|
||||
hist_cmd_parser.add_argument('-A', '--answer', help='Search for answer substring')
|
||||
hist_cmd_parser.add_argument('-Q', '--question', help='Search for question substring')
|
||||
|
||||
# 'tag' command parser
|
||||
tag_cmd_parser = cmdparser.add_parser('tag',
|
||||
help="Manage tags.",
|
||||
aliases=['t'])
|
||||
tag_cmd_parser.set_defaults(func=tag_cmd)
|
||||
tag_group = tag_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||
tag_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||
action='store_true')
|
||||
# 'tags' command parser
|
||||
tags_cmd_parser = cmdparser.add_parser('tags',
|
||||
help="Manage tags.",
|
||||
aliases=['t'])
|
||||
tags_cmd_parser.set_defaults(func=tags_cmd)
|
||||
tags_group = tags_cmd_parser.add_mutually_exclusive_group(required=True)
|
||||
tags_group.add_argument('-l', '--list', help="List all tags and their frequency",
|
||||
action='store_true')
|
||||
tags_cmd_parser.add_argument('-p', '--prefix', help="Filter tags by prefix")
|
||||
tags_cmd_parser.add_argument('-c', '--contain', help="Filter tags by contained substring")
|
||||
|
||||
# 'config' command parser
|
||||
config_cmd_parser = cmdparser.add_parser('config',
|
||||
@@ -214,7 +226,7 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
aliases=['p'])
|
||||
print_cmd_parser.set_defaults(func=print_cmd)
|
||||
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
|
||||
print_cmd_parser.add_argument('-S', '--only-source-code', help='Print only source code',
|
||||
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
|
||||
action='store_true')
|
||||
|
||||
argcomplete.autocomplete(parser)
|
||||
|
||||
+49
-42
@@ -96,7 +96,7 @@ class AILine(str):
|
||||
|
||||
def __new__(cls: Type[AILineInst], string: str) -> AILineInst:
|
||||
if not string.startswith(cls.prefix):
|
||||
raise TagError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
|
||||
raise MessageError(f"AILine '{string}' is missing prefix '{cls.prefix}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@@ -116,7 +116,7 @@ class ModelLine(str):
|
||||
|
||||
def __new__(cls: Type[ModelLineInst], string: str) -> ModelLineInst:
|
||||
if not string.startswith(cls.prefix):
|
||||
raise TagError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
|
||||
raise MessageError(f"ModelLine '{string}' is missing prefix '{cls.prefix}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@@ -128,6 +128,40 @@ class ModelLine(str):
|
||||
return cls(' '.join([cls.prefix, model]))
|
||||
|
||||
|
||||
class Answer(str):
|
||||
"""
|
||||
A single answer with a defined header.
|
||||
"""
|
||||
tokens: int = 0 # tokens used by this answer
|
||||
txt_header: ClassVar[str] = '==== ANSWER ===='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
"""
|
||||
Make sure the answer string does not contain the header as a whole line.
|
||||
"""
|
||||
if cls.txt_header in string.split('\n'):
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
|
||||
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||
"""
|
||||
Extract and return all source code sections.
|
||||
"""
|
||||
return source_code(self, include_delims)
|
||||
|
||||
|
||||
class Question(str):
|
||||
"""
|
||||
A single question with a defined header.
|
||||
@@ -138,10 +172,14 @@ class Question(str):
|
||||
|
||||
def __new__(cls: Type[QuestionInst], string: str) -> QuestionInst:
|
||||
"""
|
||||
Make sure the question string does not contain the header.
|
||||
Make sure the question string does not contain the header as a whole line
|
||||
(also not that from 'Answer', so it's always clear where the answer starts).
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
string_lines = string.split('\n')
|
||||
if cls.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{cls.txt_header}'")
|
||||
if Answer.txt_header in string_lines:
|
||||
raise MessageError(f"Question '{string}' contains the header '{Answer.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@@ -150,41 +188,7 @@ class Question(str):
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
|
||||
def source_code(self, include_delims: bool = False) -> list[str]:
|
||||
"""
|
||||
Extract and return all source code sections.
|
||||
"""
|
||||
return source_code(self, include_delims)
|
||||
|
||||
|
||||
class Answer(str):
|
||||
"""
|
||||
A single answer with a defined header.
|
||||
"""
|
||||
tokens: int = 0 # tokens used by this answer
|
||||
txt_header: ClassVar[str] = '=== ANSWER ==='
|
||||
yaml_key: ClassVar[str] = 'answer'
|
||||
|
||||
def __new__(cls: Type[AnswerInst], string: str) -> AnswerInst:
|
||||
"""
|
||||
Make sure the answer string does not contain the header.
|
||||
"""
|
||||
if cls.txt_header in string:
|
||||
raise MessageError(f"Answer '{string}' contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, string)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_list(cls: Type[AnswerInst], strings: list[str]) -> AnswerInst:
|
||||
"""
|
||||
Build Question from a list of strings. Make sure strings do not contain the header.
|
||||
"""
|
||||
if any(cls.txt_header in string for string in strings):
|
||||
if cls.txt_header in strings:
|
||||
raise MessageError(f"Question contains the header '{cls.txt_header}'")
|
||||
instance = super().__new__(cls, '\n'.join(strings).strip())
|
||||
return instance
|
||||
@@ -351,17 +355,20 @@ class Message():
|
||||
try:
|
||||
pos = fd.tell()
|
||||
ai = AILine(fd.readline()).ai()
|
||||
except TagError:
|
||||
except MessageError:
|
||||
fd.seek(pos)
|
||||
# ModelLine (Optional)
|
||||
try:
|
||||
pos = fd.tell()
|
||||
model = ModelLine(fd.readline()).model()
|
||||
except TagError:
|
||||
except MessageError:
|
||||
fd.seek(pos)
|
||||
# Question and Answer
|
||||
text = fd.read().strip().split('\n')
|
||||
question_idx = text.index(Question.txt_header) + 1
|
||||
try:
|
||||
question_idx = text.index(Question.txt_header) + 1
|
||||
except ValueError:
|
||||
raise MessageError(f"Question header '{Question.txt_header}' not found in '{file_path}'")
|
||||
try:
|
||||
answer_idx = text.index(Answer.txt_header)
|
||||
question = Question.from_list(text[question_idx:answer_idx])
|
||||
|
||||
@@ -78,8 +78,3 @@ def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = Fals
|
||||
print(message['content'])
|
||||
else:
|
||||
print(f"{message['role'].upper()}: {message['content']}")
|
||||
|
||||
|
||||
def print_tags_frequency(tags: list[str]) -> None:
|
||||
for tag in sorted(set(tags)):
|
||||
print(f"- {tag}: {tags.count(tag)}")
|
||||
|
||||
+56
-6
@@ -14,7 +14,7 @@ class TestChat(CmmTestCase):
|
||||
self.chat = Chat([])
|
||||
self.message1 = Message(Question('Question 1'),
|
||||
Answer('Answer 1'),
|
||||
{Tag('atag1')},
|
||||
{Tag('atag1'), Tag('btag2')},
|
||||
file_path=pathlib.Path('0001.txt'))
|
||||
self.message2 = Message(Question('Question 2'),
|
||||
Answer('Answer 2'),
|
||||
@@ -57,6 +57,11 @@ class TestChat(CmmTestCase):
|
||||
tags_cont = self.chat.tags(contain='2')
|
||||
self.assertSetEqual(tags_cont, {Tag('btag2')})
|
||||
|
||||
def test_tags_frequency(self) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
tags_freq = self.chat.tags_frequency()
|
||||
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
|
||||
|
||||
@patch('sys.stdout', new_callable=StringIO)
|
||||
def test_print(self, mock_stdout: StringIO) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
@@ -77,21 +82,21 @@ Answer 2
|
||||
@patch('sys.stdout', new_callable=StringIO)
|
||||
def test_print_with_tags_and_file(self, mock_stdout: StringIO) -> None:
|
||||
self.chat.add_msgs([self.message1, self.message2])
|
||||
self.chat.print(paged=False, with_tags=True, with_file=True)
|
||||
self.chat.print(paged=False, with_tags=True, with_files=True)
|
||||
expected_output = f"""{'-'*terminal_width()}
|
||||
{TagLine.prefix} atag1 btag2
|
||||
FILE: 0001.txt
|
||||
{Question.txt_header}
|
||||
Question 1
|
||||
{Answer.txt_header}
|
||||
Answer 1
|
||||
{TagLine.prefix} atag1
|
||||
FILE: 0001.txt
|
||||
{'-'*terminal_width()}
|
||||
{TagLine.prefix} btag2
|
||||
FILE: 0002.txt
|
||||
{Question.txt_header}
|
||||
Question 2
|
||||
{Answer.txt_header}
|
||||
Answer 2
|
||||
{TagLine.prefix} btag2
|
||||
FILE: 0002.txt
|
||||
"""
|
||||
self.assertEqual(mock_stdout.getvalue(), expected_output)
|
||||
|
||||
@@ -295,3 +300,48 @@ class TestChatDB(CmmTestCase):
|
||||
# check that they now have the DB path
|
||||
self.assertEqual(chat_db.messages[6].file_path, pathlib.Path(self.db_path.name, '0007.txt'))
|
||||
self.assertEqual(chat_db.messages[7].file_path, pathlib.Path(self.db_path.name, '0008.yaml'))
|
||||
|
||||
def test_chat_db_clear(self) -> None:
|
||||
# create a new ChatDB instance
|
||||
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
|
||||
pathlib.Path(self.db_path.name))
|
||||
# check that Message.file_path is correct
|
||||
self.assertEqual(chat_db.messages[0].file_path, pathlib.Path(self.db_path.name, '0001.txt'))
|
||||
self.assertEqual(chat_db.messages[1].file_path, pathlib.Path(self.db_path.name, '0002.yaml'))
|
||||
self.assertEqual(chat_db.messages[2].file_path, pathlib.Path(self.db_path.name, '0003.txt'))
|
||||
self.assertEqual(chat_db.messages[3].file_path, pathlib.Path(self.db_path.name, '0004.yaml'))
|
||||
|
||||
# write the messages to the cache directory
|
||||
chat_db.write_cache()
|
||||
# check if the written files are in the cache directory
|
||||
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
|
||||
self.assertEqual(len(cache_dir_files), 4)
|
||||
|
||||
# now rewrite them to the DB dir and check for modified paths
|
||||
chat_db.write_db()
|
||||
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||
self.assertEqual(len(db_dir_files), 4)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0001.txt'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0002.yaml'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0003.txt'), db_dir_files)
|
||||
self.assertIn(pathlib.Path(self.db_path.name, '0004.yaml'), db_dir_files)
|
||||
|
||||
# add a new message with empty file_path
|
||||
message_empty = Message(question=Question("What the hell am I doing here?"),
|
||||
answer=Answer("You don't belong here!"))
|
||||
# and one for the cache dir
|
||||
message_cache = Message(question=Question("What the hell am I doing here?"),
|
||||
answer=Answer("You're a creep!"),
|
||||
file_path=pathlib.Path(self.cache_path.name, '0005.txt'))
|
||||
chat_db.add_msgs([message_empty, message_cache])
|
||||
|
||||
# clear the cache and check the cache dir
|
||||
chat_db.clear_cache()
|
||||
cache_dir_files = list(pathlib.Path(self.cache_path.name).glob('*'))
|
||||
self.assertEqual(len(cache_dir_files), 0)
|
||||
# make sure that the DB messages (and the new message) are still there
|
||||
self.assertEqual(len(chat_db.messages), 5)
|
||||
db_dir_files = list(pathlib.Path(self.db_path.name).glob('*'))
|
||||
self.assertEqual(len(db_dir_files), 4)
|
||||
# but not the message with the cache dir path
|
||||
self.assertFalse(any(m.file_path == message_cache.file_path for m in chat_db.messages))
|
||||
|
||||
+10
-7
@@ -115,11 +115,12 @@ class TestHandleQuestion(CmmTestCase):
|
||||
self.question = "test question"
|
||||
self.args = argparse.Namespace(
|
||||
tags=['tag1'],
|
||||
extags=['extag1'],
|
||||
atags=None,
|
||||
etags=['etag1'],
|
||||
output_tags=None,
|
||||
question=[self.question],
|
||||
source=None,
|
||||
only_source_code=False,
|
||||
source_code_only=False,
|
||||
number=3,
|
||||
max_tokens=None,
|
||||
temperature=None,
|
||||
@@ -143,16 +144,18 @@ class TestHandleQuestion(CmmTestCase):
|
||||
with patch("chatmastermind.storage.open", open_mock):
|
||||
ask_cmd(self.args, self.config)
|
||||
mock_print_tag_args.assert_called_once_with(self.args.tags,
|
||||
self.args.extags,
|
||||
self.args.etags,
|
||||
[])
|
||||
mock_create_chat_hist.assert_called_once_with(self.question,
|
||||
self.args.tags,
|
||||
self.args.extags,
|
||||
self.args.etags,
|
||||
self.config,
|
||||
False, False, False)
|
||||
match_all_tags=False,
|
||||
with_tags=False,
|
||||
with_file=False)
|
||||
mock_print_chat_hist.assert_called_once_with('test_chat',
|
||||
False,
|
||||
self.args.only_source_code)
|
||||
self.args.source_code_only)
|
||||
mock_ai.assert_called_with("test_chat",
|
||||
self.config,
|
||||
self.args.number)
|
||||
@@ -227,7 +230,7 @@ class TestCreateParser(CmmTestCase):
|
||||
mock_add_subparsers.assert_called_once_with(dest='command', title='commands', description='supported commands', required=True)
|
||||
mock_cmdparser.add_parser.assert_any_call('ask', parents=ANY, help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('hist', parents=ANY, help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('tag', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('tags', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
|
||||
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
|
||||
self.assertTrue('.config.yaml' in parser.get_default('config'))
|
||||
|
||||
+23
-6
@@ -61,22 +61,39 @@ class SourceCodeTestCase(CmmTestCase):
|
||||
|
||||
|
||||
class QuestionTestCase(CmmTestCase):
|
||||
def test_question_with_prefix(self) -> None:
|
||||
def test_question_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question("=== QUESTION === What is your name?")
|
||||
Question(f"{Question.txt_header}\nWhat is your name?")
|
||||
|
||||
def test_question_without_prefix(self) -> None:
|
||||
def test_question_with_answer_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Question(f"{Answer.txt_header}\nBob")
|
||||
|
||||
def test_question_with_legal_header(self) -> None:
|
||||
"""
|
||||
If the header is just a part of a line, it's fine.
|
||||
"""
|
||||
question = Question(f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, f"This is a line contaning '{Question.txt_header}'\nWhat does that mean?")
|
||||
|
||||
def test_question_without_header(self) -> None:
|
||||
question = Question("What is your favorite color?")
|
||||
self.assertIsInstance(question, Question)
|
||||
self.assertEqual(question, "What is your favorite color?")
|
||||
|
||||
|
||||
class AnswerTestCase(CmmTestCase):
|
||||
def test_answer_with_prefix(self) -> None:
|
||||
def test_answer_with_header(self) -> None:
|
||||
with self.assertRaises(MessageError):
|
||||
Answer("=== ANSWER === Yes")
|
||||
Answer(f"{Answer.txt_header}\nno")
|
||||
|
||||
def test_answer_without_prefix(self) -> None:
|
||||
def test_answer_with_legal_header(self) -> None:
|
||||
answer = Answer(f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, f"This is a line contaning '{Answer.txt_header}'\nIt is what it is.")
|
||||
|
||||
def test_answer_without_header(self) -> None:
|
||||
answer = Answer("No")
|
||||
self.assertIsInstance(answer, Answer)
|
||||
self.assertEqual(answer, "No")
|
||||
|
||||
Reference in New Issue
Block a user