API
ucp
|
UCX-Py: Python bindings for UCX <www.openucx.org> |
|
Create and start a listener to accept incoming connections |
|
Create a new endpoint to a server |
|
Get the address associated with a network interface. |
Returns all UCX configuration options as a dict. |
|
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer. |
|
Return the version of the underlying UCX installation |
|
|
Initiate UCX. |
Try to progress the communication layer |
|
Resets the UCX library by shutting down all of UCX. |
Endpoint
|
An endpoint represents a connection to a peer |
Close the communication immediately and abruptly. |
|
Close the endpoint cleanly. |
|
Is this endpoint closed? |
|
|
Close the endpoint after n received messages. |
Return whether UCX is configured with CUDA support or not |
|
Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer. |
|
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer. |
|
|
Receive from connected peer into buffer. |
|
Send buffer to connected peer. |
Return low-level UCX info about this endpoint as a string |
|
The unique ID of the underlying UCX endpoint |
Listener
|
A handle to the listening service started by create_listener() |
Closing the listener |
|
Is the listener closed? |
|
The listening network port |
- ucp.create_listener(callback_func, port=None, endpoint_error_handling=True)[source]
Create and start a listener to accept incoming connections
callback_func is the function or coroutine that takes one argument – the Endpoint connected to the client.
Notice, the listening is closed when the returned Listener goes out of scope thus remember to keep a reference to the object.
Parameters
- callback_func: function or coroutine
A callback function that gets invoked when an incoming connection is accepted
- port: int, optional
An unused port number for listening, or 0 to let UCX assign an unused port.
- endpoint_error_handling: boolean, optional
If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.
Returns
- Listener
The new listener. When this object is deleted, the listening stops
- async ucp.create_endpoint(ip_address, port, endpoint_error_handling=True)[source]
Create a new endpoint to a server
Parameters
- ip_address: str
IP address of the server the endpoint should connect to
- port: int
IP address of the server the endpoint should connect to
- endpoint_error_handling: boolean, optional
If True (default) enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process from terminating unexpectedly that may happen when disabled. If False endpoint endpoint error handling is disabled.
Returns
- Endpoint
The new endpoint
- ucp.get_address(ifname=None)[source]
Get the address associated with a network interface.
Parameters
- ifnamestr
The network interface name to find the address for. If None, it uses the value of environment variable UCXPY_IFNAME and if UCXPY_IFNAME is not set it defaults to “ib0” An OSError is raised for invalid interfaces.
Returns
- addressstr
The inet addr associated with an interface.
Raises
- RuntimeError
If a network address could not be determined.
Examples
>>> get_address() '10.33.225.160'
>>> get_address(ifname='lo') '127.0.0.1'
- ucp.get_config()[source]
Returns all UCX configuration options as a dict.
If UCX is uninitialized, the options returned are the options used if UCX were to be initialized now. Notice, this function doesn’t initialize UCX.
Returns
- dict
The current UCX configuration options
- ucp.get_ucp_worker()[source]
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.
- ucp.get_ucx_version()[source]
Return the version of the underlying UCX installation
Notice, this function doesn’t initialize UCX.
Returns
- tuple
The version as a tuple e.g. (1, 7, 0)
- ucp.init(options={}, env_takes_precedence=False, blocking_progress_mode=None)[source]
Initiate UCX.
Usually this is done automatically at the first API call but this function makes it possible to set UCX options programmable. Alternatively, UCX options can be specified through environment variables.
Parameters
- options: dict, optional
UCX options send to the underlying UCX library
- env_takes_precedence: bool, optional
Whether environment variables takes precedence over the options specified here.
- blocking_progress_mode: bool, optional
If None, blocking UCX progress mode is used unless the environment variable UCXPY_NON_BLOCKING_MODE is defined. Otherwise, if True blocking mode is used and if False non-blocking mode is used.
- ucp.progress()[source]
Try to progress the communication layer
Warning, it is illegal to call this from a call-back function such as the call-back function given to create_listener.
- ucp.reset()[source]
Resets the UCX library by shutting down all of UCX.
The library is initiated at next API call.
Endpoint
- class ucp.Endpoint(endpoint, ctx, tags=None)[source]
An endpoint represents a connection to a peer
Please use create_listener() and create_endpoint() to create an Endpoint.
- abort()[source]
Close the communication immediately and abruptly. Useful in destructors or generators’
finally
blocks.Notice, this functions doesn’t signal the connected peer to close. To do that, use Endpoint.close()
- am_send(buffer)[source]
Send buffer to connected peer.
Parameters
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to send. Raise ValueError if buffer is smaller than nbytes.
- async close()[source]
Close the endpoint cleanly. This will attempt to flush outgoing buffers before actually closing the underlying UCX endpoint.
- close_after_n_recv(n, count_from_ep_creation=False)[source]
Close the endpoint after n received messages.
Parameters
- n: int
Number of messages to received before closing the endpoint.
- count_from_ep_creation: bool, optional
Whether to count n from this function call (default) or from the creation of the endpoint.
- get_ucp_endpoint()[source]
Returns the underlying UCP endpoint handle (ucp_ep_h) as a Python integer.
- get_ucp_worker()[source]
Returns the underlying UCP worker handle (ucp_worker_h) as a Python integer.
- recv(buffer, tag=None, force_tag=False)[source]
Receive from connected peer into buffer.
Parameters
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only.
- tag: hashable, optional
Set a tag that must match the received message. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async recv_obj(tag=None, allocator=<class 'bytearray'>)[source]
Receive from connected peer that calls send_obj().
As opposed to recv(), this function returns the received object. Data is received into a buffer allocated by allocator.
The transfer includes an extra message containing the size of obj, which increses the overhead slightly.
Parameters
- tag: hashable, optional
Set a tag that must match the received message. Notice, currently UCX-Py doesn’t support a “any tag” thus tag=None only matches a send that also sets tag=None.
- allocator: callabale, optional
Function to allocate the received object. The function should take the number of bytes to allocate as input and return a new buffer of that size as output.
Example
>>> await pickle.loads(ep.recv_obj())
- send(buffer, tag=None, force_tag=False)[source]
Send buffer to connected peer.
Parameters
- buffer: exposing the buffer protocol or array/cuda interface
The buffer to send. Raise ValueError if buffer is smaller than nbytes.
tag: hashable, optional tag: hashable, optional
Set a tag that the receiver must match. Currently the tag is hashed together with the internal Endpoint tag that is agreed with the remote end at connection time. To enforce using the user tag, make sure to specify force_tag=True.
- force_tag: bool
If true, force using tag as is, otherwise the value specified with tag (if any) will be hashed with the internal Endpoint tag.
- async send_obj(obj, tag=None)[source]
Send obj to connected peer that calls recv_obj().
The transfer includes an extra message containing the size of obj, which increases the overhead slightly.
Parameters
- obj: exposing the buffer protocol or array/cuda interface
The object to send.
- tag: hashable, optional
Set a tag that the receiver must match.
Example
>>> await ep.send_obj(pickle.dumps([1,2,3]))
- set_close_callback(callback_func)[source]
Register a user callback function to be called on Endpoint’s closing.
Allows the user to register a callback function to be called when the Endpoint’s error callback is called, or during its finalizer if the error callback is never called.
Once the callback is called, it’s not possible to send any more messages. However, receiving messages may still be possible, as UCP may still have incoming messages in transit.
Parameters
- callback_func: callable
The callback function to be called when the Endpoint’s error callback is called, otherwise called on its finalizer.
Example >>> ep.set_close_callback(lambda: print(“Executing close callback”))
- property uid
The unique ID of the underlying UCX endpoint
Listener
- class ucp.Listener(backend)[source]
A handle to the listening service started by create_listener()
The listening continues as long as this object exist or .close() is called. Please use create_listener() to create an Listener.
- property ip
The listening network IP address
- property port
The listening network port