Skip to content

Module streamdeck_ui.api

Defines the Python API for interacting with the StreamDeck Configuration UI

None

View Source
"""Defines the Python API for interacting with the StreamDeck Configuration UI"""

import json

import os

import shlex

import threading

import time

from functools import partial

from subprocess import Popen  # nosec - Need to allow users to specify arbitrary commands

from typing import Dict, Tuple, Union, cast

from warnings import warn

from PIL import Image, ImageDraw, ImageFont

from pynput.keyboard import Controller, Key

from StreamDeck import DeviceManager

from StreamDeck.Devices import StreamDeck

from StreamDeck.ImageHelpers import PILHelper

from streamdeck_ui.config import CONFIG_FILE_VERSION, DEFAULT_FONT, FONTS_PATH, STATE_FILE

image_cache: Dict[str, memoryview] = {}

decks: Dict[str, StreamDeck.StreamDeck] = {}

state: Dict[str, Dict[str, Union[int, Dict[int, Dict[int, Dict[str, str]]]]]] = {}

streamdecks_lock = threading.Lock()

def _replace_special_keys(key):

    """Replaces special keywords the user can use with their character equivalent."""

    if key.lower() == "plus":

        return "+"

    if key.lower() == "comma":

        return ","

    if key.lower() == "delay":

        return "delay"

    return key

def _key_change_callback(deck_id: str, _deck: StreamDeck.StreamDeck, key: int, state: bool) -> None:

    """ Callback whenever a key is pressed. This is method runs the various actions defined

        for the key being pressed, sequentially. """

    if state:

        keyboard = Controller()

        page = get_page(deck_id)

        command = get_button_command(deck_id, page, key)

        if command:

            try:

                Popen(shlex.split(command))

            except Exception as error:

                print(f"The command '{command}' failed: {error}")

        keys = get_button_keys(deck_id, page, key)

        if keys:

            keys = keys.strip().replace(" ", "")

            for section in keys.split(","):

                # Since + and , are used to delimit our section and keys to press,

                # they need to be substituded with keywords.

                section_keys = [_replace_special_keys(key_name) for key_name in section.split("+")]

                # Translate string to enum, or just the string itself if not found

                section_keys = [

                    getattr(Key, key_name.lower(), key_name) for key_name in section_keys

                ]

                for key_name in section_keys:

                    try:

                        if key_name == "delay":

                            time.sleep(0.5)

                        else:

                            keyboard.press(key_name)

                    except Exception:

                        print(f"Could not press key '{key_name}'")

                for key_name in section_keys:

                    try:

                        if key_name != "delay":

                            keyboard.release(key_name)

                    except Exception:

                        print(f"Could not release key '{key_name}'")

        write = get_button_write(deck_id, page, key)

        if write:

            try:

                keyboard.type(write)

            except Exception as error:

                print(f"Could not complete the write command: {error}")

        brightness_change = get_button_change_brightness(deck_id, page, key)

        if brightness_change:

            try:

                change_brightness(deck_id, brightness_change)

            except Exception as error:

                print(f"Could not change brightness: {error}")

        switch_page = get_button_switch_page(deck_id, page, key)

        if switch_page:

            set_page(deck_id, switch_page - 1)

def _save_state():

    export_config(STATE_FILE)

def _open_config(config_file: str):

    global state

    with open(config_file) as state_file:

        config = json.loads(state_file.read())

        file_version = config.get("streamdeck_ui_version", 0)

        if file_version != CONFIG_FILE_VERSION:

            raise ValueError(

                "Incompatible version of config file found: "

                f"{file_version} does not match required version "

                f"{CONFIG_FILE_VERSION}."

            )

        state = {}

        for deck_id, deck in config["state"].items():

            deck["buttons"] = {

                int(page_id): {int(button_id): button for button_id, button in buttons.items()}

                for page_id, buttons in deck.get("buttons", {}).items()

            }

            state[deck_id] = deck

def import_config(config_file: str) -> None:

    _open_config(config_file)

    render()

    _save_state()

def export_config(output_file: str) -> None:

    try:

        with open(output_file + ".tmp", "w") as state_file:

            state_file.write(

                json.dumps(

                    {"streamdeck_ui_version": CONFIG_FILE_VERSION, "state": state},

                    indent=4,

                    separators=(",", ": "),

                )

            )

    except Exception as error:

        print(f"The configuration file '{output_file}' was not updated. Error: {error}")

        raise

    else:

        os.replace(output_file + ".tmp", output_file)

def open_decks() -> Dict[str, Dict[str, Union[str, Tuple[int, int]]]]:

    """Opens and then returns all known stream deck devices"""

    for deck in DeviceManager.DeviceManager().enumerate():

        deck.open()

        deck.reset()

        deck_id = deck.get_serial_number()

        decks[deck_id] = deck

        deck.set_key_callback(partial(_key_change_callback, deck_id))

    return {

        deck_id: {"type": deck.deck_type(), "layout": deck.key_layout()}

        for deck_id, deck in decks.items()

    }

def close_decks() -> None:

    """Closes open decks for input/ouput."""

    for _deck_serial, deck in decks.items():

        if deck.connected():

            deck.reset()

            deck.close()

def ensure_decks_connected() -> None:

    """Reconnects to any decks that lost connection. If they did, re-renders them."""

    for deck_serial, deck in decks.copy().items():

        if not deck.connected():

            for new_deck in DeviceManager.DeviceManager().enumerate():

                try:

                    new_deck.open()

                    new_deck_serial = new_deck.get_serial_number()

                except Exception as error:

                    warn(f"A {error} error occurred when trying to reconnect to {deck_serial}")

                    new_deck_serial = None

                if new_deck_serial == deck_serial:

                    deck.close()

                    new_deck.reset()

                    new_deck.set_key_callback(partial(_key_change_callback, new_deck_serial))

                    decks[new_deck_serial] = new_deck

                    render()

def get_deck(deck_id: str) -> Dict[str, Dict[str, Union[str, Tuple[int, int]]]]:

    return {"type": decks[deck_id].deck_type(), "layout": decks[deck_id].key_layout()}

def _button_state(deck_id: str, page: int, button: int) -> dict:

    buttons = state.setdefault(deck_id, {}).setdefault("buttons", {})

    buttons_state = buttons.setdefault(page, {})  # type: ignore

    return buttons_state.setdefault(button, {})  # type: ignore

def swap_buttons(deck_id: str, page: int, source_button: int, target_button: int) -> None:

    """Swaps the properties of the source and target buttons"""

    temp = cast(dict, state[deck_id]["buttons"])[page][source_button]

    cast(dict, state[deck_id]["buttons"])[page][source_button] = cast(

        dict, state[deck_id]["buttons"]

    )[page][target_button]

    cast(dict, state[deck_id]["buttons"])[page][target_button] = temp

    # Clear the cache so images will be recreated on render

    image_cache.pop(f"{deck_id}.{page}.{source_button}", None)

    image_cache.pop(f"{deck_id}.{page}.{target_button}", None)

    _save_state()

    render()

def set_button_text(deck_id: str, page: int, button: int, text: str) -> None:

    """Set the text associated with a button"""

    if get_button_text(deck_id, page, button) != text:

        _button_state(deck_id, page, button)["text"] = text

        image_cache.pop(f"{deck_id}.{page}.{button}", None)

        render()

        _save_state()

def get_button_text(deck_id: str, page: int, button: int) -> str:

    """Returns the text set for the specified button"""

    return _button_state(deck_id, page, button).get("text", "")

def set_button_icon(deck_id: str, page: int, button: int, icon: str) -> None:

    """Sets the icon associated with a button"""

    if get_button_icon(deck_id, page, button) != icon:

        _button_state(deck_id, page, button)["icon"] = icon

        image_cache.pop(f"{deck_id}.{page}.{button}", None)

        render()

        _save_state()

def get_button_icon(deck_id: str, page: int, button: int) -> str:

    """Returns the icon set for a particular button"""

    return _button_state(deck_id, page, button).get("icon", "")

def set_button_change_brightness(deck_id: str, page: int, button: int, amount: int) -> None:

    """Sets the brightness changing associated with a button"""

    if get_button_change_brightness(deck_id, page, button) != amount:

        _button_state(deck_id, page, button)["brightness_change"] = amount

        render()

        _save_state()

def get_button_change_brightness(deck_id: str, page: int, button: int) -> int:

    """Returns the brightness change set for a particular button"""

    return _button_state(deck_id, page, button).get("brightness_change", 0)

def set_button_command(deck_id: str, page: int, button: int, command: str) -> None:

    """Sets the command associated with the button"""

    if get_button_command(deck_id, page, button) != command:

        _button_state(deck_id, page, button)["command"] = command

        _save_state()

def get_button_command(deck_id: str, page: int, button: int) -> str:

    """Returns the command set for the specified button"""

    return _button_state(deck_id, page, button).get("command", "")

def set_button_switch_page(deck_id: str, page: int, button: int, switch_page: int) -> None:

    """Sets the page switch associated with the button"""

    if get_button_switch_page(deck_id, page, button) != switch_page:

        _button_state(deck_id, page, button)["switch_page"] = switch_page

        _save_state()

def get_button_switch_page(deck_id: str, page: int, button: int) -> int:

    """Returns the page switch set for the specified button. 0 implies no page switch."""

    return _button_state(deck_id, page, button).get("switch_page", 0)

def set_button_keys(deck_id: str, page: int, button: int, keys: str) -> None:

    """Sets the keys associated with the button"""

    if get_button_keys(deck_id, page, button) != keys:

        _button_state(deck_id, page, button)["keys"] = keys

        _save_state()

def get_button_keys(deck_id: str, page: int, button: int) -> str:

    """Returns the keys set for the specified button"""

    return _button_state(deck_id, page, button).get("keys", "")

def set_button_write(deck_id: str, page: int, button: int, write: str) -> None:

    """Sets the text meant to be written when button is pressed"""

    if get_button_write(deck_id, page, button) != write:

        _button_state(deck_id, page, button)["write"] = write

        _save_state()

def get_button_write(deck_id: str, page: int, button: int) -> str:

    """Returns the text to be produced when the specified button is pressed"""

    return _button_state(deck_id, page, button).get("write", "")

def set_brightness(deck_id: str, brightness: int) -> None:

    """Sets the brightness for every button on the deck"""

    if get_brightness(deck_id) != brightness:

        decks[deck_id].set_brightness(brightness)

        state.setdefault(deck_id, {})["brightness"] = brightness

        _save_state()

def get_brightness(deck_id: str) -> int:

    """Gets the brightness that is set for the specified stream deck"""

    return state.get(deck_id, {}).get("brightness", 100)  # type: ignore

def change_brightness(deck_id: str, amount: int = 1) -> None:

    """Change the brightness of the deck by the specified amount"""

    set_brightness(deck_id, max(min(get_brightness(deck_id) + amount, 100), 0))

def get_page(deck_id: str) -> int:

    """Gets the current page shown on the stream deck"""

    return state.get(deck_id, {}).get("page", 0)  # type: ignore

def set_page(deck_id: str, page: int) -> None:

    """Sets the current page shown on the stream deck"""

    if get_page(deck_id) != page:

        state.setdefault(deck_id, {})["page"] = page

        render()

        _save_state()

def render() -> None:

    """renders all decks"""

    for deck_id, deck_state in state.items():

        deck = decks.get(deck_id, None)

        if not deck:

            warn(f"{deck_id} has settings specified but is not seen. Likely unplugged!")

            continue

        page = get_page(deck_id)

        for button_id, button_settings in (

            deck_state.get("buttons", {}).get(page, {}).items()  # type: ignore

        ):

            key = f"{deck_id}.{page}.{button_id}"

            if key in image_cache:

                image = image_cache[key]

            else:

                image = _render_key_image(deck, **button_settings)

                image_cache[key] = image

            with streamdecks_lock:

                deck.set_key_image(button_id, image)

def _render_key_image(deck, icon: str = "", text: str = "", font: str = DEFAULT_FONT, **kwargs):

    """Renders an individual key image"""

    image = PILHelper.create_image(deck)

    draw = ImageDraw.Draw(image)

    if icon:

        try:

            rgba_icon = Image.open(icon).convert("RGBA")

        except (OSError, IOError) as icon_error:

            print(f"Unable to load icon {icon} with error {icon_error}")

            rgba_icon = Image.new("RGBA", (300, 300))

    else:

        rgba_icon = Image.new("RGBA", (300, 300))

    icon_width, icon_height = image.width, image.height

    if text:

        icon_height -= 20

    rgba_icon.thumbnail((icon_width, icon_height), Image.LANCZOS)

    icon_pos = ((image.width - rgba_icon.width) // 2, 0)

    image.paste(rgba_icon, icon_pos, rgba_icon)

    if text:

        true_font = ImageFont.truetype(os.path.join(FONTS_PATH, font), 14)

        label_w, label_h = draw.textsize(text, font=true_font)

        if icon:

            label_pos = ((image.width - label_w) // 2, image.height - 20)

        else:

            label_pos = ((image.width - label_w) // 2, (image.height // 2) - 7)

        draw.text(label_pos, text=text, font=true_font, fill="white")

    return PILHelper.to_native_format(deck, image)

if os.path.isfile(STATE_FILE):

    _open_config(STATE_FILE)

Variables

CONFIG_FILE_VERSION
DEFAULT_FONT
FONTS_PATH
STATE_FILE
decks
image_cache
state
streamdecks_lock

Functions

change_brightness

def change_brightness(
    deck_id: str,
    amount: int = 1
) -> None

Change the brightness of the deck by the specified amount

View Source
def change_brightness(deck_id: str, amount: int = 1) -> None:

    """Change the brightness of the deck by the specified amount"""

    set_brightness(deck_id, max(min(get_brightness(deck_id) + amount, 100), 0))

close_decks

def close_decks(

) -> None

Closes open decks for input/ouput.

View Source
def close_decks() -> None:

    """Closes open decks for input/ouput."""

    for _deck_serial, deck in decks.items():

        if deck.connected():

            deck.reset()

            deck.close()

ensure_decks_connected

def ensure_decks_connected(

) -> None

Reconnects to any decks that lost connection. If they did, re-renders them.

View Source
def ensure_decks_connected() -> None:

    """Reconnects to any decks that lost connection. If they did, re-renders them."""

    for deck_serial, deck in decks.copy().items():

        if not deck.connected():

            for new_deck in DeviceManager.DeviceManager().enumerate():

                try:

                    new_deck.open()

                    new_deck_serial = new_deck.get_serial_number()

                except Exception as error:

                    warn(f"A {error} error occurred when trying to reconnect to {deck_serial}")

                    new_deck_serial = None

                if new_deck_serial == deck_serial:

                    deck.close()

                    new_deck.reset()

                    new_deck.set_key_callback(partial(_key_change_callback, new_deck_serial))

                    decks[new_deck_serial] = new_deck

                    render()

export_config

def export_config(
    output_file: str
) -> None
View Source
def export_config(output_file: str) -> None:

    try:

        with open(output_file + ".tmp", "w") as state_file:

            state_file.write(

                json.dumps(

                    {"streamdeck_ui_version": CONFIG_FILE_VERSION, "state": state},

                    indent=4,

                    separators=(",", ": "),

                )

            )

    except Exception as error:

        print(f"The configuration file '{output_file}' was not updated. Error: {error}")

        raise

    else:

        os.replace(output_file + ".tmp", output_file)

get_brightness

def get_brightness(
    deck_id: str
) -> int

Gets the brightness that is set for the specified stream deck

View Source
def get_brightness(deck_id: str) -> int:

    """Gets the brightness that is set for the specified stream deck"""

    return state.get(deck_id, {}).get("brightness", 100)  # type: ignore

get_button_change_brightness

def get_button_change_brightness(
    deck_id: str,
    page: int,
    button: int
) -> int

Returns the brightness change set for a particular button

View Source
def get_button_change_brightness(deck_id: str, page: int, button: int) -> int:

    """Returns the brightness change set for a particular button"""

    return _button_state(deck_id, page, button).get("brightness_change", 0)

get_button_command

def get_button_command(
    deck_id: str,
    page: int,
    button: int
) -> str

Returns the command set for the specified button

View Source
def get_button_command(deck_id: str, page: int, button: int) -> str:

    """Returns the command set for the specified button"""

    return _button_state(deck_id, page, button).get("command", "")

get_button_icon

def get_button_icon(
    deck_id: str,
    page: int,
    button: int
) -> str

Returns the icon set for a particular button

View Source
def get_button_icon(deck_id: str, page: int, button: int) -> str:

    """Returns the icon set for a particular button"""

    return _button_state(deck_id, page, button).get("icon", "")

get_button_keys

def get_button_keys(
    deck_id: str,
    page: int,
    button: int
) -> str

Returns the keys set for the specified button

View Source
def get_button_keys(deck_id: str, page: int, button: int) -> str:

    """Returns the keys set for the specified button"""

    return _button_state(deck_id, page, button).get("keys", "")

get_button_switch_page

def get_button_switch_page(
    deck_id: str,
    page: int,
    button: int
) -> int

Returns the page switch set for the specified button. 0 implies no page switch.

View Source
def get_button_switch_page(deck_id: str, page: int, button: int) -> int:

    """Returns the page switch set for the specified button. 0 implies no page switch."""

    return _button_state(deck_id, page, button).get("switch_page", 0)

get_button_text

def get_button_text(
    deck_id: str,
    page: int,
    button: int
) -> str

Returns the text set for the specified button

View Source
def get_button_text(deck_id: str, page: int, button: int) -> str:

    """Returns the text set for the specified button"""

    return _button_state(deck_id, page, button).get("text", "")

get_button_write

def get_button_write(
    deck_id: str,
    page: int,
    button: int
) -> str

Returns the text to be produced when the specified button is pressed

View Source
def get_button_write(deck_id: str, page: int, button: int) -> str:

    """Returns the text to be produced when the specified button is pressed"""

    return _button_state(deck_id, page, button).get("write", "")

get_deck

def get_deck(
    deck_id: str
) -> typing.Dict[str, typing.Dict[str, typing.Union[str, typing.Tuple[int, int]]]]
View Source
def get_deck(deck_id: str) -> Dict[str, Dict[str, Union[str, Tuple[int, int]]]]:

    return {"type": decks[deck_id].deck_type(), "layout": decks[deck_id].key_layout()}

get_page

def get_page(
    deck_id: str
) -> int

Gets the current page shown on the stream deck

View Source
def get_page(deck_id: str) -> int:

    """Gets the current page shown on the stream deck"""

    return state.get(deck_id, {}).get("page", 0)  # type: ignore

import_config

def import_config(
    config_file: str
) -> None
View Source
def import_config(config_file: str) -> None:

    _open_config(config_file)

    render()

    _save_state()

open_decks

def open_decks(

) -> typing.Dict[str, typing.Dict[str, typing.Union[str, typing.Tuple[int, int]]]]

Opens and then returns all known stream deck devices

View Source
def open_decks() -> Dict[str, Dict[str, Union[str, Tuple[int, int]]]]:

    """Opens and then returns all known stream deck devices"""

    for deck in DeviceManager.DeviceManager().enumerate():

        deck.open()

        deck.reset()

        deck_id = deck.get_serial_number()

        decks[deck_id] = deck

        deck.set_key_callback(partial(_key_change_callback, deck_id))

    return {

        deck_id: {"type": deck.deck_type(), "layout": deck.key_layout()}

        for deck_id, deck in decks.items()

    }

render

def render(

) -> None

renders all decks

View Source
def render() -> None:

    """renders all decks"""

    for deck_id, deck_state in state.items():

        deck = decks.get(deck_id, None)

        if not deck:

            warn(f"{deck_id} has settings specified but is not seen. Likely unplugged!")

            continue

        page = get_page(deck_id)

        for button_id, button_settings in (

            deck_state.get("buttons", {}).get(page, {}).items()  # type: ignore

        ):

            key = f"{deck_id}.{page}.{button_id}"

            if key in image_cache:

                image = image_cache[key]

            else:

                image = _render_key_image(deck, **button_settings)

                image_cache[key] = image

            with streamdecks_lock:

                deck.set_key_image(button_id, image)

set_brightness

def set_brightness(
    deck_id: str,
    brightness: int
) -> None

Sets the brightness for every button on the deck

View Source
def set_brightness(deck_id: str, brightness: int) -> None:

    """Sets the brightness for every button on the deck"""

    if get_brightness(deck_id) != brightness:

        decks[deck_id].set_brightness(brightness)

        state.setdefault(deck_id, {})["brightness"] = brightness

        _save_state()

set_button_change_brightness

def set_button_change_brightness(
    deck_id: str,
    page: int,
    button: int,
    amount: int
) -> None

Sets the brightness changing associated with a button

View Source
def set_button_change_brightness(deck_id: str, page: int, button: int, amount: int) -> None:

    """Sets the brightness changing associated with a button"""

    if get_button_change_brightness(deck_id, page, button) != amount:

        _button_state(deck_id, page, button)["brightness_change"] = amount

        render()

        _save_state()

set_button_command

def set_button_command(
    deck_id: str,
    page: int,
    button: int,
    command: str
) -> None

Sets the command associated with the button

View Source
def set_button_command(deck_id: str, page: int, button: int, command: str) -> None:

    """Sets the command associated with the button"""

    if get_button_command(deck_id, page, button) != command:

        _button_state(deck_id, page, button)["command"] = command

        _save_state()

set_button_icon

def set_button_icon(
    deck_id: str,
    page: int,
    button: int,
    icon: str
) -> None

Sets the icon associated with a button

View Source
def set_button_icon(deck_id: str, page: int, button: int, icon: str) -> None:

    """Sets the icon associated with a button"""

    if get_button_icon(deck_id, page, button) != icon:

        _button_state(deck_id, page, button)["icon"] = icon

        image_cache.pop(f"{deck_id}.{page}.{button}", None)

        render()

        _save_state()

set_button_keys

def set_button_keys(
    deck_id: str,
    page: int,
    button: int,
    keys: str
) -> None

Sets the keys associated with the button

View Source
def set_button_keys(deck_id: str, page: int, button: int, keys: str) -> None:

    """Sets the keys associated with the button"""

    if get_button_keys(deck_id, page, button) != keys:

        _button_state(deck_id, page, button)["keys"] = keys

        _save_state()

set_button_switch_page

def set_button_switch_page(
    deck_id: str,
    page: int,
    button: int,
    switch_page: int
) -> None

Sets the page switch associated with the button

View Source
def set_button_switch_page(deck_id: str, page: int, button: int, switch_page: int) -> None:

    """Sets the page switch associated with the button"""

    if get_button_switch_page(deck_id, page, button) != switch_page:

        _button_state(deck_id, page, button)["switch_page"] = switch_page

        _save_state()

set_button_text

def set_button_text(
    deck_id: str,
    page: int,
    button: int,
    text: str
) -> None

Set the text associated with a button

View Source
def set_button_text(deck_id: str, page: int, button: int, text: str) -> None:

    """Set the text associated with a button"""

    if get_button_text(deck_id, page, button) != text:

        _button_state(deck_id, page, button)["text"] = text

        image_cache.pop(f"{deck_id}.{page}.{button}", None)

        render()

        _save_state()

set_button_write

def set_button_write(
    deck_id: str,
    page: int,
    button: int,
    write: str
) -> None

Sets the text meant to be written when button is pressed

View Source
def set_button_write(deck_id: str, page: int, button: int, write: str) -> None:

    """Sets the text meant to be written when button is pressed"""

    if get_button_write(deck_id, page, button) != write:

        _button_state(deck_id, page, button)["write"] = write

        _save_state()

set_page

def set_page(
    deck_id: str,
    page: int
) -> None

Sets the current page shown on the stream deck

View Source
def set_page(deck_id: str, page: int) -> None:

    """Sets the current page shown on the stream deck"""

    if get_page(deck_id) != page:

        state.setdefault(deck_id, {})["page"] = page

        render()

        _save_state()

swap_buttons

def swap_buttons(
    deck_id: str,
    page: int,
    source_button: int,
    target_button: int
) -> None

Swaps the properties of the source and target buttons

View Source
def swap_buttons(deck_id: str, page: int, source_button: int, target_button: int) -> None:

    """Swaps the properties of the source and target buttons"""

    temp = cast(dict, state[deck_id]["buttons"])[page][source_button]

    cast(dict, state[deck_id]["buttons"])[page][source_button] = cast(

        dict, state[deck_id]["buttons"]

    )[page][target_button]

    cast(dict, state[deck_id]["buttons"])[page][target_button] = temp

    # Clear the cache so images will be recreated on render

    image_cache.pop(f"{deck_id}.{page}.{source_button}", None)

    image_cache.pop(f"{deck_id}.{page}.{target_button}", None)

    _save_state()

    render()