strong base for payment handling
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Failing after 50s
Build and deploy the backend to staging / Deploy to staging (pull_request) Has been skipped
Run testing on the backend code / Build (pull_request) Failing after 2m32s
Run linting on the backend code / Build (pull_request) Successful in 2m39s

This commit is contained in:
2025-10-08 17:30:07 +02:00
parent f86174bc11
commit ab03cee3e3
3 changed files with 395 additions and 150 deletions

View File

@@ -1,149 +1,336 @@
from typing import Literal from typing import Literal
from datetime import datetime, timedelta
import logging import logging
import json import json
from pydantic import BaseModel from pydantic import BaseModel, Field, field_validator
from fastapi import HTTPException
import requests import requests
from ..configuration.environment import Environment from ..configuration.environment import Environment
# Model for payment request body # Define the base URL, might move that to toml file
class OrderDetails(): BASE_URL = 'https://api-m.sandbox.paypal.com'
# Intialize the logger
logger = logging.getLogger(__name__)
class BasketItem(BaseModel):
"""
Represents a single item in the user's basket.
Attributes:
id (str): The unique identifier for the item.
quantity (int): The number of units of the item.
"""
id: str
quantity: int
class Item(BaseModel):
"""
Represents an item available in the shop.
Attributes:
id (str): The unique identifier for the item.
name (str): The name of the item.
description (str): The description of the item.
unit_price (str): The unit price of the item.
"""
id: str
name: str
description: str
unit_price: str
def item_from_sql(item_id: str):
"""
Fetches an item from the database by its ID.
Args:
item_id (str): The unique identifier for the item.
Returns:
Item: The item object retrieved from the database.
"""
# TODO: Replace with actual SQL fetch logic
return Item(
id = '12345678',
name = 'test_item',
description = 'lorem ipsum',
unit_price = 420
)
class OrderRequest(BaseModel):
"""
Represents an order request from the frontend.
Attributes:
user_id (str): The ID of the user placing the order.
basket (list[BasketItem]): List of basket items.
currency (str): The currency code for the order.
created_at (datetime): Timestamp when the order was created.
updated_at (datetime): Timestamp when the order was last updated.
items (list[Item]): List of item details loaded from the database.
total_price (float): Total price of the order.
"""
user_id: str user_id: str
number_of_credits: Literal[10, 50, 100] basket: list[BasketItem]
unit_price: float currency: Literal['CHF', 'EUR', 'USD']
amount: int created_at: datetime = Field(default_factory=datetime.now)
currency: Literal['USD', 'EUR', 'CHF'] updated_at: datetime = Field(default_factory=datetime.now)
items: list[Item] = Field(default_factory=list)
total_price: float = 0
@field_validator('basket')
def validate_basket(cls, v):
"""Validates the basket items.
Args:
v (list): List of basket items.
Raises:
ValueError: If basket does not contain valid BasketItem objects.
Returns:
list: The validated basket.
"""
if not v or not all(isinstance(i, BasketItem) for i in v):
raise ValueError('Basket must contain BasketItem objects')
return
def load_items_and_price(self):
"""
Loads item details from database and calculates the total price.
"""
self.items = []
self.total_price = 0
for basket_item in self.basket:
item = item_from_sql(basket_item.id)
self.items.append(item)
self.total_price += item.unit_price * basket_item.quantity
def to_paypal_items(self):
"""
Converts items to the PayPal API item format.
Returns:
list: List of items formatted for PayPal API.
"""
item_list = []
for basket_item, item in zip(self.basket, self.items):
item_list.append({
'id': item.id,
'name': item.name,
'description': item.description,
'quantity': str(basket_item.quantity),
'unit_amount': {
'currency_code': self.currency,
'value': str(item.unit_price)
}
})
return item_list
# Payment handler class for managing PayPal payments # Payment handler class for managing PayPal payments
class PaypalHandler: class PaypalHandler:
"""
Handles PayPal payment operations.
# PayPal secrets Attributes:
username = Environment.paypal_id_sandbox sandbox (bool): Whether to use the sandbox environment.
password = Environment.paypal_key_sandbox id (str): PayPal client ID.
key (str): PayPal client secret.
base_url (str): Base URL for PayPal API.
_token_cache (dict): Cache for the PayPal OAuth access token.
"""
_token_cache = {
"access_token": None,
"expires_at": 0
}
order_request = None
def __init__( def __init__(
self, self,
order_details: OrderDetails,
sandbox_mode: bool = False sandbox_mode: bool = False
): ):
# Initialize the logger """
self.logger = logging.getLogger(__name__) Initializes the handler.
# Payment request parameters Args:
self.order_details = order_details sandbox_mode (bool): Whether to use sandbox credentials.
"""
self.logger = logging.getLogger(__name__)
self.sandbox = sandbox_mode self.sandbox = sandbox_mode
# Only support purchase of credit 'bundles': 10, 50 or 100 credits worth of trip generation # PayPal keys
# def fetch_price(self) -> float: if sandbox_mode :
# ''' self.id = Environment.paypal_id_sandbox
# Fetches the price of credits in the specified currency. self.key = Environment.paypal_key_sandbox
# ''' self.base_url = BASE_URL
# result = self.supabase.table('prices').select('credit_amount').eq('currency', self.details.currency).single().execute()
# if result.data:
# return result.data.get('price')
# else:
# self.logger.error(f'Unsupported currency: {self.details.currency}')
# return None
def validate(self):
if self.sandbox :
validation_url = 'https://api-m.sandbox.paypal.com/v1/oauth2/token'
else : else :
validation_url = 'https://api-m.paypal.com/v1/oauth2/token' self.id = Environment.paypal_id_prod
self.key = Environment.paypal_key_prod
self.base_url = 'https://api-m.paypal.com'
# payload for the validation request
def _get_access_token(self) -> str | None:
"""
Gets (and caches) a PayPal access token.
Returns:
str | None: The access token if successful, None otherwise.
"""
now = datetime.now()
# Check if token is still valid
if (
self._token_cache["access_token"] is not None
and self._token_cache["expires_at"] > now
):
self.logger.info('Returning (cached) access token.')
return self._token_cache["access_token"]
# Request new token
validation_data = {'grant_type': 'client_credentials'} validation_data = {'grant_type': 'client_credentials'}
try: try:
# pass the request # pass the request
validation_response = requests.post( validation_response = requests.post(
url=validation_url, url = f'{self.base_url}/v1/oauth2/token',
data=validation_data, data = validation_data,
auth=(self.username, self.password) auth =(self.id, self.key)
) )
except Exception as exc: except Exception as exc:
self.logger.error(f'Error while requesting access token: {exc}') self.logger.error(f'Error while requesting access token: {exc}')
return None return None
if validation_response.status_code == 201 : data = validation_response.json()
access_token = json.loads(validation_response.text)['access_token'] access_token = data.get("access_token")
self.logger.info('Validation step successful. Returning access token.') expires_in = int(data.get("expires_in", 3600)) # seconds, default 1 hour
return access_token
# Cache the token and its expiry
self.logger.error(f'Error {validation_response.status_code} while requesting access token: {validation_response.text}') self._token_cache["access_token"] = access_token
return None self._token_cache["expires_at"] = now + timedelta(seconds=expires_in - 60) # buffer 1 min
self.logger.info('Returning (new) access token.')
return access_token
def order( def order(self, order_request: OrderRequest):
self, """
access_token: int Creates a new PayPal order.
):
if self.sandbox : Args:
order_url = 'https://api-m.sandbox.paypal.com/v2/checkout/orders' order_request (OrderRequest): The order request.
else :
order_url = 'https://api-m.paypal.com/v2/checkout/orders'
# payload for the order equest Returns:
dict | None: PayPal order response JSON, or None if failed.
"""
order_data = { order_data = {
'intent': 'CAPTURE', 'intent': 'CAPTURE',
'purchase_units': [ 'purchase_units': [
{ {
'items': [ 'items': order_request.to_paypal_items(),
{
'name': f'{self.order_details.number_of_credits} Credits Pack',
'description': f'Credits for {self.order_details.number_of_credits} trip generations on AnyWay.',
'quantity': self.order_details.amount,
'unit_amount': {
'currency_code': self.order_details.currency,
'value': self.order_details.unit_price
}
}
],
'amount': { 'amount': {
'currency_code': self.order_details.currency, 'currency_code': order_request.currency,
'value': self.order_details.amount*self.order_details.unit_price, 'value': str(order_request.total_price),
# 'breakdown': { 'breakdown': {
# 'item_total': { 'item_total': {
# 'currency_code': 'CHF', 'currency_code': order_request.currency,
# 'value': '5.00' 'value': str(order_request.total_price)
# } }
# } }
## what is that for ?
} }
} }
], ],
# TODO: add these to anydev website somehow # TODO: add these to anydev website
'application_context': { 'application_context': {
'return_url': 'https://anydev.info', 'return_url': 'https://anydev.info',
'cancel_url': 'https://anydev.info' 'cancel_url': 'https://anydev.info'
} }
} }
# TODO continue here # Get the access_token:
access_token = self._get_access_token()
try:
order_response = requests.post(
url = f'{self.base_url}/v2/checkout/orders',
headers = {'Authorization': f'Bearer {access_token}'},
json = order_data,
)
# Raise HTTP Exception if request was unsuccessful.
except Exception as exc:
self.logger.error(f'Error creating PayPal order: {exc}')
return None
order_response.raise_for_status()
# TODO Now that we have the order ID, we can inscribe the details in sql database using the order id given by paypal
# DB for storing the transactions:
# order_id (key): json.loads(order_response.text)["id"]
# user_id : order_request.user_id
# created_at : order_request.created_at
# status : order_request.status
# basket (json) : OrderDetails.jsonify()
# total_price : order_request.total_price
# currency : order_request.currency
# updated_at : order_request.created_at
return order_response.json()
pass # Standalone function to capture a payment
def capture(self, order_id: str):
"""
Captures payment for a PayPal order.
def capture(self): Args:
order_id (str): The PayPal order ID.
Returns:
dict | None: PayPal capture response JSON, or None if failed.
pass """
# Get the access_token:
access_token = self._get_access_token()
try:
capture_response = requests.post(
url = f'{BASE_URL}/v2/checkout/orders/{order_id}/capture',
headers = {'Authorization': f'Bearer {access_token}'},
json = {},
)
except Exception as exc:
logger.error(f'Error while requesting access token: {exc}')
return None
capture_response.raise_for_status()
# todo check status code + try except
print(capture_response.text)
# order_id = json.loads(response.text)["id"]
# TODO: update status to PAID in sql database
# where order_id (key) = order_id
# status : 'PAID'
# updated_at : datetime.now()
# Not sure yet if/how to implement that
def cancel(self): def cancel(self):
pass pass

View File

@@ -1,79 +1,136 @@
import logging from typing import Literal
import paypalrestsdk
from fastapi import HTTPException, APIRouter
from ..supabase.supabase import SupabaseClient from fastapi import FastAPI, HTTPException
from .payment_handler import PaymentRequest, PaymentHandler from ..payments import PaypalHandler, OrderRequest
# Set up logging and supabase app = FastAPI()
logger = logging.getLogger(__name__)
supabase = SupabaseClient()
# Configure PayPal SDK # Initialize PayPal handler
paypalrestsdk.configure({ paypal_handler = PaypalHandler(sandbox_mode=True)
"mode": "sandbox", # Use 'live' for production
"client_id": "YOUR_PAYPAL_CLIENT_ID",
"client_secret": "YOUR_PAYPAL_SECRET"
})
@app.post("/orders/new")
# Define the API router def create_order(
router = APIRouter() user_id: str,
basket: list,
@router.post("/purchase/credits") currency: Literal['CHF', 'EUR', 'USD']
def purchase_credits(payment_request: PaymentRequest): ):
""" """
Handles token purchases. Calculates the number of tokens based on the amount paid, Creates a new PayPal order.
updates the user's balance, and processes PayPal payment.
Args:
user_id (str): The ID of the user placing the order.
basket (list): The basket items.
currency (str): The currency code.
Returns:
dict: The PayPal order details.
""" """
payment_handler = PaymentHandler(payment_request)
# Create PayPal payment and get the approval URL # Create order :
approval_url = payment_handler.create_paypal_payment() order = OrderRequest(
user_id = user_id,
basket=basket,
currency=currency
)
return { # Process the order and return the details
"message": "Purchase initiated successfully", return paypal_handler.order(order_request = order)
"payment_id": payment_handler.payment_id,
"credits": payment_request.credit_amount,
"approval_url": approval_url,
}
@router.get("/payment/success")
def payment_success(paymentId: str, PayerID: str): @app.post("/orders/{order_id}/capture")
def capture_order(order_id: str):
""" """
Handles successful PayPal payment. Captures payment for an existing PayPal order.
Args:
order_id (str): The PayPal order ID.
Returns:
dict: The PayPal capture response.
""" """
payment = paypalrestsdk.Payment.find(paymentId) result = paypal_handler.capture(order_id)
return result
if payment.execute({"payer_id": PayerID}):
logger.info("Payment executed successfully")
# Retrieve transaction details from the database
result = supabase.table("pending_payments").select("*").eq("payment_id", paymentId).single().execute()
if not result.data:
raise HTTPException(status_code=404, detail="Transaction not found")
# Extract the necessary information
user_id = result.data["user_id"]
credit_amount = result.data["credit_amount"]
# Update the user's balance
supabase.increment_credit_balance(user_id, amount=credit_amount)
# Optionally, delete the pending payment entry since the transaction is completed
supabase.table("pending_payments").delete().eq("payment_id", paymentId).execute()
return {"message": "Payment completed successfully"}
else:
logger.error(f"Payment execution failed: {payment.error}")
raise HTTPException(status_code=500, detail="Payment execution failed")
@router.get("/payment/cancel")
def payment_cancel():
""" # import logging
Handles PayPal payment cancellation. # import paypalrestsdk
""" # from fastapi import HTTPException, APIRouter
return {"message": "Payment was cancelled"}
# from ..supabase.supabase import SupabaseClient
# from .payment_handler import PaymentRequest, PaymentHandler
# # Set up logging and supabase
# logger = logging.getLogger(__name__)
# supabase = SupabaseClient()
# # Configure PayPal SDK
# paypalrestsdk.configure({
# "mode": "sandbox", # Use 'live' for production
# "client_id": "YOUR_PAYPAL_CLIENT_ID",
# "client_secret": "YOUR_PAYPAL_SECRET"
# })
# # Define the API router
# router = APIRouter()
# @router.post("/purchase/credits")
# def purchase_credits(payment_request: PaymentRequest):
# """
# Handles token purchases. Calculates the number of tokens based on the amount paid,
# updates the user's balance, and processes PayPal payment.
# """
# payment_handler = PaymentHandler(payment_request)
# # Create PayPal payment and get the approval URL
# approval_url = payment_handler.create_paypal_payment()
# return {
# "message": "Purchase initiated successfully",
# "payment_id": payment_handler.payment_id,
# "credits": payment_request.credit_amount,
# "approval_url": approval_url,
# }
# @router.get("/payment/success")
# def payment_success(paymentId: str, PayerID: str):
# """
# Handles successful PayPal payment.
# """
# payment = paypalrestsdk.Payment.find(paymentId)
# if payment.execute({"payer_id": PayerID}):
# logger.info("Payment executed successfully")
# # Retrieve transaction details from the database
# result = supabase.table("pending_payments").select("*").eq("payment_id", paymentId).single().execute()
# if not result.data:
# raise HTTPException(status_code=404, detail="Transaction not found")
# # Extract the necessary information
# user_id = result.data["user_id"]
# credit_amount = result.data["credit_amount"]
# # Update the user's balance
# supabase.increment_credit_balance(user_id, amount=credit_amount)
# # Optionally, delete the pending payment entry since the transaction is completed
# supabase.table("pending_payments").delete().eq("payment_id", paymentId).execute()
# return {"message": "Payment completed successfully"}
# else:
# logger.error(f"Payment execution failed: {payment.error}")
# raise HTTPException(status_code=500, detail="Payment execution failed")
# @router.get("/payment/cancel")
# def payment_cancel():
# """
# Handles PayPal payment cancellation.
# """
# return {"message": "Payment was cancelled"}

View File

@@ -74,6 +74,7 @@ order_data = {
order_response = requests.post( order_response = requests.post(
url=order_url, url=order_url,
headers={"Authorization": f"Bearer {access_token}"}, ## need access token here?
json=order_data, json=order_data,
auth=(username, password) auth=(username, password)
) )