College Online
0%

Implementando a DSL

Modulo 6 · Aula 2 ~30 min de leitura Nivel: Avancado

Video da aula estara disponivel em breve

Pipeline de Implementacao

Nesta aula, implementamos o pipeline completo para a DSL de regras do harness.os: lexer, parser e evaluator. Cada componente usa as tecnicas estudadas ao longo do curso.

Diagrama
Texto DSL
  |
  v
+----------+    tokens    +----------+    AST     +------------+
|  Lexer   | ----------> |  Parser  | --------> | Evaluator  |
| (Mod. 2) |             | (Mod. 3) |           | (Mod. 4-5) |
+----------+             +----------+           +------------+
                                                      |
                                                      v
                                                List[Rule]
                                                (objetos Python)

Passo 1: O Lexer (DSLLexer)

O lexer transforma o texto da DSL em tokens. Definimos os tipos de token com um Enum e reutilizamos as tecnicas do Modulo 2.

Python
from enum import Enum, auto
from dataclasses import dataclass
from typing import List
import re

class TT(Enum):
    """Token Types para a DSL de regras."""
    # Keywords
    RULE    = auto()
    WHEN    = auto()
    THEN    = auto()
    AND     = auto()
    OR      = auto()
    NOT     = auto()
    # Literais
    STRING  = auto()
    NUMBER  = auto()
    IDENT   = auto()
    # Simbolos
    LBRACE  = auto()  # {
    RBRACE  = auto()  # }
    GT      = auto()  # >
    LT      = auto()  # <
    GTE     = auto()  # >=
    LTE     = auto()  # <=
    EQ      = auto()  # ==
    NEQ     = auto()  # !=
    # Controle
    EOF     = auto()

@dataclass
class Token:
    type: TT
    value: str
    line: int
    col: int

    def __repr__(self):
        return f"Token({self.type.name}, {self.value!r}, L{self.line})"

KEYWORDS = {
    'rule': TT.RULE, 'when': TT.WHEN, 'then': TT.THEN,
    'and': TT.AND, 'or': TT.OR, 'not': TT.NOT,
}

class DSLLexer:
    """Lexer para a DSL de regras do harness.os."""

    def __init__(self, source: str):
        self.source = source
        self.pos = 0
        self.line = 1
        self.col = 1
        self.tokens: List[Token] = []

    def error(self, msg: str):
        raise SyntaxError(
            f"Linha {self.line}, coluna {self.col}: {msg}"
        )

    def peek(self) -> str:
        if self.pos >= len(self.source):
            return '\0'
        return self.source[self.pos]

    def advance(self) -> str:
        ch = self.source[self.pos]
        self.pos += 1
        if ch == '\n':
            self.line += 1
            self.col = 1
        else:
            self.col += 1
        return ch

    def skip_whitespace(self):
        while self.pos < len(self.source) and \
              self.source[self.pos] in ' \t\n\r':
            self.advance()

    def skip_comment(self):
        while self.pos < len(self.source) and \
              self.source[self.pos] != '\n':
            self.advance()

    def read_string(self) -> Token:
        start_line, start_col = self.line, self.col
        self.advance()  # pula a aspas inicial
        result = []
        while self.peek() != '"':
            if self.peek() == '\0':
                self.error("String nao fechada")
            if self.peek() == '\\':
                self.advance()  # escape
            result.append(self.advance())
        self.advance()  # pula a aspas final
        return Token(TT.STRING, ''.join(result), start_line, start_col)

    def read_number(self) -> Token:
        start_line, start_col = self.line, self.col
        result = []
        while self.peek().isdigit() or self.peek() == '.':
            result.append(self.advance())
        return Token(TT.NUMBER, ''.join(result), start_line, start_col)

    def read_identifier(self) -> Token:
        start_line, start_col = self.line, self.col
        result = []
        while self.peek().isalnum() or self.peek() in '_-':
            result.append(self.advance())
        word = ''.join(result)
        tt = KEYWORDS.get(word, TT.IDENT)
        return Token(tt, word, start_line, start_col)

    def tokenize(self) -> List[Token]:
        while self.pos < len(self.source):
            self.skip_whitespace()
            if self.pos >= len(self.source):
                break

            ch = self.peek()
            ln, co = self.line, self.col

            if ch == '#':
                self.skip_comment()
            elif ch == '"':
                self.tokens.append(self.read_string())
            elif ch.isdigit():
                self.tokens.append(self.read_number())
            elif ch.isalpha() or ch == '_':
                self.tokens.append(self.read_identifier())
            elif ch == '{':
                self.advance()
                self.tokens.append(Token(TT.LBRACE, '{', ln, co))
            elif ch == '}':
                self.advance()
                self.tokens.append(Token(TT.RBRACE, '}', ln, co))
            elif ch == '>':
                self.advance()
                if self.peek() == '=':
                    self.advance()
                    self.tokens.append(Token(TT.GTE, '>=', ln, co))
                else:
                    self.tokens.append(Token(TT.GT, '>', ln, co))
            elif ch == '<':
                self.advance()
                if self.peek() == '=':
                    self.advance()
                    self.tokens.append(Token(TT.LTE, '<=', ln, co))
                else:
                    self.tokens.append(Token(TT.LT, '<', ln, co))
            elif ch == '=':
                self.advance()
                if self.peek() == '=':
                    self.advance()
                    self.tokens.append(Token(TT.EQ, '==', ln, co))
                else:
                    self.error(f"Caractere inesperado: '='. Voce quis dizer '=='?")
            elif ch == '!':
                self.advance()
                if self.peek() == '=':
                    self.advance()
                    self.tokens.append(Token(TT.NEQ, '!=', ln, co))
                else:
                    self.error(f"Caractere inesperado: '!'. Voce quis dizer '!='?")
            else:
                self.error(f"Caractere inesperado: '{ch}'")

        self.tokens.append(Token(TT.EOF, '', self.line, self.col))
        return self.tokens


# Testar o lexer
source = '''
rule "commit-hygiene" {
  description "Nunca adicionar Co-Authored-By"
  context     all
  priority    high

  when git_commit {
    contains "Co-Authored-By: Claude"
  }

  then block {
    message "Remova o trailer"
  }
}
'''

lexer = DSLLexer(source)
tokens = lexer.tokenize()
for t in tokens:
    print(t)
Saida
Token(RULE, 'rule', L2)
Token(STRING, 'commit-hygiene', L2)
Token(LBRACE, '{', L2)
Token(IDENT, 'description', L3)
Token(STRING, 'Nunca adicionar Co-Authored-By', L3)
Token(IDENT, 'context', L4)
Token(IDENT, 'all', L4)
Token(IDENT, 'priority', L5)
Token(IDENT, 'high', L5)
Token(WHEN, 'when', L7)
Token(IDENT, 'git_commit', L7)
Token(LBRACE, '{', L7)
Token(IDENT, 'contains', L8)
Token(STRING, 'Co-Authored-By: Claude', L8)
Token(RBRACE, '}', L9)
Token(THEN, 'then', L11)
Token(IDENT, 'block', L11)
Token(LBRACE, '{', L11)
Token(IDENT, 'message', L12)
Token(STRING, 'Remova o trailer', L12)
Token(RBRACE, '}', L13)
Token(RBRACE, '}', L14)
Token(EOF, '', L15)

Passo 2: O Parser (DSLParser)

O parser consome tokens e constroi a AST. Usamos recursive descent (Modulo 3) com nos de dados definidos como dataclasses (Modulo 4).

Python
from dataclasses import dataclass, field
from typing import List, Optional, Union

# --- Nos da AST ---

@dataclass
class PropertyNode:
    """Propriedade: chave valor"""
    key: str
    value: str

@dataclass
class ComparisonNode:
    """Condicao com comparacao: field op value"""
    field: str
    op: str       # '>', '<', '>=', '<=', '==', '!='
    value: str

@dataclass
class MatchNode:
    """Condicao de match: field value"""
    field: str
    value: str

@dataclass
class LogicalNode:
    """Combinacao logica: left op right"""
    op: str      # 'and', 'or'
    left: object
    right: object

@dataclass
class ConditionNode:
    """Clausula when: trigger + condicoes"""
    trigger: str
    conditions: List[Union[ComparisonNode, MatchNode, LogicalNode]]

@dataclass
class ActionNode:
    """Clausula then: tipo + propriedades"""
    action_type: str  # 'block', 'require', 'warn', 'log'
    properties: List[PropertyNode] = field(default_factory=list)

@dataclass
class RuleNode:
    """No raiz: uma regra completa"""
    name: str
    properties: List[PropertyNode] = field(default_factory=list)
    condition: Optional[ConditionNode] = None
    action: Optional[ActionNode] = None


# --- Parser ---

VALID_ACTIONS = {'block', 'require', 'warn', 'log'}
COMPARISON_OPS = {TT.GT, TT.LT, TT.GTE, TT.LTE, TT.EQ, TT.NEQ}

class DSLParser:
    """Parser recursive descent para a DSL de regras."""

    def __init__(self, tokens: List[Token]):
        self.tokens = tokens
        self.pos = 0

    def current(self) -> Token:
        return self.tokens[self.pos]

    def peek(self) -> Token:
        if self.pos + 1 < len(self.tokens):
            return self.tokens[self.pos + 1]
        return self.tokens[-1]

    def eat(self, expected: TT) -> Token:
        tok = self.current()
        if tok.type != expected:
            raise SyntaxError(
                f"Linha {tok.line}: esperava {expected.name}, "
                f"achei {tok.type.name} ('{tok.value}')"
            )
        self.pos += 1
        return tok

    def parse(self) -> List[RuleNode]:
        """Ponto de entrada: parse program -> list of rules"""
        rules = []
        while self.current().type != TT.EOF:
            rules.append(self.parse_rule())
        return rules

    def parse_rule(self) -> RuleNode:
        """rule STRING { rule_body }"""
        self.eat(TT.RULE)
        name = self.eat(TT.STRING).value
        self.eat(TT.LBRACE)

        props = []
        condition = None
        action = None

        while self.current().type != TT.RBRACE:
            if self.current().type == TT.WHEN:
                condition = self.parse_when()
            elif self.current().type == TT.THEN:
                action = self.parse_then()
            else:
                props.append(self.parse_property())

        self.eat(TT.RBRACE)
        return RuleNode(name, props, condition, action)

    def parse_property(self) -> PropertyNode:
        """IDENT value"""
        key = self.eat(TT.IDENT).value
        if self.current().type == TT.STRING:
            val = self.eat(TT.STRING).value
        elif self.current().type == TT.NUMBER:
            val = self.eat(TT.NUMBER).value
        elif self.current().type == TT.IDENT:
            val = self.eat(TT.IDENT).value
        else:
            raise SyntaxError(
                f"Linha {self.current().line}: esperava valor apos '{key}'"
            )
        return PropertyNode(key, val)

    def parse_when(self) -> ConditionNode:
        """when IDENT { conditions }"""
        self.eat(TT.WHEN)
        trigger = self.eat(TT.IDENT).value
        self.eat(TT.LBRACE)

        conditions = []
        while self.current().type != TT.RBRACE:
            cond = self.parse_condition()
            # Verificar operadores logicos
            while self.current().type in (TT.AND, TT.OR):
                op = self.eat(self.current().type).value
                right = self.parse_condition()
                cond = LogicalNode(op, cond, right)
            conditions.append(cond)

        self.eat(TT.RBRACE)
        return ConditionNode(trigger, conditions)

    def parse_condition(self):
        """IDENT (STRING | comparison)"""
        field = self.eat(TT.IDENT).value

        # Comparacao: field > value
        if self.current().type in COMPARISON_OPS:
            op = self.eat(self.current().type).value
            if self.current().type == TT.NUMBER:
                val = self.eat(TT.NUMBER).value
            else:
                val = self.eat(TT.STRING).value
            return ComparisonNode(field, op, val)

        # Match simples: field "valor"
        if self.current().type == TT.STRING:
            val = self.eat(TT.STRING).value
            return MatchNode(field, val)

        # Match com identificador: field valor
        if self.current().type == TT.IDENT:
            val = self.eat(TT.IDENT).value
            return MatchNode(field, val)

        raise SyntaxError(
            f"Linha {self.current().line}: condicao invalida apos '{field}'"
        )

    def parse_then(self) -> ActionNode:
        """then ACTION_TYPE { properties }"""
        self.eat(TT.THEN)
        action_type = self.eat(TT.IDENT).value

        if action_type not in VALID_ACTIONS:
            raise SyntaxError(
                f"Tipo de acao '{action_type}' invalido. "
                f"Opcoes: {', '.join(VALID_ACTIONS)}"
            )

        self.eat(TT.LBRACE)
        props = []
        while self.current().type != TT.RBRACE:
            props.append(self.parse_property())
        self.eat(TT.RBRACE)

        return ActionNode(action_type, props)


# Testar o parser
lexer = DSLLexer(source)  # source definido acima
tokens = lexer.tokenize()
parser = DSLParser(tokens)
rules = parser.parse()

for rule in rules:
    print(f"Rule: {rule.name}")
    for p in rule.properties:
        print(f"  {p.key} = {p.value}")
    if rule.condition:
        print(f"  When: {rule.condition.trigger}")
        for c in rule.condition.conditions:
            print(f"    {c}")
    if rule.action:
        print(f"  Then: {rule.action.action_type}")
        for p in rule.action.properties:
            print(f"    {p.key} = {p.value}")

Passo 3: O Evaluator (DSLEvaluator)

O evaluator percorre a AST e executa as regras contra eventos. Ele transforma os nos da AST em objetos Rule executaveis.

Python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable

@dataclass
class CompiledRule:
    """Regra compilada, pronta para avaliacao."""
    name: str
    description: str = ""
    context: str = "all"
    domain: str = "build"
    priority: str = "medium"
    trigger: str = ""
    check: Optional[Callable] = None
    action_type: str = "warn"
    action_message: str = ""

@dataclass
class RuleResult:
    rule_name: str
    triggered: bool
    action_type: str
    message: str

class DSLEvaluator:
    """Avalia regras da DSL contra eventos."""

    def compile_rules(self, ast_rules: List[RuleNode]) -> List[CompiledRule]:
        """Compila AST em regras executaveis."""
        compiled = []
        for rule_node in ast_rules:
            cr = CompiledRule(name=rule_node.name)

            # Propriedades
            for prop in rule_node.properties:
                if prop.key == 'description':
                    cr.description = prop.value
                elif prop.key == 'context':
                    cr.context = prop.value
                elif prop.key == 'domain':
                    cr.domain = prop.value
                elif prop.key == 'priority':
                    cr.priority = prop.value

            # Condition
            if rule_node.condition:
                cr.trigger = rule_node.condition.trigger
                cr.check = self._compile_conditions(
                    rule_node.condition.conditions)

            # Action
            if rule_node.action:
                cr.action_type = rule_node.action.action_type
                for prop in rule_node.action.properties:
                    if prop.key == 'message':
                        cr.action_message = prop.value

            compiled.append(cr)
        return compiled

    def _compile_conditions(self, conditions) -> Callable:
        """Compila condicoes em uma funcao callable."""
        checks = []
        for cond in conditions:
            checks.append(self._compile_condition(cond))

        def combined_check(event: Dict) -> bool:
            return all(check(event) for check in checks)
        return combined_check

    def _compile_condition(self, cond) -> Callable:
        if isinstance(cond, MatchNode):
            def match_check(event, f=cond.field, v=cond.value):
                if f == 'contains':
                    return v in str(event.get('content', ''))
                elif f == 'type':
                    return event.get('type') == v
                elif f == 'missing':
                    return v not in event.get('actions', [])
                return event.get(f) == v
            return match_check

        elif isinstance(cond, ComparisonNode):
            def cmp_check(event, f=cond.field, op=cond.op, v=cond.value):
                actual = float(event.get(f, 0))
                expected = float(v)
                ops = {'>': actual > expected,
                       '<': actual < expected,
                       '>=': actual >= expected,
                       '<=': actual <= expected,
                       '==': actual == expected,
                       '!=': actual != expected}
                return ops.get(op, False)
            return cmp_check

        elif isinstance(cond, LogicalNode):
            left = self._compile_condition(cond.left)
            right = self._compile_condition(cond.right)
            if cond.op == 'and':
                return lambda e: left(e) and right(e)
            else:  # or
                return lambda e: left(e) or right(e)

    def evaluate(self, rules: List[CompiledRule],
                  event: Dict[str, Any]) -> List[RuleResult]:
        """Avalia todas as regras contra um evento."""
        results = []
        event_type = event.get('type', '')

        for rule in rules:
            # Verificar trigger
            if rule.trigger and rule.trigger != event_type:
                continue

            # Verificar condicoes
            triggered = True
            if rule.check:
                triggered = rule.check(event)

            if triggered:
                results.append(RuleResult(
                    rule_name=rule.name,
                    triggered=True,
                    action_type=rule.action_type,
                    message=rule.action_message
                ))

        return results


# Pipeline completo: texto -> tokens -> AST -> regras -> avaliacao
dsl_source = '''
rule "commit-hygiene" {
  description "Nunca adicionar Co-Authored-By"
  context all
  priority high
  when git_commit {
    contains "Co-Authored-By: Claude"
  }
  then block {
    message "Remova o trailer Co-Authored-By do commit"
  }
}
'''

# Compilar
lexer = DSLLexer(dsl_source)
parser = DSLParser(lexer.tokenize())
ast_rules = parser.parse()
evaluator = DSLEvaluator()
compiled = evaluator.compile_rules(ast_rules)

# Testar com eventos
event_bad = {
    'type': 'git_commit',
    'content': 'Add feature\n\nCo-Authored-By: Claude Sonnet'
}
event_good = {
    'type': 'git_commit',
    'content': 'Add feature\n\nSigned-off-by: Marco'
}

results = evaluator.evaluate(compiled, event_bad)
for r in results:
    print(f"[{r.action_type.upper()}] {r.rule_name}: {r.message}")
# [BLOCK] commit-hygiene: Remova o trailer Co-Authored-By do commit

results = evaluator.evaluate(compiled, event_good)
print(f"Eventos good: {len(results)} regras ativadas")
# Eventos good: 0 regras ativadas

Tratamento de Erros

Diagrama
Erros de cada fase:

Lexer:
  "Linha 5, coluna 12: Caractere inesperado: '@'"
  "Linha 8, coluna 1: String nao fechada"

Parser:
  "Linha 3: esperava STRING, achei LBRACE ('{')"
  "Tipo de acao 'blok' invalido. Opcoes: block, require, warn, log"

Evaluator (semantico):
  "Regra 'commit-hygiene': trigger 'git_commut' desconhecido.
   Triggers validos: git_commit, session_start, harness_change"
  "Regra 'budget-check': campo 'utilization' requer tipo numerico"

Principio: erros em termos do DOMINIO, nao da implementacao!

Resumo

Exercicio

Estenda a DSL para suportar: (1) condicoes negadas com not (ex: not contains "DEBUG"), (2) um novo action type transform que modifica o evento (ex: remover texto), (3) importacao de regras de outros arquivos com include "path". Implemente as mudancas no lexer, parser e evaluator.

Verifique seu entendimento

Qual e o papel do Evaluator no pipeline da DSL?

  • Verificar se o texto da DSL tem erros de sintaxe
  • Transformar texto em tokens
  • Compilar a AST em regras executaveis e avalia-las contra eventos
  • Gerar codigo assembly a partir da AST