Implementando Comunicacao
Video da aula estara disponivel em breve
Implementacao com asyncio
Para um mesh com multiplos nodes comunicando simultaneamente, precisamos de I/O assincrono. Python asyncio permite gerenciar multiplas conexoes sem threads.
Python
import asyncio
import json
class MeshNode:
"""Um node do harness.os mesh."""
def __init__(self, node_id, host, port):
self.node_id = node_id
self.host = host
self.port = port
self.peers = {} # node_id -> (reader, writer)
self.knowledge = {} # knowledge store local
async def start_server(self):
"""Inicia o servidor TCP para receber conexoes de peers."""
server = await asyncio.start_server(
self._handle_connection, self.host, self.port
)
print(f"[{self.node_id}] Listening on {self.host}:{self.port}")
async with server:
await server.serve_forever()
async def _handle_connection(self, reader, writer):
"""Processa conexao de um peer."""
addr = writer.get_extra_info("peername")
print(f"[{self.node_id}] Connection from {addr}")
try:
while True:
# Ler tamanho da mensagem (4 bytes, big-endian)
size_data = await reader.readexactly(4)
msg_size = int.from_bytes(size_data, "big")
# Ler a mensagem
msg_data = await reader.readexactly(msg_size)
message = json.loads(msg_data.decode())
# Processar
response = await self._process_message(message)
# Enviar resposta
resp_data = json.dumps(response).encode()
writer.write(len(resp_data).to_bytes(4, "big"))
writer.write(resp_data)
await writer.drain()
except asyncio.IncompleteReadError:
print(f"[{self.node_id}] Peer disconnected")
finally:
writer.close()
async def _process_message(self, message):
"""Roteia mensagem para handler correto."""
msg_type = message.get("type")
if msg_type == "health_check":
return {"status": "healthy", "node": self.node_id}
elif msg_type == "knowledge_sync":
domain = message["domain"]
self.knowledge[domain] = message["data"]
return {"status": "synced", "domain": domain}
elif msg_type == "knowledge_query":
domain = message["domain"]
return {"data": self.knowledge.get(domain, [])}
return {"error": "unknown message type"}
Cliente HTTP com httpx
Para a versao HTTP do protocolo (mais pratica para producao), usamos httpx com suporte a async:
Python
import httpx
import asyncio
class MeshClient:
"""Cliente HTTP para comunicacao entre mesh nodes."""
def __init__(self, node_id, auth_token):
self.node_id = node_id
self.client = httpx.AsyncClient(
headers={
"Authorization": f"Bearer {auth_token}",
"X-Node-ID": node_id
},
timeout=30.0
)
async def health_check(self, peer_url):
"""Verifica saude de um peer."""
try:
r = await self.client.get(f"{peer_url}/health")
return r.status_code == 200
except httpx.ConnectError:
return False
async def sync_knowledge(self, peer_url, domain, data):
"""Envia conhecimento para um peer."""
r = await self.client.post(
f"{peer_url}/knowledge/sync",
json={"domain": domain, "data": data}
)
return r.json()
async def query_knowledge(self, peer_url, domain):
"""Consulta conhecimento de um peer."""
r = await self.client.get(
f"{peer_url}/knowledge/{domain}"
)
return r.json()
# Exemplo: dois nodes trocando conhecimento
async def demo():
client = MeshClient("build-ai", "token123")
# Sincronizar learning com marco.ai
result = await client.sync_knowledge(
"https://marco.fly.dev",
domain="build",
data=[{"title": "Token optimization", "content": "..."}]
)
print(f"Sync result: {result}")
# Consultar knowledge de marco.ai
knowledge = await client.query_knowledge(
"https://marco.fly.dev", "governance"
)
print(f"Knowledge: {knowledge}")
SSE para eventos em tempo real
Python
import asyncio
from aiohttp import web
async def sse_handler(request):
"""Endpoint SSE para streaming de eventos mesh."""
response = web.StreamResponse()
response.content_type = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
await response.prepare(request)
# Simular eventos de sessao
events = [
{"type": "session_start", "agent": "build-ai"},
{"type": "tool_call", "tool": "get_knowledge"},
{"type": "decision_logged", "title": "Use TCP for mesh"},
{"type": "session_end", "summary": "Done"},
]
for event in events:
data = json.dumps(event)
await response.write(
f"event: {event['type']}\ndata: {data}\n\n".encode()
)
await asyncio.sleep(1)
return response
app = web.Application()
app.router.add_get("/events", sse_handler)
Comunicacao completa entre dois nodes
Diagrama
Fluxo completo: build-ai envia learning para marco.ai
1. build-ai resolve "marco-ai" -> https://marco.fly.dev
(service discovery via registry)
2. build-ai cria JWT com claims: sub=build-ai, role=build
(autenticacao, aula 5.2)
3. build-ai envia HTTP POST com TLS:
POST /knowledge/sync HTTP/2
Authorization: Bearer eyJ...
Content-Type: application/json
{"domain": "build", "data": [...]}
4. Fly.io roteia request para MCP server de marco.ai
(load balancing, anycast IP, aula 5.3)
5. marco.ai valida JWT, verifica permissoes
(seguranca, aula 5.2)
6. marco.ai salva knowledge no Neon DB
(TCP:5432, TLS, PgBouncer, aula 4.3)
7. marco.ai responde:
HTTP/2 200 OK
{"status": "synced", "domain": "build"}
8. build-ai confirma sync e atualiza status local
Cada passo usa conceitos que aprendemos:
DNS (2.2), HTTP (2.1), TLS (5.2), TCP (3.2),
IP (4.1), routing (4.2), load balancing (5.3)
No harness.os
Esta implementacao e um prototipo funcional do que o mesh do harness.os usara para comunicacao entre nodes. A versao de producao usaria o MCP protocol (Streamable HTTP) em vez de HTTP puro, mas os conceitos de rede sao identicos.
Resumo
asynciopermite I/O nao-bloqueante para multiplas conexoes simultaneas- Sockets TCP: protocolo binario com header de tamanho + payload JSON
httpx: cliente HTTP async para comunicacao entre nodes via REST- SSE: streaming de eventos em tempo real do servidor para clientes
- O fluxo completo combina todos os conceitos do curso
Exercicio pratico
Implemente um prototipo de dois harness nodes trocando knowledge.
- Node A (build-ai) roda na porta 8001, Node B (marco-ai) na porta 8002
- Node A envia um learning para Node B via HTTP POST
- Node B salva o learning e confirma
- Node A consulta Node B para verificar que o knowledge foi persistido
- Adicione autenticacao JWT ao fluxo
Verifique seu entendimento
Por que usamos asyncio em vez de threads para gerenciar multiplas conexoes no mesh?