Files
Siro/scratch/payment_simulation.py
2026-06-16 22:44:11 +03:00

290 lines
11 KiB
Python

import hmac
import hashlib
import json
import base64
import uuid
# Simulation configuration / Secrets
SECRET_KEY = "siro_super_secret_jwt_key"
SECRET_KEY_HMAC = "siro_super_secret_hmac_key"
FP_PEPPER = "siro_fp_pepper_salt"
S2S_SHARED_KEY = "s2s_shared_key_12345"
# Mock Database
db = {
"payments": [],
"passengerWallet": {},
"driverWallet": {},
"invoices": {},
"security_logs": []
}
def init_user(user_id, balance, is_driver=False):
if is_driver:
db["driverWallet"][user_id] = balance
else:
db["passengerWallet"][user_id] = balance
# Helper functions for Client
def generate_jwt(user_id, device_fp):
# Mock JWT creation payload
header = base64.b64encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()).decode().replace("=", "")
hashed_fp = hashlib.sha256((device_fp + FP_PEPPER).encode()).hexdigest()
payload = base64.b64encode(json.dumps({
"iss": "Tripz-Wallet",
"user_id": user_id,
"fingerPrint": hashed_fp,
"exp": 1900000000 # future exp
}).encode()).decode().replace("=", "")
signature = hmac.new(SECRET_KEY.encode(), f"{header}.{payload}".encode(), hashlib.sha256).digest()
encoded_signature = base64.b64encode(signature).decode().replace("=", "").replace("+", "-").replace("/", "_")
return f"{header}.{payload}.{encoded_signature}"
def calculate_hmac_header(user_id):
return hmac.new(SECRET_KEY_HMAC.encode(), user_id.encode(), hashlib.sha256).hexdigest()
# Helper functions for Server Authentication
def authenticate_request(headers, payload_user_id):
# 1. JWT verification
auth_header = headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return False, "Authorization token required"
token = auth_header.split(" ")[1]
try:
parts = token.split(".")
if len(parts) != 3:
return False, "Invalid token structure"
# Parse payload
payload_b64 = parts[1]
payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
payload = json.loads(base64.b64decode(payload_b64).decode())
except Exception as e:
return False, f"JWT decode failed: {str(e)}"
# Check Issuer
if payload.get("iss") != "Tripz-Wallet":
return False, "Invalid token issuer"
# Check User ID match
token_user_id = payload.get("user_id")
if token_user_id != payload_user_id:
return False, "User ID mismatch with token"
# 2. Device Fingerprint verification
device_fp_header = headers.get("X-Device-FP")
fp_in_token = payload.get("fingerPrint")
if device_fp_header and fp_in_token:
expected_fp = hashlib.sha256((device_fp_header + FP_PEPPER).encode()).hexdigest()
if not hmac.compare_digest(expected_fp, fp_in_token):
db["security_logs"].append(f"[WARNING] Device mismatch for user {payload_user_id}!")
return False, "Device mismatch (possible Session Hijacking)"
# 3. HMAC Auth verification
hmac_header = headers.get("X-HMAC-Auth")
if hmac_header:
expected_hmac = hmac.new(SECRET_KEY_HMAC.encode(), payload_user_id.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected_hmac, hmac_header):
db["security_logs"].append(f"[WARNING] HMAC mismatch for user {payload_user_id}!")
return False, "Invalid HMAC (possible payload/identity tampering)"
return True, "Authenticated"
# AI verification simulation (Mocking Gemini API)
def mock_gemini_verify_payment(invoice_number, amount, proof_text):
print(f"[Gemini AI Processing] Verifying proof for Invoice {invoice_number} of amount {amount}...")
print(f"[Gemini AI Input Proof] '{proof_text}'")
# Mocking semantic check on proof text
success = False
reason = ""
proof_lower = proof_text.lower()
amount_str = str(int(amount)) if amount == int(amount) else str(amount)
if amount_str in proof_lower and ("successful" in proof_lower or "transferred" in proof_lower or "تم تحويل" in proof_lower or "نجاح" in proof_lower):
success = True
reason = "Proof text verified successfully. Amount matches invoice."
else:
reason = f"Verification failed: Proof text does not indicate successful transfer of {amount}."
print(f"[Gemini AI Output] Verified: {success}, Reason: {reason}")
return {"verified": success, "reason": reason}
# Atomic transaction simulation for ride payment
def process_ride_payment(ride_id, driver_id, passenger_id, amount, payment_method, wallet_checked, s2s_key):
# S2S auth check
if s2s_key != S2S_SHARED_KEY:
return {"status": "failure", "message": "Unauthorized S2S call"}
print(f"\n--- Database Transaction Initiated for Ride {ride_id} ---")
# Save a rollback point
rollback_db = {
"payments": list(db["payments"]),
"passengerWallet": dict(db["passengerWallet"]),
"driverWallet": dict(db["driverWallet"])
}
try:
# 1. Insert payment record
final_method = payment_method + "Ride" if wallet_checked else payment_method
payment_record = {
"id": str(uuid.uuid4().int)[:15],
"amount": amount,
"payment_method": final_method,
"passengerID": passenger_id,
"rideId": ride_id,
"driverID": driver_id
}
db["payments"].append(payment_record)
print(f"[DB] Inserted payment record: {payment_record}")
# 2. Deduct from passenger wallet
if wallet_checked:
if passenger_id not in db["passengerWallet"]:
db["passengerWallet"][passenger_id] = 0.0
db["passengerWallet"][passenger_id] -= amount
print(f"[DB] Deducted {amount} from Passenger {passenger_id}. New balance: {db["passengerWallet"][passenger_id]}")
# Settle debt if balance was negative before (example scenario)
# here we just apply deduction.
# 3. Deduct driver points (8% platform commission)
commission = amount * 0.08
if driver_id not in db["driverWallet"]:
db["driverWallet"][driver_id] = 0.0
db["driverWallet"][driver_id] -= commission
print(f"[DB] Subtracted 8% commission ({commission}) from Driver {driver_id} points. New points balance: {db["driverWallet"][driver_id]}")
print(f"[DB] Transaction Committed successfully.")
return {"status": "success", "message": "Transaction committed"}
except Exception as e:
# Rollback
db["payments"] = rollback_db["payments"]
db["passengerWallet"] = rollback_db["passengerWallet"]
db["driverWallet"] = rollback_db["driverWallet"]
print(f"[DB ERROR] Transaction Failed. Rolled back changes. Error: {str(e)}")
return {"status": "failure", "message": f"Transaction failed: {str(e)}"}
# Run Scenario Simulations
def run_egypt_simulation():
print("\n" + "="*50)
print("SCENARIO 1: EGYPT - CREDIT CARD / PAYMOB (Ahmed)")
print("="*50)
user_id = "egypt_passenger_101"
driver_id = "egypt_driver_501"
device_fp = "ahmed_iphone_13_fp_hash"
init_user(user_id, 500.0) # passenger wallet balance
init_user(driver_id, 100.0, is_driver=True) # driver wallet points
# 1. Client signs request
jwt = generate_jwt(user_id, device_fp)
hmac_header = calculate_hmac_header(user_id)
headers = {
"Authorization": f"Bearer {jwt}",
"X-Device-FP": device_fp,
"X-HMAC-Auth": hmac_header
}
print(f"Passenger {user_id} initiating payment for Egypt ride. Ride cost: 120 EGP.")
# 2. Server authenticates headers
auth_ok, msg = authenticate_request(headers, user_id)
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
if auth_ok:
# S2S transaction
res = process_ride_payment(
ride_id="ride_eg_999",
driver_id=driver_id,
passenger_id=user_id,
amount=120.0,
payment_method="PayMob",
wallet_checked=True,
s2s_key=S2S_SHARED_KEY
)
print(f"[Final Result] {res}")
def run_syria_simulation():
print("\n" + "="*50)
print("SCENARIO 2: SYRIA - MTN CASH WITH AI VERIFICATION (Bassel)")
print("="*50)
user_id = "syria_passenger_202"
driver_id = "syria_driver_602"
device_fp = "bassel_galaxy_s22_fp_hash"
init_user(user_id, 0.0) # passenger starts with 0.0
init_user(driver_id, 50.0, is_driver=True) # driver starts with 50 points
jwt = generate_jwt(user_id, device_fp)
hmac_header = calculate_hmac_header(user_id)
headers = {
"Authorization": f"Bearer {jwt}",
"X-Device-FP": device_fp,
"X-HMAC-Auth": hmac_header
}
invoice_number = "INV-MTN-8877"
amount = 50000.0 # 50,000 SYP for the ride/wallet load
print(f"Passenger {user_id} loads wallet via MTN Cash for invoice {invoice_number} of {amount} SYP.")
print("User transfers money and uploads transaction SMS proof.")
proof_text = "MTN Cash: You have successfully transferred 50000 SYP to SIRO system. Ref: 9812739182."
# 1. Server authenticates headers
auth_ok, msg = authenticate_request(headers, user_id)
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
if auth_ok:
# 2. AI verifies proof text
ai_res = mock_gemini_verify_payment(invoice_number, amount, proof_text)
if ai_res["verified"]:
print(f"[Server] AI verified payment. Settle wallet.")
# Atomic settlement
print(f"\n--- Wallet Settlement Transaction Initiated (Syria) ---")
db["passengerWallet"][user_id] += amount
print(f"[DB] Added {amount} SYP to passenger {user_id} wallet. New balance: {db["passengerWallet"][user_id]} SYP")
print("[DB] Transaction Committed.")
else:
print(f"[Server] Payment rejected: {ai_res['reason']}")
def run_jordan_simulation():
print("\n" + "="*50)
print("SCENARIO 3: JORDAN - CLIQ WITH TAMPERED DEVICE FINGERPRINT (Rania)")
print("="*50)
user_id = "jordan_passenger_303"
device_fp = "rania_oneplus_11_fp_hash"
tampered_fp = "hijacked_device_fp_hash" # Attacker trying to reuse Rania's token from a different device
# 1. Client token generated for Rania on her real device
jwt = generate_jwt(user_id, device_fp)
hmac_header = calculate_hmac_header(user_id)
# Attacker sends request with correct JWT but hijacked/different device fingerprint header
headers = {
"Authorization": f"Bearer {jwt}",
"X-Device-FP": tampered_fp, # Hijacked device header
"X-HMAC-Auth": hmac_header
}
print(f"Attacker attempts to trigger payment under Rania's user_id using hijacked token on different device.")
# 2. Server authenticates headers
auth_ok, msg = authenticate_request(headers, user_id)
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
if not auth_ok:
print("[Server Alert] Attack blocked successfully! Session Hijacking thwarted.")
print(f"Security logs: {db['security_logs']}")
if __name__ == "__main__":
run_egypt_simulation()
run_syria_simulation()
run_jordan_simulation()