Connections
This page explains how to connect two nodes and how to use the resulting Pipe.
auto_connect
from warpgate.node.auto_connect import auto_connect
pipe, plugin = await auto_connect(node, dest_addr)
auto_connect is the high-level connection API. It:
Resolves
dest_addr(raw bytes or a PNP nickname string)Builds every valid (plugin, address-family, route-type) combination
Launches all combinations concurrently
Returns
(pipe, plugin)for the first successful connectionCancels and cleans up everything else
dest_addr ──► resolve ──► dest_map
│
build combos (plugin × AF × route_type)
│
┌────────────────────┼────────────────────┐
│ │ │
direct_connect reverse_connect punch
(IP4, NIC_BIND) (IP4, NIC_BIND) (IP4, NIC_BIND)
│ │ │
└────────────────────┼────────────────────┘
│
race results
│
(pipe, winning_plugin)
If all direct strategies fail, auto_connect tries TURN (relay) as a fallback,
up to turn_limit times (default 3).
Parameters
Parameter |
Default |
Description |
|---|---|---|
|
— |
The local Node |
|
— |
Peer’s address bytes or PNP nickname |
|
|
Seconds to wait for any plugin to succeed |
|
|
Max TURN attempts if direct strategies all fail |
Returns (None, None) if everything fails within the timeout.
node.connect (manual)
For fine-grained control, use node.connect directly:
from aionetiface import IP4, NIC_BIND
plugin = await node.connect(
af=IP4,
route_type=NIC_BIND,
pnp_addr=dest_addr,
plugin_name="direct_connect",
)
pipe = await asyncio.wait_for(plugin.result, timeout=10)
node.connect returns a plugin object whose .result is an asyncio.Future
that resolves to a Pipe when the connection succeeds.
address families
Constant |
Meaning |
|---|---|
|
IPv4 only |
|
IPv6 only |
Pass None to let warpgate pick the first AF both nodes share.
route types
Constant |
Meaning |
|---|---|
|
Use the NIC’s private/local IP (LAN or same-machine) |
|
Use the external/public IP (WAN path through NAT) |
Pipe API
Once you have a Pipe, you communicate with it like this:
Subscribing
Before you can receive messages you must subscribe. A subscription tells warpgate which messages you’re interested in:
from aionetiface import SUB_ALL
pipe.subscribe(SUB_ALL) # match any message from any sender
SUB_ALL is the wildcard. You can also filter by content pattern or sender:
# Only messages matching a regex pattern
pipe.subscribe((b"^HELLO", None))
# Only messages from a specific sender
pipe.subscribe((None, ("192.168.1.5", 12345)))
# Deliver matching messages directly to a callback instead of a queue
pipe.subscribe(SUB_ALL, handler=my_async_func)
A subscription returns an offset integer you can use with unsubscribe.
Sending
await pipe.send(b"hello world", dest_tup)
For TCP pipes you established with auto_connect, you can omit dest_tup:
await pipe.send(b"hello world", None)
For UDP, always pass the destination tuple (ip, port).
Receiving
data = await pipe.recv(SUB_ALL) # returns bytes or None on timeout
data = await pipe.recv(SUB_ALL, timeout=5) # custom timeout in seconds
tup = await pipe.recv(SUB_ALL, full=True) # returns [(ip, port), data]
recv blocks until a matching message arrives. Default timeout is 2 seconds.
Receiving N bytes
For stream protocols where you need an exact byte count:
data = await pipe.recv_n(1024, SUB_ALL) # accumulates until >= 1024 bytes
Closing
await pipe.close()
Always close pipes when you’re done. Unclosed pipes keep the underlying socket open.
Receiving on the server side
The server-side node receives messages through node.add_msg_cb:
async def on_message(msg, client_tup, pipe):
ip, port = client_tup
print("Received {} bytes from {}:{}".format(len(msg), ip, port))
await pipe.send(b"got it", client_tup)
node.add_msg_cb(on_message)
This fires for every message on every server socket the node manages.
Connection patterns
Ping-pong (request-response)
# Sender side:
pipe.subscribe(SUB_ALL)
await pipe.send(b"ping", dest)
resp = await pipe.recv(SUB_ALL) # b"pong"
# Receiver side:
async def on_msg(msg, client_tup, pipe):
if msg == b"ping":
await pipe.send(b"pong", client_tup)
node.add_msg_cb(on_msg)
Streaming
for chunk in data_chunks:
await pipe.send(chunk, dest)
# Receiver accumulates with recv_n:
full_data = await pipe.recv_n(total_size, SUB_ALL)
Broadcast (one sender, multiple receivers)
There is no built-in broadcast. Create a Pipe to each receiver and send to all:
pipes = []
for addr in peer_addresses:
p, _ = await auto_connect(node, addr)
pipes.append(p)
for p in pipes:
await p.send(b"broadcast message", None)
Inbound connections
When another node connects to yours, the message arrives through msg_cb. To
get the pipe itself for bidirectional use:
pipe_id = "my-session-id"
future = node.pipe_future(pipe_id)
# The connecting peer sends the pipe_id in its first message.
# Your msg_cb calls:
async def on_msg(msg, client_tup, pipe):
if msg.startswith(b"CONNECT:"):
pipe_id = msg[8:].decode()
node.pipe_ready(pipe_id, pipe)
node.add_msg_cb(on_msg)
# Then you await the future to get the pipe:
inbound_pipe = await asyncio.wait_for(future, timeout=10)
The direct_connect plugin uses this pattern internally with CON_ID_MSG.