Send/Recv Internals

Generally UCX creates connections between endpoints with the following steps:

  1. Create a Listener with defined IP address and port

  1. Listener defines a callback function to process communications from endpoints

  1. Connect an Endpoint to the Listener

  2. Endpoint communicates with the Listener

  3. When finished, close Endpoint and Listener

Below we go into more detail as we create an echo server in UCX and compare with Python Sockets

Server

First, we create the server – in UCX-Py, we create a server with create_listener and build a blocking call to keep the listener alive. The listener invokes a callback function when an incoming connection is accepted. This callback should take in an Endpoint as an argument for send/recv.

For Python sockets, the server is similarly constructed. bind opens a connection on a given port and accept is Python Sockets’ blocking call for incoming connections. In both UCX-Py and Sockets, once a connection has been made, both receive data and echo the same data back to the client

UCX

Python Sockets

async def echo_server(ep):
    obj = await ep.recv_obj()
    await ep.send_obj(obj)

lf = ucp.create_listener(echo_server, port)

while not lf.closed():
    await asyncio.sleep(0.1)
s = socket.socket(...)
s.bind((HOST, PORT))
s.listen(1)

while True:
    conn, addr = s.accept()
    data = conn.recv(1024)
    if not data: break
    conn.sendall(data)
    conn.close()

Note

In this example we create servers which listen forever. In production applications developers should also call appropriate closing functions

Client

For Sockets, on the client-side we connect to the established host/port combination and send data to the socket. The client-side is a bit more interesting in UCX-Py: create_endpoint, also uses a host/port combination to establish a connection, and after an Endpoint is created, hello, world is passed back and forth between the client an server.

UCX

Python Sockets

client = await ucp.create_endpoint(addr, port)

msg = bytearray(b"hello, world")
await client.send_obj(msg)
echo_msg = await client.recv_obj()
await client.close()
s = socket.socket(...)
s.connect((HOST, PORT))
s.sendall(b'hello, world')
echo_msg = s.recv(1024)

s.close()

So what happens with create_endpoint ? Unlike Sockets, UCX employs a tag-matching strategy where endpoints are created with a unique id and send/receive operations also use unique ids (these are called tags). With standard TCP connections, when a incoming requests is made, a socket is created with a unique 4-tuple: client address, client port, server address, and server port. With this uniqueness, threads and processes alike are now free to communicate with one another. Again, UCX, uses tags for uniqueness so when an incoming request is made, the receiver matches the Endpoint ID and a unique tag – for more details on tag-matching please see the this page.

create_endpoint, will create an Endpoint with three steps:

  1. Generate unique IDs to use as tags

  2. Exchange endpoint info such as tags

  3. Use the info to create an endpoint

Again, an Endpoint sends and receives with unique tags.

ep = Endpoint(
        endpoint=ucx_ep,
        ctx=self,
        msg_tag_send=peer_info["msg_tag"],
        msg_tag_recv=msg_tag,
        ctrl_tag_send=peer_info["ctrl_tag"],
        ctrl_tag_recv=ctrl_tag,
        guarantee_msg_order=guarantee_msg_order,
    )

Most users will not care about these details but developers and interested network enthusiasts may. Looking at the DEBUG (UCXPY_LOG_LEVEL=DEBUG) output of the client can help clarify what UCX-Py/UCX is doing under the hood:

# client = await ucp.create_endpoint(addr, port)
[1594319245.032609] [dgx12:5904] UCXPY  DEBUG create_endpoint() client: 0x7f5e6e7bd0d8, msg-tag-send: 0x88e288ec81799a75, msg-tag-recv: 0xf29f8e9b7ce33f66, ctrl-tag-send: 0xb1cd5cb9b1120434, ctrl-tag-recv: 0xe79506f1d24b4997

# await client.send_obj(msg)
[1594319251.364999] [dgx12:5904] UCXPY  DEBUG [Send #000] ep: 0x7f5e6e7bd0d8, tag: 0x88e288ec81799a75, nbytes: 8, type: <class 'bytes'>
[1594319251.365213] [dgx12:5904] UCXPY  DEBUG [Send #001] ep: 0x7f5e6e7bd0d8, tag: 0x88e288ec81799a75, nbytes: 12, type: <class 'bytearray'>

# echo_msg = await client.recv_obj()
[1594319260.452441] [dgx12:5904] UCXPY  DEBUG [Recv #000] ep: 0x7f5e6e7bd0d8, tag: 0xf29f8e9b7ce33f66, nbytes: 8, type: <class 'bytearray'>
[1594319260.452677] [dgx12:5904] UCXPY  DEBUG [Recv #001] ep: 0x7f5e6e7bd0d8, tag: 0xf29f8e9b7ce33f66, nbytes: 12, type: <class 'bytearray'>

# await client.close()
[1594319287.522824] [dgx12:5904] UCXPY  DEBUG [Send shutdown] ep: 0x7f5e6e7bd0d8, tag: 0xb1cd5cb9b1120434, close_after_n_recv: 2
[1594319287.523172] [dgx12:5904] UCXPY  DEBUG Endpoint.abort(): 0x7f5e6e7bd0d8
[1594319287.523331] [dgx12:5904] UCXPY  DEBUG Future cancelling: [Recv shutdown] ep: 0x7f5e6e7bd0d8, tag: 0xe79506f1d24b4997

We can see from the above that when the Endpoint is created, 4 tags are generated: msg-tag-send, msg-tag-recv, ctrl-tag-send, and ctrl-tag-recv. This data is transmitted to the server via a stream communication in an exchange peer info convenience function.

Next, the client sends data on the msg-tag-send tag. Two messages are sent, the size of the data 8 bytes and data itself. The server receives the data and immediately echos the data back. The client then receives two messages the size of the data and the data itself. Lastly, the client closes down. When the client closes, it sends a control message to the server’s Endpoint instructing it to also close