From 5ea9a8dd752c7fab4e7373c37ee79e6eaac52ffb Mon Sep 17 00:00:00 2001 From: Leonard Kugis Date: Mon, 22 Jul 2024 03:58:58 +0200 Subject: Initial commit --- yara.py | 246 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 yara.py (limited to 'yara.py') diff --git a/yara.py b/yara.py new file mode 100644 index 0000000..9252c68 --- /dev/null +++ b/yara.py @@ -0,0 +1,246 @@ +import json +import sys +import os +import struct +import re +import logging + +logger = logging.getLogger(__name__) + +class OperatorTree(object): + def __init__(self): + self.left = None + self.right = None + self.parent = None + self.operator = None + self.data = None + +class OperatorOf(object): + def __init__(self, parent, n, pattern): + self.parent = parent + self.n = n + self.pattern = pattern + +class YaraDatabase(object): + + __FORMAT_HEADER = "=3sccI" + __FORMAT_ENTRY = "=c{size_id}sc" + __FORMAT_STRING = "=c{size_id}scH{size_text}scII" + __FORMAT_WILDCARD = "=Ic" + __FORMAT_RANGE = "=II" + __FORMAT_OPERATOR = "=c" + __FORMAT_OPERATOR_OF = "=cc" + __FORMAT_OPERATOR_OF_ELEMENT = "=c" + __FORMAT_OPERATOR_SINGLE = "=c" + + __STRING_TYPE_STRING = 0 + __STRING_TYPE_HEX = 1 + __STRING_TYPE_REGEX = 2 + + __PATTERN_RANGE_VARIABLE = re.compile(r"^\[(\d+)-(\d+)\]$") + __PATTERN_RANGE_FIXED = re.compile(r"^\[(\d+)\]$") + __PATTERN_WILDCARD_HIGH = re.compile(r"^\?[0-9A-Fa-f]$") + __PATTERN_WILDCARD_LOW = re.compile(r"^[0-9A-Fa-f]\?$") + __PATTERN_WILDCARD_BOTH = re.compile(r"^\?\?$") + __PATTERN_OF = re.compile(r"((\d+)|(all)|(any))\s+of\s+([\w\_\(\)\$\*\,]+)") + __PATTERN_AND = re.compile(r"(.*)\s+and\s+(.*)") + __PATTERN_OR = re.compile(r"(.*)\s+or\s+(.*)") + + __CONDITION_OPERATOR_OR = 0 + __CONDITION_OPERATOR_AND = 1 + __CONDITION_OPERATOR_OF = 2 + __CONDITION_OPERATOR_SINGLE = 3 + __CONDITION_OPERATOR_TRUE = 4 + __CONDITION_OPERATOR_FALSE = 5 + + @staticmethod + def parse_file(file): + container = json.load(file) + entries = list() + entries.extend(container["rules"]) + return entries + + @staticmethod + def build_tree(condition, parent): + node = OperatorTree() + node.data = condition + logger.debug("Parsing condition = {}".format(condition)) + match = re.findall(YaraDatabase.__PATTERN_OR, condition) + if match: + node.left = YaraDatabase.build_tree(match[0][0], node) + node.right = YaraDatabase.build_tree(match[0][1], node) + node.operator = YaraDatabase.__CONDITION_OPERATOR_OR + return node + match = re.findall(YaraDatabase.__PATTERN_AND, condition) + if match: + node.left = YaraDatabase.build_tree(match[0][0], node) + node.right = YaraDatabase.build_tree(match[0][1], node) + node.operator = YaraDatabase.__CONDITION_OPERATOR_AND + return node + match = re.findall(YaraDatabase.__PATTERN_OF, condition) + if match: + logger.debug("Leaf: OperatorOf, match = {}, n = {}, pattern = {}".format(match, match[0][0], match[0][4])) + return OperatorOf(parent, match[0][0], match[0][4]) + logger.debug("Leaf: remainder = {}".format(condition)) + return condition + + @staticmethod + def compile_tree(node, strings): + if isinstance(node, OperatorTree): + data_left = YaraDatabase.compile_tree(node.left, strings) + data_right = YaraDatabase.compile_tree(node.right, strings) + logger.debug("Compiling OperatorTree, left = {}, right = {}".format(data_left, data_right)) + data_left += data_right + data_left += struct.pack(YaraDatabase.__FORMAT_OPERATOR, node.operator.to_bytes(1)) + return data_left + elif isinstance(node, OperatorOf): + logger.debug("Compiling OperatorOf, n = {}, pattern = {}".format(node.n, node.pattern)) + data = bytearray() + data += struct.pack(YaraDatabase.__FORMAT_OPERATOR, YaraDatabase.__CONDITION_OPERATOR_OF.to_bytes(1)) + of_elements = list() + pattern = str() + if node.pattern.strip() == "them": + pattern = r".*" + else: + para = 0 + for c in node.pattern.strip(): + if c == '$': + pattern += r"\$" + elif c == '*': + pattern += r".*" + elif c == ',': + pattern += ")|(" + elif c == ' ': + pass + elif c == '(': + pattern += "(" + para += 1 + elif c == ')': + if para == 0: + logger.warning("Unmatched paranthesis in pattern {}".format(node.pattern)) + else: + pattern += ")" + para -= 1 + else: + pattern += c + logger.debug("Patched pattern = {}".format(pattern)) + pattern = re.compile(pattern) + c = 0 + for s in strings: + if re.match(pattern, s): + of_elements.append(c) + c += 1 + n = node.n + if n == "all": + n = 0 + if n == "any": + n = 1 + data += struct.pack(YaraDatabase.__FORMAT_OPERATOR_OF, int(n).to_bytes(1), len(of_elements).to_bytes(1)) + for e in of_elements: + data += struct.pack(YaraDatabase.__FORMAT_OPERATOR_OF_ELEMENT, e.to_bytes(1)) + return data + else: + logger.debug("Compiling single identifier {}".format(node)) + data = bytearray(struct.pack(YaraDatabase.__FORMAT_OPERATOR, YaraDatabase.__CONDITION_OPERATOR_SINGLE.to_bytes(1))) + c = 0 + for s in strings: + if s == node: + data += struct.pack(YaraDatabase.__FORMAT_OPERATOR_SINGLE, c.to_bytes(1)) + return data + c += 1 + else: + logger.warning("Single identifier {} not found, defaulting to true".format(node)) + return bytearray(struct.pack(YaraDatabase.__FORMAT_OPERATOR, YaraDatabase.__CONDITION_OPERATOR_TRUE.to_bytes(1))) + + def add_file(self, filename): + f = open(filename, 'r') + self.__entries.extend(YaraDatabase.parse_file(f)) + f.close() + + def write_file(self, filename): + f = open(filename, 'wb') + header = struct.pack(self.__FORMAT_HEADER, "YAC".encode("utf-8"), b'\x00', b'\x00', len(self.__entries)) + logger.debug("Header data = {}".format(header)) + f.write(header) + for entry in self.__entries: + logger.debug("Compiling entry {}".format(entry["identifier"])) + entry_data = bytearray(struct.pack(self.__FORMAT_ENTRY.format(size_id=len(entry["identifier"])), len(entry["identifier"]).to_bytes(1), entry["identifier"].encode("utf-8"), len(entry["strings"]).to_bytes(1))) + logger.debug("Entry data = {}".format(entry_data)) + string_data = bytearray() + for s in entry["strings"]: + logger.debug("Compiling string {}".format(s["id"])) + # first parse text + text = bytearray() + wildcards = list() + ranges = list() + if s["type"] == YaraDatabase.__STRING_TYPE_STRING: + logger.debug("String type string, text = {}".format(s["text"])) + text += s["text"].encode("utf-8") + elif s["type"] == YaraDatabase.__STRING_TYPE_HEX: + bn = 0 + for block in s["text"].strip().split(' '): + logger.debug("Compiling block = {}".format(block)) + match = re.match(self.__PATTERN_RANGE_VARIABLE, block) + if match: + for i in range(int(match.group(1)), int(match.group(2)), 1): + logger.debug("Appending range = {}".format((bn, i))) + ranges.append((bn, i)) + bn += 1 + continue + match = re.match(self.__PATTERN_RANGE_FIXED, block) + if match: + logger.debug("Appending range = {}".format((bn, int(match.group(1))))) + ranges.append((bn, int(match.group(1)))) + bn += 1 + continue + if re.match(self.__PATTERN_WILDCARD_HIGH, block): + wildcards.append((bn, 1)) + block = block.replace('?', '0') + text += bytearray.fromhex(block) + bn += 1 + continue + if re.match(self.__PATTERN_WILDCARD_LOW, block): + wildcards.append((bn, 0)) + block = block.replace('?', '0') + text += bytearray.fromhex(block) + bn += 1 + continue + if re.match(self.__PATTERN_WILDCARD_BOTH, block): + wildcards.append((bn, 0)) + wildcards.append((bn, 1)) + block = block.replace('?', '0') + text += bytearray.fromhex(block) + bn += 1 + continue + text += bytearray.fromhex(block) + bn += 1 + continue + elif s["type"] == YaraDatabase.__STRING_TYPE_REGEX: + text += s["text"].encode("utf-8") + # parse modifiers + modifiers = (((1 if s["modifiers"]["nocase"] else 0) << 6) | + ((1 if s["modifiers"]["ascii"] else 0) << 5) | + ((1 if s["modifiers"]["wide"] else 0) << 4) | + ((1 if s["modifiers"]["fullword"] else 0) << 3) | + ((1 if s["modifiers"]["private"] else 0) << 2) | + ((1 if s["modifiers"]["i"] else 0) << 1) | + ((1 if s["modifiers"]["s"] else 0) << 0)) + string_data += struct.pack(self.__FORMAT_STRING.format(size_id=len(s["id"]), size_text=len(text)), len(s["id"]).to_bytes(1), s["id"].encode("utf-8"), s["type"].to_bytes(1), len(text), text, modifiers.to_bytes(1), len(wildcards), len(ranges)) + for wildcard in wildcards: + string_data += struct.pack(self.__FORMAT_WILDCARD, wildcard[0], wildcard[1].to_bytes(1)) + for r in ranges: + string_data += struct.pack(self.__FORMAT_RANGE, r[0], r[1]) + logger.debug("Building conditional operator tree for entry = {}, condition = {}".format(entry["identifier"], entry["condition"])) + node = YaraDatabase.build_tree(entry["condition"], None) + logger.debug("Compiling conditional operator tree for entry = {}, condition = {}".format(entry["identifier"], entry["condition"])) + condition_data = YaraDatabase.compile_tree(node, [s["id"] for s in entry["strings"]]) + logger.debug("Compilation done for entry {}".format(entry["identifier"])) + f.write(entry_data) + f.write(string_data) + f.write(condition_data) + logger.debug("Compilation done for file {}".format(filename)) + f.close() + + def __init__(self): + self.__entries = list() + pass \ No newline at end of file -- cgit v1.2.1