Connections

This page explains how to connect two nodes and how to use the resulting Pipe.

auto_connect

from warpgate.node.auto_connect import auto_connect

pipe, plugin = await auto_connect(node, dest_addr)

auto_connect is the high-level connection API. It:

  1. Resolves dest_addr (raw bytes or a PNP nickname string)

  2. Builds every valid (plugin, address-family, route-type) combination

  3. Launches all combinations concurrently

  4. Returns (pipe, plugin) for the first successful connection

  5. Cancels and cleans up everything else

dest_addr ──► resolve ──► dest_map
                               │
                    build combos (plugin × AF × route_type)
                               │
          ┌────────────────────┼────────────────────┐
          │                    │                    │
   direct_connect        reverse_connect          punch
   (IP4, NIC_BIND)      (IP4, NIC_BIND)    (IP4, NIC_BIND)
          │                    │                    │
          └────────────────────┼────────────────────┘
                               │
                          race results
                               │
                      (pipe, winning_plugin)

If all direct strategies fail, auto_connect tries TURN (relay) as a fallback, up to turn_limit times (default 3).

Parameters

Parameter

Default

Description

node

The local Node

dest_addr

Peer’s address bytes or PNP nickname

timeout

60.0

Seconds to wait for any plugin to succeed

turn_limit

3

Max TURN attempts if direct strategies all fail

Returns (None, None) if everything fails within the timeout.

node.connect (manual)

For fine-grained control, use node.connect directly:

from aionetiface import IP4, NIC_BIND

plugin = await node.connect(
    af=IP4,
    route_type=NIC_BIND,
    pnp_addr=dest_addr,
    plugin_name="direct_connect",
)
pipe = await asyncio.wait_for(plugin.result, timeout=10)

node.connect returns a plugin object whose .result is an asyncio.Future that resolves to a Pipe when the connection succeeds.

address families

Constant

Meaning

IP4

IPv4 only

IP6

IPv6 only

Pass None to let warpgate pick the first AF both nodes share.

route types

Constant

Meaning

NIC_BIND

Use the NIC’s private/local IP (LAN or same-machine)

EXT_BIND

Use the external/public IP (WAN path through NAT)

Pipe API

Once you have a Pipe, you communicate with it like this:

Subscribing

Before you can receive messages you must subscribe. A subscription tells warpgate which messages you’re interested in:

from aionetiface import SUB_ALL

pipe.subscribe(SUB_ALL)   # match any message from any sender

SUB_ALL is the wildcard. You can also filter by content pattern or sender:

# Only messages matching a regex pattern
pipe.subscribe((b"^HELLO", None))

# Only messages from a specific sender
pipe.subscribe((None, ("192.168.1.5", 12345)))

# Deliver matching messages directly to a callback instead of a queue
pipe.subscribe(SUB_ALL, handler=my_async_func)

A subscription returns an offset integer you can use with unsubscribe.

Sending

await pipe.send(b"hello world", dest_tup)

For TCP pipes you established with auto_connect, you can omit dest_tup:

await pipe.send(b"hello world", None)

For UDP, always pass the destination tuple (ip, port).

Receiving

data = await pipe.recv(SUB_ALL)                  # returns bytes or None on timeout
data = await pipe.recv(SUB_ALL, timeout=5)        # custom timeout in seconds
tup  = await pipe.recv(SUB_ALL, full=True)        # returns [(ip, port), data]

recv blocks until a matching message arrives. Default timeout is 2 seconds.

Receiving N bytes

For stream protocols where you need an exact byte count:

data = await pipe.recv_n(1024, SUB_ALL)   # accumulates until >= 1024 bytes

Closing

await pipe.close()

Always close pipes when you’re done. Unclosed pipes keep the underlying socket open.

Receiving on the server side

The server-side node receives messages through node.add_msg_cb:

async def on_message(msg, client_tup, pipe):
    ip, port = client_tup
    print("Received {} bytes from {}:{}".format(len(msg), ip, port))
    await pipe.send(b"got it", client_tup)

node.add_msg_cb(on_message)

This fires for every message on every server socket the node manages.

Connection patterns

Ping-pong (request-response)

# Sender side:
pipe.subscribe(SUB_ALL)
await pipe.send(b"ping", dest)
resp = await pipe.recv(SUB_ALL)   # b"pong"

# Receiver side:
async def on_msg(msg, client_tup, pipe):
    if msg == b"ping":
        await pipe.send(b"pong", client_tup)

node.add_msg_cb(on_msg)

Streaming

for chunk in data_chunks:
    await pipe.send(chunk, dest)

# Receiver accumulates with recv_n:
full_data = await pipe.recv_n(total_size, SUB_ALL)

Broadcast (one sender, multiple receivers)

There is no built-in broadcast. Create a Pipe to each receiver and send to all:

pipes = []
for addr in peer_addresses:
    p, _ = await auto_connect(node, addr)
    pipes.append(p)

for p in pipes:
    await p.send(b"broadcast message", None)

Inbound connections

When another node connects to yours, the message arrives through msg_cb. To get the pipe itself for bidirectional use:

pipe_id = "my-session-id"
future = node.pipe_future(pipe_id)

# The connecting peer sends the pipe_id in its first message.
# Your msg_cb calls:
async def on_msg(msg, client_tup, pipe):
    if msg.startswith(b"CONNECT:"):
        pipe_id = msg[8:].decode()
        node.pipe_ready(pipe_id, pipe)

node.add_msg_cb(on_msg)

# Then you await the future to get the pipe:
inbound_pipe = await asyncio.wait_for(future, timeout=10)

The direct_connect plugin uses this pattern internally with CON_ID_MSG.