Implementando a DSL
Video da aula estara disponivel em breve
Pipeline de Implementacao
Nesta aula, implementamos o pipeline completo para a DSL de regras do harness.os: lexer, parser e evaluator. Cada componente usa as tecnicas estudadas ao longo do curso.
Texto DSL
|
v
+----------+ tokens +----------+ AST +------------+
| Lexer | ----------> | Parser | --------> | Evaluator |
| (Mod. 2) | | (Mod. 3) | | (Mod. 4-5) |
+----------+ +----------+ +------------+
|
v
List[Rule]
(objetos Python)
Passo 1: O Lexer (DSLLexer)
O lexer transforma o texto da DSL em tokens. Definimos os tipos de token com um Enum e reutilizamos as tecnicas do Modulo 2.
from enum import Enum, auto
from dataclasses import dataclass
from typing import List
import re
class TT(Enum):
"""Token Types para a DSL de regras."""
# Keywords
RULE = auto()
WHEN = auto()
THEN = auto()
AND = auto()
OR = auto()
NOT = auto()
# Literais
STRING = auto()
NUMBER = auto()
IDENT = auto()
# Simbolos
LBRACE = auto() # {
RBRACE = auto() # }
GT = auto() # >
LT = auto() # <
GTE = auto() # >=
LTE = auto() # <=
EQ = auto() # ==
NEQ = auto() # !=
# Controle
EOF = auto()
@dataclass
class Token:
type: TT
value: str
line: int
col: int
def __repr__(self):
return f"Token({self.type.name}, {self.value!r}, L{self.line})"
KEYWORDS = {
'rule': TT.RULE, 'when': TT.WHEN, 'then': TT.THEN,
'and': TT.AND, 'or': TT.OR, 'not': TT.NOT,
}
class DSLLexer:
"""Lexer para a DSL de regras do harness.os."""
def __init__(self, source: str):
self.source = source
self.pos = 0
self.line = 1
self.col = 1
self.tokens: List[Token] = []
def error(self, msg: str):
raise SyntaxError(
f"Linha {self.line}, coluna {self.col}: {msg}"
)
def peek(self) -> str:
if self.pos >= len(self.source):
return '\0'
return self.source[self.pos]
def advance(self) -> str:
ch = self.source[self.pos]
self.pos += 1
if ch == '\n':
self.line += 1
self.col = 1
else:
self.col += 1
return ch
def skip_whitespace(self):
while self.pos < len(self.source) and \
self.source[self.pos] in ' \t\n\r':
self.advance()
def skip_comment(self):
while self.pos < len(self.source) and \
self.source[self.pos] != '\n':
self.advance()
def read_string(self) -> Token:
start_line, start_col = self.line, self.col
self.advance() # pula a aspas inicial
result = []
while self.peek() != '"':
if self.peek() == '\0':
self.error("String nao fechada")
if self.peek() == '\\':
self.advance() # escape
result.append(self.advance())
self.advance() # pula a aspas final
return Token(TT.STRING, ''.join(result), start_line, start_col)
def read_number(self) -> Token:
start_line, start_col = self.line, self.col
result = []
while self.peek().isdigit() or self.peek() == '.':
result.append(self.advance())
return Token(TT.NUMBER, ''.join(result), start_line, start_col)
def read_identifier(self) -> Token:
start_line, start_col = self.line, self.col
result = []
while self.peek().isalnum() or self.peek() in '_-':
result.append(self.advance())
word = ''.join(result)
tt = KEYWORDS.get(word, TT.IDENT)
return Token(tt, word, start_line, start_col)
def tokenize(self) -> List[Token]:
while self.pos < len(self.source):
self.skip_whitespace()
if self.pos >= len(self.source):
break
ch = self.peek()
ln, co = self.line, self.col
if ch == '#':
self.skip_comment()
elif ch == '"':
self.tokens.append(self.read_string())
elif ch.isdigit():
self.tokens.append(self.read_number())
elif ch.isalpha() or ch == '_':
self.tokens.append(self.read_identifier())
elif ch == '{':
self.advance()
self.tokens.append(Token(TT.LBRACE, '{', ln, co))
elif ch == '}':
self.advance()
self.tokens.append(Token(TT.RBRACE, '}', ln, co))
elif ch == '>':
self.advance()
if self.peek() == '=':
self.advance()
self.tokens.append(Token(TT.GTE, '>=', ln, co))
else:
self.tokens.append(Token(TT.GT, '>', ln, co))
elif ch == '<':
self.advance()
if self.peek() == '=':
self.advance()
self.tokens.append(Token(TT.LTE, '<=', ln, co))
else:
self.tokens.append(Token(TT.LT, '<', ln, co))
elif ch == '=':
self.advance()
if self.peek() == '=':
self.advance()
self.tokens.append(Token(TT.EQ, '==', ln, co))
else:
self.error(f"Caractere inesperado: '='. Voce quis dizer '=='?")
elif ch == '!':
self.advance()
if self.peek() == '=':
self.advance()
self.tokens.append(Token(TT.NEQ, '!=', ln, co))
else:
self.error(f"Caractere inesperado: '!'. Voce quis dizer '!='?")
else:
self.error(f"Caractere inesperado: '{ch}'")
self.tokens.append(Token(TT.EOF, '', self.line, self.col))
return self.tokens
# Testar o lexer
source = '''
rule "commit-hygiene" {
description "Nunca adicionar Co-Authored-By"
context all
priority high
when git_commit {
contains "Co-Authored-By: Claude"
}
then block {
message "Remova o trailer"
}
}
'''
lexer = DSLLexer(source)
tokens = lexer.tokenize()
for t in tokens:
print(t)
Token(RULE, 'rule', L2)
Token(STRING, 'commit-hygiene', L2)
Token(LBRACE, '{', L2)
Token(IDENT, 'description', L3)
Token(STRING, 'Nunca adicionar Co-Authored-By', L3)
Token(IDENT, 'context', L4)
Token(IDENT, 'all', L4)
Token(IDENT, 'priority', L5)
Token(IDENT, 'high', L5)
Token(WHEN, 'when', L7)
Token(IDENT, 'git_commit', L7)
Token(LBRACE, '{', L7)
Token(IDENT, 'contains', L8)
Token(STRING, 'Co-Authored-By: Claude', L8)
Token(RBRACE, '}', L9)
Token(THEN, 'then', L11)
Token(IDENT, 'block', L11)
Token(LBRACE, '{', L11)
Token(IDENT, 'message', L12)
Token(STRING, 'Remova o trailer', L12)
Token(RBRACE, '}', L13)
Token(RBRACE, '}', L14)
Token(EOF, '', L15)
Passo 2: O Parser (DSLParser)
O parser consome tokens e constroi a AST. Usamos recursive descent (Modulo 3) com nos de dados definidos como dataclasses (Modulo 4).
from dataclasses import dataclass, field
from typing import List, Optional, Union
# --- Nos da AST ---
@dataclass
class PropertyNode:
"""Propriedade: chave valor"""
key: str
value: str
@dataclass
class ComparisonNode:
"""Condicao com comparacao: field op value"""
field: str
op: str # '>', '<', '>=', '<=', '==', '!='
value: str
@dataclass
class MatchNode:
"""Condicao de match: field value"""
field: str
value: str
@dataclass
class LogicalNode:
"""Combinacao logica: left op right"""
op: str # 'and', 'or'
left: object
right: object
@dataclass
class ConditionNode:
"""Clausula when: trigger + condicoes"""
trigger: str
conditions: List[Union[ComparisonNode, MatchNode, LogicalNode]]
@dataclass
class ActionNode:
"""Clausula then: tipo + propriedades"""
action_type: str # 'block', 'require', 'warn', 'log'
properties: List[PropertyNode] = field(default_factory=list)
@dataclass
class RuleNode:
"""No raiz: uma regra completa"""
name: str
properties: List[PropertyNode] = field(default_factory=list)
condition: Optional[ConditionNode] = None
action: Optional[ActionNode] = None
# --- Parser ---
VALID_ACTIONS = {'block', 'require', 'warn', 'log'}
COMPARISON_OPS = {TT.GT, TT.LT, TT.GTE, TT.LTE, TT.EQ, TT.NEQ}
class DSLParser:
"""Parser recursive descent para a DSL de regras."""
def __init__(self, tokens: List[Token]):
self.tokens = tokens
self.pos = 0
def current(self) -> Token:
return self.tokens[self.pos]
def peek(self) -> Token:
if self.pos + 1 < len(self.tokens):
return self.tokens[self.pos + 1]
return self.tokens[-1]
def eat(self, expected: TT) -> Token:
tok = self.current()
if tok.type != expected:
raise SyntaxError(
f"Linha {tok.line}: esperava {expected.name}, "
f"achei {tok.type.name} ('{tok.value}')"
)
self.pos += 1
return tok
def parse(self) -> List[RuleNode]:
"""Ponto de entrada: parse program -> list of rules"""
rules = []
while self.current().type != TT.EOF:
rules.append(self.parse_rule())
return rules
def parse_rule(self) -> RuleNode:
"""rule STRING { rule_body }"""
self.eat(TT.RULE)
name = self.eat(TT.STRING).value
self.eat(TT.LBRACE)
props = []
condition = None
action = None
while self.current().type != TT.RBRACE:
if self.current().type == TT.WHEN:
condition = self.parse_when()
elif self.current().type == TT.THEN:
action = self.parse_then()
else:
props.append(self.parse_property())
self.eat(TT.RBRACE)
return RuleNode(name, props, condition, action)
def parse_property(self) -> PropertyNode:
"""IDENT value"""
key = self.eat(TT.IDENT).value
if self.current().type == TT.STRING:
val = self.eat(TT.STRING).value
elif self.current().type == TT.NUMBER:
val = self.eat(TT.NUMBER).value
elif self.current().type == TT.IDENT:
val = self.eat(TT.IDENT).value
else:
raise SyntaxError(
f"Linha {self.current().line}: esperava valor apos '{key}'"
)
return PropertyNode(key, val)
def parse_when(self) -> ConditionNode:
"""when IDENT { conditions }"""
self.eat(TT.WHEN)
trigger = self.eat(TT.IDENT).value
self.eat(TT.LBRACE)
conditions = []
while self.current().type != TT.RBRACE:
cond = self.parse_condition()
# Verificar operadores logicos
while self.current().type in (TT.AND, TT.OR):
op = self.eat(self.current().type).value
right = self.parse_condition()
cond = LogicalNode(op, cond, right)
conditions.append(cond)
self.eat(TT.RBRACE)
return ConditionNode(trigger, conditions)
def parse_condition(self):
"""IDENT (STRING | comparison)"""
field = self.eat(TT.IDENT).value
# Comparacao: field > value
if self.current().type in COMPARISON_OPS:
op = self.eat(self.current().type).value
if self.current().type == TT.NUMBER:
val = self.eat(TT.NUMBER).value
else:
val = self.eat(TT.STRING).value
return ComparisonNode(field, op, val)
# Match simples: field "valor"
if self.current().type == TT.STRING:
val = self.eat(TT.STRING).value
return MatchNode(field, val)
# Match com identificador: field valor
if self.current().type == TT.IDENT:
val = self.eat(TT.IDENT).value
return MatchNode(field, val)
raise SyntaxError(
f"Linha {self.current().line}: condicao invalida apos '{field}'"
)
def parse_then(self) -> ActionNode:
"""then ACTION_TYPE { properties }"""
self.eat(TT.THEN)
action_type = self.eat(TT.IDENT).value
if action_type not in VALID_ACTIONS:
raise SyntaxError(
f"Tipo de acao '{action_type}' invalido. "
f"Opcoes: {', '.join(VALID_ACTIONS)}"
)
self.eat(TT.LBRACE)
props = []
while self.current().type != TT.RBRACE:
props.append(self.parse_property())
self.eat(TT.RBRACE)
return ActionNode(action_type, props)
# Testar o parser
lexer = DSLLexer(source) # source definido acima
tokens = lexer.tokenize()
parser = DSLParser(tokens)
rules = parser.parse()
for rule in rules:
print(f"Rule: {rule.name}")
for p in rule.properties:
print(f" {p.key} = {p.value}")
if rule.condition:
print(f" When: {rule.condition.trigger}")
for c in rule.condition.conditions:
print(f" {c}")
if rule.action:
print(f" Then: {rule.action.action_type}")
for p in rule.action.properties:
print(f" {p.key} = {p.value}")
Passo 3: O Evaluator (DSLEvaluator)
O evaluator percorre a AST e executa as regras contra eventos. Ele transforma os nos da AST em objetos Rule executaveis.
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
@dataclass
class CompiledRule:
"""Regra compilada, pronta para avaliacao."""
name: str
description: str = ""
context: str = "all"
domain: str = "build"
priority: str = "medium"
trigger: str = ""
check: Optional[Callable] = None
action_type: str = "warn"
action_message: str = ""
@dataclass
class RuleResult:
rule_name: str
triggered: bool
action_type: str
message: str
class DSLEvaluator:
"""Avalia regras da DSL contra eventos."""
def compile_rules(self, ast_rules: List[RuleNode]) -> List[CompiledRule]:
"""Compila AST em regras executaveis."""
compiled = []
for rule_node in ast_rules:
cr = CompiledRule(name=rule_node.name)
# Propriedades
for prop in rule_node.properties:
if prop.key == 'description':
cr.description = prop.value
elif prop.key == 'context':
cr.context = prop.value
elif prop.key == 'domain':
cr.domain = prop.value
elif prop.key == 'priority':
cr.priority = prop.value
# Condition
if rule_node.condition:
cr.trigger = rule_node.condition.trigger
cr.check = self._compile_conditions(
rule_node.condition.conditions)
# Action
if rule_node.action:
cr.action_type = rule_node.action.action_type
for prop in rule_node.action.properties:
if prop.key == 'message':
cr.action_message = prop.value
compiled.append(cr)
return compiled
def _compile_conditions(self, conditions) -> Callable:
"""Compila condicoes em uma funcao callable."""
checks = []
for cond in conditions:
checks.append(self._compile_condition(cond))
def combined_check(event: Dict) -> bool:
return all(check(event) for check in checks)
return combined_check
def _compile_condition(self, cond) -> Callable:
if isinstance(cond, MatchNode):
def match_check(event, f=cond.field, v=cond.value):
if f == 'contains':
return v in str(event.get('content', ''))
elif f == 'type':
return event.get('type') == v
elif f == 'missing':
return v not in event.get('actions', [])
return event.get(f) == v
return match_check
elif isinstance(cond, ComparisonNode):
def cmp_check(event, f=cond.field, op=cond.op, v=cond.value):
actual = float(event.get(f, 0))
expected = float(v)
ops = {'>': actual > expected,
'<': actual < expected,
'>=': actual >= expected,
'<=': actual <= expected,
'==': actual == expected,
'!=': actual != expected}
return ops.get(op, False)
return cmp_check
elif isinstance(cond, LogicalNode):
left = self._compile_condition(cond.left)
right = self._compile_condition(cond.right)
if cond.op == 'and':
return lambda e: left(e) and right(e)
else: # or
return lambda e: left(e) or right(e)
def evaluate(self, rules: List[CompiledRule],
event: Dict[str, Any]) -> List[RuleResult]:
"""Avalia todas as regras contra um evento."""
results = []
event_type = event.get('type', '')
for rule in rules:
# Verificar trigger
if rule.trigger and rule.trigger != event_type:
continue
# Verificar condicoes
triggered = True
if rule.check:
triggered = rule.check(event)
if triggered:
results.append(RuleResult(
rule_name=rule.name,
triggered=True,
action_type=rule.action_type,
message=rule.action_message
))
return results
# Pipeline completo: texto -> tokens -> AST -> regras -> avaliacao
dsl_source = '''
rule "commit-hygiene" {
description "Nunca adicionar Co-Authored-By"
context all
priority high
when git_commit {
contains "Co-Authored-By: Claude"
}
then block {
message "Remova o trailer Co-Authored-By do commit"
}
}
'''
# Compilar
lexer = DSLLexer(dsl_source)
parser = DSLParser(lexer.tokenize())
ast_rules = parser.parse()
evaluator = DSLEvaluator()
compiled = evaluator.compile_rules(ast_rules)
# Testar com eventos
event_bad = {
'type': 'git_commit',
'content': 'Add feature\n\nCo-Authored-By: Claude Sonnet'
}
event_good = {
'type': 'git_commit',
'content': 'Add feature\n\nSigned-off-by: Marco'
}
results = evaluator.evaluate(compiled, event_bad)
for r in results:
print(f"[{r.action_type.upper()}] {r.rule_name}: {r.message}")
# [BLOCK] commit-hygiene: Remova o trailer Co-Authored-By do commit
results = evaluator.evaluate(compiled, event_good)
print(f"Eventos good: {len(results)} regras ativadas")
# Eventos good: 0 regras ativadas
Tratamento de Erros
Erros de cada fase:
Lexer:
"Linha 5, coluna 12: Caractere inesperado: '@'"
"Linha 8, coluna 1: String nao fechada"
Parser:
"Linha 3: esperava STRING, achei LBRACE ('{')"
"Tipo de acao 'blok' invalido. Opcoes: block, require, warn, log"
Evaluator (semantico):
"Regra 'commit-hygiene': trigger 'git_commut' desconhecido.
Triggers validos: git_commit, session_start, harness_change"
"Regra 'budget-check': campo 'utilization' requer tipo numerico"
Principio: erros em termos do DOMINIO, nao da implementacao!
Resumo
- O pipeline DSL tem tres fases: Lexer (texto -> tokens), Parser (tokens -> AST), Evaluator (AST -> execucao)
- O DSLLexer reconhece keywords (rule, when, then), strings, numeros, identificadores e operadores
- O DSLParser usa recursive descent para construir uma AST com nos tipados (RuleNode, ConditionNode, ActionNode)
- O DSLEvaluator compila a AST em funcoes callable que avaliam eventos contra regras
- Mensagens de erro devem usar a linguagem do dominio, nao detalhes de implementacao
- Na proxima aula, integramos essa DSL ao sistema harness.os real
Exercicio
Estenda a DSL para suportar: (1) condicoes negadas com not (ex: not contains "DEBUG"), (2) um novo action type transform que modifica o evento (ex: remover texto), (3) importacao de regras de outros arquivos com include "path". Implemente as mudancas no lexer, parser e evaluator.
Verifique seu entendimento
Qual e o papel do Evaluator no pipeline da DSL?