API

ucp

ucp

UCX-Py: Python bindings for UCX <www.openucx.org>

ucp.create_listener(callback_func[, port, ...])

Create and start a listener to accept incoming connections

ucp.create_endpoint(ip_address, port[, ...])

Create a new endpoint to a server

ucp.get_address([ifname])

Get the address associated with a network interface.

ucp.get_config()

Returns all UCX configuration options as a dict.

ucp.get_ucp_worker()

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

ucp.get_ucx_version()

Return the version of the underlying UCX installation

ucp.init([options, env_takes_precedence, ...])

Initiate UCX.

ucp.progress()

Try to progress the communication layer

ucp.reset()

Resets the UCX library by shutting down all of UCX.

Endpoint

Endpoint(endpoint, ctx[, tags])

An endpoint represents a connection to a peer

Endpoint.abort()

Close the communication immediately and abruptly.

Endpoint.close()

Close the endpoint cleanly.

Endpoint.closed()

Is this endpoint closed?

Endpoint.close_after_n_recv(n[, ...])

Close the endpoint after n received messages.

Endpoint.cuda_support()

Return whether UCX is configured with CUDA support or not

Endpoint.get_ucp_endpoint()

Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.

Endpoint.get_ucp_worker()

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

Endpoint.recv(buffer[, tag, force_tag])

Receive from connected peer into buffer.

Endpoint.send(buffer[, tag, force_tag])

Send buffer to connected peer.

Endpoint.ucx_info()

Return low-level UCX info about this endpoint as a string

Endpoint.uid

The unique ID of the underlying UCX endpoint

Listener

Listener(backend)

A handle to the listening service started by create_listener()

Listener.close()

Closing the listener

Listener.closed()

Is the listener closed?

Listener.port

The listening network port

ucp.create_listener(callback_func, port=None, endpoint_error_handling=True)[source]

Create and start a listener to accept incoming connections

callback_func is the function or coroutine that takes one argument – the Endpoint connected to the client.

Notice, the listening is closed when the returned Listener goes out of scope thus remember to keep a reference to the object.

Parameters

callback_func: function or coroutine

A callback function that gets invoked when an incoming connection is accepted

port: int, optional

An unused port number for listening, or 0 to let UCX assign an unused port.

endpoint_error_handling: boolean, optional

If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.

Returns

Listener

The new listener. When this object is deleted, the listening stops

async ucp.create_endpoint(ip_address, port, endpoint_error_handling=True)[source]

Create a new endpoint to a server

Parameters

ip_address: str

IP address of the server the endpoint should connect to

port: int

IP address of the server the endpoint should connect to

endpoint_error_handling: boolean, optional

If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.

Returns

Endpoint

The new endpoint

ucp.get_address(ifname=None)[source]

Get the address associated with a network interface.

Parameters

ifnamestr

The network interface name to find the address for. If None, it uses the value of environment variable UCXPY_IFNAME and if UCXPY_IFNAME is not set it defaults to “ib0” An OSError is raised for invalid interfaces.

Returns

addressstr

The inet addr associated with an interface.

Raises

RuntimeError

If a network address could not be determined.

Examples

>>> get_address()
'10.33.225.160'
>>> get_address(ifname='lo')
'127.0.0.1'
ucp.get_config()[source]

Returns all UCX configuration options as a dict.

If UCX is uninitialized, the options returned are the options used if UCX were to be initialized now. Notice, this function doesn’t initialize UCX.

Returns

dict

The current UCX configuration options

ucp.get_ucp_worker()[source]

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

ucp.get_ucx_version()[source]

Return the version of the underlying UCX installation

Notice, this function doesn’t initialize UCX.

Returns

tuple

The version as a tuple e.g. (1, 7, 0)

ucp.init(options={}, env_takes_precedence=False, blocking_progress_mode=None)[source]

Initiate UCX.

Usually this is done automatically at the first API call but this function makes it possible to set UCX options programmable. Alternatively, UCX options can be specified through environment variables.

Parameters

options: dict, optional

UCX options send to the underlying UCX library

env_takes_precedence: bool, optional

Whether environment variables takes precedence over the options specified here.

blocking_progress_mode: bool, optional

If None, blocking UCX progress mode is used unless the environment variable UCXPY_NON_BLOCKING_MODE is defined. Otherwise, if True blocking mode is used and if False non-blocking mode is used.

ucp.progress()[source]

Try to progress the communication layer

Warning, it is illegal to call this from a call-back function such as the call-back function given to create_listener.

ucp.reset()[source]

Resets the UCX library by shutting down all of UCX.

The library is initiated at next API call.

Endpoint

class ucp.Endpoint(endpoint, ctx, tags=None)[source]

An endpoint represents a connection to a peer

Please use create_listener() and create_endpoint() to create an Endpoint.

abort()[source]

Close the communication immediately and abruptly. Useful in destructors or generators’ finally blocks.

Notice, this functions doesn’t signal the connected peer to close. To do that, use Endpoint.close()

am_recv()[source]

Receive from connected peer.

am_send(buffer)[source]

Send buffer to connected peer.

Parameters

buffer: exposing the buffer protocol or array/cuda interface

The buffer to send. Raise ValueError if buffer is smaller than nbytes.

async close()[source]

Close the endpoint cleanly. This will attempt to flush outgoing buffers before actually closing the underlying UCX endpoint.

close_after_n_recv(n, count_from_ep_creation=False)[source]

Close the endpoint after n received messages.

Parameters

n: int

Number of messages to received before closing the endpoint.

count_from_ep_creation: bool, optional

Whether to count n from this function call (default) or from the creation of the endpoint.

closed()[source]

Is this endpoint closed?

cuda_support()[source]

Return whether UCX is configured with CUDA support or not

get_ucp_endpoint()[source]

Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.

get_ucp_worker()[source]

Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.

recv(buffer, tag=None, force_tag=False)[source]

Receive from connected peer into buffer.

Parameters

buffer: exposing the buffer protocol or array/cuda interface

The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.

tag: hashable, optional

Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async recv_obj(tag=None, allocator=<class 'bytearray'>)[source]

Receive from connected peer that calls send_obj().

As opposed to recv(), this function returns the received object. Data is received into a buffer allocated by allocator.

The transfer includes an extra message containing the size of obj, which increses the overhead slightly.

Parameters

tag: hashable, optional

Set a tag that must match the received message. Notice, currently UCX-Py doesn’t support a “any tag” thus tag=None only matches a send that also sets tag=None.

allocator: callabale, optional

Function to allocate the received object. The function should take the number of bytes to allocate as input and return a new buffer of that size as output.

Example

>>> await pickle.loads(ep.recv_obj())
send(buffer, tag=None, force_tag=False)[source]

Send buffer to connected peer.

Parameters

buffer: exposing the buffer protocol or array/cuda interface

The buffer to send. Raise ValueError if buffer is smaller than nbytes.

tag: hashable, optional tag: hashable, optional

Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.

force_tag: bool

If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.

async send_obj(obj, tag=None)[source]

Send obj to connected peer that calls recv_obj().

The transfer includes an extra message containing the size of obj, which increases the overhead slightly.

Parameters

obj: exposing the buffer protocol or array/cuda interface

The object to send.

tag: hashable, optional

Set a tag that the receiver must match.

Example

>>> await ep.send_obj(pickle.dumps([1,2,3]))
set_close_callback(callback_func)[source]

Register a user callback function to be called on Endpoint’s closing.

Allows the user to register a callback function to be called when the Endpoint’s error callback is called, or during its finalizer if the error callback is never called.

Once the callback is called, it’s not possible to send any more messages. However, receiving messages may still be possible, as UCP may still have incoming messages in transit.

Parameters

callback_func: callable

The callback function to be called when the Endpoint’s error callback is called, otherwise called on its finalizer.

Example >>> ep.set_close_callback(lambda: print(“Executing close callback”))

ucx_info()[source]

Return low-level UCX info about this endpoint as a string

property uid

The unique ID of the underlying UCX endpoint

Listener

class ucp.Listener(backend)[source]

A handle to the listening service started by create_listener()

The listening continues as long as this object exist or .close() is called. Please use create_listener() to create an Listener.

close()[source]

Closing the listener

closed()[source]

Is the listener closed?

property ip

The listening network IP address

property port

The listening network port