Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 1m48s
Run linting on the backend code / Build (pull_request) Failing after 29s
Run testing on the backend code / Build (pull_request) Failing after 50s
Build and release debug APK / Build APK (pull_request) Failing after 3m50s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 27s
171 lines
6.1 KiB
Python
171 lines
6.1 KiB
Python
import os
|
|
import logging
|
|
import yaml
|
|
from fastapi import HTTPException, status
|
|
from supabase import create_client, Client, ClientOptions
|
|
|
|
from ..constants import PARAMETERS_DIR
|
|
|
|
# Silence the supabase logger
|
|
logging.getLogger("httpx").setLevel(logging.CRITICAL)
|
|
logging.getLogger("hpack").setLevel(logging.CRITICAL)
|
|
logging.getLogger("httpcore").setLevel(logging.CRITICAL)
|
|
|
|
|
|
class Supabase:
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def __init__(self):
|
|
|
|
with open(os.path.join(PARAMETERS_DIR, 'secrets.yaml')) as f:
|
|
secrets = yaml.safe_load(f)
|
|
self.SUPABASE_URL = secrets['SUPABASE_URL']
|
|
self.SUPABASE_ADMIN_KEY = secrets['SUPABASE_ADMIN_KEY']
|
|
self.SUPABASE_TEST_USER_ID = secrets['SUPABASE_TEST_USER_ID']
|
|
|
|
self.supabase = create_client(
|
|
self.SUPABASE_URL,
|
|
self.SUPABASE_ADMIN_KEY,
|
|
options=ClientOptions(schema='public')
|
|
)
|
|
self.logger.debug('Supabase client initialized.')
|
|
|
|
|
|
def check_balance(self, user_id: str) -> bool:
|
|
"""
|
|
Checks if the user has enough 'credit' for generating a new trip.
|
|
|
|
Args:
|
|
user_id (str): The ID of the current user.
|
|
|
|
Returns:
|
|
bool: True if the balance is positive, False otherwise.
|
|
"""
|
|
try:
|
|
# Query the public.credits table to get the user's credits
|
|
response = (
|
|
self.supabase.table("credits")
|
|
.select('*')
|
|
.eq('id', user_id)
|
|
.single()
|
|
.execute()
|
|
)
|
|
# self.logger.critical(response)
|
|
|
|
except Exception as e:
|
|
if e.code == '22P02' :
|
|
self.logger.error(f"Failed querying credits : {str(e)}")
|
|
raise SyntaxError(f"Failed querying credits : {str(e)}") from e
|
|
if e.code == 'PGRST116' :
|
|
self.logger.error(f"User not found : {str(e)}")
|
|
raise ValueError(f"User not found : {str(e)}") from e
|
|
else :
|
|
self.logger.error(f"An unexpected error occured while checking user balance : {str(e)}")
|
|
raise Exception(f"An unexpected error occured while checking user balance : {str(e)}") from e
|
|
|
|
# Proceed to check the user's credit balance
|
|
credits = response.data['credit_amount']
|
|
self.logger.debug(f'Credits of user {user_id}: {credits}')
|
|
|
|
if credits > 0:
|
|
self.logger.info(f'Credit balance is positive for user {user_id}. Proceeding with trip generation.')
|
|
return True
|
|
|
|
self.logger.warning(f'Insufficient balance for user {user_id}. Trip generation cannot proceed.')
|
|
return False
|
|
|
|
|
|
def decrement_credit_balance(self, user_id: str, amount: int=1) -> bool:
|
|
"""
|
|
Decrements the user's credit balance by 1.
|
|
|
|
Args:
|
|
user_id (str): The ID of the current user.
|
|
"""
|
|
try:
|
|
# Query the public.credits table to get the user's current credits
|
|
response = (
|
|
self.supabase.table("credits")
|
|
.select('*')
|
|
.eq('id', user_id)
|
|
.single()
|
|
.execute()
|
|
)
|
|
except Exception as e:
|
|
if e.code == '22P02' :
|
|
self.logger.error(f"Failed decrementing credits : {str(e)}")
|
|
raise SyntaxError(f"Failed decrementing credits : {str(e)}") from e
|
|
if e.code == 'PGRST116' :
|
|
self.logger.error(f"User not found : {str(e)}")
|
|
raise ValueError(f"User not found : {str(e)}") from e
|
|
else :
|
|
self.logger.error(f"An unexpected error occured while decrementing user balance : {str(e)}")
|
|
raise Exception(f"An unexpected error occured while decrementing user balance : {str(e)}") from e
|
|
|
|
|
|
current_credits = response.data['credit_amount']
|
|
updated_credits = current_credits - amount
|
|
|
|
# Update the user's credits in the table
|
|
update_response = (
|
|
self.supabase.table('credits')
|
|
.update({'credit_amount': updated_credits})
|
|
.eq('id', user_id)
|
|
.execute()
|
|
)
|
|
|
|
# Check if the update was successful
|
|
if update_response.data:
|
|
self.logger.debug(f'Credit balance successfully decremented.')
|
|
return True
|
|
else:
|
|
raise Exception("Error decrementing credit balance.")
|
|
|
|
|
|
def increment_credit_balance(self, user_id: str, amount: int=1) -> bool:
|
|
"""
|
|
Increments the user's credit balance by 1.
|
|
|
|
Args:
|
|
user_id (str): The ID of the current user.
|
|
"""
|
|
try:
|
|
# Query the public.credits table to get the user's current credits
|
|
response = (
|
|
self.supabase.table("credits")
|
|
.select('*')
|
|
.eq('id', user_id)
|
|
.single()
|
|
.execute()
|
|
)
|
|
except Exception as e:
|
|
if e.code == '22P02' :
|
|
self.logger.error(f"Failed incrementing credits : {str(e)}")
|
|
raise SyntaxError(f"Failed incrementing credits : {str(e)}") from e
|
|
if e.code == 'PGRST116' :
|
|
self.logger.error(f"User not found : {str(e)}")
|
|
raise ValueError(f"User not found : {str(e)}") from e
|
|
else :
|
|
self.logger.error(f"An unexpected error occured while incrementing user balance : {str(e)}")
|
|
raise Exception(f"An unexpected error occured while incrementing user balance : {str(e)}") from e
|
|
|
|
|
|
current_credits = response.data['credit_amount']
|
|
updated_credits = current_credits + amount
|
|
|
|
# Update the user's credits in the table
|
|
update_response = (
|
|
self.supabase.table('credits')
|
|
.update({'credit_amount': updated_credits})
|
|
.eq('id', user_id)
|
|
.execute()
|
|
)
|
|
|
|
# Check if the update was successful
|
|
if update_response.data:
|
|
self.logger.debug(f'Credit balance successfully incremented.')
|
|
return True
|
|
else:
|
|
raise Exception("Error incrementing credit balance.")
|