College Online
0%

Implementando Comunicacao

Modulo 6 · Aula 2 ~30 min de leitura Nivel: Avancado

Video da aula estara disponivel em breve

Implementacao com asyncio

Para um mesh com multiplos nodes comunicando simultaneamente, precisamos de I/O assincrono. Python asyncio permite gerenciar multiplas conexoes sem threads.

Python
import asyncio
import json

class MeshNode:
    """Um node do harness.os mesh."""

    def __init__(self, node_id, host, port):
        self.node_id = node_id
        self.host = host
        self.port = port
        self.peers = {}  # node_id -> (reader, writer)
        self.knowledge = {}  # knowledge store local

    async def start_server(self):
        """Inicia o servidor TCP para receber conexoes de peers."""
        server = await asyncio.start_server(
            self._handle_connection, self.host, self.port
        )
        print(f"[{self.node_id}] Listening on {self.host}:{self.port}")
        async with server:
            await server.serve_forever()

    async def _handle_connection(self, reader, writer):
        """Processa conexao de um peer."""
        addr = writer.get_extra_info("peername")
        print(f"[{self.node_id}] Connection from {addr}")

        try:
            while True:
                # Ler tamanho da mensagem (4 bytes, big-endian)
                size_data = await reader.readexactly(4)
                msg_size = int.from_bytes(size_data, "big")

                # Ler a mensagem
                msg_data = await reader.readexactly(msg_size)
                message = json.loads(msg_data.decode())

                # Processar
                response = await self._process_message(message)

                # Enviar resposta
                resp_data = json.dumps(response).encode()
                writer.write(len(resp_data).to_bytes(4, "big"))
                writer.write(resp_data)
                await writer.drain()
        except asyncio.IncompleteReadError:
            print(f"[{self.node_id}] Peer disconnected")
        finally:
            writer.close()

    async def _process_message(self, message):
        """Roteia mensagem para handler correto."""
        msg_type = message.get("type")

        if msg_type == "health_check":
            return {"status": "healthy", "node": self.node_id}

        elif msg_type == "knowledge_sync":
            domain = message["domain"]
            self.knowledge[domain] = message["data"]
            return {"status": "synced", "domain": domain}

        elif msg_type == "knowledge_query":
            domain = message["domain"]
            return {"data": self.knowledge.get(domain, [])}

        return {"error": "unknown message type"}

Cliente HTTP com httpx

Para a versao HTTP do protocolo (mais pratica para producao), usamos httpx com suporte a async:

Python
import httpx
import asyncio

class MeshClient:
    """Cliente HTTP para comunicacao entre mesh nodes."""

    def __init__(self, node_id, auth_token):
        self.node_id = node_id
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"Bearer {auth_token}",
                "X-Node-ID": node_id
            },
            timeout=30.0
        )

    async def health_check(self, peer_url):
        """Verifica saude de um peer."""
        try:
            r = await self.client.get(f"{peer_url}/health")
            return r.status_code == 200
        except httpx.ConnectError:
            return False

    async def sync_knowledge(self, peer_url, domain, data):
        """Envia conhecimento para um peer."""
        r = await self.client.post(
            f"{peer_url}/knowledge/sync",
            json={"domain": domain, "data": data}
        )
        return r.json()

    async def query_knowledge(self, peer_url, domain):
        """Consulta conhecimento de um peer."""
        r = await self.client.get(
            f"{peer_url}/knowledge/{domain}"
        )
        return r.json()

# Exemplo: dois nodes trocando conhecimento
async def demo():
    client = MeshClient("build-ai", "token123")

    # Sincronizar learning com marco.ai
    result = await client.sync_knowledge(
        "https://marco.fly.dev",
        domain="build",
        data=[{"title": "Token optimization", "content": "..."}]
    )
    print(f"Sync result: {result}")

    # Consultar knowledge de marco.ai
    knowledge = await client.query_knowledge(
        "https://marco.fly.dev", "governance"
    )
    print(f"Knowledge: {knowledge}")

SSE para eventos em tempo real

Python
import asyncio
from aiohttp import web

async def sse_handler(request):
    """Endpoint SSE para streaming de eventos mesh."""
    response = web.StreamResponse()
    response.content_type = "text/event-stream"
    response.headers["Cache-Control"] = "no-cache"
    await response.prepare(request)

    # Simular eventos de sessao
    events = [
        {"type": "session_start", "agent": "build-ai"},
        {"type": "tool_call", "tool": "get_knowledge"},
        {"type": "decision_logged", "title": "Use TCP for mesh"},
        {"type": "session_end", "summary": "Done"},
    ]

    for event in events:
        data = json.dumps(event)
        await response.write(
            f"event: {event['type']}\ndata: {data}\n\n".encode()
        )
        await asyncio.sleep(1)

    return response

app = web.Application()
app.router.add_get("/events", sse_handler)

Comunicacao completa entre dois nodes

Diagrama
Fluxo completo: build-ai envia learning para marco.ai

1. build-ai resolve "marco-ai" -> https://marco.fly.dev
   (service discovery via registry)

2. build-ai cria JWT com claims: sub=build-ai, role=build
   (autenticacao, aula 5.2)

3. build-ai envia HTTP POST com TLS:
   POST /knowledge/sync HTTP/2
   Authorization: Bearer eyJ...
   Content-Type: application/json
   {"domain": "build", "data": [...]}

4. Fly.io roteia request para MCP server de marco.ai
   (load balancing, anycast IP, aula 5.3)

5. marco.ai valida JWT, verifica permissoes
   (seguranca, aula 5.2)

6. marco.ai salva knowledge no Neon DB
   (TCP:5432, TLS, PgBouncer, aula 4.3)

7. marco.ai responde:
   HTTP/2 200 OK
   {"status": "synced", "domain": "build"}

8. build-ai confirma sync e atualiza status local

Cada passo usa conceitos que aprendemos:
  DNS (2.2), HTTP (2.1), TLS (5.2), TCP (3.2),
  IP (4.1), routing (4.2), load balancing (5.3)

No harness.os

Esta implementacao e um prototipo funcional do que o mesh do harness.os usara para comunicacao entre nodes. A versao de producao usaria o MCP protocol (Streamable HTTP) em vez de HTTP puro, mas os conceitos de rede sao identicos.

Resumo

Exercicio pratico

Implemente um prototipo de dois harness nodes trocando knowledge.

  1. Node A (build-ai) roda na porta 8001, Node B (marco-ai) na porta 8002
  2. Node A envia um learning para Node B via HTTP POST
  3. Node B salva o learning e confirma
  4. Node A consulta Node B para verificar que o knowledge foi persistido
  5. Adicione autenticacao JWT ao fluxo

Verifique seu entendimento

Por que usamos asyncio em vez de threads para gerenciar multiplas conexoes no mesh?

  • Threads sao mais lentas que asyncio em todos os casos
  • Conexoes de rede sao I/O-bound; asyncio e mais eficiente para esperar I/O sem criar milhares de threads
  • Python nao suporta threads
  • asyncio funciona sem TCP, o que simplifica o codigo