rixaplugin.internal package

Submodules

rixaplugin.internal.api module

https://www.youtube.com/watch?v=siwpn14IE7E

class rixaplugin.internal.api.BaseAPI(request_id, identity, scope=None)

Bases: object

Base class for all API objs. Simultaneously fallback for calls where no API was passed.

API objs only exist asynchronously. Sync calls will be wrapped. The API implementation is provided by a server on which the plugin system runs. It’s primary function is to access webserver functionality. For development and consistency reasons, serverless API exists as well. This is the base and fallback for all API calls. It serves API calls with minimal compatibility, i.e. it just prints the result.

The API implementation assumes a django server, although as mentioned it is not a requirement.

async call(function_name, args_func, kwargs_func, **kwargs)

Low level remote call.

Used for more fine grained control over the call. If you don’t know what this is, you probably don’t need it. :param function_name: :param args_func: :param kwargs_func: :param kwargs: :return:

async call_client_js(function_name, *args, oneway=True)

Call a JS function in the clients browser

Parameters:
  • function_name – JS function name

  • args – Arguments to pass to the function

  • oneway – If True, the call will be one way, i.e. no return value will be expected

Returns:

return value of the JS function

async datalog_to_tmp(message, write_mode='a')

Write into a file in the tmp directory.

Filename is the request_id of the API object. Folder can be specified via settings.TMP_DATA_LOG_FOLDER This is not meant for normal logging, but for large text dumps (e.g. a chat history) that are tied to a specific request. :param message: String to write :param write_mode: Mode to open the file in. Default is append.

async display(html=None, json=None, plotly=None, text=None)
async display_in_chat(tracker_entry=None, text=None, html=None, plotly_obj=None, role=None, citations=None, index=None, flags=None)
async execute_code_on_remote(code)
static get_request_id(function_name, args, kwargs)
async retrieve_usr_obj(key)
async save_usr_obj(key, value, sync_db=False)
async show_message(message, message_type='info')

Display a message to the user.

Will show a box that needs to be acknowledged before the website can be used again. There is always just one message box, so if a new message is shown, the old one will be replaced. :param message: Message to display :param message_type:

async sync_session_storage_db()

Sync the session storage with the database. Use sparingly!

property worker_ctx
class rixaplugin.internal.api.JupyterAPI(request_id, identity)

Bases: BaseAPI

API class for Jupyter Notebooks.

Does the same as the base API, but uses jupyters display methods when possible. Strongly recommended for development and testing.

async display(html=None, json=None, plotly=None, text=None)
async show_message(message, message_type='info')

Display a message to the user.

Will show a box that needs to be acknowledged before the website can be used again. There is always just one message box, so if a new message is shown, the old one will be replaced. :param message: Message to display :param message_type:

class rixaplugin.internal.api.RemoteAPI(request_id, identity, network_adapter, scope=None)

Bases: BaseAPI

API class for remote calls.

Relays all calls to client for execution. Only exception is logging, which will be both logged locally and sent to the server.

rixaplugin.internal.api.construct_api_module()
rixaplugin.internal.api.get_api()
rixaplugin.internal.api.relay_module(*args, **kwargs)

rixaplugin.internal.cli module

async rixaplugin.internal.cli.run_client(debug)
async rixaplugin.internal.cli.run_server(debug)
rixaplugin.internal.cli.setup_plugin_system(path, address=None, port=None, debug=None)

rixaplugin.internal.executor module

class rixaplugin.internal.executor.CountingProcessPoolExecutor(max_workers=None, *args, **kwargs)

Bases: ProcessPoolExecutor

debug_print()
get_active_task_count()
get_api(request_id)
get_free_worker_count()
get_max_task_count()
get_queued_task_count()
async remove_api(request_id)
submit(fn, *args, **kwargs)

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns:

A Future representing the given call.

class rixaplugin.internal.executor.CountingThreadPoolExecutor(max_workers=None, *args, **kwargs)

Bases: ThreadPoolExecutor

debug_print()
get_active_task_count()
get_free_worker_count()
get_max_task_count()
get_queued_task_count()
submit(fn, *args, **kwargs)

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns:

A Future representing the given call.

async rixaplugin.internal.executor.execute(function_name, plugin_name=None, args=None, kwargs=None, api_obj=None, return_future=False, return_time_estimate=False, timeout=30)

Execute a function in the plugin system.

This function is used to execute a function in the plugin system. It first checks if the plugin system is active, then it gets the function entry from the function name and plugin name. It validates the call and then executes the function.

Args:

function_name (str): The name of the function to be executed. plugin_name (str, optional): The name of the plugin where the function is located. Defaults to None. args (tuple, optional): The positional arguments to pass to the function. Defaults to (). kwargs (dict, optional): The keyword arguments to pass to the function. Defaults to {}. api_obj (BaseAPI, optional): The API object to use for the function call. If None, a new BaseAPI object is created. Defaults to None. return_future (bool, optional): If True, the function will return a future of the function call. Defaults to False. return_time_estimate (bool, optional): If True, the function will return a time estimate of the function call. Defaults to False. timeout (int, optional): The maximum time to wait for the function call to complete. Defaults to 10.

Raises:

Exception: If the plugin system is not initialized.

Returns:

Future or any: If return_future is True, it returns a Future object. Otherwise, it returns the result of the function call.

async rixaplugin.internal.executor.execute_async(entry, args, kwargs, api_obj, return_future)
async rixaplugin.internal.executor.execute_code(code, api_obj=None, return_future=True, timeout=30)

Execute (a string) as code in the plugin system.

This is not meant for normal programming, as functionality is severely limited. It can be used to provide a safe interface to the plugin functions e.g. in a web interface.

async rixaplugin.internal.executor.execute_networked(func_name, plugin_name, plugin_id, args, kwargs, oneway, request_id, identity, network_adapter, scope)
async rixaplugin.internal.executor.execute_sync(entry, args, kwargs, api_obj, return_future)

Runs a local sync function in the plugin system.

For sync-sync calls, do not use this function, but rather call the function directly. :param plugin_entry: :param args: :param kwargs: :param return_future: :return:

rixaplugin.internal.executor.fake_function(func_name, plugin_name, *args, **kwargs)
rixaplugin.internal.executor.init_plugin_system(mode=PluginModeFlags.None, num_workers=None, debug=False, max_jupyter_messages=10)

rixaplugin.internal.memory module

class rixaplugin.internal.memory.PluginMemory

Bases: object

Singleton class to store plugin information.

DO NOT INSTANTIATE THIS CLASS DIRECTLY! Ensures access to variables is only possible from main thread and proper cleanup of resources.

add_client_connection(scheme)
add_function(signature_dict, id=None, fn_type=FunctionPointerType.LOCAL)
add_plugin(plugin_dict, identity, remote_origin, origin_is_client=False, tags=None)
apply_tags_plugin(plugin_name, tags)
clean()
delete_connection(con)
delete_plugin(plugin_id)
find_function_by_name(func_name)
find_plugin_by_name(plugin_name)
force_shutdown()
get_functions(scope)
get_functions_as_str(scope, short=False, include_docstr=True)
get_plugins(scope)
get_sendable_plugins(remote_id=-1, skip=None)
pretty_print_filtered_plugins(scope, include_functions=True, include_docstr=True, include_plugin_meta=True)
pretty_print_plugin(plugin_name, include_docstr=True)
pretty_print_plugins(include_functions=True, include_docstr=True)
static purge()
property zmq_context
rixaplugin.internal.memory.get_function_entry(function_name, plugin_id)
rixaplugin.internal.memory.get_function_entry_by_name(name, plugin_name=None)
rixaplugin.internal.memory.get_plugin_id(plugin_name)

rixaplugin.internal.networking module

class rixaplugin.internal.networking.CredentialsProvider

Bases: object

callback(domain, key)
class rixaplugin.internal.networking.NetworkAdapter(port, use_curve=True, manually_created=True, address=None)

Bases: object

async call_remote_function(plugin_entry, api_obj, args=None, kwargs=None, one_way=False, return_time_estimate=False)
async handle_remote_message(header_flags, msg, identity)
async listen()
async send(identity, data, already_serialized=False)
async send_api_call(identity, request_id, api_func_name, args, kwargs)
async send_exception(identity, request_id, exception)
async send_return(identity, request_id, ret)
async trigger_api_deletion(request_id, time=2)

Trigger the deletion of an api object after a certain time.

We can’t del immediately as api calls can come after return due to async nature. :param request_id: :param time: :return:

class rixaplugin.internal.networking.PluginClient(server_address, port, protocoll='tcp://', use_auth=True, manually_created=True)

Bases: NetworkAdapter

reconnect()
class rixaplugin.internal.networking.PluginServer(port, address=None, use_curve=True, manually_created=True)

Bases: NetworkAdapter

callback(domain, key)
async rixaplugin.internal.networking.create_and_start_plugin_client(server_address, port=2809, raise_on_connection_failure=True, return_future=False, use_auth=False)
async rixaplugin.internal.networking.create_and_start_plugin_server(port, address=None, use_auth=False, return_future=True)
rixaplugin.internal.networking.create_keys(name=None, metadata=None, server_keys=False)

rixaplugin.internal.rixalogger module

class rixaplugin.internal.rixalogger.JupyterLoggingHandler(max_messages=10)

Bases: Handler

emit(record)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

class rixaplugin.internal.rixalogger.RIXAFilter

Bases: Filter

filter(record)

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class rixaplugin.internal.rixalogger.RIXAFormatter(colormode, fmt_string, time_fmt)

Bases: Formatter

CRITICAL = [255, 0, 0]
DEBUG = [51, 204, 204]
ERROR = [255, 0, 102]
FORMATS_CONSOLE = {10: '\x1b[38;2;51;204;204m', 20: '\x1b[38;2;0;255;0m', 30: '\x1b[38;2;204;204;0m', 40: '\x1b[38;2;255;0;102m', 50: '\x1b[38;2;255;0;0m'}
FORMATS_HTML = {10: "<p style='color:rgb(51, 204, 204)'>", 20: "<p style='color:rgb(0, 255, 0)'>", 30: "<p style='color:rgb(204, 204, 0)'>", 40: "<p style='color:rgb(255, 0, 102)'>", 50: "<p style='color:rgb(255, 0, 0)'>"}
INFO = [0, 255, 0]
WARNING = [204, 204, 0]
format(record)

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class rixaplugin.internal.rixalogger.RIXALogger(*args, **kwargs)

Bases: Logger

log_exception()
class rixaplugin.internal.rixalogger.TerminalFormat

Bases: object

Bold = '\x1b[1m'
Dim = '\x1b[2m'
END = '\x1b[0m'
FAIL = '\x1b[91m'
HEADER = '\x1b[95m'
Hidden = '\x1b[8m'
Italic = '\x1b[3m'
NC = '\x1b[0m'
OKBLUE = '\x1b[94m'
OKGREEN = '\x1b[92m'
Reset = '\x1b[0m'
Reset_Bold = '\x1b[21m'
Reset_Dim = '\x1b[22m'
Reset_Hidden = '\x1b[28m'
Reset_Italic = '\x1b[23m'
Reset_Reverse = '\x1b[27m'
Reset_Underlined = '\x1b[24'
Reverse = '\x1b[7m'
STANDARDTEXT = '\x1b[38;2;255;255;255m'
Underlined = '\x1b[4m'
WARNING = '\x1b[93m'
static rgb(r, g, b, foreground=True, html=False, invert=False)
rixaplugin.internal.rixalogger.format_exception(exception, context_lines=2, without_color=False, limit=5, html=False)
rixaplugin.internal.rixalogger.rgb_to_html(x, invert=False)

rixaplugin.internal.utils module

rixaplugin.internal.utils.discover_plugins() list[dict]

Discover all available plugins by reading the registry file. :return: A list of dictionaries containing plugin info (name, endpoint, port, description)

async rixaplugin.internal.utils.event_wait(evt, timeout)
rixaplugin.internal.utils.identifier_from_signature(fname, args=[], kwargs={})

Create a unique identifier from function signature.

If debug mode is enabled, this returns a human-readable string, that allows to identify the function. This means the ID is not necessarily unique! :param fname: :param args: :param kwargs: :return: Unique identifier

rixaplugin.internal.utils.is_valid_call(func_metadata, args, kwargs)

Validates if the provided args and kwargs are valid for the function described by func_metadata.

Parameters:
  • func_metadata – Dictionary describing the function’s parameters.

  • args – Tuple of positional arguments to be passed to the function.

  • kwargs – Dictionary of keyword arguments to be passed to the function.

Returns:

True if the call is valid, False otherwise.

rixaplugin.internal.utils.make_discoverable(id: str, endpoint: str, port: int, plugins: list) None

Make oneself available by writing to the registry file. If the plugin is already registered, it will be updated with new values.

Parameters:
  • name – The unique name of the plugin

  • endpoint – An IP-like string (e.g. “localhost”, “example.com”)

  • port – A TCP port number

  • description – A short human-readable description of the plugin

rixaplugin.internal.utils.remove_plugin(id: str) None

Remove one’s own entry from the registry file. :param name: The unique name of the plugin to be removed

async rixaplugin.internal.utils.supervise_future(future, id=None)

Supervise a future, log any exceptions.

Used for plugin calls that are not awaited. :param future: Future to supervise :param id: Origin id (if available)

async rixaplugin.internal.utils.supervisor(future, id=None)

Module contents