# Writing a Plugin
This guide walks through building a custom traversal plugin from scratch.
A plugin is responsible for one thing: producing a `Pipe` between two
nodes. When it succeeds, it resolves `self.result` with the pipe.
When it fails, it resolves `self.result` with `None` (or just lets the
TraversalManager's `finally` do it for you on exception).
## The minimum
```python
from warpgate import Plugin, register
@register(phase="direct")
class DirectTCP(Plugin):
name = "direct_tcp"
transport = "tcp"
async def run(self, reply=None):
if self.af not in self.nic.supported():
return self.result.set_result(None) # skip
route = await self.nic.route(self.af).bind()
pipe = await route.connect(
(self.dest_info["ip"], self.dest_info["port"])
)
self.result.set_result(pipe) # winner
```
Drop this in `src/warpgate/traversal/plugins/
/main.py` (or anywhere
on the Python path that gets imported during node startup) and the
plugin loader picks it up automatically — `@register` adds the class
to `plugin_registry` at import time, and `node.start()` walks that
registry to install everything onto the `TraversalManager`.
## The class-attribute contract
Everything the framework needs lives in lowercase class attributes
on the `Plugin` subclass:
| attr | required | what it does |
| --- | --- | --- |
| `name` | yes | Human-readable name, also the wire-protocol id used in `ConMsg.meta.plugin_name` |
| `transport` | yes | `"tcp"` or `"udp"` |
| `route_types` | no | Tuple of route types this plugin supports. Default is all three: `(NIC_BIND, EXT_BIND, LOOPBACK_BIND)` |
| `conf` | no | Plugin config dict. Currently the framework reads `timeout` (seconds, default 10) and `set_bind` (default False). See `traversal_manager.install_plugin` for the full shape. |
| `proto_messages` | no | Tuple of `(MsgClass, strategy_enum, ttl)` triples. Used by plugins that need a custom signal-channel message type. |
| `phase` | yes (set by `@register`) | One of `"direct"`, `"punch"`, `"spray"`, `"relay"`, or `None`. `auto_connect` walks phases in this order and stops as soon as one succeeds. |
`@register(phase=...)` writes `phase` onto the class for you — you
don't set it manually.
## What `run()` gets
By the time `run(self, reply=None)` is called, the framework has
already populated the plugin instance with everything it needs:
| attribute | type | description |
| --- | --- | --- |
| `self.result` | `asyncio.Future` | Resolve to a `Pipe` on success, or `None` on failure |
| `self.plugin_id` | `str` | 15-char random id, used for inbound-pipe routing |
| `self.af` | `IP4` or `IP6` | Address family this attempt uses |
| `self.nic` | `Interface` | The local NIC selected for this attempt |
| `self.src_info` | `dict` | Local addressing — `ip`, `port`, `nic`, `ext`, `nat`, `if_index`, … |
| `self.dest_info` | `dict` | Same fields for the peer |
| `self.route_type` | constant | `NIC_BIND`, `EXT_BIND`, or `LOOPBACK_BIND` |
| `self.same_machine` | `bool` | True if both peers share a `machine_id` |
| `self.timeout` | `int` | Seconds; `auto_connect`'s outer await uses this + a small margin |
`reply` is the inbound signal message that triggered this `run()`
call, or `None` on the first invocation. Plugins that exchange
multiple signal messages (`tcp_punch`, `udp_punch`, `random_probe`)
get re-entered with `reply` set to whatever the peer sent back.
## Resolving `self.result`
Three contracts:
```python
self.result.set_result(pipe) # success
self.result.set_result(None) # explicit fail-fast
return # implicit fail (manager resolves to None)
```
The TraversalManager wraps every `run()` in a try/except that catches
`asyncio.TimeoutError`, `OSError`, `ConnectionError`, and any
uncaught `Exception`, and resolves `self.result` to `None` on each.
You can let exceptions propagate; you don't need to swallow them
yourself just to satisfy the future contract.
What you **must not** do is exit `run()` after spawning a background
task that will resolve the future later, without leaving the future
unresolved — the manager only fast-fails on exception paths, not on
normal returns, so a background-task plugin (like `tcp_punch`) keeps
working.
## Phases
`auto_connect` walks phases in this order:
```text
direct → punch → spray → relay
```
Within a phase, every plugin is tried concurrently across every valid
`(af, route_type, src_nic, dest_nic)` combination. As soon as one
combo wins, the rest are cancelled and the next phase is skipped.
Plugins registered with `phase=None` are auxiliary (e.g. `get_addr`,
`return_addr`, `fan_out`) — `auto_connect` doesn't dispatch them.
Pick the phase that matches the strategy:
| phase | for | examples |
| --- | --- | --- |
| `direct` | tries that work without coordination | `direct_connect`, `reverse_connect` |
| `punch` | NTP-aligned simultaneous-open | `tcp_punch` |
| `spray` | parallel UDP probe rendezvous | `udp_punch`, `random_probe` |
| `relay` | last-resort relays | `turn` |
## Signal-coordinated example
This is `reverse_connect`, slightly trimmed:
```python
from warpgate import Plugin, register
from warpgate.protocol.proto_msg import ConMsg
@register(phase="direct")
class ReverseConnect(Plugin):
name = "reverse_connect"
transport = "tcp"
async def run(self, reply=None):
# Tell the peer to dial back at us using their direct_connect.
msg = ConMsg()
msg.meta.plugin_name = "direct_connect"
# Reserve the inbound future BEFORE sending — a fast peer could
# connect back before we register, otherwise.
self.register_inbound()
await self.send_signal_msg(msg)
# Block until the peer's TCP connect lands and the inbound
# dispatcher routes it to our reserved future.
pipe = await self.wait_for_inbound()
self.result.set_result(pipe)
```
Key helpers:
- `self.register_inbound()` — claim an inbound-pipe slot under
`self.plugin_id`. The inbound dispatcher routes any incoming
connection that carries this plugin_id back to your future.
- `self.wait_for_inbound()` — await the slot you just registered.
- `self.send_signal_msg(msg)` — push a `ConMsg`-shaped object out
through the MQTT signal channel.
## Custom signal types
Need a multi-round handshake (like the punch plugins)? Define a
message class in your plugin's `proto.py`, then register it via
`proto_messages`:
```python
from warpgate import Plugin, register
from .proto import MyHandshakeMsg
P2P_MY_PROTO = 99
@register(phase="punch")
class MyHandshake(Plugin):
name = "my_handshake"
transport = "udp"
proto_messages = (
(MyHandshakeMsg, P2P_MY_PROTO, 18), # (msg_class, strategy_id, ttl_secs)
)
async def run(self, reply=None):
if reply is None:
# First call — initiator side.
outgoing = MyHandshakeMsg(...)
outgoing.meta.plugin_name = "my_handshake"
await self.send_signal_msg(outgoing)
return # wait for peer reply
# Second call — reply-handler side.
...
self.result.set_result(pipe)
```
The plugin loader patches `WIRE_NAME = "."`
onto the message class so it round-trips through pack/unpack
automatically.
## Factory plugins
When the plugin needs node-level shared state (TURN server pool,
process pool, persistent NAT classifier), use a `setup` classmethod
instead of letting the loader instantiate the class directly:
```python
@register(phase="relay")
class TURNRelay(Plugin):
name = "turn"
transport = "udp"
@classmethod
async def setup(cls, node):
if not node.conf.get("enable_turn", True):
return None # skip — plugin disabled
factory = TURNFactory(node)
node.resources.register(factory) # auto-close on shutdown
return factory # used as the per-attempt builder
async def run(self, reply=None):
...
```
The returned factory's `build_plugin()` is called once per connection
attempt; whatever it returns must be a `Plugin` instance. Returning
`None` from `setup` removes the plugin from the registry entirely.
## Testing
Use `NODE_TEST_CONF` so the test doesn't need MQTT / STUN / PNP:
```python
import unittest
from aionetiface.testing import AsyncTestCase
from warpgate import Node
from warpgate.node.node_defs import NODE_TEST_CONF
from warpgate.node.auto_connect import auto_connect
class TestMyPlugin(AsyncTestCase):
async def test_connects(self):
alice = await Node(conf=NODE_TEST_CONF).start()
bob = await Node(conf=NODE_TEST_CONF).start()
try:
pipe, _ = await auto_connect(alice, bob.address())
self.assertIsNotNone(pipe)
finally:
await alice.close()
await bob.close()
```
To narrow `auto_connect` down to your plugin only, clear the
manager's plugin_loaders before the call:
```python
for n in (alice, bob):
n.traversal.plugin_loaders.clear()
n.traversal.install_plugin("my_plugin", {"class": MyPlugin})
```
## Checklist
- [ ] `@register(phase="direct"|"punch"|"spray"|"relay"|None)` on the class
- [ ] Subclass `Plugin`
- [ ] Set `name` and `transport` class attrs
- [ ] (Optional) `route_types`, `conf`, `proto_messages`, `setup`
- [ ] `async def run(self, reply=None)` that resolves `self.result`
- [ ] On any failure: `self.result.set_result(None)` (or just raise — the manager handles it)
- [ ] Register inbound *before* sending signals, not after
- [ ] Drop the file at `src/warpgate/traversal/plugins//main.py` for auto-discovery, or expose it via the `warpgate.strategies` entry-point group