Added tags filtering (prefix and contained string) to TagLine and Message

This commit is contained in:
2023-08-26 12:50:47 +02:00
parent fc1b8006a0
commit 7f91a2b567
4 changed files with 204 additions and 14 deletions
+65 -6
View File
@@ -219,21 +219,57 @@ class Message():
file_path=data.get(cls.file_yaml_key, None))
@classmethod
def tags_from_file(cls: Type[MessageInst], file_path: pathlib.Path) -> set[Tag]:
def tags_from_file(cls: Type[MessageInst],
file_path: pathlib.Path,
prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
"""
Return only the tags from the given Message file.
Return only the tags from the given Message file,
optionally filtered based on prefix or contained string.
"""
tags: set[Tag] = set()
if not file_path.exists():
raise MessageError(f"Message file '{file_path}' does not exist")
if file_path.suffix not in cls.file_suffixes:
raise MessageError(f"File type '{file_path.suffix}' is not supported")
# for TXT, it's enough to read the TagLine
if file_path.suffix == '.txt':
with open(file_path, "r") as fd:
tags = TagLine(fd.readline()).tags()
try:
tags = TagLine(fd.readline()).tags(prefix, contain)
except TagError:
pass # message without tags
else: # '.yaml'
with open(file_path, "r") as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
tags = set(sorted(data[cls.tags_yaml_key]))
try:
message = cls.from_file(file_path)
if message:
msg_tags = message.filter_tags(prefix=prefix, contain=contain)
except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}")
if msg_tags:
tags = msg_tags
return tags
@classmethod
def tags_from_dir(cls: Type[MessageInst],
path: pathlib.Path,
glob: Optional[str] = None,
prefix: Optional[str] = None,
contain: Optional[str] = None) -> set[Tag]:
"""
Return only the tags from message files in the given directory.
The files can be filtered using 'glob', the tags by using 'prefix'
and 'contain'.
"""
tags: set[Tag] = set()
file_iter = path.glob(glob) if glob else path.iterdir()
for file_path in sorted(file_iter):
if file_path.is_file():
try:
tags |= cls.tags_from_file(file_path, prefix, contain)
except MessageError as e:
print(f"Error processing message in '{file_path}': {str(e)}")
return tags
@classmethod
@@ -395,6 +431,29 @@ class Message():
data[self.tags_yaml_key] = sorted([str(tag) for tag in self.tags])
yaml.dump(data, fd, sort_keys=False)
def filter_tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Filter tags based on their prefix (i. e. the tag starts with a given string)
or some contained string.
"""
res_tags = self.tags
if res_tags:
if prefix and len(prefix) > 0:
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
if contain and len(contain) > 0:
res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
def tags_str(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> str:
"""
Returns all tags as a string with the TagLine prefix. Optionally filtered
using 'Message.filter_tags()'.
"""
if self.tags:
return str(TagLine.from_set(self.filter_tags(prefix, contain)))
else:
return str(TagLine.from_set(set()))
def match(self, mfilter: MessageFilter) -> bool: # noqa: 13
"""
Matches the current Message to the given filter atttributes.
+9 -3
View File
@@ -118,9 +118,10 @@ class TagLine(str):
"""
return cls(' '.join([cls.prefix] + sorted([t for t in tags])))
def tags(self) -> set[Tag]:
def tags(self, prefix: Optional[str] = None, contain: Optional[str] = None) -> set[Tag]:
"""
Returns all tags contained in this line as a set.
Returns all tags contained in this line as a set, optionally
filtered based on prefix or contained string.
"""
tagstr = self[len(self.prefix):].strip()
separator = Tag.default_separator
@@ -130,7 +131,12 @@ class TagLine(str):
if s in tagstr:
separator = s
break
return set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
res_tags = set(sorted([Tag(t.strip()) for t in tagstr.split(separator)]))
if prefix and len(prefix) > 0:
res_tags -= {tag for tag in res_tags if not tag.startswith(prefix)}
if contain and len(contain) > 0:
res_tags -= {tag for tag in res_tags if contain not in tag}
return res_tags or set()
def merge(self, taglines: set['TagLine']) -> 'TagLine':
"""