Quickstart
Setup
With GPU support:
conda create -n ucx -c conda-forge -c rapidsai \
cudatoolkit=<CUDA version> ucx-proc=*=gpu ucx ucx-py python=3.9
Without GPU support:
conda create -n ucx -c conda-forge -c rapidsai \
ucx-proc=*=cpu ucx ucx-py python=3.9
For a more detailed guide on installation options please refer to the Install page.
Send/Recv NumPy Arrays
Process 1 - Server
import asyncio
import time
import ucp
import numpy as np
n_bytes = 2**30
host = ucp.get_address(ifname='eth0') # ethernet device name
port = 13337
async def send(ep):
# recv buffer
arr = np.empty(n_bytes, dtype='u1')
await ep.recv(arr)
assert np.count_nonzero(arr) == np.array(0, dtype=np.int64)
print("Received NumPy array")
# increment array and send back
arr += 1
print("Sending incremented NumPy array")
await ep.send(arr)
await ep.close()
lf.close()
async def main():
global lf
lf = ucp.create_listener(send, port)
while not lf.closed():
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
Process 2 - Client
import asyncio
import ucp
import numpy as np
port = 13337
n_bytes = 2**30
async def main():
host = ucp.get_address(ifname='eth0') # ethernet device name
ep = await ucp.create_endpoint(host, port)
msg = np.zeros(n_bytes, dtype='u1') # create some data to send
# send message
print("Send Original NumPy array")
await ep.send(msg) # send the real message
# recv response
print("Receive Incremented NumPy arrays")
resp = np.empty_like(msg)
await ep.recv(resp) # receive the echo
await ep.close()
np.testing.assert_array_equal(msg + 1, resp)
if __name__ == '__main__':
asyncio.run(main())
Send/Recv CuPy Arrays
Note
If you are passing CuPy arrays between GPUs and want to use NVLINK ensure you have correctly set UCX_TLS
with cuda_ipc
. See the Configuration for more details
Process 1 - Server
import asyncio
import time
import ucp
import cupy as cp
n_bytes = 2**30
host = ucp.get_address(ifname='eth0') # ethernet device name
port = 13337
async def send(ep):
# recv buffer
arr = cp.empty(n_bytes, dtype='u1')
await ep.recv(arr)
assert cp.count_nonzero(arr) == cp.array(0, dtype=cp.int64)
print("Received CuPy array")
# increment array and send back
arr += 1
print("Sending incremented CuPy array")
await ep.send(arr)
await ep.close()
lf.close()
async def main():
global lf
lf = ucp.create_listener(send, port)
while not lf.closed():
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
Process 2 - Client
import asyncio
import ucp
import cupy as cp
import numpy as np
port = 13337
n_bytes = 2**30
async def main():
host = ucp.get_address(ifname='eth0') # ethernet device name
ep = await ucp.create_endpoint(host, port)
msg = cp.zeros(n_bytes, dtype='u1') # create some data to send
# send message
print("Send Original CuPy array")
await ep.send(msg) # send the real message
# recv response
print("Receive Incremented CuPy arrays")
resp = cp.empty_like(msg)
await ep.recv(resp) # receive the echo
await ep.close()
cp.testing.assert_array_equal(msg + 1, resp)
if __name__ == '__main__':
asyncio.run(main())