Projetando o Protocolo
Video da aula estara disponivel em breve
Projeto do modulo: Mesh Protocol para harness.os
Neste módulo final, vamos aplicar tudo que aprendemos para projetar e implementar um protocolo de comunicação para o mesh do harness.os. O objetivo é que nodes do mesh (marco.ai, build.ai, cortex.ai) possam trocar conhecimento, sincronizar estado e coordenar agentes. Projetar um protocolo envolve as mesmas decisões que os engenheiros do TCP, HTTP e BGP enfrentaram: formato de mensagem, semântica de entrega, tratamento de erros e extensibilidade.
Design de formato de mensagem
Todo protocolo precisa de um formato de mensagem bem definido. Vamos projetar o HMP (Harness Mesh Protocol):
Formato da mensagem HMP:
+─────────────── HEADER (fixo, 32 bytes) ──────────────────+
| Version (1B) | Type (1B) | Flags (2B) | Length (4B) |
| Source ID (16B - UUID) |
| Dest ID (8B - short hash) |
+─────────────── PAYLOAD (variavel) ────────────────────────+
| JSON payload (ate 64KB) |
+───────────────────────────────────────────────────────────+
Version: 1 (primeira versao do protocolo)
Type:
0x01 = KNOWLEDGE_SYNC (sincronizar conhecimento)
0x02 = HEALTH_CHECK (verificar se node esta vivo)
0x03 = SESSION_EVENT (notificar evento de sessao)
0x04 = ROUTE_UPDATE (atualizar tabela de roteamento)
0x05 = AUTH_REQUEST (autenticacao entre nodes)
0x06 = AUTH_RESPONSE (resposta de autenticacao)
Flags:
bit 0: REQUIRES_ACK (precisa de confirmacao)
bit 1: ENCRYPTED (payload criptografado)
bit 2: COMPRESSED (payload comprimido)
bit 3: PRIORITY (alta prioridade)
import struct
import json
import uuid
# Definicao do protocolo HMP
HMP_VERSION = 1
HEADER_FORMAT = "!BBHi16s8s" # network byte order
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 32 bytes
# Message types
KNOWLEDGE_SYNC = 0x01
HEALTH_CHECK = 0x02
SESSION_EVENT = 0x03
ROUTE_UPDATE = 0x04
# Flags
FLAG_REQUIRES_ACK = 0x01
FLAG_ENCRYPTED = 0x02
FLAG_COMPRESSED = 0x04
FLAG_PRIORITY = 0x08
def encode_message(msg_type, source_id, dest_id, payload, flags=0):
"""Codifica uma mensagem HMP."""
payload_bytes = json.dumps(payload).encode("utf-8")
src_bytes = uuid.UUID(source_id).bytes
dst_bytes = dest_id.encode("utf-8")[:8].ljust(8, b'\x00')
header = struct.pack(
HEADER_FORMAT,
HMP_VERSION,
msg_type,
flags,
len(payload_bytes),
src_bytes,
dst_bytes
)
return header + payload_bytes
def decode_message(data):
"""Decodifica uma mensagem HMP."""
header = data[:HEADER_SIZE]
version, msg_type, flags, length, src, dst = struct.unpack(
HEADER_FORMAT, header
)
payload = json.loads(data[HEADER_SIZE:HEADER_SIZE + length])
return {
"version": version,
"type": msg_type,
"flags": flags,
"source": str(uuid.UUID(bytes=src)),
"dest": dst.rstrip(b'\x00').decode(),
"payload": payload
}
# Exemplo de uso
msg = encode_message(
msg_type=KNOWLEDGE_SYNC,
source_id="550e8400-e29b-41d4-a716-446655440000",
dest_id="build-ai",
payload={"domain": "build", "knowledge": ["new learning..."]},
flags=FLAG_REQUIRES_ACK
)
print(decode_message(msg))
Máquina de estados do protocolo
Todo protocolo tem uma máquina de estados que define os estados possíveis de uma conexão e as transições entre eles. Assim como TCP tem SYN_SENT, ESTABLISHED, FIN_WAIT, o HMP tem seus próprios estados:
Máquina de estados HMP (Harness Mesh Protocol):
┌────────────┐
│ IDLE │ (node registrado, sem conexão ativa)
└─────┬──────┘
│ send AUTH_REQUEST
v
┌────────────┐ timeout / auth failed
│ AUTH │ ──────────────────────────→ IDLE
│ PENDING │
└─────┬──────┘
│ recv AUTH_RESPONSE (ok)
v
┌────────────┐
│ CONNECTED │ (autenticado, pode trocar mensagens)
└─────┬──────┘
│ │
│ send/recv msgs │ 3 heartbeats perdidos
│ (KNOWLEDGE_SYNC, │
│ SESSION_EVENT, v
│ ROUTE_UPDATE) ┌────────────┐
│ │ SUSPECT │ (possível falha)
│ └─────┬──────┘
│ │ heartbeat recebido → CONNECTED
│ │ timeout → DISCONNECTED
│ v
│ ┌────────────┐
│ │DISCONNECTED│ (node considerado down)
│ └─────┬──────┘
│ │ reconexão → AUTH PENDING
│ v
└───────────────→ IDLE (cleanup)
Timeouts:
AUTH_PENDING → IDLE: 5 segundos
CONNECTED → SUSPECT: 90 segundos (3 × heartbeat interval)
SUSPECT → DISCONNECTED: 30 segundos
Reconnect backoff: exponential (1s, 2s, 4s, 8s, max 60s)
from enum import Enum
import time
class NodeState(Enum):
IDLE = "idle"
AUTH_PENDING = "auth_pending"
CONNECTED = "connected"
SUSPECT = "suspect"
DISCONNECTED = "disconnected"
class HMPConnection:
"""Gerencia o estado de uma conexão HMP."""
def __init__(self, node_id):
self.node_id = node_id
self.state = NodeState.IDLE
self.last_heartbeat = None
self.auth_attempts = 0
def transition(self, event):
"""Transição de estado baseada em evento."""
transitions = {
(NodeState.IDLE, "auth_request"): NodeState.AUTH_PENDING,
(NodeState.AUTH_PENDING, "auth_ok"): NodeState.CONNECTED,
(NodeState.AUTH_PENDING, "auth_fail"): NodeState.IDLE,
(NodeState.CONNECTED, "heartbeat_timeout"): NodeState.SUSPECT,
(NodeState.CONNECTED, "disconnect"): NodeState.DISCONNECTED,
(NodeState.SUSPECT, "heartbeat_ok"): NodeState.CONNECTED,
(NodeState.SUSPECT, "timeout"): NodeState.DISCONNECTED,
(NodeState.DISCONNECTED, "reconnect"): NodeState.AUTH_PENDING,
}
key = (self.state, event)
if key in transitions:
old = self.state
self.state = transitions[key]
print(f"[{self.node_id}] {old.value} --{event}--> {self.state.value}")
else:
print(f"[{self.node_id}] Transição inválida: {self.state.value} + {event}")
# Simulação
conn = HMPConnection("build-ai")
conn.transition("auth_request") # idle → auth_pending
conn.transition("auth_ok") # auth_pending → connected
conn.transition("heartbeat_timeout") # connected → suspect
conn.transition("heartbeat_ok") # suspect → connected
Request/response vs. Pub/sub
Padrao Quando usar no mesh Implementacao
────────────── ────────────────────────────── ──────────────
Request/response Knowledge queries, tool calls HTTP/JSON-RPC
"Preciso de uma resposta"
Pub/sub Session events, health status SSE ou WebSocket
"Informo a todos que..."
Hibrido Mesh protocol combina ambos HMP customizado
Tipo 0x01-0x04: req/resp
Tipo 0x03: pub/sub (broadcast)
Service discovery
# Service registry para mesh nodes
class MeshRegistry:
def __init__(self):
self.nodes = {}
def register(self, node_id, endpoint, capabilities):
"""Registra um node no mesh."""
self.nodes[node_id] = {
"endpoint": endpoint,
"capabilities": capabilities,
"status": "healthy",
"last_heartbeat": time.time()
}
def discover(self, capability):
"""Encontra nodes com uma capability especifica."""
return [
node_id for node_id, info in self.nodes.items()
if capability in info["capabilities"]
and info["status"] == "healthy"
]
def heartbeat(self, node_id):
"""Atualiza heartbeat de um node."""
if node_id in self.nodes:
self.nodes[node_id]["last_heartbeat"] = time.time()
# Uso
registry = MeshRegistry()
registry.register("marco-ai", "https://marco.fly.dev",
["knowledge", "governance", "metacognitive"])
registry.register("build-ai", "https://build.fly.dev",
["knowledge", "causal", "build"])
# Quem sabe sobre "knowledge"?
print(registry.discover("knowledge")) # ['marco-ai', 'build-ai']
Tratamento de erros e retransmissão
Um protocolo robusto precisa definir como lidar com mensagens perdidas, duplicadas ou corrompidas. O HMP usa ACKs e retransmissão com backoff exponencial:
Retransmissão com exponential backoff:
Emissor Receptor
| |
|--- MSG (seq=1, ACK_REQ) ----->|
| [timer: 1s] |
| | (msg perdida!)
| [timeout 1s] |
|--- MSG (seq=1, retry=1) ----->|
| [timer: 2s] |
| | (msg perdida de novo!)
| [timeout 2s] |
|--- MSG (seq=1, retry=2) ----->|
| [timer: 4s] |
|<-- ACK (seq=1) ---------------| (finalmente!)
| |
Backoff: 1s, 2s, 4s, 8s, 16s, max 60s
Max retries: 5 (depois disso → marcar node como SUSPECT)
Detecção de duplicatas:
Receptor mantém cache de (source_id, seq_number) recentes
Se recebe msg com seq já visto → descarta (já processou)
Cache expira após 5 minutos
Heartbeats e health checks
Nodes do mesh precisam saber se seus pares estão vivos. Heartbeats são mensagens periódicas "estou vivo". Se um node não envia heartbeat por T segundos, é considerado down e a máquina de estados transiciona para SUSPECT.
Heartbeat protocol:
Node A Registry Node B
| | |
|---HEALTH_CHECK(A)----->| |
| |---HEALTH_CHECK(B)---->|
| |<--HEALTH_OK-----------|
|<--registry updated-----| |
| | |
[... 30 segundos ...] | |
| | |
|---HEALTH_CHECK(A)----->| |
| |---HEALTH_CHECK(B)---->|
| | (timeout, sem resp) |
| | X (down!)
|<--NODE_DOWN(B)---------|
| |
Se Node B nao responde ao heartbeat por 3 tentativas
consecutivas, e marcado como "unhealthy" no registry.
No harness.os
Este protocolo é o que o mesh do harness.os usará para comunicação inter-node. O design combina conceitos de todas as aulas anteriores: formato de mensagem (camada de aplicação), transporte confiável (TCP), endereçamento (UUIDs como IPs), roteamento (concern-based), e segurança (JWT + TLS). Cada decisão de design reflete um trade-off estudado nas aulas anteriores.
Mapeamento: conceitos do curso → decisões do HMP
Camada de aplicação (Módulo 2):
HTTP request/response → HMP request/response (tipos 0x01-0x04)
DNS service discovery → MeshRegistry (node_id → endpoint)
JSON payload → HMP payload (flexível, extensível)
Camada de transporte (Módulo 3):
TCP confiabilidade → ACK + retransmissão no HMP
Controle de congestionamento → backoff exponencial nas retries
Sequence numbers → seq field para detecção de duplicatas
Camada de rede (Módulo 4):
IP endereçamento → UUID como node identifier
Routing tables → concern-based routing no mesh
TTL → max retries (evita mensagens circulando infinitamente)
Enlace e segurança (Módulo 5):
CRC/checksum → integridade via TLS (abaixo do HMP)
TLS/mTLS → flag ENCRYPTED + JWT auth
VLANs → isolamento por harness type (build/product/ops)
Resumo
- Formato de mensagem: header fixo (versão, tipo, flags, tamanho) + payload JSON variável
- Máquina de estados: IDLE → AUTH_PENDING → CONNECTED → SUSPECT → DISCONNECTED
- Tipos de mensagem: KNOWLEDGE_SYNC, HEALTH_CHECK, SESSION_EVENT, ROUTE_UPDATE, AUTH_*
- Retransmissão com exponential backoff e detecção de duplicatas por (source_id, seq)
- Service discovery: registry que mapeia node_id para endpoint + capabilities
- Heartbeats: mensagens periódicas para detectar nodes fora do ar (30s interval, 3 misses = SUSPECT)
- Combina request/response (queries) e pub/sub (eventos) em um único protocolo
Homework
Exercício 1: Escreva a spec completa do protocolo HMP (Harness Mesh Protocol).
- Defina todos os tipos de mensagem e seus payloads JSON (com exemplos)
- Especifique o comportamento de cada flag (REQUIRES_ACK, ENCRYPTED, COMPRESSED, PRIORITY)
- Documente o fluxo de handshake entre dois nodes (AUTH_REQUEST → AUTH_RESPONSE)
- Defina timeouts e retries para cada tipo de mensagem
Exercício 2: Implemente a máquina de estados HMP completa em Python. Adicione validação de transições inválidas, logging de cada transição, e um mecanismo de reconnect automático com backoff exponencial.
Exercício 3: Compare o formato HMP (header binário + JSON payload) com alternativas: (a) tudo JSON, (b) tudo protobuf, (c) MessagePack. Analise trade-offs de performance, debugging e extensibilidade.
Exercício 4: Implemente a retransmissão com exponential backoff e detecção de duplicatas. Simule uma rede com 30% de perda de pacotes e meça: quantas retransmissões são necessárias em média para entregar 100 mensagens?
Verifique seu entendimento
Por que o header do protocolo tem tamanho fixo enquanto o payload e variavel?
Verifique seu entendimento
Na máquina de estados HMP, o que acontece quando um node CONNECTED não recebe heartbeat por 3 intervalos consecutivos?
Verifique seu entendimento
Como o receptor HMP detecta e descarta mensagens duplicadas causadas por retransmissões?