Send/Recv Internals
Generally UCX creates connections between endpoints with the following steps:
Create a
Listener
with defined IP address and port
Listener
defines a callback function to process communications from endpoints
Connect an
Endpoint
to theListener
Endpoint
communicates with theListener
When finished, close
Endpoint
andListener
Below we go into more detail as we create an echo server in UCX and compare with Python Sockets
Server
First, we create the server – in UCX-Py, we create a server with create_listener
and build a blocking call to keep the listener alive. The listener invokes a callback function when an incoming connection is accepted. This callback should take in an Endpoint
as an argument for send
/recv
.
For Python sockets, the server is similarly constructed. bind
opens a connection on a given port and accept
is Python Sockets’ blocking call for incoming connections. In both UCX-Py and Sockets, once a connection has been made, both receive data and echo the same data back to the client
UCX |
Python Sockets |
async def echo_server(ep):
obj = await ep.recv_obj()
await ep.send_obj(obj)
lf = ucp.create_listener(echo_server, port)
while not lf.closed():
await asyncio.sleep(0.1)
|
s = socket.socket(...)
s.bind((HOST, PORT))
s.listen(1)
while True:
conn, addr = s.accept()
data = conn.recv(1024)
if not data: break
conn.sendall(data)
conn.close()
|
Note
In this example we create servers which listen forever. In production applications developers should also call appropriate closing functions
Client
For Sockets, on the client-side we connect to the established host/port combination and send data to the socket. The client-side is a bit more interesting in UCX-Py: create_endpoint
, also uses a host/port combination to establish a connection, and after an Endpoint
is created, hello, world
is passed back and forth between the client an server.
UCX |
Python Sockets |
client = await ucp.create_endpoint(addr, port)
msg = bytearray(b"hello, world")
await client.send_obj(msg)
echo_msg = await client.recv_obj()
await client.close()
|
s = socket.socket(...)
s.connect((HOST, PORT))
s.sendall(b'hello, world')
echo_msg = s.recv(1024)
s.close()
|
So what happens with create_endpoint
? Unlike Sockets, UCX employs a tag-matching strategy where endpoints are created with a unique id and send/receive operations also use unique ids (these are called tags
). With standard TCP connections, when a incoming requests is made, a socket is created with a unique 4-tuple: client address, client port, server address, and server port. With this uniqueness, threads and processes alike are now free to communicate with one another. Again, UCX, uses tags for uniqueness so when an incoming request is made, the receiver matches the Endpoint
ID and a unique tag – for more details on tag-matching please see the this page.
create_endpoint
, will create an Endpoint
with three steps:
Generate unique IDs to use as tags
Exchange endpoint info such as tags
Use the info to create an endpoint
Again, an Endpoint
sends and receives with unique tags.
ep = Endpoint(
endpoint=ucx_ep,
ctx=self,
msg_tag_send=peer_info["msg_tag"],
msg_tag_recv=msg_tag,
ctrl_tag_send=peer_info["ctrl_tag"],
ctrl_tag_recv=ctrl_tag,
guarantee_msg_order=guarantee_msg_order,
)
Most users will not care about these details but developers and interested network enthusiasts may. Looking at the DEBUG (UCXPY_LOG_LEVEL=DEBUG
) output of the client can help clarify what UCX-Py/UCX is doing under the hood:
# client = await ucp.create_endpoint(addr, port)
[1594319245.032609] [dgx12:5904] UCXPY DEBUG create_endpoint() client: 0x7f5e6e7bd0d8, msg-tag-send: 0x88e288ec81799a75, msg-tag-recv: 0xf29f8e9b7ce33f66, ctrl-tag-send: 0xb1cd5cb9b1120434, ctrl-tag-recv: 0xe79506f1d24b4997
# await client.send_obj(msg)
[1594319251.364999] [dgx12:5904] UCXPY DEBUG [Send #000] ep: 0x7f5e6e7bd0d8, tag: 0x88e288ec81799a75, nbytes: 8, type: <class 'bytes'>
[1594319251.365213] [dgx12:5904] UCXPY DEBUG [Send #001] ep: 0x7f5e6e7bd0d8, tag: 0x88e288ec81799a75, nbytes: 12, type: <class 'bytearray'>
# echo_msg = await client.recv_obj()
[1594319260.452441] [dgx12:5904] UCXPY DEBUG [Recv #000] ep: 0x7f5e6e7bd0d8, tag: 0xf29f8e9b7ce33f66, nbytes: 8, type: <class 'bytearray'>
[1594319260.452677] [dgx12:5904] UCXPY DEBUG [Recv #001] ep: 0x7f5e6e7bd0d8, tag: 0xf29f8e9b7ce33f66, nbytes: 12, type: <class 'bytearray'>
# await client.close()
[1594319287.522824] [dgx12:5904] UCXPY DEBUG [Send shutdown] ep: 0x7f5e6e7bd0d8, tag: 0xb1cd5cb9b1120434, close_after_n_recv: 2
[1594319287.523172] [dgx12:5904] UCXPY DEBUG Endpoint.abort(): 0x7f5e6e7bd0d8
[1594319287.523331] [dgx12:5904] UCXPY DEBUG Future cancelling: [Recv shutdown] ep: 0x7f5e6e7bd0d8, tag: 0xe79506f1d24b4997
We can see from the above that when the Endpoint
is created, 4 tags are generated: msg-tag-send
, msg-tag-recv
, ctrl-tag-send
, and ctrl-tag-recv
. This data is transmitted to the server via a stream communication in an exchange peer info convenience function.
Next, the client sends data on the msg-tag-send
tag. Two messages are sent, the size of the data 8 bytes
and data itself. The server receives the data and immediately echos the data back. The client then receives two messages the size of the data and the data itself. Lastly, the client closes down. When the client closes, it sends a control message to the server’s Endpoint
instructing it to also close