Source code for openwebui_token_tracking.credit_groups

import sqlalchemy as db
from sqlalchemy.orm import Session

from openwebui_token_tracking.db import init_db
from openwebui_token_tracking.db.credit_group import CreditGroup, CreditGroupUser
from openwebui_token_tracking.db.user import User
from openwebui_token_tracking.user import serialize_user

import os


[docs] def create_credit_group( credit_group_name: str, credit_allowance: int, description: str, database_url: str = None, ): """Creates a credit group in the database. :param credit_group_name: Name of the credit group to be created. :type credit_group_name: str :param credit_allowance: Maximum credit allowance granted to members of this group. :type credit_allowance: int :param description: Description e of the credit group to be created. :type description: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :raises KeyError: Raised if a credit group of this name already exists. """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: # Make sure credit group of that name does not already exist credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: session.add( CreditGroup( name=credit_group_name, max_credit=credit_allowance, description=description, ) ) session.commit() else: raise KeyError( f"A credit group of that name already exists: '{credit_group.name}'" )
[docs] def get_credit_group(credit_group_name: str, database_url: str = None) -> dict: """Retrieves a credit group from the database by its name and returns it as a dictionary. :param credit_group_name: Name of the credit group to retrieve :type credit_group_name: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :return: Dictionary containing the credit group properties (id, name, max_credit, description) :rtype: dict :raises KeyError: Raised if the credit group of that name could not be found """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group_name}") return { "id": str(credit_group.id), # Convert UUID to string "name": credit_group.name, "max_credit": credit_group.max_credit, "description": credit_group.description, }
[docs] def list_credit_groups(database_url: str = None) -> list[dict]: """Lists all credit groups in the database in a readable format. :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :return: List of dictionaries containing formatted credit group information :rtype: list[dict] """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: credit_groups = session.query(CreditGroup).all() formatted_groups = [] for credit_group in credit_groups: # Count number of users in the group user_count = ( session.query(CreditGroupUser) .filter_by(credit_group_id=credit_group.id) .count() ) formatted_groups.append( { "id": str(credit_group.id), # Convert UUID to string "name": credit_group.name, "description": credit_group.description, "max_credit": credit_group.max_credit, "user_count": user_count, } ) return formatted_groups
[docs] def update_credit_group( credit_group_name: str, new_credit_allowance: int = None, new_description: str = None, new_name: str = None, database_url: str = None, ): """Updates an existing credit group in the database. :param credit_group_name: Name of the credit group to update :type credit_group_name: str :param new_credit_allowance: New maximum credit allowance for the group :type new_credit_allowance: int, optional :param new_description: New description for the group :type new_description: str, optional :param new_name: New name for the group :type new_name: str, optional :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :raises KeyError: Raised if the credit group of that name could not be found :raises ValueError: Raised if no updates are specified """ if all( param is None for param in [new_credit_allowance, new_description, new_name] ): raise ValueError("At least one update parameter must be specified") if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group_name}") # Update fields if new values are provided if new_credit_allowance is not None: credit_group.max_credit = new_credit_allowance if new_description is not None: credit_group.description = new_description if new_name is not None: # Check if new name already exists existing = session.query(CreditGroup).filter_by(name=new_name).first() if existing and existing.id != credit_group.id: raise KeyError(f"A credit group with name '{new_name}' already exists") credit_group.name = new_name session.commit()
[docs] def upsert_credit_group( credit_group_name: str, credit_allowance: int, description: str, database_url: str = None, ) -> bool: """Creates a new credit group or updates an existing one if it already exists. :param credit_group_name: Name of the credit group :type credit_group_name: str :param credit_allowance: Maximum credit allowance for the group :type credit_allowance: int :param description: Description of the credit group :type description: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :return: True if a new group was created, False if an existing group was updated :rtype: bool """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if credit_group: # Update existing group credit_group.max_credit = credit_allowance credit_group.description = description session.commit() return False else: # Create new group session.add( CreditGroup( name=credit_group_name, max_credit=credit_allowance, description=description, ) ) session.commit() return True
[docs] def delete_credit_group( credit_group_name: str, database_url: str = None, force: bool = False ): """Deletes a credit group from the database. :param credit_group_name: Name of the credit group to delete :type credit_group_name: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :param force: If True, deletes group even if it has users. If False, raises error if group has users :type force: bool, optional :raises KeyError: Raised if the credit group of that name could not be found :raises ValueError: Raised if the group has users and force=False """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: # Find the credit group credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group_name}") # Check if the group has any users user_count = ( session.query(CreditGroupUser) .filter_by(credit_group_id=credit_group.id) .count() ) if user_count > 0 and not force: raise ValueError( f"Credit group '{credit_group_name}' has {user_count} users. " "Use force=True to delete anyway." ) # Delete all user associations if force=True if force: session.query(CreditGroupUser).filter_by( credit_group_id=credit_group.id ).delete() # Delete the credit group session.delete(credit_group) session.commit()
[docs] def add_user(user_id: str, credit_group_name: str, database_url: str = None): """Add the specified user to the credit group :param credit_group_name: Name of the credit group to add the user to :type credit_group_name: str :param user_id: ID of the user :type user_id: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :raises KeyError: Raised if the credit group of that name could not be found """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group=}") # Add user to credit group user = session.query(User).filter_by(id=user_id).first() session.merge(CreditGroupUser(credit_group_id=credit_group.id, user_id=user.id)) session.commit()
[docs] def remove_user(user_id: str, credit_group_name: str, database_url: str = None): """Removes a user from the specified credit group. :param user_id: ID of the user to remove :type user_id: str :param credit_group_name: Name of the credit group to remove the user from :type credit_group_name: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :raises KeyError: Raised if the credit group of that name could not be found :raises ValueError: Raised if the user is not in the specified credit group """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: # Find the credit group credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group_name}") # Find the user user = session.query(User).filter_by(id=user_id).first() if not user: raise KeyError(f"Could not find user with ID: {user_id}") # Find and delete the association association = ( session.query(CreditGroupUser) .filter_by(credit_group_id=credit_group.id, user_id=user.id) .first() ) if not association: raise ValueError( f"User {user_id} is not a member of credit group '{credit_group_name}'" ) session.delete(association) session.commit()
[docs] def list_users(credit_group_name: str, database_url: str = None) -> list[dict]: """Lists all users in a specific credit group. :param credit_group_name: Name of the credit group to list users from :type credit_group_name: str :param database_url: URL of the database. If None, uses env variable ``DATABASE_URL`` :type database_url: str, optional :return: List of dictionaries containing user information :rtype: list[dict] :raises KeyError: Raised if the credit group of that name could not be found """ if database_url is None: database_url = os.environ["DATABASE_URL"] engine = init_db(database_url) with Session(engine) as session: # Find the credit group credit_group = ( session.query(CreditGroup).filter_by(name=credit_group_name).first() ) if not credit_group: raise KeyError(f"Could not find credit group: {credit_group_name}") # Query users in the group users = ( session.query(User) .join(CreditGroupUser) .filter(CreditGroupUser.credit_group_id == credit_group.id) .all() ) # Format user information user_list = [] for user in users: user_list.append( serialize_user(user), ) return user_list