6 Commits

24 changed files with 734 additions and 967 deletions
+8 -18
View File
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from abc import abstractmethod
from typing import Protocol, Optional, Union
from .configuration import AIConfig
from .tags import Tag
from .message import Message
from .chat import Chat
@@ -33,26 +33,22 @@ class AI(Protocol):
The base class for AI clients.
"""
ID: str
name: str
config: AIConfig
@abstractmethod
def request(self,
question: Message,
chat: Chat,
num_answers: int = 1,
otags: Optional[set[Tag]] = None) -> AIResponse:
context: Chat,
num_answers: int = 1) -> AIResponse:
"""
Make an AI request. Parameters:
* question: the question to ask
* chat: the chat history to be added as context
* num_answers: nr. of requested answers (corresponds
to the nr. of messages in the 'AIResponse')
* otags: the output tags, i. e. the tags that all
returned messages should contain
Make an AI request, asking the given question with the given
context (i. e. chat history). The nr. of requested answers
corresponds to the nr. of messages in the 'AIResponse'.
"""
raise NotImplementedError
@abstractmethod
def models(self) -> list[str]:
"""
Return all models supported by this AI.
@@ -66,9 +62,3 @@ class AI(Protocol):
and is not implemented for all AIs.
"""
raise NotImplementedError
def print(self) -> None:
"""
Print some info about the current AI, like system message.
"""
pass
-43
View File
@@ -1,43 +0,0 @@
"""
Creates different AI instances, based on the given configuration.
"""
import argparse
from typing import cast
from .configuration import Config, AIConfig, OpenAIConfig
from .ai import AI, AIError
from .ais.openai import OpenAI
def create_ai(args: argparse.Namespace, config: Config) -> AI: # noqa: 11
"""
Creates an AI subclass instance from the given arguments
and configuration file. If AI has not been set in the
arguments, it searches for the ID 'default'. If that
is not found, it uses the first AI in the list.
"""
ai_conf: AIConfig
if args.AI:
try:
ai_conf = config.ais[args.AI]
except KeyError:
raise AIError(f"AI ID '{args.AI}' does not exist in this configuration")
elif 'default' in config.ais:
ai_conf = config.ais['default']
else:
try:
ai_conf = next(iter(config.ais.values()))
except StopIteration:
raise AIError("No AI found in this configuration")
if ai_conf.name == 'openai':
ai = OpenAI(cast(OpenAIConfig, ai_conf))
if args.model:
ai.config.model = args.model
if args.max_tokens:
ai.config.max_tokens = args.max_tokens
if args.temperature:
ai.config.temperature = args.temperature
return ai
else:
raise AIError(f"AI '{args.AI}' is not supported")
View File
+9 -25
View File
@@ -2,12 +2,12 @@
Implements the OpenAI client classes and functions.
"""
import openai
from typing import Optional, Union
from typing import Optional
from ..tags import Tag
from ..message import Message, Answer
from ..chat import Chat
from ..ai import AI, AIResponse, Tokens
from ..configuration import OpenAIConfig
from ..config import OpenAIConfig
ChatType = list[dict[str, str]]
@@ -17,11 +17,7 @@ class OpenAI(AI):
The OpenAI AI client.
"""
def __init__(self, config: OpenAIConfig) -> None:
self.ID = config.ID
self.name = config.name
self.config = config
openai.api_key = config.api_key
config: OpenAIConfig
def request(self,
question: Message,
@@ -43,20 +39,16 @@ class OpenAI(AI):
n=num_answers,
frequency_penalty=self.config.frequency_penalty,
presence_penalty=self.config.presence_penalty)
question.answer = Answer(response['choices'][0]['message']['content'])
question.tags = otags
question.ai = self.ID
question.model = self.config.model
answers: list[Message] = [question]
for choice in response['choices'][1:]: # type: ignore
answers: list[Message] = []
for choice in response['choices']: # type: ignore
answers.append(Message(question=question.question,
answer=Answer(choice['message']['content']),
tags=otags,
ai=self.ID,
ai=self.name,
model=self.config.model))
return AIResponse(answers, Tokens(response['usage']['prompt_tokens'],
response['usage']['completion_tokens'],
response['usage']['total_tokens']))
return AIResponse(answers, Tokens(response['usage']['prompt'],
response['usage']['completion'],
response['usage']['total']))
def models(self) -> list[str]:
"""
@@ -96,11 +88,3 @@ class OpenAI(AI):
if question:
append('user', question.question)
return oai_chat
def tokens(self, data: Union[Message, Chat]) -> int:
raise NotImplementedError
def print(self) -> None:
print(f"MODEL: {self.config.model}")
print("=== SYSTEM ===")
print(self.config.system)
+45
View File
@@ -0,0 +1,45 @@
import openai
from .utils import ChatType
from .configuration import Config
def openai_api_key(api_key: str) -> None:
openai.api_key = api_key
def print_models() -> None:
"""
Print all models supported by the current AI.
"""
not_ready = []
for engine in sorted(openai.Engine.list()['data'], key=lambda x: x['id']):
if engine['ready']:
print(engine['id'])
else:
not_ready.append(engine['id'])
if len(not_ready) > 0:
print('\nNot ready: ' + ', '.join(not_ready))
def ai(chat: ChatType,
config: Config,
number: int
) -> tuple[list[str], dict[str, int]]:
"""
Make AI request with the given chat history and configuration.
Return AI response and tokens used.
"""
response = openai.ChatCompletion.create(
model=config.openai.model,
messages=chat,
temperature=config.openai.temperature,
max_tokens=config.openai.max_tokens,
top_p=config.openai.top_p,
n=number,
frequency_penalty=config.openai.frequency_penalty,
presence_penalty=config.openai.presence_penalty)
result = []
for choice in response['choices']: # type: ignore
result.append(choice['message']['content'].strip())
return result, dict(response['usage']) # type: ignore
+13 -60
View File
@@ -2,7 +2,7 @@
Module implementing various chat classes and functions for managing a chat history.
"""
import shutil
from pathlib import Path
import pathlib
from pprint import PrettyPrinter
from pydoc import pager
from dataclasses import dataclass
@@ -30,7 +30,7 @@ def print_paged(text: str) -> None:
pager(text)
def read_dir(dir_path: Path,
def read_dir(dir_path: pathlib.Path,
glob: Optional[str] = None,
mfilter: Optional[MessageFilter] = None) -> list[Message]:
"""
@@ -55,20 +55,17 @@ def read_dir(dir_path: Path,
return messages
def make_file_path(dir_path: Path,
def make_file_path(dir_path: pathlib.Path,
file_suffix: str,
next_fid: Callable[[], int]) -> Path:
next_fid: Callable[[], int]) -> pathlib.Path:
"""
Create a file_path for the given directory using the
given file_suffix and ID generator function.
"""
file_path = dir_path / f"{next_fid():04d}{file_suffix}"
while file_path.exists():
file_path = dir_path / f"{next_fid():04d}{file_suffix}"
return file_path
return dir_path / f"{next_fid():04d}{file_suffix}"
def write_dir(dir_path: Path,
def write_dir(dir_path: pathlib.Path,
messages: list[Message],
file_suffix: str,
next_fid: Callable[[], int]) -> None:
@@ -93,7 +90,7 @@ def write_dir(dir_path: Path,
message.to_file(file_path)
def clear_dir(dir_path: Path,
def clear_dir(dir_path: pathlib.Path,
glob: Optional[str] = None) -> None:
"""
Deletes all Message files in the given directory.
@@ -142,34 +139,6 @@ class Chat:
self.messages += messages
self.sort()
def latest_message(self) -> Optional[Message]:
"""
Returns the last added message (according to the file ID).
"""
if len(self.messages) > 0:
self.sort()
return self.messages[-1]
else:
return None
def find_messages(self, msg_names: list[str]) -> list[Message]:
"""
Search and return the messages with the given names. Names can either be filenames
(incl. suffixes) or full paths. Messages that can't be found are ignored (i. e. the
caller should check the result if he requires all messages).
"""
return [m for m in self.messages
if any((m.file_path and (m.file_path == Path(mn) or m.file_path.name == mn)) for mn in msg_names)]
def remove_messages(self, msg_names: list[str]) -> None:
"""
Remove the messages with the given names. Names can either be filenames
(incl. the suffix) or full paths.
"""
self.messages = [m for m in self.messages
if not any((m.file_path and (m.file_path == Path(mn) or m.file_path.name == mn)) for mn in msg_names)]
self.sort()
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Get the tags of all messages, optionally filtered by prefix or substring.
@@ -223,8 +192,8 @@ class ChatDB(Chat):
default_file_suffix: ClassVar[str] = '.txt'
cache_path: Path
db_path: Path
cache_path: pathlib.Path
db_path: pathlib.Path
# a MessageFilter that all messages must match (if given)
mfilter: Optional[MessageFilter] = None
file_suffix: str = default_file_suffix
@@ -240,8 +209,8 @@ class ChatDB(Chat):
@classmethod
def from_dir(cls: Type[ChatDBInst],
cache_path: Path,
db_path: Path,
cache_path: pathlib.Path,
db_path: pathlib.Path,
glob: Optional[str] = None,
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
"""
@@ -261,8 +230,8 @@ class ChatDB(Chat):
@classmethod
def from_messages(cls: Type[ChatDBInst],
cache_path: Path,
db_path: Path,
cache_path: pathlib.Path,
db_path: pathlib.Path,
messages: list[Message],
mfilter: Optional[MessageFilter] = None) -> ChatDBInst:
"""
@@ -389,19 +358,3 @@ class ChatDB(Chat):
msgs = iter(messages if messages else self.messages)
while (m := next(msgs, None)):
m.to_file()
def update_messages(self, messages: list[Message], write: bool = True) -> None:
"""
Update existing messages. A message is determined as 'existing' if a message with
the same base filename (i. e. 'file_path.name') is already in the list. Only accepts
existing messages.
"""
if any(not message_in(m, self.messages) for m in messages):
raise ChatError("Can't update messages that are not in the internal list")
# remove old versions and add new ones
self.messages = [m for m in self.messages if not message_in(m, messages)]
self.messages += messages
self.sort()
# write the UPDATED messages if requested
if write:
self.write_messages(messages)
-11
View File
@@ -1,11 +0,0 @@
import argparse
from pathlib import Path
from ..configuration import Config
def config_cmd(args: argparse.Namespace) -> None:
"""
Handler for the 'config' command.
"""
if args.create:
Config.create_default(Path(args.create))
-23
View File
@@ -1,23 +0,0 @@
import argparse
from pathlib import Path
from ..configuration import Config
from ..chat import ChatDB
from ..message import MessageFilter
def hist_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'hist' command.
"""
mfilter = MessageFilter(tags_or=args.or_tags,
tags_and=args.and_tags,
tags_not=args.exclude_tags,
question_contains=args.question,
answer_contains=args.answer)
chat = ChatDB.from_dir(Path('.'),
Path(config.db),
mfilter=mfilter)
chat.print(args.source_code_only,
args.with_tags,
args.with_files)
-27
View File
@@ -1,27 +0,0 @@
import sys
import argparse
from pathlib import Path
from ..configuration import Config
from ..message import Message, MessageError
def print_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'print' command.
"""
fname = Path(args.file)
try:
message = Message.from_file(fname)
if message:
if args.question:
print(message.question)
elif args.answer:
print(message.answer)
elif message.answer and args.only_source_code:
for code in message.answer.source_code():
print(code)
else:
print(message.to_str())
except MessageError:
print(f"File is not a valid message: {args.file}")
sys.exit(1)
-94
View File
@@ -1,94 +0,0 @@
import argparse
from pathlib import Path
from itertools import zip_longest
from ..configuration import Config
from ..chat import ChatDB
from ..message import Message, MessageFilter, Question, source_code
from ..ai_factory import create_ai
from ..ai import AI, AIResponse
def create_message(chat: ChatDB, args: argparse.Namespace) -> Message:
"""
Creates (and writes) a new message from the given arguments.
"""
question_parts = []
question_list = args.ask if args.ask is not None else []
text_files = args.source_text if args.source_text is not None else []
code_files = args.source_code if args.source_code is not None else []
for question, source, code in zip_longest(question_list, text_files, code_files, fillvalue=None):
if question is not None and len(question.strip()) > 0:
question_parts.append(question)
if source is not None and len(source) > 0:
with open(source) as r:
content = r.read().strip()
if len(content) > 0:
question_parts.append(content)
if code is not None and len(code) > 0:
with open(code) as r:
content = r.read().strip()
if len(content) == 0:
continue
# try to extract and add source code
code_parts = source_code(content, include_delims=True)
if len(code_parts) > 0:
question_parts += code_parts
# if there's none, add the whole file
else:
question_parts.append(f"```\n{content}\n```")
full_question = '\n\n'.join(question_parts)
message = Message(question=Question(full_question),
tags=args.output_tags, # FIXME
ai=args.AI,
model=args.model)
chat.add_to_cache([message])
return message
def question_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'question' command.
"""
mfilter = MessageFilter(tags_or=args.or_tags if args.or_tags is not None else set(),
tags_and=args.and_tags if args.and_tags is not None else set(),
tags_not=args.exclude_tags if args.exclude_tags is not None else set())
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db),
mfilter=mfilter)
# if it's a new question, create and store it immediately
if args.ask or args.create:
message = create_message(chat, args)
if args.create:
return
# create the correct AI instance
ai: AI = create_ai(args, config)
if args.ask:
ai.print()
chat.print(paged=False)
response: AIResponse = ai.request(message,
chat,
args.num_answers, # FIXME
args.output_tags) # FIXME
chat.update_messages([response.messages[0]])
chat.add_to_cache(response.messages[1:])
for idx, msg in enumerate(response.messages):
print(f"=== ANSWER {idx+1} ===")
print(msg.answer)
if response.tokens:
print("===============")
print(response.tokens)
elif args.repeat is not None:
lmessage = chat.latest_message()
assert lmessage
# TODO: repeat either the last question or the
# one(s) given in 'args.repeat' (overwrite
# existing ones if 'args.overwrite' is True)
pass
elif args.process is not None:
# TODO: process either all questions without an
# answer or the one(s) given in 'args.process'
pass
-17
View File
@@ -1,17 +0,0 @@
import argparse
from pathlib import Path
from ..configuration import Config
from ..chat import ChatDB
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'tags' command.
"""
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
if args.list:
tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
# TODO: add renaming
+23 -111
View File
@@ -1,51 +1,17 @@
import yaml
from pathlib import Path
from typing import Type, TypeVar, Any, Optional, ClassVar
from dataclasses import dataclass, asdict, field
from typing import Type, TypeVar, Any
from dataclasses import dataclass, asdict
ConfigInst = TypeVar('ConfigInst', bound='Config')
AIConfigInst = TypeVar('AIConfigInst', bound='AIConfig')
OpenAIConfigInst = TypeVar('OpenAIConfigInst', bound='OpenAIConfig')
supported_ais: list[str] = ['openai']
default_config_path = '.config.yaml'
class ConfigError(Exception):
pass
def str_presenter(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
"""
Changes the YAML dump style to multiline syntax for multiline strings.
"""
if len(data.splitlines()) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
@dataclass
class AIConfig:
"""
The base class of all AI configurations.
"""
# the name of the AI the config class represents
# -> it's a class variable and thus not part of the
# dataclass constructor
name: ClassVar[str]
# a user-defined ID for an AI configuration entry
ID: str
# the name must not be changed
def __setattr__(self, name: str, value: Any) -> None:
if name == 'name':
raise AttributeError("'{name}' is not allowed to be changed")
else:
super().__setattr__(name, value)
name: str
@dataclass
@@ -53,59 +19,29 @@ class OpenAIConfig(AIConfig):
"""
The OpenAI section of the configuration file.
"""
name: ClassVar[str] = 'openai'
# all members have default values, so we can easily create
# a default configuration
ID: str = 'myopenai'
api_key: str = '0123456789'
model: str = 'gpt-3.5-turbo-16k'
temperature: float = 1.0
max_tokens: int = 4000
top_p: float = 1.0
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
system: str = 'You are an assistant'
api_key: str
model: str
temperature: float
max_tokens: int
top_p: float
frequency_penalty: float
presence_penalty: float
@classmethod
def from_dict(cls: Type[OpenAIConfigInst], source: dict[str, Any]) -> OpenAIConfigInst:
"""
Create OpenAIConfig from a dict.
"""
res = cls(
return cls(
name='OpenAI',
api_key=str(source['api_key']),
model=str(source['model']),
max_tokens=int(source['max_tokens']),
temperature=float(source['temperature']),
top_p=float(source['top_p']),
frequency_penalty=float(source['frequency_penalty']),
presence_penalty=float(source['presence_penalty']),
system=str(source['system'])
presence_penalty=float(source['presence_penalty'])
)
# overwrite default ID if provided
if 'ID' in source:
res.ID = source['ID']
return res
def ai_config_instance(name: str, conf_dict: Optional[dict[str, Any]] = None) -> AIConfig:
"""
Creates an AIConfig instance of the given name.
"""
if name.lower() == 'openai':
if conf_dict is None:
return OpenAIConfig()
else:
return OpenAIConfig.from_dict(conf_dict)
else:
raise ConfigError(f"Unknown AI '{name}'")
def create_default_ai_configs() -> dict[str, AIConfig]:
"""
Create a dict containing default configurations for all supported AIs.
"""
return {ai_config_instance(name).ID: ai_config_instance(name) for name in supported_ais}
@dataclass
@@ -113,54 +49,30 @@ class Config:
"""
The configuration file structure.
"""
# all members have default values, so we can easily create
# a default configuration
db: str = './db/'
ais: dict[str, AIConfig] = field(default_factory=create_default_ai_configs)
system: str
db: str
openai: OpenAIConfig
@classmethod
def from_dict(cls: Type[ConfigInst], source: dict[str, Any]) -> ConfigInst:
"""
Create Config from a dict (with the same format as the config file).
Create Config from a dict.
"""
# create the correct AI type instances
ais: dict[str, AIConfig] = {}
for ID, conf in source['ais'].items():
# add the AI ID to the config (for easy internal access)
conf['ID'] = ID
ai_conf = ai_config_instance(conf['name'], conf)
ais[ID] = ai_conf
return cls(
system=str(source['system']),
db=str(source['db']),
ais=ais
openai=OpenAIConfig.from_dict(source['openai'])
)
@classmethod
def create_default(self, file_path: Path) -> None:
"""
Creates a default Config in the given file.
"""
conf = Config()
conf.to_file(file_path)
@classmethod
def from_file(cls: Type[ConfigInst], path: str) -> ConfigInst:
with open(path, 'r') as f:
source = yaml.load(f, Loader=yaml.FullLoader)
return cls.from_dict(source)
def to_file(self, file_path: Path) -> None:
# remove the AI name from the config (for a cleaner format)
data = self.as_dict()
for conf in data['ais'].values():
del (conf['ID'])
with open(file_path, 'w') as f:
yaml.dump(data, f, sort_keys=False)
def to_file(self, path: str) -> None:
with open(path, 'w') as f:
yaml.dump(asdict(self), f, sort_keys=False)
def as_dict(self) -> dict[str, Any]:
res = asdict(self)
# add the AI name manually (as first element)
# (not done by 'asdict' because it's a class variable)
for ID, conf in res['ais'].items():
res['ais'][ID] = {**{'name': self.ais[ID].name}, **conf}
return res
return asdict(self)
+159 -48
View File
@@ -2,18 +2,21 @@
# -*- coding: utf-8 -*-
# vim: set fileencoding=utf-8 :
import yaml
import sys
import argcomplete
import argparse
from pathlib import Path
from .utils import terminal_width, print_tag_args, print_chat_hist, display_source_code, ChatType
from .storage import save_answers, create_chat_hist, read_file, dump_data
from .api_client import ai, openai_api_key, print_models
from .configuration import Config
from .chat import ChatDB
from .message import Message, MessageFilter
from itertools import zip_longest
from typing import Any
from .configuration import Config, default_config_path
from .message import Message
from .commands.question import question_cmd
from .commands.tags import tags_cmd
from .commands.config import config_cmd
from .commands.hist import hist_cmd
from .commands.print import print_cmd
default_config = '.config.yaml'
def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
@@ -21,10 +24,128 @@ def tags_completer(prefix: str, parsed_args: Any, **kwargs: Any) -> list[str]:
return list(Message.tags_from_dir(Path(config.db), prefix=prefix))
def create_question_with_hist(args: argparse.Namespace,
config: Config,
) -> tuple[ChatType, str, list[str]]:
"""
Creates the "AI request", including the question and chat history as determined
by the specified tags.
"""
tags = args.tags or []
etags = args.etags or []
otags = args.output_tags or []
if not args.source_code_only:
print_tag_args(tags, etags, otags)
question_parts = []
question_list = args.question if args.question is not None else []
source_list = args.source if args.source is not None else []
for question, source in zip_longest(question_list, source_list, fillvalue=None):
if question is not None and source is not None:
with open(source) as r:
question_parts.append(f"{question}\n\n```\n{r.read().strip()}\n```")
elif question is not None:
question_parts.append(question)
elif source is not None:
with open(source) as r:
question_parts.append(f"```\n{r.read().strip()}\n```")
full_question = '\n\n'.join(question_parts)
chat = create_chat_hist(full_question, tags, etags, config,
match_all_tags=True if args.atags else False, # FIXME
with_tags=False,
with_file=False)
return chat, full_question, tags
def tags_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'tags' command.
"""
chat = ChatDB.from_dir(cache_path=Path('.'),
db_path=Path(config.db))
if args.list:
tags_freq = chat.tags_frequency(args.prefix, args.contain)
for tag, freq in tags_freq.items():
print(f"- {tag}: {freq}")
# TODO: add renaming
def config_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'config' command.
"""
if args.list_models:
print_models()
elif args.print_model:
print(config.openai.model)
elif args.model:
config.openai.model = args.model
config.to_file(args.config)
def ask_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'ask' command.
"""
if args.max_tokens:
config.openai.max_tokens = args.max_tokens
if args.temperature:
config.openai.temperature = args.temperature
if args.model:
config.openai.model = args.model
chat, question, tags = create_question_with_hist(args, config)
print_chat_hist(chat, False, args.source_code_only)
otags = args.output_tags or []
answers, usage = ai(chat, config, args.number)
save_answers(question, answers, tags, otags, config)
print("-" * terminal_width())
print(f"Usage: {usage}")
def hist_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'hist' command.
"""
mfilter = MessageFilter(tags_or=args.tags,
tags_and=args.atags,
tags_not=args.etags,
question_contains=args.question,
answer_contains=args.answer)
chat = ChatDB.from_dir(Path('.'),
Path(config.db),
mfilter=mfilter)
chat.print(args.source_code_only,
args.with_tags,
args.with_files)
def print_cmd(args: argparse.Namespace, config: Config) -> None:
"""
Handler for the 'print' command.
"""
fname = Path(args.file)
if fname.suffix == '.yaml':
with open(args.file, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
elif fname.suffix == '.txt':
data = read_file(fname)
else:
print(f"Unknown file type: {args.file}")
sys.exit(1)
if args.source_code_only:
display_source_code(data['answer'])
else:
print(dump_data(data).strip())
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="ChatMastermind is a Python application that automates conversation with AI")
parser.add_argument('-C', '--config', help='Config file name.', default=default_config_path)
parser.add_argument('-c', '--config', help='Config file name.', default=default_config)
# subcommand-parser
cmdparser = parser.add_subparsers(dest='command',
@@ -34,41 +155,34 @@ def create_parser() -> argparse.ArgumentParser:
# a parent parser for all commands that support tag selection
tag_parser = argparse.ArgumentParser(add_help=False)
tag_arg = tag_parser.add_argument('-t', '--or-tags', nargs='+',
help='List of tags (one must match)', metavar='OTAGS')
tag_arg = tag_parser.add_argument('-t', '--tags', nargs='+',
help='List of tag names (one must match)', metavar='TAGS')
tag_arg.completer = tags_completer # type: ignore
atag_arg = tag_parser.add_argument('-k', '--and-tags', nargs='+',
help='List of tags (all must match)', metavar='ATAGS')
atag_arg = tag_parser.add_argument('-a', '--atags', nargs='+',
help='List of tag names (all must match)', metavar='TAGS')
atag_arg.completer = tags_completer # type: ignore
etag_arg = tag_parser.add_argument('-x', '--exclude-tags', nargs='+',
help='List of tags to exclude', metavar='XTAGS')
etag_arg = tag_parser.add_argument('-e', '--etags', nargs='+',
help='List of tag names to exclude', metavar='ETAGS')
etag_arg.completer = tags_completer # type: ignore
otag_arg = tag_parser.add_argument('-o', '--output-tags', nargs='+',
help='List of output tags (default: use input tags)', metavar='OUTTAGS')
help='List of output tag names, default is input', metavar='OTAGS')
otag_arg.completer = tags_completer # type: ignore
# a parent parser for all commands that support AI configuration
ai_parser = argparse.ArgumentParser(add_help=False)
ai_parser.add_argument('-A', '--AI', help='AI ID to use')
ai_parser.add_argument('-M', '--model', help='Model to use')
ai_parser.add_argument('-n', '--num-answers', help='Number of answers to request', type=int, default=1)
ai_parser.add_argument('-m', '--max-tokens', help='Max. nr. of tokens', type=int)
ai_parser.add_argument('-T', '--temperature', help='Temperature value', type=float)
# 'question' command parser
question_cmd_parser = cmdparser.add_parser('question', parents=[tag_parser, ai_parser],
help="ask, create and process questions.",
aliases=['q'])
question_cmd_parser.set_defaults(func=question_cmd)
question_group = question_cmd_parser.add_mutually_exclusive_group(required=True)
question_group.add_argument('-a', '--ask', nargs='+', help='Ask a question')
question_group.add_argument('-c', '--create', nargs='+', help='Create a question')
question_group.add_argument('-r', '--repeat', nargs='*', help='Repeat a question')
question_group.add_argument('-p', '--process', nargs='*', help='Process existing questions')
question_cmd_parser.add_argument('-O', '--overwrite', help='Overwrite existing messages when repeating them',
action='store_true')
question_cmd_parser.add_argument('-s', '--source-text', nargs='+', help='Add content of a file to the query')
question_cmd_parser.add_argument('-S', '--source-code', nargs='+', help='Add source code file content to the chat history')
# 'ask' command parser
ask_cmd_parser = cmdparser.add_parser('ask', parents=[tag_parser],
help="Ask a question.",
aliases=['a'])
ask_cmd_parser.set_defaults(func=ask_cmd)
ask_cmd_parser.add_argument('-q', '--question', nargs='+', help='Question to ask',
required=True)
ask_cmd_parser.add_argument('-m', '--max-tokens', help='Max tokens to use', type=int)
ask_cmd_parser.add_argument('-T', '--temperature', help='Temperature to use', type=float)
ask_cmd_parser.add_argument('-M', '--model', help='Model to use')
ask_cmd_parser.add_argument('-n', '--number', help='Number of answers to produce', type=int,
default=1)
ask_cmd_parser.add_argument('-s', '--source', nargs='+', help='Source add content of a file to the query')
ask_cmd_parser.add_argument('-S', '--source-code-only', help='Add pure source code to the chat history',
action='store_true')
# 'hist' command parser
hist_cmd_parser = cmdparser.add_parser('hist', parents=[tag_parser],
@@ -105,18 +219,16 @@ def create_parser() -> argparse.ArgumentParser:
action='store_true')
config_group.add_argument('-m', '--print-model', help="Print the currently configured model",
action='store_true')
config_group.add_argument('-c', '--create', help="Create config with default settings in the given file")
config_group.add_argument('-M', '--model', help="Set model in the config file")
# 'print' command parser
print_cmd_parser = cmdparser.add_parser('print',
help="Print message files.",
help="Print files.",
aliases=['p'])
print_cmd_parser.set_defaults(func=print_cmd)
print_cmd_parser.add_argument('-f', '--file', help='File to print', required=True)
print_cmd_modes = print_cmd_parser.add_mutually_exclusive_group()
print_cmd_modes.add_argument('-q', '--question', help='Print only question', action='store_true')
print_cmd_modes.add_argument('-a', '--answer', help='Print only answer', action='store_true')
print_cmd_modes.add_argument('-S', '--only-source-code', help='Print only source code', action='store_true')
print_cmd_parser.add_argument('-S', '--source-code-only', help='Print only source code',
action='store_true')
argcomplete.autocomplete(parser)
return parser
@@ -126,12 +238,11 @@ def main() -> int:
parser = create_parser()
args = parser.parse_args()
command = parser.parse_args()
config = Config.from_file(args.config)
if command.func == config_cmd:
command.func(command)
else:
config = Config.from_file(args.config)
command.func(command, config)
openai_api_key(config.openai.api_key)
command.func(command, config)
return 0
+13 -19
View File
@@ -3,8 +3,6 @@ Module implementing message related functions and classes.
"""
import pathlib
import yaml
import tempfile
import shutil
from typing import Type, TypeVar, ClassVar, Optional, Any, Union, Final, Literal, Iterable
from dataclasses import dataclass, asdict, field
from .tags import Tag, TagLine, TagError, match_tags, rename_tags
@@ -314,7 +312,7 @@ class Message():
mfilter.tags_not if mfilter else None)
else:
message = cls.__from_file_yaml(file_path)
if message and (mfilter is None or message.match(mfilter)):
if message and (not mfilter or (mfilter and message.match(mfilter))):
return message
else:
return None
@@ -400,10 +398,10 @@ class Message():
"""
output: list[str] = []
if source_code_only:
# use the source code from answer only
output.extend(self.question.source_code(include_delims=True))
if self.answer:
output.extend(self.answer.source_code(include_delims=True))
return '\n'.join(output) if len(output) > 0 else ''
return '\n'.join(output)
if with_tags:
output.append(self.tags_str())
if with_file:
@@ -416,7 +414,7 @@ class Message():
return '\n'.join(output)
def __str__(self) -> str:
return self.to_str(True, True, False)
return self.to_str(False, False, False)
def to_file(self, file_path: Optional[pathlib.Path]=None) -> None: # noqa: 11
"""
@@ -447,18 +445,16 @@ class Message():
* Answer.txt_header
* Answer
"""
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
temp_file_path = pathlib.Path(temp_fd.name)
with open(file_path, "w") as fd:
if self.tags:
temp_fd.write(f'{TagLine.from_set(self.tags)}\n')
fd.write(f'{TagLine.from_set(self.tags)}\n')
if self.ai:
temp_fd.write(f'{AILine.from_ai(self.ai)}\n')
fd.write(f'{AILine.from_ai(self.ai)}\n')
if self.model:
temp_fd.write(f'{ModelLine.from_model(self.model)}\n')
temp_fd.write(f'{Question.txt_header}\n{self.question}\n')
fd.write(f'{ModelLine.from_model(self.model)}\n')
fd.write(f'{Question.txt_header}\n{self.question}\n')
if self.answer:
temp_fd.write(f'{Answer.txt_header}\n{self.answer}\n')
shutil.move(temp_file_path, file_path)
fd.write(f'{Answer.txt_header}\n{self.answer}\n')
def __to_file_yaml(self, file_path: pathlib.Path) -> None:
"""
@@ -470,8 +466,7 @@ class Message():
* Message.ai_yaml_key: str [Optional]
* Message.model_yaml_key: str [Optional]
"""
with tempfile.NamedTemporaryFile(dir=file_path.parent, prefix=file_path.name, mode="w", delete=False) as temp_fd:
temp_file_path = pathlib.Path(temp_fd.name)
with open(file_path, "w") as fd:
data: YamlDict = {Question.yaml_key: str(self.question)}
if self.answer:
data[Answer.yaml_key] = str(self.answer)
@@ -481,8 +476,7 @@ class Message():
data[self.model_yaml_key] = self.model
if self.tags:
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, temp_fd, sort_keys=False)
shutil.move(temp_file_path, file_path)
yaml.dump(data, fd, sort_keys=False)
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
@@ -514,7 +508,7 @@ class Message():
Return True if all attributes match, else False.
"""
mytags = self.tags or set()
if (((mfilter.tags_or is not None or mfilter.tags_and is not None or mfilter.tags_not is not None)
if (((mfilter.tags_or or mfilter.tags_and or mfilter.tags_not)
and not match_tags(mytags, mfilter.tags_or, mfilter.tags_and, mfilter.tags_not)) # noqa: W503
or (mfilter.ai and (not self.ai or mfilter.ai != self.ai)) # noqa: W503
or (mfilter.model and (not self.model or mfilter.model != self.model)) # noqa: W503
+121
View File
@@ -0,0 +1,121 @@
import yaml
import io
import pathlib
from .utils import terminal_width, append_message, message_to_chat, ChatType
from .configuration import Config
from typing import Any, Optional
def read_file(fname: pathlib.Path, tags_only: bool = False) -> dict[str, Any]:
with open(fname, "r") as fd:
tagline = fd.readline().strip().split(':', maxsplit=1)[1].strip()
# also support tags separated by ',' (old format)
separator = ',' if ',' in tagline else ' '
tags = [t.strip() for t in tagline.split(separator)]
if tags_only:
return {"tags": tags}
text = fd.read().strip().split('\n')
question_idx = text.index("=== QUESTION ===") + 1
answer_idx = text.index("==== ANSWER ====")
question = "\n".join(text[question_idx:answer_idx]).strip()
answer = "\n".join(text[answer_idx + 1:]).strip()
return {"question": question, "answer": answer, "tags": tags,
"file": fname.name}
def dump_data(data: dict[str, Any]) -> str:
with io.StringIO() as fd:
fd.write(f'TAGS: {" ".join(data["tags"])}\n')
fd.write(f'=== QUESTION ===\n{data["question"]}\n')
fd.write(f'==== ANSWER ====\n{data["answer"]}\n')
return fd.getvalue()
def write_file(fname: str, data: dict[str, Any]) -> None:
with open(fname, "w") as fd:
fd.write(f'TAGS: {" ".join(data["tags"])}\n')
fd.write(f'=== QUESTION ===\n{data["question"]}\n')
fd.write(f'==== ANSWER ====\n{data["answer"]}\n')
def save_answers(question: str,
answers: list[str],
tags: list[str],
otags: Optional[list[str]],
config: Config
) -> None:
wtags = otags or tags
num, inum = 0, 0
next_fname = pathlib.Path(str(config.db)) / '.next'
try:
with open(next_fname, 'r') as f:
num = int(f.read())
except Exception:
pass
for answer in answers:
num += 1
inum += 1
title = f'-- ANSWER {inum} '
title_end = '-' * (terminal_width() - len(title))
print(f'{title}{title_end}')
print(answer)
write_file(f"{num:04d}.txt", {"question": question, "answer": answer, "tags": wtags})
with open(next_fname, 'w') as f:
f.write(f'{num}')
def create_chat_hist(question: Optional[str],
tags: Optional[list[str]],
extags: Optional[list[str]],
config: Config,
match_all_tags: bool = False,
with_tags: bool = False,
with_file: bool = False
) -> ChatType:
chat: ChatType = []
append_message(chat, 'system', str(config.system).strip())
for file in sorted(pathlib.Path(str(config.db)).iterdir()):
if file.suffix == '.yaml':
with open(file, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
data['file'] = file.name
elif file.suffix == '.txt':
data = read_file(file)
else:
continue
data_tags = set(data.get('tags', []))
tags_match: bool
if match_all_tags:
tags_match = not tags or set(tags).issubset(data_tags)
else:
tags_match = not tags or bool(data_tags.intersection(tags))
extags_do_not_match = \
not extags or not data_tags.intersection(extags)
if tags_match and extags_do_not_match:
message_to_chat(data, chat, with_tags, with_file)
if question:
append_message(chat, 'user', question)
return chat
def get_tags(config: Config, prefix: Optional[str]) -> list[str]:
result = []
for file in sorted(pathlib.Path(str(config.db)).iterdir()):
if file.suffix == '.yaml':
with open(file, 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
elif file.suffix == '.txt':
data = read_file(file, tags_only=True)
else:
continue
for tag in data.get('tags', []):
if prefix and len(prefix) > 0:
if tag.startswith(prefix):
result.append(tag)
else:
result.append(tag)
return result
def get_tags_unique(config: Config, prefix: Optional[str]) -> list[str]:
return list(set(get_tags(config, prefix)))
+80
View File
@@ -0,0 +1,80 @@
import shutil
from pprint import PrettyPrinter
from typing import Any
ChatType = list[dict[str, str]]
def terminal_width() -> int:
return shutil.get_terminal_size().columns
def pp(*args: Any, **kwargs: Any) -> None:
return PrettyPrinter(width=terminal_width()).pprint(*args, **kwargs)
def print_tag_args(tags: list[str], extags: list[str], otags: list[str]) -> None:
"""
Prints the tags specified in the given args.
"""
printed_messages = []
if tags:
printed_messages.append(f"Tags: {' '.join(tags)}")
if extags:
printed_messages.append(f"Excluding tags: {' '.join(extags)}")
if otags:
printed_messages.append(f"Output tags: {' '.join(otags)}")
if printed_messages:
print("\n".join(printed_messages))
print()
def append_message(chat: ChatType,
role: str,
content: str
) -> None:
chat.append({'role': role, 'content': content.replace("''", "'")})
def message_to_chat(message: dict[str, str],
chat: ChatType,
with_tags: bool = False,
with_file: bool = False
) -> None:
append_message(chat, 'user', message['question'])
append_message(chat, 'assistant', message['answer'])
if with_tags:
tags = " ".join(message['tags'])
append_message(chat, 'tags', tags)
if with_file:
append_message(chat, 'file', message['file'])
def display_source_code(content: str) -> None:
try:
content_start = content.index('```')
content_end = content.rindex('```')
if content_start + 3 < content_end:
print(content[content_start + 3:content_end].strip())
except ValueError:
pass
def print_chat_hist(chat: ChatType, dump: bool = False, source_code: bool = False) -> None:
if dump:
pp(chat)
return
for message in chat:
text_too_long = len(message['content']) > terminal_width() - len(message['role']) - 2
if source_code:
display_source_code(message['content'])
continue
if message['role'] == 'user':
print('-' * terminal_width())
if text_too_long:
print(f"{message['role'].upper()}:")
print(message['content'])
else:
print(f"{message['role'].upper()}: {message['content']}")
+2 -2
View File
@@ -12,7 +12,7 @@ setup(
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/ok2/ChatMastermind",
packages=find_packages() + ["chatmastermind.ais", "chatmastermind.commands"],
packages=find_packages(),
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
@@ -32,7 +32,7 @@ setup(
"openai",
"PyYAML",
"argcomplete",
"pytest",
"pytest"
],
python_requires=">=3.9",
test_suite="tests",
-48
View File
@@ -1,48 +0,0 @@
import argparse
import unittest
from unittest.mock import MagicMock
from chatmastermind.ai_factory import create_ai
from chatmastermind.configuration import Config
from chatmastermind.ai import AIError
from chatmastermind.ais.openai import OpenAI
class TestCreateAI(unittest.TestCase):
def setUp(self) -> None:
self.args = MagicMock(spec=argparse.Namespace)
self.args.AI = 'myopenai'
self.args.model = None
self.args.max_tokens = None
self.args.temperature = None
def test_create_ai_from_args(self) -> None:
# Create an AI with the default configuration
config = Config()
self.args.AI = 'myopenai'
ai = create_ai(self.args, config)
self.assertIsInstance(ai, OpenAI)
def test_create_ai_from_default(self) -> None:
self.args.AI = None
# Create an AI with the default configuration
config = Config()
ai = create_ai(self.args, config)
self.assertIsInstance(ai, OpenAI)
def test_create_empty_ai_error(self) -> None:
self.args.AI = None
# Create Config with empty AIs
config = Config()
config.ais = {}
# Call create_ai function and assert that it raises AIError
with self.assertRaises(AIError):
create_ai(self.args, config)
def test_create_unsupported_ai_error(self) -> None:
# Mock argparse.Namespace with ai='invalid_ai'
self.args.AI = 'invalid_ai'
# Create default Config
config = Config()
# Call create_ai function and assert that it raises AIError
with self.assertRaises(AIError):
create_ai(self.args, config)
+5 -73
View File
@@ -1,4 +1,3 @@
import unittest
import pathlib
import tempfile
import time
@@ -7,9 +6,10 @@ from unittest.mock import patch
from chatmastermind.tags import TagLine
from chatmastermind.message import Message, Question, Answer, Tag, MessageFilter
from chatmastermind.chat import Chat, ChatDB, terminal_width, ChatError
from .test_main import CmmTestCase
class TestChat(unittest.TestCase):
class TestChat(CmmTestCase):
def setUp(self) -> None:
self.chat = Chat([])
self.message1 = Message(Question('Question 1'),
@@ -62,28 +62,6 @@ class TestChat(unittest.TestCase):
tags_freq = self.chat.tags_frequency()
self.assertDictEqual(tags_freq, {'atag1': 1, 'btag2': 2})
def test_find_remove_messages(self) -> None:
self.chat.add_messages([self.message1, self.message2])
msgs = self.chat.find_messages(['0001.txt'])
self.assertListEqual(msgs, [self.message1])
msgs = self.chat.find_messages(['0001.txt', '0002.txt'])
self.assertListEqual(msgs, [self.message1, self.message2])
# add new Message with full path
message3 = Message(Question('Question 2'),
Answer('Answer 2'),
{Tag('btag2')},
file_path=pathlib.Path('/foo/bla/0003.txt'))
self.chat.add_messages([message3])
# find new Message by full path
msgs = self.chat.find_messages(['/foo/bla/0003.txt'])
self.assertListEqual(msgs, [message3])
# find Message with full path only by filename
msgs = self.chat.find_messages(['0003.txt'])
self.assertListEqual(msgs, [message3])
# remove last message
self.chat.remove_messages(['0003.txt'])
self.assertListEqual(self.chat.messages, [self.message1, self.message2])
@patch('sys.stdout', new_callable=StringIO)
def test_print(self, mock_stdout: StringIO) -> None:
self.chat.add_messages([self.message1, self.message2])
@@ -131,7 +109,7 @@ Answer 2
self.assertEqual(mock_stdout.getvalue(), expected_output)
class TestChatDB(unittest.TestCase):
class TestChatDB(CmmTestCase):
def setUp(self) -> None:
self.db_path = tempfile.TemporaryDirectory()
self.cache_path = tempfile.TemporaryDirectory()
@@ -202,25 +180,7 @@ class TestChatDB(unittest.TestCase):
self.assertEqual(chat_db.messages[1].file_path,
pathlib.Path(self.db_path.name, '0003.txt'))
def test_chat_db_from_dir_filter_tags(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(tags_or={Tag('tag1')}))
self.assertEqual(len(chat_db.messages), 1)
self.assertEqual(chat_db.cache_path, pathlib.Path(self.cache_path.name))
self.assertEqual(chat_db.db_path, pathlib.Path(self.db_path.name))
self.assertEqual(chat_db.messages[0].file_path,
pathlib.Path(self.db_path.name, '0001.txt'))
def test_chat_db_from_dir_filter_tags_empty(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(tags_or=set(),
tags_and=set(),
tags_not=set()))
self.assertEqual(len(chat_db.messages), 0)
def test_chat_db_from_dir_filter_answer(self) -> None:
def test_chat_db_filter(self) -> None:
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
mfilter=MessageFilter(answer_contains='Answer 2'))
@@ -231,7 +191,7 @@ class TestChatDB(unittest.TestCase):
pathlib.Path(self.db_path.name, '0002.yaml'))
self.assertEqual(chat_db.messages[0].answer, 'Answer 2')
def test_chat_db_from_messages(self) -> None:
def test_chat_db_from_messges(self) -> None:
chat_db = ChatDB.from_messages(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name),
messages=[self.message1, self.message2,
@@ -458,31 +418,3 @@ class TestChatDB(unittest.TestCase):
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 1)
self.assertIn(pathlib.Path(self.cache_path.name, '123456.txt'), cache_dir_files)
def test_chat_db_update_messages(self) -> None:
# create a new ChatDB instance
chat_db = ChatDB.from_dir(pathlib.Path(self.cache_path.name),
pathlib.Path(self.db_path.name))
db_dir_files = self.message_list(self.db_path)
self.assertEqual(len(db_dir_files), 4)
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 0)
message = chat_db.messages[0]
message.answer = Answer("New answer")
# update message without writing
chat_db.update_messages([message], write=False)
self.assertEqual(chat_db.messages[0].answer, Answer("New answer"))
# re-read the message and check for old content
chat_db.read_db()
self.assertEqual(chat_db.messages[0].answer, Answer("Answer 1"))
# now check with writing (message should be overwritten)
chat_db.update_messages([message], write=True)
chat_db.read_db()
self.assertEqual(chat_db.messages[0].answer, Answer("New answer"))
# test without file_path -> expect error
message1 = Message(question=Question("Question 1"),
answer=Answer("Answer 1"))
with self.assertRaises(ChatError):
chat_db.update_messages([message1])
-160
View File
@@ -1,160 +0,0 @@
import os
import unittest
import yaml
from tempfile import NamedTemporaryFile
from pathlib import Path
from typing import cast
from chatmastermind.configuration import AIConfig, OpenAIConfig, ConfigError, ai_config_instance, Config
class TestAIConfigInstance(unittest.TestCase):
def test_ai_config_instance_with_valid_name_should_return_instance_with_default_values(self) -> None:
ai_config = cast(OpenAIConfig, ai_config_instance('openai'))
ai_reference = OpenAIConfig()
self.assertEqual(ai_config.ID, ai_reference.ID)
self.assertEqual(ai_config.name, ai_reference.name)
self.assertEqual(ai_config.api_key, ai_reference.api_key)
self.assertEqual(ai_config.system, ai_reference.system)
self.assertEqual(ai_config.model, ai_reference.model)
self.assertEqual(ai_config.temperature, ai_reference.temperature)
self.assertEqual(ai_config.max_tokens, ai_reference.max_tokens)
self.assertEqual(ai_config.top_p, ai_reference.top_p)
self.assertEqual(ai_config.frequency_penalty, ai_reference.frequency_penalty)
self.assertEqual(ai_config.presence_penalty, ai_reference.presence_penalty)
def test_ai_config_instance_with_valid_name_and_configuration_should_return_instance_with_custom_values(self) -> None:
conf_dict = {
'system': 'Custom system',
'api_key': '9876543210',
'model': 'custom_model',
'max_tokens': 5000,
'temperature': 0.5,
'top_p': 0.8,
'frequency_penalty': 0.7,
'presence_penalty': 0.2
}
ai_config = cast(OpenAIConfig, ai_config_instance('openai', conf_dict))
self.assertEqual(ai_config.system, 'Custom system')
self.assertEqual(ai_config.api_key, '9876543210')
self.assertEqual(ai_config.model, 'custom_model')
self.assertEqual(ai_config.max_tokens, 5000)
self.assertAlmostEqual(ai_config.temperature, 0.5)
self.assertAlmostEqual(ai_config.top_p, 0.8)
self.assertAlmostEqual(ai_config.frequency_penalty, 0.7)
self.assertAlmostEqual(ai_config.presence_penalty, 0.2)
def test_ai_config_instance_with_invalid_name_should_raise_config_error(self) -> None:
with self.assertRaises(ConfigError):
ai_config_instance('invalid_name')
class TestConfig(unittest.TestCase):
def setUp(self) -> None:
self.test_file = NamedTemporaryFile(delete=False)
def tearDown(self) -> None:
os.remove(self.test_file.name)
def test_from_dict_should_create_config_from_dict(self) -> None:
source_dict = {
'db': './test_db/',
'ais': {
'myopenai': {
'name': 'openai',
'system': 'Custom system',
'api_key': '9876543210',
'model': 'custom_model',
'max_tokens': 5000,
'temperature': 0.5,
'top_p': 0.8,
'frequency_penalty': 0.7,
'presence_penalty': 0.2
}
}
}
config = Config.from_dict(source_dict)
self.assertEqual(config.db, './test_db/')
self.assertEqual(len(config.ais), 1)
self.assertEqual(config.ais['myopenai'].name, 'openai')
self.assertEqual(cast(OpenAIConfig, config.ais['myopenai']).system, 'Custom system')
# check that 'ID' has been added
self.assertEqual(config.ais['myopenai'].ID, 'myopenai')
def test_create_default_should_create_default_config(self) -> None:
Config.create_default(Path(self.test_file.name))
with open(self.test_file.name, 'r') as f:
default_config = yaml.load(f, Loader=yaml.FullLoader)
config_reference = Config()
self.assertEqual(default_config['db'], config_reference.db)
def test_from_file_should_load_config_from_file(self) -> None:
source_dict = {
'db': './test_db/',
'ais': {
'default': {
'name': 'openai',
'system': 'Custom system',
'api_key': '9876543210',
'model': 'custom_model',
'max_tokens': 5000,
'temperature': 0.5,
'top_p': 0.8,
'frequency_penalty': 0.7,
'presence_penalty': 0.2
}
}
}
with open(self.test_file.name, 'w') as f:
yaml.dump(source_dict, f)
config = Config.from_file(self.test_file.name)
self.assertIsInstance(config, Config)
self.assertEqual(config.db, './test_db/')
self.assertEqual(len(config.ais), 1)
self.assertIsInstance(config.ais['default'], AIConfig)
self.assertEqual(cast(OpenAIConfig, config.ais['default']).system, 'Custom system')
def test_to_file_should_save_config_to_file(self) -> None:
config = Config(
db='./test_db/',
ais={
'myopenai': OpenAIConfig(
ID='myopenai',
system='Custom system',
api_key='9876543210',
model='custom_model',
max_tokens=5000,
temperature=0.5,
top_p=0.8,
frequency_penalty=0.7,
presence_penalty=0.2
)
}
)
config.to_file(Path(self.test_file.name))
with open(self.test_file.name, 'r') as f:
saved_config = yaml.load(f, Loader=yaml.FullLoader)
self.assertEqual(saved_config['db'], './test_db/')
self.assertEqual(len(saved_config['ais']), 1)
self.assertEqual(saved_config['ais']['myopenai']['system'], 'Custom system')
def test_from_file_error_unknown_ai(self) -> None:
source_dict = {
'db': './test_db/',
'ais': {
'default': {
'name': 'foobla',
'system': 'Custom system',
'api_key': '9876543210',
'model': 'custom_model',
'max_tokens': 5000,
'temperature': 0.5,
'top_p': 0.8,
'frequency_penalty': 0.7,
'presence_penalty': 0.2
}
}
}
with open(self.test_file.name, 'w') as f:
yaml.dump(source_dict, f)
with self.assertRaises(ConfigError):
Config.from_file(self.test_file.name)
+236
View File
@@ -0,0 +1,236 @@
import unittest
import io
import pathlib
import argparse
from chatmastermind.utils import terminal_width
from chatmastermind.main import create_parser, ask_cmd
from chatmastermind.api_client import ai
from chatmastermind.configuration import Config
from chatmastermind.storage import create_chat_hist, save_answers, dump_data
from unittest import mock
from unittest.mock import patch, MagicMock, Mock, ANY
class CmmTestCase(unittest.TestCase):
"""
Base class for all cmm testcases.
"""
def dummy_config(self, db: str) -> Config:
"""
Creates a dummy configuration.
"""
return Config.from_dict(
{'system': 'dummy_system',
'db': db,
'openai': {'api_key': 'dummy_key',
'model': 'dummy_model',
'max_tokens': 4000,
'temperature': 1.0,
'top_p': 1,
'frequency_penalty': 0,
'presence_penalty': 0}}
)
class TestCreateChat(CmmTestCase):
def setUp(self) -> None:
self.config = self.dummy_config(db='test_files')
self.question = "test question"
self.tags = ['test_tag']
@patch('os.listdir')
@patch('pathlib.Path.iterdir')
@patch('builtins.open')
def test_create_chat_with_tags(self, open_mock: MagicMock, iterdir_mock: MagicMock, listdir_mock: MagicMock) -> None:
listdir_mock.return_value = ['testfile.txt']
iterdir_mock.return_value = [pathlib.Path(x) for x in listdir_mock.return_value]
open_mock.return_value.__enter__.return_value = io.StringIO(dump_data(
{'question': 'test_content', 'answer': 'some answer',
'tags': ['test_tag']}))
test_chat = create_chat_hist(self.question, self.tags, None, self.config)
self.assertEqual(len(test_chat), 4)
self.assertEqual(test_chat[0],
{'role': 'system', 'content': self.config.system})
self.assertEqual(test_chat[1],
{'role': 'user', 'content': 'test_content'})
self.assertEqual(test_chat[2],
{'role': 'assistant', 'content': 'some answer'})
self.assertEqual(test_chat[3],
{'role': 'user', 'content': self.question})
@patch('os.listdir')
@patch('pathlib.Path.iterdir')
@patch('builtins.open')
def test_create_chat_with_other_tags(self, open_mock: MagicMock, iterdir_mock: MagicMock, listdir_mock: MagicMock) -> None:
listdir_mock.return_value = ['testfile.txt']
iterdir_mock.return_value = [pathlib.Path(x) for x in listdir_mock.return_value]
open_mock.return_value.__enter__.return_value = io.StringIO(dump_data(
{'question': 'test_content', 'answer': 'some answer',
'tags': ['other_tag']}))
test_chat = create_chat_hist(self.question, self.tags, None, self.config)
self.assertEqual(len(test_chat), 2)
self.assertEqual(test_chat[0],
{'role': 'system', 'content': self.config.system})
self.assertEqual(test_chat[1],
{'role': 'user', 'content': self.question})
@patch('os.listdir')
@patch('pathlib.Path.iterdir')
@patch('builtins.open')
def test_create_chat_without_tags(self, open_mock: MagicMock, iterdir_mock: MagicMock, listdir_mock: MagicMock) -> None:
listdir_mock.return_value = ['testfile.txt', 'testfile2.txt']
iterdir_mock.return_value = [pathlib.Path(x) for x in listdir_mock.return_value]
open_mock.side_effect = (
io.StringIO(dump_data({'question': 'test_content',
'answer': 'some answer',
'tags': ['test_tag']})),
io.StringIO(dump_data({'question': 'test_content2',
'answer': 'some answer2',
'tags': ['test_tag2']})),
)
test_chat = create_chat_hist(self.question, [], None, self.config)
self.assertEqual(len(test_chat), 6)
self.assertEqual(test_chat[0],
{'role': 'system', 'content': self.config.system})
self.assertEqual(test_chat[1],
{'role': 'user', 'content': 'test_content'})
self.assertEqual(test_chat[2],
{'role': 'assistant', 'content': 'some answer'})
self.assertEqual(test_chat[3],
{'role': 'user', 'content': 'test_content2'})
self.assertEqual(test_chat[4],
{'role': 'assistant', 'content': 'some answer2'})
class TestHandleQuestion(CmmTestCase):
def setUp(self) -> None:
self.question = "test question"
self.args = argparse.Namespace(
tags=['tag1'],
atags=None,
etags=['etag1'],
output_tags=None,
question=[self.question],
source=None,
source_code_only=False,
number=3,
max_tokens=None,
temperature=None,
model=None,
match_all_tags=False,
with_tags=False,
with_file=False,
)
self.config = self.dummy_config(db='test_files')
@patch("chatmastermind.main.create_chat_hist", return_value="test_chat")
@patch("chatmastermind.main.print_tag_args")
@patch("chatmastermind.main.print_chat_hist")
@patch("chatmastermind.main.ai", return_value=(["answer1", "answer2", "answer3"], "test_usage"))
@patch("chatmastermind.utils.pp")
@patch("builtins.print")
def test_ask_cmd(self, mock_print: MagicMock, mock_pp: MagicMock, mock_ai: MagicMock,
mock_print_chat_hist: MagicMock, mock_print_tag_args: MagicMock,
mock_create_chat_hist: MagicMock) -> None:
open_mock = MagicMock()
with patch("chatmastermind.storage.open", open_mock):
ask_cmd(self.args, self.config)
mock_print_tag_args.assert_called_once_with(self.args.tags,
self.args.etags,
[])
mock_create_chat_hist.assert_called_once_with(self.question,
self.args.tags,
self.args.etags,
self.config,
match_all_tags=False,
with_tags=False,
with_file=False)
mock_print_chat_hist.assert_called_once_with('test_chat',
False,
self.args.source_code_only)
mock_ai.assert_called_with("test_chat",
self.config,
self.args.number)
expected_calls = []
for num, answer in enumerate(mock_ai.return_value[0], start=1):
title = f'-- ANSWER {num} '
title_end = '-' * (terminal_width() - len(title))
expected_calls.append(((f'{title}{title_end}',),))
expected_calls.append(((answer,),))
expected_calls.append((("-" * terminal_width(),),))
expected_calls.append(((f"Usage: {mock_ai.return_value[1]}",),))
self.assertEqual(mock_print.call_args_list, expected_calls)
open_expected_calls = list([mock.call(f"{num:04d}.txt", "w") for num in range(2, 5)])
open_mock.assert_has_calls(open_expected_calls, any_order=True)
class TestSaveAnswers(CmmTestCase):
@mock.patch('builtins.open')
@mock.patch('chatmastermind.storage.print')
def test_save_answers(self, print_mock: MagicMock, open_mock: MagicMock) -> None:
question = "Test question?"
answers = ["Answer 1", "Answer 2"]
tags = ["tag1", "tag2"]
otags = ["otag1", "otag2"]
config = self.dummy_config(db='test_db')
with mock.patch('chatmastermind.storage.pathlib.Path.exists', return_value=True), \
mock.patch('chatmastermind.storage.yaml.dump'), \
mock.patch('io.StringIO') as stringio_mock:
stringio_instance = stringio_mock.return_value
stringio_instance.getvalue.side_effect = ["question", "answer1", "answer2"]
save_answers(question, answers, tags, otags, config)
open_calls = [
mock.call(pathlib.Path('test_db/.next'), 'r'),
mock.call(pathlib.Path('test_db/.next'), 'w'),
]
open_mock.assert_has_calls(open_calls, any_order=True)
class TestAI(CmmTestCase):
@patch("openai.ChatCompletion.create")
def test_ai(self, mock_create: MagicMock) -> None:
mock_create.return_value = {
'choices': [
{'message': {'content': 'response_text_1'}},
{'message': {'content': 'response_text_2'}}
],
'usage': {'tokens': 10}
}
chat = [{"role": "system", "content": "hello ai"}]
config = self.dummy_config(db='dummy')
config.openai.model = "text-davinci-002"
config.openai.max_tokens = 150
config.openai.temperature = 0.5
result = ai(chat, config, 2)
expected_result = (['response_text_1', 'response_text_2'],
{'tokens': 10})
self.assertEqual(result, expected_result)
class TestCreateParser(CmmTestCase):
def test_create_parser(self) -> None:
with patch('argparse.ArgumentParser.add_subparsers') as mock_add_subparsers:
mock_cmdparser = Mock()
mock_add_subparsers.return_value = mock_cmdparser
parser = create_parser()
self.assertIsInstance(parser, argparse.ArgumentParser)
mock_add_subparsers.assert_called_once_with(dest='command', title='commands', description='supported commands', required=True)
mock_cmdparser.add_parser.assert_any_call('ask', parents=ANY, help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('hist', parents=ANY, help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('tags', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('config', help=ANY, aliases=ANY)
mock_cmdparser.add_parser.assert_any_call('print', help=ANY, aliases=ANY)
self.assertTrue('.config.yaml' in parser.get_default('config'))
+17 -23
View File
@@ -1,12 +1,12 @@
import unittest
import pathlib
import tempfile
from typing import cast
from .test_main import CmmTestCase
from chatmastermind.message import source_code, Message, MessageError, Question, Answer, AILine, ModelLine, MessageFilter, message_in
from chatmastermind.tags import Tag, TagLine
class SourceCodeTestCase(unittest.TestCase):
class SourceCodeTestCase(CmmTestCase):
def test_source_code_with_include_delims(self) -> None:
text = """
Some text before the code block
@@ -60,7 +60,7 @@ class SourceCodeTestCase(unittest.TestCase):
self.assertEqual(result, expected_result)
class QuestionTestCase(unittest.TestCase):
class QuestionTestCase(CmmTestCase):
def test_question_with_header(self) -> None:
with self.assertRaises(MessageError):
Question(f"{Question.txt_header}\nWhat is your name?")
@@ -83,7 +83,7 @@ class QuestionTestCase(unittest.TestCase):
self.assertEqual(question, "What is your favorite color?")
class AnswerTestCase(unittest.TestCase):
class AnswerTestCase(CmmTestCase):
def test_answer_with_header(self) -> None:
with self.assertRaises(MessageError):
Answer(f"{Answer.txt_header}\nno")
@@ -99,7 +99,7 @@ class AnswerTestCase(unittest.TestCase):
self.assertEqual(answer, "No")
class MessageToFileTxtTestCase(unittest.TestCase):
class MessageToFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
@@ -160,7 +160,7 @@ This is a question.
self.message_complete.file_path = self.file_path
class MessageToFileYamlTestCase(unittest.TestCase):
class MessageToFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
@@ -226,7 +226,7 @@ class MessageToFileYamlTestCase(unittest.TestCase):
self.assertEqual(content, expected_content)
class MessageFromFileTxtTestCase(unittest.TestCase):
class MessageFromFileTxtTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
@@ -300,12 +300,6 @@ This is a question.
MessageFilter(tags_or={Tag('tag1')}))
self.assertIsNone(message)
def test_from_file_txt_empty_tags_dont_match(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_or=set(),
tags_and=set()))
self.assertIsNone(message)
def test_from_file_txt_no_tags_match_tags_not(self) -> None:
message = Message.from_file(self.file_path_min,
MessageFilter(tags_not={Tag('tag1')}))
@@ -394,7 +388,7 @@ This is a question.
self.assertIsNone(message)
class MessageFromFileYamlTestCase(unittest.TestCase):
class MessageFromFileYamlTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.yaml')
self.file_path = pathlib.Path(self.file.name)
@@ -561,7 +555,7 @@ class MessageFromFileYamlTestCase(unittest.TestCase):
self.assertIsNone(message)
class TagsFromFileTestCase(unittest.TestCase):
class TagsFromFileTestCase(CmmTestCase):
def setUp(self) -> None:
self.file_txt = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path_txt = pathlib.Path(self.file_txt.name)
@@ -669,7 +663,7 @@ This is an answer.
self.assertSetEqual(tags, set())
class TagsFromDirTestCase(unittest.TestCase):
class TagsFromDirTestCase(CmmTestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_dir_no_tags = tempfile.TemporaryDirectory()
@@ -717,7 +711,7 @@ class TagsFromDirTestCase(unittest.TestCase):
self.assertSetEqual(all_tags, set())
class MessageIDTestCase(unittest.TestCase):
class MessageIDTestCase(CmmTestCase):
def setUp(self) -> None:
self.file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
self.file_path = pathlib.Path(self.file.name)
@@ -737,7 +731,7 @@ class MessageIDTestCase(unittest.TestCase):
self.message_no_file_path.msg_id()
class MessageHashTestCase(unittest.TestCase):
class MessageHashTestCase(CmmTestCase):
def setUp(self) -> None:
self.message1 = Message(Question('This is a question.'),
tags={Tag('tag1')},
@@ -761,7 +755,7 @@ class MessageHashTestCase(unittest.TestCase):
self.assertIn(msg, msgs)
class MessageTagsStrTestCase(unittest.TestCase):
class MessageTagsStrTestCase(CmmTestCase):
def setUp(self) -> None:
self.message = Message(Question('This is a question.'),
tags={Tag('tag1')},
@@ -771,7 +765,7 @@ class MessageTagsStrTestCase(unittest.TestCase):
self.assertEqual(self.message.tags_str(), f'{TagLine.prefix} tag1')
class MessageFilterTagsTestCase(unittest.TestCase):
class MessageFilterTagsTestCase(CmmTestCase):
def setUp(self) -> None:
self.message = Message(Question('This is a question.'),
tags={Tag('atag1'), Tag('btag2')},
@@ -786,7 +780,7 @@ class MessageFilterTagsTestCase(unittest.TestCase):
self.assertSetEqual(tags_cont, {Tag('btag2')})
class MessageInTestCase(unittest.TestCase):
class MessageInTestCase(CmmTestCase):
def setUp(self) -> None:
self.message1 = Message(Question('This is a question.'),
tags={Tag('atag1'), Tag('btag2')},
@@ -800,7 +794,7 @@ class MessageInTestCase(unittest.TestCase):
self.assertFalse(message_in(self.message1, [self.message2]))
class MessageRenameTagsTestCase(unittest.TestCase):
class MessageRenameTagsTestCase(CmmTestCase):
def setUp(self) -> None:
self.message = Message(Question('This is a question.'),
tags={Tag('atag1'), Tag('btag2')},
@@ -812,7 +806,7 @@ class MessageRenameTagsTestCase(unittest.TestCase):
self.assertSetEqual(self.message.tags, {Tag('atag2'), Tag('btag3')}) # type: ignore [arg-type]
class MessageToStrTestCase(unittest.TestCase):
class MessageToStrTestCase(CmmTestCase):
def setUp(self) -> None:
self.message = Message(Question('This is a question.'),
Answer('This is an answer.'),
-162
View File
@@ -1,162 +0,0 @@
import os
import unittest
import argparse
import tempfile
from pathlib import Path
from unittest.mock import MagicMock
from chatmastermind.commands.question import create_message
from chatmastermind.message import Message, Question
from chatmastermind.chat import ChatDB
class TestMessageCreate(unittest.TestCase):
"""
Test if messages created by the 'question' command have
the correct format.
"""
def setUp(self) -> None:
# create ChatDB structure
self.db_path = tempfile.TemporaryDirectory()
self.cache_path = tempfile.TemporaryDirectory()
self.chat = ChatDB.from_dir(cache_path=Path(self.cache_path.name),
db_path=Path(self.db_path.name))
# create arguments mock
self.args = MagicMock(spec=argparse.Namespace)
self.args.source_text = None
self.args.source_code = None
self.args.AI = None
self.args.model = None
self.args.output_tags = None
# File 1 : no source code block, only text
self.source_file1 = tempfile.NamedTemporaryFile(delete=False)
self.source_file1_content = """This is just text.
No source code.
Nope. Go look elsewhere!"""
with open(self.source_file1.name, 'w') as f:
f.write(self.source_file1_content)
# File 2 : one embedded source code block
self.source_file2 = tempfile.NamedTemporaryFile(delete=False)
self.source_file2_content = """This is just text.
```
This is embedded source code.
```
And some text again."""
with open(self.source_file2.name, 'w') as f:
f.write(self.source_file2_content)
# File 3 : all source code
self.source_file3 = tempfile.NamedTemporaryFile(delete=False)
self.source_file3_content = """This is all source code.
Yes, really.
Language is called 'brainfart'."""
with open(self.source_file3.name, 'w') as f:
f.write(self.source_file3_content)
# File 4 : two source code blocks
self.source_file4 = tempfile.NamedTemporaryFile(delete=False)
self.source_file4_content = """This is just text.
```
This is embedded source code.
```
And some text again.
```
This is embedded source code.
```
Aaaand again some text."""
with open(self.source_file4.name, 'w') as f:
f.write(self.source_file4_content)
def tearDown(self) -> None:
os.remove(self.source_file1.name)
os.remove(self.source_file2.name)
os.remove(self.source_file3.name)
def message_list(self, tmp_dir: tempfile.TemporaryDirectory) -> list[Path]:
# exclude '.next'
return list(Path(tmp_dir.name).glob('*.[ty]*'))
def test_message_file_created(self) -> None:
self.args.ask = ["What is this?"]
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 0)
create_message(self.chat, self.args)
cache_dir_files = self.message_list(self.cache_path)
self.assertEqual(len(cache_dir_files), 1)
message = Message.from_file(cache_dir_files[0])
self.assertIsInstance(message, Message)
self.assertEqual(message.question, Question("What is this?")) # type: ignore [union-attr]
def test_single_question(self) -> None:
self.args.ask = ["What is this?"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
self.assertEqual(message.question, Question("What is this?"))
self.assertEqual(len(message.question.source_code()), 0)
def test_multipart_question(self) -> None:
self.args.ask = ["What is this", "'bard' thing?", "Is it good?"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
self.assertEqual(message.question, Question("""What is this
'bard' thing?
Is it good?"""))
def test_single_question_with_text_only_file(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_text = [f"{self.source_file1.name}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# file contains no source code (only text)
# -> don't expect any in the question
self.assertEqual(len(message.question.source_code()), 0)
self.assertEqual(message.question, Question(f"""What is this?
{self.source_file1_content}"""))
def test_single_question_with_text_file_and_embedded_code(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_code = [f"{self.source_file2.name}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# file contains 1 source code block
# -> expect it in the question
self.assertEqual(len(message.question.source_code()), 1)
self.assertEqual(message.question, Question("""What is this?
```
This is embedded source code.
```
"""))
def test_single_question_with_code_only_file(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_code = [f"{self.source_file3.name}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# file is complete source code
self.assertEqual(len(message.question.source_code()), 1)
self.assertEqual(message.question, Question(f"""What is this?
```
{self.source_file3_content}
```"""))
def test_single_question_with_text_file_and_multi_embedded_code(self) -> None:
self.args.ask = ["What is this?"]
self.args.source_code = [f"{self.source_file4.name}"]
message = create_message(self.chat, self.args)
self.assertIsInstance(message, Message)
# file contains 2 source code blocks
# -> expect them in the question
self.assertEqual(len(message.question.source_code()), 2)
self.assertEqual(message.question, Question("""What is this?
```
This is embedded source code.
```
```
This is embedded source code.
```
"""))
+3 -3
View File
@@ -1,8 +1,8 @@
import unittest
from .test_main import CmmTestCase
from chatmastermind.tags import Tag, TagLine, TagError
class TestTag(unittest.TestCase):
class TestTag(CmmTestCase):
def test_valid_tag(self) -> None:
tag = Tag('mytag')
self.assertEqual(tag, 'mytag')
@@ -18,7 +18,7 @@ class TestTag(unittest.TestCase):
self.assertEqual(Tag.alternative_separators, [','])
class TestTagLine(unittest.TestCase):
class TestTagLine(CmmTestCase):
def test_valid_tagline(self) -> None:
tagline = TagLine('TAGS: tag1 tag2')
self.assertEqual(tagline, 'TAGS: tag1 tag2')