The Breach Architecture Built Into Most B2B SaaS Applications
There’s a version of a breach that almost never makes headlines. No ransomware note. No brute force. No zero-day exploit. Just a single API request, slightly modified.
An attacker signs up for your SaaS product. Within a couple of hours, they notice something subtle: an endpoint accepts a tenant_id. Out of curiosity, they change it. And suddenly, they’re not looking at their own data anymore.
They’re looking at everyone’s. This is how most multi-tenant SaaS breaches actually happen. Not through sophisticated hacking, but through broken assumptions about tenant isolation.
And the worst part?
Most security testing approaches don’t catch this. Because they’re not designed to understand how multi-tenancy fails.
This guide covers every dimension of multi-tenant SaaS security testing:
the architecture patterns that create risk
the specific attacks against each pattern
the code-level failures that enable cross-tenant access, and
the complete testing methodology for verifying that your tenant boundaries actually hold
Why This Matters More Than You Think
Multi-tenant SaaS is built on a single promise:
Shared infrastructure. Completely isolated data.
Break that, and you don’t just have a bug.
You have:
A cross-customer data breach
A compliance violation (SOC 2, ISO 27001, HIPAA)
A blast radius across your entire customer base
In shared database architectures, one mistake doesn’t expose one tenant. It exposes all tenants simultaneously.
The 3 Rules of Tenant Isolation (Mental Model)

If you remember nothing else from this guide, remember this:
Never trust tenant identity from the request
Always derive tenant from authenticated context
Enforce isolation at the database layer, not just application logic
Every real-world SaaS breach violates at least one of these.
The Breach Architecture Built Into Most B2B SaaS Applications
Here is a scenario that plays out more often than any breach disclosure report captures. A B2B SaaS company has 500 customers. Each customer is a business, with their own employees, their own data, their own proprietary information. Customer A's data should be completely invisible to Customer B. That's the fundamental promise of multi-tenant software: shared infrastructure, isolated data.
An attacker signs up for a free account. They poke around the API for two hours. They find that one endpoint accepts a tenant_id parameter, and that the server uses exactly whatever tenant_id is supplied to filter records, without checking whether the authenticated user belongs to that tenant.
Now they have access to every customer's data. Not one at a time. All 500, simultaneously, with a script that loops through tenant identifiers.
No brute force. No credential stuffing. No zero-day. One misconfigured API parameter.
This is the defining vulnerability class of B2B SaaS, and it's the one that security testing most consistently misses.
👉 This is a classic case of cross-tenant IDOR (Insecure Direct Object Reference), if you're not familiar with how these vulnerabilities work at a deeper level, read the full breakdown here: IDOR Vulnerabilities: The Complete Testing Guide
👉 And if you're wondering how modern systems actually detect these logic-level issues in code, this breakdown helps: How AI Reads Source Code During a Penetration Test
The only testing methodology that reliably catches this is authenticated, gray box penetration testing specifically designed around the multi-tenant threat model.
Multi-Tenancy Architecture Patterns and Their Security Profiles
Before testing, you need to understand the architecture. Multi-tenancy is implemented in fundamentally different ways, and the vulnerabilities, and testing approach, differ significantly by pattern.
Pattern 1: Shared Database, Shared Schema
The most common pattern for SaaS applications. All tenants' data lives in the same tables, separated only by a tenant_id column.
-- Single database, single schema, all tenants in same tables CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), email VARCHAR(255) NOT NULL, name VARCHAR(255), role VARCHAR(50), created_at TIMESTAMP DEFAULT NOW(), UNIQUE (tenant_id, email) -- Email unique per tenant, not globally ); CREATE TABLE records ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), user_id BIGINT NOT NULL REFERENCES users(id), data JSONB, created_at TIMESTAMP DEFAULT NOW() ); -- Row Level Security (if implemented correctly): ALTER TABLE users ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON users USING (tenant_id = current_setting('app.current_tenant_id')::UUID)
-- Single database, single schema, all tenants in same tables CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), email VARCHAR(255) NOT NULL, name VARCHAR(255), role VARCHAR(50), created_at TIMESTAMP DEFAULT NOW(), UNIQUE (tenant_id, email) -- Email unique per tenant, not globally ); CREATE TABLE records ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), user_id BIGINT NOT NULL REFERENCES users(id), data JSONB, created_at TIMESTAMP DEFAULT NOW() ); -- Row Level Security (if implemented correctly): ALTER TABLE users ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON users USING (tenant_id = current_setting('app.current_tenant_id')::UUID)
-- Single database, single schema, all tenants in same tables CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), email VARCHAR(255) NOT NULL, name VARCHAR(255), role VARCHAR(50), created_at TIMESTAMP DEFAULT NOW(), UNIQUE (tenant_id, email) -- Email unique per tenant, not globally ); CREATE TABLE records ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL REFERENCES tenants(id), user_id BIGINT NOT NULL REFERENCES users(id), data JSONB, created_at TIMESTAMP DEFAULT NOW() ); -- Row Level Security (if implemented correctly): ALTER TABLE users ENABLE ROW LEVEL SECURITY; CREATE POLICY tenant_isolation ON users USING (tenant_id = current_setting('app.current_tenant_id')::UUID)
Security profile: High blast radius when isolation fails. One misconfigured query exposes all tenants' data simultaneously. Correct implementation requires consistent tenant_id filtering on every query, a discipline that fails as the application grows.
Primary attack surface: Every API endpoint that queries these tables. If even one endpoint omits the tenant_id filter, that endpoint exposes data across all tenants.
Pattern 2: Shared Database, Separate Schemas
One database, separate PostgreSQL schemas per tenant. Each tenant gets their own users table, their own records table, same structure, different schema.
-- Each tenant gets their own schema CREATE SCHEMA tenant_a; CREATE SCHEMA tenant_b; -- Same table structure in each schema CREATE TABLE tenant_a.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); CREATE TABLE tenant_b.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); -- Application sets search path based on authenticated tenant: -- SET search_path TO tenant_a; -- SELECT * FROM users; → queries tenant_a.users
-- Each tenant gets their own schema CREATE SCHEMA tenant_a; CREATE SCHEMA tenant_b; -- Same table structure in each schema CREATE TABLE tenant_a.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); CREATE TABLE tenant_b.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); -- Application sets search path based on authenticated tenant: -- SET search_path TO tenant_a; -- SELECT * FROM users; → queries tenant_a.users
-- Each tenant gets their own schema CREATE SCHEMA tenant_a; CREATE SCHEMA tenant_b; -- Same table structure in each schema CREATE TABLE tenant_a.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); CREATE TABLE tenant_b.users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255) ); -- Application sets search path based on authenticated tenant: -- SET search_path TO tenant_a; -- SELECT * FROM users; → queries tenant_a.users
Security profile: Better isolation than shared schema, a query with no tenant filter only accesses the current tenant's schema. The risk shifts to schema selection: if the application sets search_path based on user input (rather than the authenticated session), an attacker can switch schemas.
Primary attack surface: How the schema is selected. If schema selection uses any user-supplied value, cross-tenant access is possible.
Pattern 3: Separate Databases Per Tenant
The strongest isolation model. Each tenant gets their own database instance. Cross-tenant access requires a completely different database connection.
# Connection routing based on tenant: class TenantDatabaseRouter: def get_connection(self, tenant_id: str) -> Connection: # Each tenant has their own connection string in config connection_string = self.tenant_config[tenant_id]['database_url'] return create_engine(connection_string) # The risk here is in the routing layer: # If tenant_id in the router comes from user input → cross-tenant DB access # If connection strings leak → direct database access # If the routing layer has IDOR → attacker routes to any tenant's DB
# Connection routing based on tenant: class TenantDatabaseRouter: def get_connection(self, tenant_id: str) -> Connection: # Each tenant has their own connection string in config connection_string = self.tenant_config[tenant_id]['database_url'] return create_engine(connection_string) # The risk here is in the routing layer: # If tenant_id in the router comes from user input → cross-tenant DB access # If connection strings leak → direct database access # If the routing layer has IDOR → attacker routes to any tenant's DB
# Connection routing based on tenant: class TenantDatabaseRouter: def get_connection(self, tenant_id: str) -> Connection: # Each tenant has their own connection string in config connection_string = self.tenant_config[tenant_id]['database_url'] return create_engine(connection_string) # The risk here is in the routing layer: # If tenant_id in the router comes from user input → cross-tenant DB access # If connection strings leak → direct database access # If the routing layer has IDOR → attacker routes to any tenant's DB
Security profile: Strongest isolation at the database level. The attack surface shifts entirely to the routing layer and any shared infrastructure between tenant databases.

The Tenant Identifier Problem: What Makes Every Pattern Vulnerable
Regardless of which architecture pattern is used, every multi-tenant application must answer one question at every API request:
Which tenant does this request belong to?
And more importantly:
Who controls that answer?
If the user controls it, you already have a vulnerability.
Where Tenant Identity Should Come From
# SECURE: Derive tenant identity from the authenticated user's session # The user cannot control their own tenant assignment class BaseMultiTenantView(APIView): permission_classes = [IsAuthenticated] def get_tenant_id(self) -> str: """ Always derive tenant_id from the authenticated user's profile. Never from the request parameters. """ return self.request.user.organization_id def get_tenant_queryset(self, model_class): """ Return a queryset scoped to the authenticated tenant. Cannot be influenced by request parameters. """ return model_class.objects.filter( tenant_id=self.get_tenant_id() )
# SECURE: Derive tenant identity from the authenticated user's session # The user cannot control their own tenant assignment class BaseMultiTenantView(APIView): permission_classes = [IsAuthenticated] def get_tenant_id(self) -> str: """ Always derive tenant_id from the authenticated user's profile. Never from the request parameters. """ return self.request.user.organization_id def get_tenant_queryset(self, model_class): """ Return a queryset scoped to the authenticated tenant. Cannot be influenced by request parameters. """ return model_class.objects.filter( tenant_id=self.get_tenant_id() )
# SECURE: Derive tenant identity from the authenticated user's session # The user cannot control their own tenant assignment class BaseMultiTenantView(APIView): permission_classes = [IsAuthenticated] def get_tenant_id(self) -> str: """ Always derive tenant_id from the authenticated user's profile. Never from the request parameters. """ return self.request.user.organization_id def get_tenant_queryset(self, model_class): """ Return a queryset scoped to the authenticated tenant. Cannot be influenced by request parameters. """ return model_class.objects.filter( tenant_id=self.get_tenant_id() )
Where Tenant Identity Actually Comes From in Vulnerable Applications
# VULNERABLE Pattern 1: Tenant ID from query parameter class RecordsView(APIView): def get(self, request): tenant_id = request.query_params.get('tenant_id') # ← User controlled records = Record.objects.filter(tenant_id=tenant_id) return Response(RecordSerializer(records, many=True).data) # VULNERABLE Pattern 2: Tenant ID from request body class DashboardView(APIView): def post(self, request): tenant_id = request.data.get('tenant_id') # ← User controlled data = Dashboard.objects.filter(tenant_id=tenant_id) return Response(data) # VULNERABLE Pattern 3: Tenant ID from custom header class APIView(View): def dispatch(self, request, *args, **kwargs): tenant_id = request.headers.get('X-Tenant-ID') # ← User controlled request.tenant_id = tenant_id return super().dispatch(request, *args, **kwargs) # VULNERABLE Pattern 4: Tenant subdomain not verified server-side class SubdomainTenantView(APIView): def get_tenant_id(self): # Extracts tenant from subdomain: customer-a.app.company.com host = self.request.get_host() subdomain = host.split('.')[0] # If subdomain → tenant lookup is the ONLY isolation mechanism # and if the API also accepts tenant_id parameter → bypass possible return Tenant.objects.get(subdomain=subdomain).id # VULNERABLE Pattern 5: Tenant ID embedded in JWT claims that aren't verified def process_request(request): token = request.headers.get('Authorization').split(' ')[1] # Decoded WITHOUT verification of tenant claim against DB payload = jwt.decode(token, SECRET, algorithms=['HS256']) # The tenant_id from the JWT might be real — but can be forged # if JWT is HS256 with a weak secret (see JWT Security Testing guide) tenant_id = payload.get('tenant_id') # Potentially forged records = Record.objects.filter(tenant_id=tenant_id)
# VULNERABLE Pattern 1: Tenant ID from query parameter class RecordsView(APIView): def get(self, request): tenant_id = request.query_params.get('tenant_id') # ← User controlled records = Record.objects.filter(tenant_id=tenant_id) return Response(RecordSerializer(records, many=True).data) # VULNERABLE Pattern 2: Tenant ID from request body class DashboardView(APIView): def post(self, request): tenant_id = request.data.get('tenant_id') # ← User controlled data = Dashboard.objects.filter(tenant_id=tenant_id) return Response(data) # VULNERABLE Pattern 3: Tenant ID from custom header class APIView(View): def dispatch(self, request, *args, **kwargs): tenant_id = request.headers.get('X-Tenant-ID') # ← User controlled request.tenant_id = tenant_id return super().dispatch(request, *args, **kwargs) # VULNERABLE Pattern 4: Tenant subdomain not verified server-side class SubdomainTenantView(APIView): def get_tenant_id(self): # Extracts tenant from subdomain: customer-a.app.company.com host = self.request.get_host() subdomain = host.split('.')[0] # If subdomain → tenant lookup is the ONLY isolation mechanism # and if the API also accepts tenant_id parameter → bypass possible return Tenant.objects.get(subdomain=subdomain).id # VULNERABLE Pattern 5: Tenant ID embedded in JWT claims that aren't verified def process_request(request): token = request.headers.get('Authorization').split(' ')[1] # Decoded WITHOUT verification of tenant claim against DB payload = jwt.decode(token, SECRET, algorithms=['HS256']) # The tenant_id from the JWT might be real — but can be forged # if JWT is HS256 with a weak secret (see JWT Security Testing guide) tenant_id = payload.get('tenant_id') # Potentially forged records = Record.objects.filter(tenant_id=tenant_id)
# VULNERABLE Pattern 1: Tenant ID from query parameter class RecordsView(APIView): def get(self, request): tenant_id = request.query_params.get('tenant_id') # ← User controlled records = Record.objects.filter(tenant_id=tenant_id) return Response(RecordSerializer(records, many=True).data) # VULNERABLE Pattern 2: Tenant ID from request body class DashboardView(APIView): def post(self, request): tenant_id = request.data.get('tenant_id') # ← User controlled data = Dashboard.objects.filter(tenant_id=tenant_id) return Response(data) # VULNERABLE Pattern 3: Tenant ID from custom header class APIView(View): def dispatch(self, request, *args, **kwargs): tenant_id = request.headers.get('X-Tenant-ID') # ← User controlled request.tenant_id = tenant_id return super().dispatch(request, *args, **kwargs) # VULNERABLE Pattern 4: Tenant subdomain not verified server-side class SubdomainTenantView(APIView): def get_tenant_id(self): # Extracts tenant from subdomain: customer-a.app.company.com host = self.request.get_host() subdomain = host.split('.')[0] # If subdomain → tenant lookup is the ONLY isolation mechanism # and if the API also accepts tenant_id parameter → bypass possible return Tenant.objects.get(subdomain=subdomain).id # VULNERABLE Pattern 5: Tenant ID embedded in JWT claims that aren't verified def process_request(request): token = request.headers.get('Authorization').split(' ')[1] # Decoded WITHOUT verification of tenant claim against DB payload = jwt.decode(token, SECRET, algorithms=['HS256']) # The tenant_id from the JWT might be real — but can be forged # if JWT is HS256 with a weak secret (see JWT Security Testing guide) tenant_id = payload.get('tenant_id') # Potentially forged records = Record.objects.filter(tenant_id=tenant_id)
The audit question for every endpoint: "If I change the tenant identifier in this request, will the application use my value or derive the correct value from my authenticated session?" Any endpoint that answers "use your value" is a cross-tenant IDOR.
The Complete Cross-Tenant Attack Taxonomy
Attack Type 1: Direct Cross-Tenant IDOR via Explicit Parameter
The simplest and most common attack. The API accepts a tenant identifier as a parameter and uses it directly.
# Authenticated as Tenant A user # Own records: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000001 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] → 200 OK — Tenant A's records (847 records) # Cross-tenant IDOR — substitute Tenant B's identifier: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000002 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] ← Still Tenant A's token → 200 OK — Tenant B's records (2,341 records) ← Cross-tenant confirmed # Mass enumeration — all tenants: import requests import uuid AUTH_HEADERS = {"Authorization": "Bearer [tenant_A_token]
# Authenticated as Tenant A user # Own records: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000001 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] → 200 OK — Tenant A's records (847 records) # Cross-tenant IDOR — substitute Tenant B's identifier: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000002 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] ← Still Tenant A's token → 200 OK — Tenant B's records (2,341 records) ← Cross-tenant confirmed # Mass enumeration — all tenants: import requests import uuid AUTH_HEADERS = {"Authorization": "Bearer [tenant_A_token]
# Authenticated as Tenant A user # Own records: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000001 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] → 200 OK — Tenant A's records (847 records) # Cross-tenant IDOR — substitute Tenant B's identifier: GET /api/v1/records?tenant_id=ten_aaaaaaaa-bbbb-cccc-dddd-000000000002 HTTP/1.1 Authorization: Bearer [tenant_A_user_token] ← Still Tenant A's token → 200 OK — Tenant B's records (2,341 records) ← Cross-tenant confirmed # Mass enumeration — all tenants: import requests import uuid AUTH_HEADERS = {"Authorization": "Bearer [tenant_A_token]
Attack Type 2: Cross-Tenant Access via Shared Resource Reference
Resources shared between tenants (documents, templates, integrations) create access paths that bypass tenant boundaries when the sharing model isn't correctly scoped.
# Tenant A shares a document with an external user POST /api/v1/documents/doc_abc123/share HTTP/1.1 Authorization: Bearer [tenant_A_token] Content-Type: application/json {"share_with": "external_user@tenant_b.com"} # The document gets a share token: # Response: {"share_token": "shr_xyz789", "document_id": "doc_abc123"} # Tenant B user uses the share token to discover Tenant A's document ID # Then tests whether they can access the document directly (without token): GET /api/v1/documents/doc_abc123 HTTP/1.1 Authorization: Bearer [tenant_B_token]
# Tenant A shares a document with an external user POST /api/v1/documents/doc_abc123/share HTTP/1.1 Authorization: Bearer [tenant_A_token] Content-Type: application/json {"share_with": "external_user@tenant_b.com"} # The document gets a share token: # Response: {"share_token": "shr_xyz789", "document_id": "doc_abc123"} # Tenant B user uses the share token to discover Tenant A's document ID # Then tests whether they can access the document directly (without token): GET /api/v1/documents/doc_abc123 HTTP/1.1 Authorization: Bearer [tenant_B_token]
# Tenant A shares a document with an external user POST /api/v1/documents/doc_abc123/share HTTP/1.1 Authorization: Bearer [tenant_A_token] Content-Type: application/json {"share_with": "external_user@tenant_b.com"} # The document gets a share token: # Response: {"share_token": "shr_xyz789", "document_id": "doc_abc123"} # Tenant B user uses the share token to discover Tenant A's document ID # Then tests whether they can access the document directly (without token): GET /api/v1/documents/doc_abc123 HTTP/1.1 Authorization: Bearer [tenant_B_token]
Attack Type 3: Tenant Pivot via User Account Overlap
When users can belong to multiple tenants, the context-switching mechanism becomes an attack surface.
# Test: Does switching tenant context actually isolate data? # Step 1: Create account that belongs to multiple tenants # (In practice: user invited to two different organizations) # Step 2: Authenticate and get token for Tenant A context tenant_a_token = login_and_get_token( email="multi_tenant_user@test.com", tenant_context="tenant_a" ) # Step 3: Switch context to Tenant B tenant_b_token = switch_tenant_context( current_token=tenant_a_token, target_tenant="tenant_b" ) # Step 4: Use Tenant B token to access Tenant A resources # If tenant context switch doesn't invalidate the previous token's permissions: response = requests.get( "<https://api.company.com/api/v1/records>", params={"tenant_id": "tenant_a"}, # Tenant A's records headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # Tests to run: # - Does old tenant context persist after switching? # - Can you access data from previous tenant after switching? # - Does the API accept explicit tenant_id override when in different context?
# Test: Does switching tenant context actually isolate data? # Step 1: Create account that belongs to multiple tenants # (In practice: user invited to two different organizations) # Step 2: Authenticate and get token for Tenant A context tenant_a_token = login_and_get_token( email="multi_tenant_user@test.com", tenant_context="tenant_a" ) # Step 3: Switch context to Tenant B tenant_b_token = switch_tenant_context( current_token=tenant_a_token, target_tenant="tenant_b" ) # Step 4: Use Tenant B token to access Tenant A resources # If tenant context switch doesn't invalidate the previous token's permissions: response = requests.get( "<https://api.company.com/api/v1/records>", params={"tenant_id": "tenant_a"}, # Tenant A's records headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # Tests to run: # - Does old tenant context persist after switching? # - Can you access data from previous tenant after switching? # - Does the API accept explicit tenant_id override when in different context?
# Test: Does switching tenant context actually isolate data? # Step 1: Create account that belongs to multiple tenants # (In practice: user invited to two different organizations) # Step 2: Authenticate and get token for Tenant A context tenant_a_token = login_and_get_token( email="multi_tenant_user@test.com", tenant_context="tenant_a" ) # Step 3: Switch context to Tenant B tenant_b_token = switch_tenant_context( current_token=tenant_a_token, target_tenant="tenant_b" ) # Step 4: Use Tenant B token to access Tenant A resources # If tenant context switch doesn't invalidate the previous token's permissions: response = requests.get( "<https://api.company.com/api/v1/records>", params={"tenant_id": "tenant_a"}, # Tenant A's records headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # Tests to run: # - Does old tenant context persist after switching? # - Can you access data from previous tenant after switching? # - Does the API accept explicit tenant_id override when in different context?
Attack Type 4: Cross-Tenant Access via Subdomain Confusion
Applications using subdomain-based tenant identification face a specific attack: API requests that bypass subdomain routing.
# Application architecture: # customer-a.app.company.com → serves Tenant A's data # customer-b.app.company.com → serves Tenant B's data # The subdomain determines which tenant's data the API queries # Attack: Make API requests directly to the API endpoint # bypassing subdomain-based tenant selection # Normal request through subdomain routing: requests.get( "<https://customer-a.app.company.com/api/v1/data>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) # → Tenant A's data (because subdomain routes to Tenant A) # Bypass: Access the underlying API directly with Tenant B token # but specify Tenant A in a parameter: requests.get( "<https://api.company.com/api/v1/data>", # Direct API, no subdomain params={"tenant_id": "tenant_a_uuid"}, # Specify Tenant A headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # If this returns Tenant A's data → subdomain isolation bypassed # The API accepted the explicit tenant_id parameter # Also test: Host header injection requests.get( "<https://api.company.com/api/v1/data>", headers={ "Authorization": f"Bearer {tenant_b_token}", "X-Forwarded-Host": "customer-a.app.company.com", # Inject Tenant A's host "Host": "customer-a.app.company.com" } )
# Application architecture: # customer-a.app.company.com → serves Tenant A's data # customer-b.app.company.com → serves Tenant B's data # The subdomain determines which tenant's data the API queries # Attack: Make API requests directly to the API endpoint # bypassing subdomain-based tenant selection # Normal request through subdomain routing: requests.get( "<https://customer-a.app.company.com/api/v1/data>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) # → Tenant A's data (because subdomain routes to Tenant A) # Bypass: Access the underlying API directly with Tenant B token # but specify Tenant A in a parameter: requests.get( "<https://api.company.com/api/v1/data>", # Direct API, no subdomain params={"tenant_id": "tenant_a_uuid"}, # Specify Tenant A headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # If this returns Tenant A's data → subdomain isolation bypassed # The API accepted the explicit tenant_id parameter # Also test: Host header injection requests.get( "<https://api.company.com/api/v1/data>", headers={ "Authorization": f"Bearer {tenant_b_token}", "X-Forwarded-Host": "customer-a.app.company.com", # Inject Tenant A's host "Host": "customer-a.app.company.com" } )
# Application architecture: # customer-a.app.company.com → serves Tenant A's data # customer-b.app.company.com → serves Tenant B's data # The subdomain determines which tenant's data the API queries # Attack: Make API requests directly to the API endpoint # bypassing subdomain-based tenant selection # Normal request through subdomain routing: requests.get( "<https://customer-a.app.company.com/api/v1/data>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) # → Tenant A's data (because subdomain routes to Tenant A) # Bypass: Access the underlying API directly with Tenant B token # but specify Tenant A in a parameter: requests.get( "<https://api.company.com/api/v1/data>", # Direct API, no subdomain params={"tenant_id": "tenant_a_uuid"}, # Specify Tenant A headers={"Authorization": f"Bearer {tenant_b_token}"} # Tenant B's token ) # If this returns Tenant A's data → subdomain isolation bypassed # The API accepted the explicit tenant_id parameter # Also test: Host header injection requests.get( "<https://api.company.com/api/v1/data>", headers={ "Authorization": f"Bearer {tenant_b_token}", "X-Forwarded-Host": "customer-a.app.company.com", # Inject Tenant A's host "Host": "customer-a.app.company.com" } )
Attack Type 5: Indirect Cross-Tenant Access via Relationship Objects
Complex data models have relationships between objects. Cross-tenant access sometimes requires following these relationships rather than directly accessing tenant data.
# Tenant A has an invoice (inv_10042) that references Tenant B's vendor # The vendor record belongs to Tenant B # Direct access to Tenant B's vendor record fails: GET /api/v1/vendors/vnd_99999 HTTP/1.1 Authorization: Bearer [tenant_A_token] → 403 Forbidden — vendor belongs to different tenant # But accessing Tenant A's invoice that REFERENCES Tenant B's vendor: GET /api/v1/invoices/inv_10042/vendor-details HTTP/1.1 Authorization: Bearer [tenant_A_token]
# Tenant A has an invoice (inv_10042) that references Tenant B's vendor # The vendor record belongs to Tenant B # Direct access to Tenant B's vendor record fails: GET /api/v1/vendors/vnd_99999 HTTP/1.1 Authorization: Bearer [tenant_A_token] → 403 Forbidden — vendor belongs to different tenant # But accessing Tenant A's invoice that REFERENCES Tenant B's vendor: GET /api/v1/invoices/inv_10042/vendor-details HTTP/1.1 Authorization: Bearer [tenant_A_token]
# Tenant A has an invoice (inv_10042) that references Tenant B's vendor # The vendor record belongs to Tenant B # Direct access to Tenant B's vendor record fails: GET /api/v1/vendors/vnd_99999 HTTP/1.1 Authorization: Bearer [tenant_A_token] → 403 Forbidden — vendor belongs to different tenant # But accessing Tenant A's invoice that REFERENCES Tenant B's vendor: GET /api/v1/invoices/inv_10042/vendor-details HTTP/1.1 Authorization: Bearer [tenant_A_token]
Database-Level Isolation Testing
Database isolation must be verified at the query level, not just the API level. An API that returns 403 for cross-tenant requests may still be executing the database query and then checking the result, which means the data was accessed even if not returned.
Verifying Row-Level Security in PostgreSQL
-- PostgreSQL Row Level Security (RLS) configuration: -- Step 1: Enable RLS on tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; -- Step 2: Create isolation policy CREATE POLICY tenant_isolation_policy ON records USING (tenant_id = current_setting('app.current_tenant_id', TRUE)::UUID); -- Step 3: Force RLS even for table owners ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Testing RLS enforcement: -- Set tenant context for Tenant A SET app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- Should return ONLY Tenant A's records SELECT COUNT(*) FROM records; -- If returns all records across all tenants → RLS not properly configured -- Test: Can RLS be bypassed via function call? CREATE OR REPLACE FUNCTION get_all_records() RETURNS SETOF records AS $$ SELECT * FROM records; -- Executes as SECURITY DEFINER $$ LANGUAGE SQL SECURITY DEFINER; -- If SECURITY DEFINER bypasses RLS: -- Any user calling get_all_records() gets all tenants' data -- Fix: Use SECURITY INVOKER or explicitly SET ROLE in functions
-- PostgreSQL Row Level Security (RLS) configuration: -- Step 1: Enable RLS on tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; -- Step 2: Create isolation policy CREATE POLICY tenant_isolation_policy ON records USING (tenant_id = current_setting('app.current_tenant_id', TRUE)::UUID); -- Step 3: Force RLS even for table owners ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Testing RLS enforcement: -- Set tenant context for Tenant A SET app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- Should return ONLY Tenant A's records SELECT COUNT(*) FROM records; -- If returns all records across all tenants → RLS not properly configured -- Test: Can RLS be bypassed via function call? CREATE OR REPLACE FUNCTION get_all_records() RETURNS SETOF records AS $$ SELECT * FROM records; -- Executes as SECURITY DEFINER $$ LANGUAGE SQL SECURITY DEFINER; -- If SECURITY DEFINER bypasses RLS: -- Any user calling get_all_records() gets all tenants' data -- Fix: Use SECURITY INVOKER or explicitly SET ROLE in functions
-- PostgreSQL Row Level Security (RLS) configuration: -- Step 1: Enable RLS on tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; -- Step 2: Create isolation policy CREATE POLICY tenant_isolation_policy ON records USING (tenant_id = current_setting('app.current_tenant_id', TRUE)::UUID); -- Step 3: Force RLS even for table owners ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Testing RLS enforcement: -- Set tenant context for Tenant A SET app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- Should return ONLY Tenant A's records SELECT COUNT(*) FROM records; -- If returns all records across all tenants → RLS not properly configured -- Test: Can RLS be bypassed via function call? CREATE OR REPLACE FUNCTION get_all_records() RETURNS SETOF records AS $$ SELECT * FROM records; -- Executes as SECURITY DEFINER $$ LANGUAGE SQL SECURITY DEFINER; -- If SECURITY DEFINER bypasses RLS: -- Any user calling get_all_records() gets all tenants' data -- Fix: Use SECURITY INVOKER or explicitly SET ROLE in functions
# Application-level database isolation testing: def test_database_isolation(db_connection, tenant_a_id, tenant_b_id): """ Verify that setting tenant context actually isolates data. Tests the database isolation mechanism directly. """ # Count Tenant A's records when context is set to Tenant A db_connection.execute( "SET app.current_tenant_id = %s", [tenant_a_id] ) tenant_a_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count records when context is set to Tenant B db_connection.execute( "SET app.current_tenant_id = %s", [tenant_b_id] ) tenant_b_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count WITHOUT any tenant context (should return 0 if RLS is enforced) db_connection.execute("RESET app.current_tenant_id") no_context_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] results = { 'tenant_a_isolated_count': tenant_a_count, 'tenant_b_isolated_count': tenant_b_count, 'no_context_count': no_context_count, } if no_context_count > 0: results['finding'] = f'CRITICAL: {no_context_count} records accessible without tenant context' results['severity'] = 'CRITICAL' if no_context_count == tenant_a_count + tenant_b_count: results['finding'] = 'CRITICAL: RLS not enforced — all records accessible without context' return results
# Application-level database isolation testing: def test_database_isolation(db_connection, tenant_a_id, tenant_b_id): """ Verify that setting tenant context actually isolates data. Tests the database isolation mechanism directly. """ # Count Tenant A's records when context is set to Tenant A db_connection.execute( "SET app.current_tenant_id = %s", [tenant_a_id] ) tenant_a_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count records when context is set to Tenant B db_connection.execute( "SET app.current_tenant_id = %s", [tenant_b_id] ) tenant_b_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count WITHOUT any tenant context (should return 0 if RLS is enforced) db_connection.execute("RESET app.current_tenant_id") no_context_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] results = { 'tenant_a_isolated_count': tenant_a_count, 'tenant_b_isolated_count': tenant_b_count, 'no_context_count': no_context_count, } if no_context_count > 0: results['finding'] = f'CRITICAL: {no_context_count} records accessible without tenant context' results['severity'] = 'CRITICAL' if no_context_count == tenant_a_count + tenant_b_count: results['finding'] = 'CRITICAL: RLS not enforced — all records accessible without context' return results
# Application-level database isolation testing: def test_database_isolation(db_connection, tenant_a_id, tenant_b_id): """ Verify that setting tenant context actually isolates data. Tests the database isolation mechanism directly. """ # Count Tenant A's records when context is set to Tenant A db_connection.execute( "SET app.current_tenant_id = %s", [tenant_a_id] ) tenant_a_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count records when context is set to Tenant B db_connection.execute( "SET app.current_tenant_id = %s", [tenant_b_id] ) tenant_b_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] # Count WITHOUT any tenant context (should return 0 if RLS is enforced) db_connection.execute("RESET app.current_tenant_id") no_context_count = db_connection.execute( "SELECT COUNT(*) FROM records" ).fetchone()[0] results = { 'tenant_a_isolated_count': tenant_a_count, 'tenant_b_isolated_count': tenant_b_count, 'no_context_count': no_context_count, } if no_context_count > 0: results['finding'] = f'CRITICAL: {no_context_count} records accessible without tenant context' results['severity'] = 'CRITICAL' if no_context_count == tenant_a_count + tenant_b_count: results['finding'] = 'CRITICAL: RLS not enforced — all records accessible without context' return results
ORM-Level Isolation Verification
# Testing whether ORM queries are consistently scoped import ast import re def audit_orm_queries_for_tenant_scoping(source_code, tenant_models): """ Find all ORM queries on tenant-scoped models that DON'T include a tenant filter — these are potential cross-tenant access points. """ tree = ast.parse(source_code) unscoped_queries = [] QUERY_METHODS = {'get', 'filter', 'exclude', 'all', 'first', 'last', 'aggregate'} TENANT_FILTER_KWARGS = {'tenant', 'tenant_id', 'organization', 'organization_id', 'account', 'account_id'} for node in ast.walk(tree): if not isinstance(node, ast.Call): continue # Check if this is a Model.objects.query_method() call func = node.func if not (isinstance(func, ast.Attribute) and func.attr in QUERY_METHODS and isinstance(func.value, ast.Attribute) and func.value.attr == 'objects'): continue # Get the model name model = getattr(func.value.value, 'id', None) if model not in tenant_models: continue # Check if any tenant filter is present in keyword arguments filter_kwargs = {kw.arg for kw in node.keywords} has_tenant_filter = bool(filter_kwargs & TENANT_FILTER_KWARGS) # Also check if called via a tenant-scoped manager # e.g., Record.tenant_objects.get() vs Record.objects.get() manager_name = getattr(func.value, 'attr', '') uses_tenant_manager = 'tenant' in manager_name.lower() if not has_tenant_filter and not uses_tenant_manager: unscoped_queries.append({ 'model': model, 'method': func.attr, 'line': node.lineno, 'file': current_file, 'kwargs_present': list(filter_kwargs), 'finding': f'Unscoped {model}.objects.{func.attr}() — missing tenant filter', 'severity': 'HIGH' if func.attr in {'get', 'all'} else 'MEDIUM' }) return unscoped_queries # Example output: # Line 47: Unscoped Record.objects.get() — missing tenant filter [HIGH] # Line 89: Unscoped Invoice.objects.filter() — missing tenant filter [HIGH] # Line 134: Unscoped User.objects.all() — missing tenant filter [MEDIUM]
# Testing whether ORM queries are consistently scoped import ast import re def audit_orm_queries_for_tenant_scoping(source_code, tenant_models): """ Find all ORM queries on tenant-scoped models that DON'T include a tenant filter — these are potential cross-tenant access points. """ tree = ast.parse(source_code) unscoped_queries = [] QUERY_METHODS = {'get', 'filter', 'exclude', 'all', 'first', 'last', 'aggregate'} TENANT_FILTER_KWARGS = {'tenant', 'tenant_id', 'organization', 'organization_id', 'account', 'account_id'} for node in ast.walk(tree): if not isinstance(node, ast.Call): continue # Check if this is a Model.objects.query_method() call func = node.func if not (isinstance(func, ast.Attribute) and func.attr in QUERY_METHODS and isinstance(func.value, ast.Attribute) and func.value.attr == 'objects'): continue # Get the model name model = getattr(func.value.value, 'id', None) if model not in tenant_models: continue # Check if any tenant filter is present in keyword arguments filter_kwargs = {kw.arg for kw in node.keywords} has_tenant_filter = bool(filter_kwargs & TENANT_FILTER_KWARGS) # Also check if called via a tenant-scoped manager # e.g., Record.tenant_objects.get() vs Record.objects.get() manager_name = getattr(func.value, 'attr', '') uses_tenant_manager = 'tenant' in manager_name.lower() if not has_tenant_filter and not uses_tenant_manager: unscoped_queries.append({ 'model': model, 'method': func.attr, 'line': node.lineno, 'file': current_file, 'kwargs_present': list(filter_kwargs), 'finding': f'Unscoped {model}.objects.{func.attr}() — missing tenant filter', 'severity': 'HIGH' if func.attr in {'get', 'all'} else 'MEDIUM' }) return unscoped_queries # Example output: # Line 47: Unscoped Record.objects.get() — missing tenant filter [HIGH] # Line 89: Unscoped Invoice.objects.filter() — missing tenant filter [HIGH] # Line 134: Unscoped User.objects.all() — missing tenant filter [MEDIUM]
# Testing whether ORM queries are consistently scoped import ast import re def audit_orm_queries_for_tenant_scoping(source_code, tenant_models): """ Find all ORM queries on tenant-scoped models that DON'T include a tenant filter — these are potential cross-tenant access points. """ tree = ast.parse(source_code) unscoped_queries = [] QUERY_METHODS = {'get', 'filter', 'exclude', 'all', 'first', 'last', 'aggregate'} TENANT_FILTER_KWARGS = {'tenant', 'tenant_id', 'organization', 'organization_id', 'account', 'account_id'} for node in ast.walk(tree): if not isinstance(node, ast.Call): continue # Check if this is a Model.objects.query_method() call func = node.func if not (isinstance(func, ast.Attribute) and func.attr in QUERY_METHODS and isinstance(func.value, ast.Attribute) and func.value.attr == 'objects'): continue # Get the model name model = getattr(func.value.value, 'id', None) if model not in tenant_models: continue # Check if any tenant filter is present in keyword arguments filter_kwargs = {kw.arg for kw in node.keywords} has_tenant_filter = bool(filter_kwargs & TENANT_FILTER_KWARGS) # Also check if called via a tenant-scoped manager # e.g., Record.tenant_objects.get() vs Record.objects.get() manager_name = getattr(func.value, 'attr', '') uses_tenant_manager = 'tenant' in manager_name.lower() if not has_tenant_filter and not uses_tenant_manager: unscoped_queries.append({ 'model': model, 'method': func.attr, 'line': node.lineno, 'file': current_file, 'kwargs_present': list(filter_kwargs), 'finding': f'Unscoped {model}.objects.{func.attr}() — missing tenant filter', 'severity': 'HIGH' if func.attr in {'get', 'all'} else 'MEDIUM' }) return unscoped_queries # Example output: # Line 47: Unscoped Record.objects.get() — missing tenant filter [HIGH] # Line 89: Unscoped Invoice.objects.filter() — missing tenant filter [HIGH] # Line 134: Unscoped User.objects.all() — missing tenant filter [MEDIUM]
API-Level Isolation Testing: The Complete Methodology
Step 1: Map the Tenant Identifier Types
Before testing cross-tenant access, map every way the application expresses tenant identity:
def map_tenant_identifiers(api_responses, jwt_tokens, url_patterns): """ Map all the ways tenant identity appears in the application. Each is a potential cross-tenant attack surface. """ tenant_identifier_map = { 'url_parameters': [], # /api/v1/records?tenant_id=xxx 'path_segments': [], # /api/v1/tenants/{id}/records 'request_body_fields': [], # {"tenant_id": "xxx"} in POST body 'headers': [], # X-Tenant-ID: xxx 'jwt_claims': [], # JWT payload contains tenant_id 'subdomains': [], # tenant-name.app.company.com 'api_key_prefix': [], # API key contains tenant identifier } # Scan API responses for tenant identifier patterns tenant_id_pattern = re.compile( r'(?:tenant_id|organization_id|account_id|workspace_id|' r'company_id|client_id)["\\s:=]+["\\']?([a-zA-Z0-9\\-_]{8,})["\\']?' ) for response in api_responses: matches = tenant_id_pattern.findall(str(response)) for match in matches: tenant_identifier_map['api_responses'] = tenant_identifier_map.get( 'api_responses', [] ) + [match] # Decode JWT to check for tenant claims for token in jwt_tokens: try: payload = jwt.decode(token, options={"verify_signature": False}) tenant_fields = [ k for k in payload.keys() if 'tenant' in k.lower() or 'org' in k.lower() or 'account' in k.lower() or 'workspace' in k.lower() ] tenant_identifier_map['jwt_claims'].extend(tenant_fields) except: pass return tenant_identifier_map
def map_tenant_identifiers(api_responses, jwt_tokens, url_patterns): """ Map all the ways tenant identity appears in the application. Each is a potential cross-tenant attack surface. """ tenant_identifier_map = { 'url_parameters': [], # /api/v1/records?tenant_id=xxx 'path_segments': [], # /api/v1/tenants/{id}/records 'request_body_fields': [], # {"tenant_id": "xxx"} in POST body 'headers': [], # X-Tenant-ID: xxx 'jwt_claims': [], # JWT payload contains tenant_id 'subdomains': [], # tenant-name.app.company.com 'api_key_prefix': [], # API key contains tenant identifier } # Scan API responses for tenant identifier patterns tenant_id_pattern = re.compile( r'(?:tenant_id|organization_id|account_id|workspace_id|' r'company_id|client_id)["\\s:=]+["\\']?([a-zA-Z0-9\\-_]{8,})["\\']?' ) for response in api_responses: matches = tenant_id_pattern.findall(str(response)) for match in matches: tenant_identifier_map['api_responses'] = tenant_identifier_map.get( 'api_responses', [] ) + [match] # Decode JWT to check for tenant claims for token in jwt_tokens: try: payload = jwt.decode(token, options={"verify_signature": False}) tenant_fields = [ k for k in payload.keys() if 'tenant' in k.lower() or 'org' in k.lower() or 'account' in k.lower() or 'workspace' in k.lower() ] tenant_identifier_map['jwt_claims'].extend(tenant_fields) except: pass return tenant_identifier_map
def map_tenant_identifiers(api_responses, jwt_tokens, url_patterns): """ Map all the ways tenant identity appears in the application. Each is a potential cross-tenant attack surface. """ tenant_identifier_map = { 'url_parameters': [], # /api/v1/records?tenant_id=xxx 'path_segments': [], # /api/v1/tenants/{id}/records 'request_body_fields': [], # {"tenant_id": "xxx"} in POST body 'headers': [], # X-Tenant-ID: xxx 'jwt_claims': [], # JWT payload contains tenant_id 'subdomains': [], # tenant-name.app.company.com 'api_key_prefix': [], # API key contains tenant identifier } # Scan API responses for tenant identifier patterns tenant_id_pattern = re.compile( r'(?:tenant_id|organization_id|account_id|workspace_id|' r'company_id|client_id)["\\s:=]+["\\']?([a-zA-Z0-9\\-_]{8,})["\\']?' ) for response in api_responses: matches = tenant_id_pattern.findall(str(response)) for match in matches: tenant_identifier_map['api_responses'] = tenant_identifier_map.get( 'api_responses', [] ) + [match] # Decode JWT to check for tenant claims for token in jwt_tokens: try: payload = jwt.decode(token, options={"verify_signature": False}) tenant_fields = [ k for k in payload.keys() if 'tenant' in k.lower() or 'org' in k.lower() or 'account' in k.lower() or 'workspace' in k.lower() ] tenant_identifier_map['jwt_claims'].extend(tenant_fields) except: pass return tenant_identifier_map
Step 2: Create the Cross-Tenant Test Matrix
def build_cross_tenant_test_matrix(endpoints, tenant_a_context, tenant_b_context): """ Build a comprehensive test matrix covering all cross-tenant attack vectors. """ test_cases = [] for endpoint in endpoints: # Test 1: Direct parameter substitution if endpoint.get('accepts_tenant_param'): test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'direct_parameter_substitution', 'auth_token': tenant_a_context['token'], 'params': {endpoint['tenant_param']: tenant_b_context['tenant_id']}, 'expected': 403, 'success_indicator': 200 }) # Test 2: Resource ID enumeration (using Tenant B's resource IDs) for resource_id in tenant_b_context['resource_ids']: test_cases.append({ 'endpoint': endpoint['url'].format(id=resource_id), 'method': endpoint['method'], 'attack_type': 'resource_id_enumeration', 'auth_token': tenant_a_context['token'], 'expected': 403, 'success_indicator': 200 }) # Test 3: Header injection test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'header_injection', 'auth_token': tenant_a_context['token'], 'headers': { 'X-Tenant-ID': tenant_b_context['tenant_id'], 'X-Organization-ID': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) # Test 4: Body field injection (for POST/PUT endpoints) if endpoint['method'] in ['POST', 'PUT', 'PATCH']: test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'body_field_injection', 'auth_token': tenant_a_context['token'], 'body': { **endpoint.get('default_body', {}), 'tenant_id': tenant_b_context['tenant_id'], 'organization_id': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) return test_cases # Execute the test matrix: async def execute_cross_tenant_tests(test_cases): findings = [] async with aiohttp.ClientSession() as session: for test in test_cases: response = await session.request( test['method'], f"<https://api.company.com>{test['endpoint']}", headers={ 'Authorization': f"Bearer {test['auth_token']}", **test.get('headers', {}) }, json=test.get('body'), params=test.get('params'), ) if response.status == test['success_indicator']: data = await response.json() # Verify this is actually Tenant B's data (not Tenant A's) if is_cross_tenant_data(data, tenant_b_context): findings.append({ 'attack_type': test['attack_type'], 'endpoint': test['endpoint'], 'method': test['method'], 'cvss': calculate_cvss(data), 'records_exposed': count_records(data), 'data_sensitivity': classify_data(data) }) return findings
def build_cross_tenant_test_matrix(endpoints, tenant_a_context, tenant_b_context): """ Build a comprehensive test matrix covering all cross-tenant attack vectors. """ test_cases = [] for endpoint in endpoints: # Test 1: Direct parameter substitution if endpoint.get('accepts_tenant_param'): test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'direct_parameter_substitution', 'auth_token': tenant_a_context['token'], 'params': {endpoint['tenant_param']: tenant_b_context['tenant_id']}, 'expected': 403, 'success_indicator': 200 }) # Test 2: Resource ID enumeration (using Tenant B's resource IDs) for resource_id in tenant_b_context['resource_ids']: test_cases.append({ 'endpoint': endpoint['url'].format(id=resource_id), 'method': endpoint['method'], 'attack_type': 'resource_id_enumeration', 'auth_token': tenant_a_context['token'], 'expected': 403, 'success_indicator': 200 }) # Test 3: Header injection test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'header_injection', 'auth_token': tenant_a_context['token'], 'headers': { 'X-Tenant-ID': tenant_b_context['tenant_id'], 'X-Organization-ID': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) # Test 4: Body field injection (for POST/PUT endpoints) if endpoint['method'] in ['POST', 'PUT', 'PATCH']: test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'body_field_injection', 'auth_token': tenant_a_context['token'], 'body': { **endpoint.get('default_body', {}), 'tenant_id': tenant_b_context['tenant_id'], 'organization_id': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) return test_cases # Execute the test matrix: async def execute_cross_tenant_tests(test_cases): findings = [] async with aiohttp.ClientSession() as session: for test in test_cases: response = await session.request( test['method'], f"<https://api.company.com>{test['endpoint']}", headers={ 'Authorization': f"Bearer {test['auth_token']}", **test.get('headers', {}) }, json=test.get('body'), params=test.get('params'), ) if response.status == test['success_indicator']: data = await response.json() # Verify this is actually Tenant B's data (not Tenant A's) if is_cross_tenant_data(data, tenant_b_context): findings.append({ 'attack_type': test['attack_type'], 'endpoint': test['endpoint'], 'method': test['method'], 'cvss': calculate_cvss(data), 'records_exposed': count_records(data), 'data_sensitivity': classify_data(data) }) return findings
def build_cross_tenant_test_matrix(endpoints, tenant_a_context, tenant_b_context): """ Build a comprehensive test matrix covering all cross-tenant attack vectors. """ test_cases = [] for endpoint in endpoints: # Test 1: Direct parameter substitution if endpoint.get('accepts_tenant_param'): test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'direct_parameter_substitution', 'auth_token': tenant_a_context['token'], 'params': {endpoint['tenant_param']: tenant_b_context['tenant_id']}, 'expected': 403, 'success_indicator': 200 }) # Test 2: Resource ID enumeration (using Tenant B's resource IDs) for resource_id in tenant_b_context['resource_ids']: test_cases.append({ 'endpoint': endpoint['url'].format(id=resource_id), 'method': endpoint['method'], 'attack_type': 'resource_id_enumeration', 'auth_token': tenant_a_context['token'], 'expected': 403, 'success_indicator': 200 }) # Test 3: Header injection test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'header_injection', 'auth_token': tenant_a_context['token'], 'headers': { 'X-Tenant-ID': tenant_b_context['tenant_id'], 'X-Organization-ID': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) # Test 4: Body field injection (for POST/PUT endpoints) if endpoint['method'] in ['POST', 'PUT', 'PATCH']: test_cases.append({ 'endpoint': endpoint['url'], 'method': endpoint['method'], 'attack_type': 'body_field_injection', 'auth_token': tenant_a_context['token'], 'body': { **endpoint.get('default_body', {}), 'tenant_id': tenant_b_context['tenant_id'], 'organization_id': tenant_b_context['tenant_id'], }, 'expected': 403, 'success_indicator': 200 }) return test_cases # Execute the test matrix: async def execute_cross_tenant_tests(test_cases): findings = [] async with aiohttp.ClientSession() as session: for test in test_cases: response = await session.request( test['method'], f"<https://api.company.com>{test['endpoint']}", headers={ 'Authorization': f"Bearer {test['auth_token']}", **test.get('headers', {}) }, json=test.get('body'), params=test.get('params'), ) if response.status == test['success_indicator']: data = await response.json() # Verify this is actually Tenant B's data (not Tenant A's) if is_cross_tenant_data(data, tenant_b_context): findings.append({ 'attack_type': test['attack_type'], 'endpoint': test['endpoint'], 'method': test['method'], 'cvss': calculate_cvss(data), 'records_exposed': count_records(data), 'data_sensitivity': classify_data(data) }) return findings
Step 3: Test the Tenant Isolation Stack End-to-End
def test_tenant_isolation_completeness(): """ Test tenant isolation at every layer of the stack. Isolation is only as strong as the weakest layer. """ layers = { 'api_layer': test_api_layer_isolation(), 'service_layer': test_service_layer_isolation(), 'orm_layer': test_orm_layer_isolation(), 'database_layer': test_database_layer_isolation(), 'cache_layer': test_cache_layer_isolation(), 'file_storage_layer': test_file_storage_isolation(), 'background_jobs': test_background_job_isolation(), 'webhooks': test_webhook_isolation(), 'search_layer': test_search_index_isolation(), } return layers def test_cache_layer_isolation(): """ Test whether cached data crosses tenant boundaries. Cache keys that don't include tenant_id create cross-tenant data leakage. """ # Step 1: Tenant A requests data (populates cache) tenant_a_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_a_token}"} ) tenant_a_data = tenant_a_response.json() # Step 2: Immediately after, Tenant B requests same endpoint # If cache key is just the endpoint (no tenant scope): # Tenant B gets Tenant A's cached data tenant_b_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) tenant_b_data = tenant_b_response.json() # Check if Tenant B received Tenant A's data if tenant_b_data == tenant_a_data: return { 'vulnerable': True, 'finding': 'Cache layer cross-tenant data leakage', 'severity': 'HIGH', 'detail': 'Dashboard stats cached without tenant scope — ' 'Tenant B receives Tenant A\\'s data from cache' } return {'vulnerable': False} def test_background_job_isolation(): """ Test whether background jobs (Celery, Sidekiq, etc.) maintain tenant context. Jobs triggered by one tenant should only access that tenant's data. """ # Trigger a job as Tenant A job_response = requests.post( "<https://api.company.com/api/v1/reports/generate>", json={"report_type": "full_export"}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) job_id = job_response.json().get('job_id') # Wait for job completion time.sleep(5) # Retrieve job result as Tenant B (should be forbidden) result_response = requests.get( f"<https://api.company.com/api/v1/jobs/{job_id}/result>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if result_response.status_code == 200: data = result_response.json() if contains_tenant_a_data(data): return { 'vulnerable': True, 'finding': 'Background job result accessible cross-tenant', 'severity': 'HIGH', 'detail': f'Job {job_id} triggered by Tenant A ' f'accessible to Tenant B' } return {'vulnerable': False} def test_file_storage_isolation(): """ Test whether file storage (S3, GCS, Azure Blob) is isolated per tenant. """ # Tenant A uploads a file upload_response = requests.post( "<https://api.company.com/api/v1/files/upload>", files={"file": open("test_doc.pdf", "rb")}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) file_id = upload_response.json().get('file_id') file_url = upload_response.json().get('url') # Test 1: Direct URL access without authentication direct_response = requests.get(file_url) if direct_response.status_code == 200: return { 'vulnerable': True, 'finding': 'Files publicly accessible without authentication', 'severity': 'CRITICAL' } # Test 2: Cross-tenant file access with Tenant B's token cross_tenant_response = requests.get( f"<https://api.company.com/api/v1/files/{file_id}>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if cross_tenant_response.status_code == 200: return { 'vulnerable': True, 'finding': 'File accessible to different tenant', 'severity': 'HIGH' } # Test 3: Predictable file path enumeration # If file URLs follow predictable patterns: # s3://company-uploads/tenant_a/file_001.pdf # s3://company-uploads/tenant_b/file_001.pdf # → Can enumerate other tenants' files if bucket policy allows listing return {'vulnerable': False}
def test_tenant_isolation_completeness(): """ Test tenant isolation at every layer of the stack. Isolation is only as strong as the weakest layer. """ layers = { 'api_layer': test_api_layer_isolation(), 'service_layer': test_service_layer_isolation(), 'orm_layer': test_orm_layer_isolation(), 'database_layer': test_database_layer_isolation(), 'cache_layer': test_cache_layer_isolation(), 'file_storage_layer': test_file_storage_isolation(), 'background_jobs': test_background_job_isolation(), 'webhooks': test_webhook_isolation(), 'search_layer': test_search_index_isolation(), } return layers def test_cache_layer_isolation(): """ Test whether cached data crosses tenant boundaries. Cache keys that don't include tenant_id create cross-tenant data leakage. """ # Step 1: Tenant A requests data (populates cache) tenant_a_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_a_token}"} ) tenant_a_data = tenant_a_response.json() # Step 2: Immediately after, Tenant B requests same endpoint # If cache key is just the endpoint (no tenant scope): # Tenant B gets Tenant A's cached data tenant_b_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) tenant_b_data = tenant_b_response.json() # Check if Tenant B received Tenant A's data if tenant_b_data == tenant_a_data: return { 'vulnerable': True, 'finding': 'Cache layer cross-tenant data leakage', 'severity': 'HIGH', 'detail': 'Dashboard stats cached without tenant scope — ' 'Tenant B receives Tenant A\\'s data from cache' } return {'vulnerable': False} def test_background_job_isolation(): """ Test whether background jobs (Celery, Sidekiq, etc.) maintain tenant context. Jobs triggered by one tenant should only access that tenant's data. """ # Trigger a job as Tenant A job_response = requests.post( "<https://api.company.com/api/v1/reports/generate>", json={"report_type": "full_export"}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) job_id = job_response.json().get('job_id') # Wait for job completion time.sleep(5) # Retrieve job result as Tenant B (should be forbidden) result_response = requests.get( f"<https://api.company.com/api/v1/jobs/{job_id}/result>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if result_response.status_code == 200: data = result_response.json() if contains_tenant_a_data(data): return { 'vulnerable': True, 'finding': 'Background job result accessible cross-tenant', 'severity': 'HIGH', 'detail': f'Job {job_id} triggered by Tenant A ' f'accessible to Tenant B' } return {'vulnerable': False} def test_file_storage_isolation(): """ Test whether file storage (S3, GCS, Azure Blob) is isolated per tenant. """ # Tenant A uploads a file upload_response = requests.post( "<https://api.company.com/api/v1/files/upload>", files={"file": open("test_doc.pdf", "rb")}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) file_id = upload_response.json().get('file_id') file_url = upload_response.json().get('url') # Test 1: Direct URL access without authentication direct_response = requests.get(file_url) if direct_response.status_code == 200: return { 'vulnerable': True, 'finding': 'Files publicly accessible without authentication', 'severity': 'CRITICAL' } # Test 2: Cross-tenant file access with Tenant B's token cross_tenant_response = requests.get( f"<https://api.company.com/api/v1/files/{file_id}>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if cross_tenant_response.status_code == 200: return { 'vulnerable': True, 'finding': 'File accessible to different tenant', 'severity': 'HIGH' } # Test 3: Predictable file path enumeration # If file URLs follow predictable patterns: # s3://company-uploads/tenant_a/file_001.pdf # s3://company-uploads/tenant_b/file_001.pdf # → Can enumerate other tenants' files if bucket policy allows listing return {'vulnerable': False}
def test_tenant_isolation_completeness(): """ Test tenant isolation at every layer of the stack. Isolation is only as strong as the weakest layer. """ layers = { 'api_layer': test_api_layer_isolation(), 'service_layer': test_service_layer_isolation(), 'orm_layer': test_orm_layer_isolation(), 'database_layer': test_database_layer_isolation(), 'cache_layer': test_cache_layer_isolation(), 'file_storage_layer': test_file_storage_isolation(), 'background_jobs': test_background_job_isolation(), 'webhooks': test_webhook_isolation(), 'search_layer': test_search_index_isolation(), } return layers def test_cache_layer_isolation(): """ Test whether cached data crosses tenant boundaries. Cache keys that don't include tenant_id create cross-tenant data leakage. """ # Step 1: Tenant A requests data (populates cache) tenant_a_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_a_token}"} ) tenant_a_data = tenant_a_response.json() # Step 2: Immediately after, Tenant B requests same endpoint # If cache key is just the endpoint (no tenant scope): # Tenant B gets Tenant A's cached data tenant_b_response = requests.get( "<https://api.company.com/api/v1/dashboard/stats>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) tenant_b_data = tenant_b_response.json() # Check if Tenant B received Tenant A's data if tenant_b_data == tenant_a_data: return { 'vulnerable': True, 'finding': 'Cache layer cross-tenant data leakage', 'severity': 'HIGH', 'detail': 'Dashboard stats cached without tenant scope — ' 'Tenant B receives Tenant A\\'s data from cache' } return {'vulnerable': False} def test_background_job_isolation(): """ Test whether background jobs (Celery, Sidekiq, etc.) maintain tenant context. Jobs triggered by one tenant should only access that tenant's data. """ # Trigger a job as Tenant A job_response = requests.post( "<https://api.company.com/api/v1/reports/generate>", json={"report_type": "full_export"}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) job_id = job_response.json().get('job_id') # Wait for job completion time.sleep(5) # Retrieve job result as Tenant B (should be forbidden) result_response = requests.get( f"<https://api.company.com/api/v1/jobs/{job_id}/result>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if result_response.status_code == 200: data = result_response.json() if contains_tenant_a_data(data): return { 'vulnerable': True, 'finding': 'Background job result accessible cross-tenant', 'severity': 'HIGH', 'detail': f'Job {job_id} triggered by Tenant A ' f'accessible to Tenant B' } return {'vulnerable': False} def test_file_storage_isolation(): """ Test whether file storage (S3, GCS, Azure Blob) is isolated per tenant. """ # Tenant A uploads a file upload_response = requests.post( "<https://api.company.com/api/v1/files/upload>", files={"file": open("test_doc.pdf", "rb")}, headers={"Authorization": f"Bearer {tenant_a_token}"} ) file_id = upload_response.json().get('file_id') file_url = upload_response.json().get('url') # Test 1: Direct URL access without authentication direct_response = requests.get(file_url) if direct_response.status_code == 200: return { 'vulnerable': True, 'finding': 'Files publicly accessible without authentication', 'severity': 'CRITICAL' } # Test 2: Cross-tenant file access with Tenant B's token cross_tenant_response = requests.get( f"<https://api.company.com/api/v1/files/{file_id}>", headers={"Authorization": f"Bearer {tenant_b_token}"} ) if cross_tenant_response.status_code == 200: return { 'vulnerable': True, 'finding': 'File accessible to different tenant', 'severity': 'HIGH' } # Test 3: Predictable file path enumeration # If file URLs follow predictable patterns: # s3://company-uploads/tenant_a/file_001.pdf # s3://company-uploads/tenant_b/file_001.pdf # → Can enumerate other tenants' files if bucket policy allows listing return {'vulnerable': False}
The Multi-Tenant Security Code Patterns: Secure vs Vulnerable
Pattern: Centralized Tenant-Aware Query Manager
The most robust prevention is a query manager that makes it structurally impossible to query tenant data without a tenant scope:
# Django — TenantAwareManager from django.db import models from threading import local _tenant_context = local() class TenantContextMiddleware: """Set tenant context from authenticated user at request start""" def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if hasattr(request, 'user') and request.user.is_authenticated: _tenant_context.tenant_id = request.user.organization_id else: _tenant_context.tenant_id = None response = self.get_response(request) # Clear context after request completes _tenant_context.tenant_id = None return response class TenantAwareQuerySet(models.QuerySet): """QuerySet that automatically scopes to current tenant""" def for_current_tenant(self): tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id is None: raise ValueError( "No tenant context set. Use TenantContextMiddleware " "or set tenant context explicitly." ) return self.filter(tenant_id=tenant_id) def cross_tenant(self, tenant_id): """ Explicit cross-tenant access — requires deliberate call. Should only be used in admin contexts with explicit permission check. """ return self.filter(tenant_id=tenant_id) class TenantAwareManager(models.Manager): def get_queryset(self): return TenantAwareQuerySet(self.model, using=self._db) def for_current_tenant(self): """Convenience method — most common usage""" return self.get_queryset().for_current_tenant() class TenantScopedModel(models.Model): """ Abstract base class for all tenant-scoped models. Include this in every model that should be tenant-isolated. """ tenant_id = models.UUIDField( db_index=True, null=False, blank=False ) # Tenant-aware manager is the DEFAULT manager objects = TenantAwareManager() # Unscoped manager ONLY for admin/migration use # Requires explicit import and deliberate use unscoped = models.Manager() class Meta: abstract = True # Usage in views — safe by default: class RecordsView(APIView): permission_classes = [IsAuthenticated] def get(self, request): # Automatically scoped to authenticated user's tenant # via TenantContextMiddleware records = Record.objects.for_current_tenant() return Response(RecordSerializer(records, many=True).data) def post(self, request): serializer = RecordSerializer(data=request.data) if serializer.is_valid(): # tenant_id automatically set from context — not from request serializer.save(tenant_id=request.user.organization_id) return Response(serializer.data, status=201)
# Django — TenantAwareManager from django.db import models from threading import local _tenant_context = local() class TenantContextMiddleware: """Set tenant context from authenticated user at request start""" def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if hasattr(request, 'user') and request.user.is_authenticated: _tenant_context.tenant_id = request.user.organization_id else: _tenant_context.tenant_id = None response = self.get_response(request) # Clear context after request completes _tenant_context.tenant_id = None return response class TenantAwareQuerySet(models.QuerySet): """QuerySet that automatically scopes to current tenant""" def for_current_tenant(self): tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id is None: raise ValueError( "No tenant context set. Use TenantContextMiddleware " "or set tenant context explicitly." ) return self.filter(tenant_id=tenant_id) def cross_tenant(self, tenant_id): """ Explicit cross-tenant access — requires deliberate call. Should only be used in admin contexts with explicit permission check. """ return self.filter(tenant_id=tenant_id) class TenantAwareManager(models.Manager): def get_queryset(self): return TenantAwareQuerySet(self.model, using=self._db) def for_current_tenant(self): """Convenience method — most common usage""" return self.get_queryset().for_current_tenant() class TenantScopedModel(models.Model): """ Abstract base class for all tenant-scoped models. Include this in every model that should be tenant-isolated. """ tenant_id = models.UUIDField( db_index=True, null=False, blank=False ) # Tenant-aware manager is the DEFAULT manager objects = TenantAwareManager() # Unscoped manager ONLY for admin/migration use # Requires explicit import and deliberate use unscoped = models.Manager() class Meta: abstract = True # Usage in views — safe by default: class RecordsView(APIView): permission_classes = [IsAuthenticated] def get(self, request): # Automatically scoped to authenticated user's tenant # via TenantContextMiddleware records = Record.objects.for_current_tenant() return Response(RecordSerializer(records, many=True).data) def post(self, request): serializer = RecordSerializer(data=request.data) if serializer.is_valid(): # tenant_id automatically set from context — not from request serializer.save(tenant_id=request.user.organization_id) return Response(serializer.data, status=201)
# Django — TenantAwareManager from django.db import models from threading import local _tenant_context = local() class TenantContextMiddleware: """Set tenant context from authenticated user at request start""" def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if hasattr(request, 'user') and request.user.is_authenticated: _tenant_context.tenant_id = request.user.organization_id else: _tenant_context.tenant_id = None response = self.get_response(request) # Clear context after request completes _tenant_context.tenant_id = None return response class TenantAwareQuerySet(models.QuerySet): """QuerySet that automatically scopes to current tenant""" def for_current_tenant(self): tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id is None: raise ValueError( "No tenant context set. Use TenantContextMiddleware " "or set tenant context explicitly." ) return self.filter(tenant_id=tenant_id) def cross_tenant(self, tenant_id): """ Explicit cross-tenant access — requires deliberate call. Should only be used in admin contexts with explicit permission check. """ return self.filter(tenant_id=tenant_id) class TenantAwareManager(models.Manager): def get_queryset(self): return TenantAwareQuerySet(self.model, using=self._db) def for_current_tenant(self): """Convenience method — most common usage""" return self.get_queryset().for_current_tenant() class TenantScopedModel(models.Model): """ Abstract base class for all tenant-scoped models. Include this in every model that should be tenant-isolated. """ tenant_id = models.UUIDField( db_index=True, null=False, blank=False ) # Tenant-aware manager is the DEFAULT manager objects = TenantAwareManager() # Unscoped manager ONLY for admin/migration use # Requires explicit import and deliberate use unscoped = models.Manager() class Meta: abstract = True # Usage in views — safe by default: class RecordsView(APIView): permission_classes = [IsAuthenticated] def get(self, request): # Automatically scoped to authenticated user's tenant # via TenantContextMiddleware records = Record.objects.for_current_tenant() return Response(RecordSerializer(records, many=True).data) def post(self, request): serializer = RecordSerializer(data=request.data) if serializer.is_valid(): # tenant_id automatically set from context — not from request serializer.save(tenant_id=request.user.organization_id) return Response(serializer.data, status=201)
Pattern: Row-Level Security in PostgreSQL
Database-level enforcement that doesn't depend on application code consistency:
-- Create a function that returns the current tenant ID CREATE OR REPLACE FUNCTION current_tenant_id() RETURNS UUID AS $$ BEGIN RETURN current_setting('app.current_tenant_id', TRUE)::UUID; EXCEPTION WHEN OTHERS THEN RETURN NULL; END; $$ LANGUAGE plpgsql STABLE; -- Enable RLS on all tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; ALTER TABLE invoices ENABLE ROW LEVEL SECURITY; ALTER TABLE documents ENABLE ROW LEVEL SECURITY; -- Create isolation policy CREATE POLICY tenant_isolation ON records AS RESTRICTIVE -- Applied in addition to other policies FOR ALL USING ( tenant_id = current_tenant_id() OR current_tenant_id() IS NULL -- Admin access when no context ); -- Force RLS even for superusers in application connections -- Create a restricted application role that cannot bypass RLS CREATE ROLE app_user NOSUPERUSER; ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Set tenant context in application: -- Before any database operation for a tenant's request: SET LOCAL app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- All subsequent queries in this transaction are automatically scoped
-- Create a function that returns the current tenant ID CREATE OR REPLACE FUNCTION current_tenant_id() RETURNS UUID AS $$ BEGIN RETURN current_setting('app.current_tenant_id', TRUE)::UUID; EXCEPTION WHEN OTHERS THEN RETURN NULL; END; $$ LANGUAGE plpgsql STABLE; -- Enable RLS on all tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; ALTER TABLE invoices ENABLE ROW LEVEL SECURITY; ALTER TABLE documents ENABLE ROW LEVEL SECURITY; -- Create isolation policy CREATE POLICY tenant_isolation ON records AS RESTRICTIVE -- Applied in addition to other policies FOR ALL USING ( tenant_id = current_tenant_id() OR current_tenant_id() IS NULL -- Admin access when no context ); -- Force RLS even for superusers in application connections -- Create a restricted application role that cannot bypass RLS CREATE ROLE app_user NOSUPERUSER; ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Set tenant context in application: -- Before any database operation for a tenant's request: SET LOCAL app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- All subsequent queries in this transaction are automatically scoped
-- Create a function that returns the current tenant ID CREATE OR REPLACE FUNCTION current_tenant_id() RETURNS UUID AS $$ BEGIN RETURN current_setting('app.current_tenant_id', TRUE)::UUID; EXCEPTION WHEN OTHERS THEN RETURN NULL; END; $$ LANGUAGE plpgsql STABLE; -- Enable RLS on all tenant-scoped tables ALTER TABLE records ENABLE ROW LEVEL SECURITY; ALTER TABLE users ENABLE ROW LEVEL SECURITY; ALTER TABLE invoices ENABLE ROW LEVEL SECURITY; ALTER TABLE documents ENABLE ROW LEVEL SECURITY; -- Create isolation policy CREATE POLICY tenant_isolation ON records AS RESTRICTIVE -- Applied in addition to other policies FOR ALL USING ( tenant_id = current_tenant_id() OR current_tenant_id() IS NULL -- Admin access when no context ); -- Force RLS even for superusers in application connections -- Create a restricted application role that cannot bypass RLS CREATE ROLE app_user NOSUPERUSER; ALTER TABLE records FORCE ROW LEVEL SECURITY; -- Set tenant context in application: -- Before any database operation for a tenant's request: SET LOCAL app.current_tenant_id = 'ten_aaaaaaaa-bbbb-cccc-dddd-000000000001'; -- All subsequent queries in this transaction are automatically scoped
# Django database backend that sets tenant context before every query: from django.db.backends.postgresql.base import DatabaseWrapper as PgDatabaseWrapper class TenantAwareDatabaseWrapper(PgDatabaseWrapper): """ PostgreSQL connection that automatically sets tenant context. All queries in the connection are RLS-scoped to current tenant. """ def create_cursor(self, name=None): cursor = super().create_cursor(name) # Set tenant context for RLS enforcement tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id: cursor.execute( "SELECT set_config('app.current_tenant_id', %s, TRUE)", [str(tenant_id)] ) return cursor # settings.py DATABASES = { 'default': { 'ENGINE': 'myapp.backends.tenant_aware_postgresql', # ... rest of config } }
# Django database backend that sets tenant context before every query: from django.db.backends.postgresql.base import DatabaseWrapper as PgDatabaseWrapper class TenantAwareDatabaseWrapper(PgDatabaseWrapper): """ PostgreSQL connection that automatically sets tenant context. All queries in the connection are RLS-scoped to current tenant. """ def create_cursor(self, name=None): cursor = super().create_cursor(name) # Set tenant context for RLS enforcement tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id: cursor.execute( "SELECT set_config('app.current_tenant_id', %s, TRUE)", [str(tenant_id)] ) return cursor # settings.py DATABASES = { 'default': { 'ENGINE': 'myapp.backends.tenant_aware_postgresql', # ... rest of config } }
# Django database backend that sets tenant context before every query: from django.db.backends.postgresql.base import DatabaseWrapper as PgDatabaseWrapper class TenantAwareDatabaseWrapper(PgDatabaseWrapper): """ PostgreSQL connection that automatically sets tenant context. All queries in the connection are RLS-scoped to current tenant. """ def create_cursor(self, name=None): cursor = super().create_cursor(name) # Set tenant context for RLS enforcement tenant_id = getattr(_tenant_context, 'tenant_id', None) if tenant_id: cursor.execute( "SELECT set_config('app.current_tenant_id', %s, TRUE)", [str(tenant_id)] ) return cursor # settings.py DATABASES = { 'default': { 'ENGINE': 'myapp.backends.tenant_aware_postgresql', # ... rest of config } }
The Tenant Isolation Test Report: What to Document
Every cross-tenant finding needs to document the complete chain of evidence:
FINDING: Cross-Tenant Data Exposure via tenant_id Parameter
ID: FIND-2026-MTI-001
Severity: CRITICAL (CVSS 4.0: 9.1)
Type: Insecure Direct Object Reference — Cross-Tenant
Confidence: CONFIRMED (exploited, data verified)
ATTACK NARRATIVE:
An authenticated user belonging to Tenant A can access the complete
record set of any other tenant by supplying a different tenant_id
in the records API query parameter. The API authenticates the user
but does not verify that the requested tenant_id matches their
own organization. Any of the platform's 500 tenants' data is
accessible to any authenticated user.
EXPLOITATION STEPS:
Step 1: Authenticate as any tenant user
POST /api/auth/login → {"token": "[user_token]"}
Step 2: Retrieve own tenant_id from profile
GET /api/v1/users/me
Response includes: "tenant_id": "ten_aaaaaaaa-..."
Step 3: Access any other tenant's records by substituting tenant_id
GET /api/v1/records?tenant_id=ten_bbbbbbbb-...
Authorization: Bearer [own_valid_token]
Response: Complete record set for Tenant B (2,341 records)
PROOF OF CONCEPT (CURL):
# Get your own tenant_id:
curl <https://api.company.com/api/v1/users/me> \\
-H "Authorization: Bearer YOUR_TOKEN" | jq .tenant_id
# Access any other tenant's records:
curl "<https://api.company.com/api/v1/records?tenant_id=TARGET_TENANT_ID>" \\
FINDING: Cross-Tenant Data Exposure via tenant_id Parameter
ID: FIND-2026-MTI-001
Severity: CRITICAL (CVSS 4.0: 9.1)
Type: Insecure Direct Object Reference — Cross-Tenant
Confidence: CONFIRMED (exploited, data verified)
ATTACK NARRATIVE:
An authenticated user belonging to Tenant A can access the complete
record set of any other tenant by supplying a different tenant_id
in the records API query parameter. The API authenticates the user
but does not verify that the requested tenant_id matches their
own organization. Any of the platform's 500 tenants' data is
accessible to any authenticated user.
EXPLOITATION STEPS:
Step 1: Authenticate as any tenant user
POST /api/auth/login → {"token": "[user_token]"}
Step 2: Retrieve own tenant_id from profile
GET /api/v1/users/me
Response includes: "tenant_id": "ten_aaaaaaaa-..."
Step 3: Access any other tenant's records by substituting tenant_id
GET /api/v1/records?tenant_id=ten_bbbbbbbb-...
Authorization: Bearer [own_valid_token]
Response: Complete record set for Tenant B (2,341 records)
PROOF OF CONCEPT (CURL):
# Get your own tenant_id:
curl <https://api.company.com/api/v1/users/me> \\
-H "Authorization: Bearer YOUR_TOKEN" | jq .tenant_id
# Access any other tenant's records:
curl "<https://api.company.com/api/v1/records?tenant_id=TARGET_TENANT_ID>" \\
FINDING: Cross-Tenant Data Exposure via tenant_id Parameter
ID: FIND-2026-MTI-001
Severity: CRITICAL (CVSS 4.0: 9.1)
Type: Insecure Direct Object Reference — Cross-Tenant
Confidence: CONFIRMED (exploited, data verified)
ATTACK NARRATIVE:
An authenticated user belonging to Tenant A can access the complete
record set of any other tenant by supplying a different tenant_id
in the records API query parameter. The API authenticates the user
but does not verify that the requested tenant_id matches their
own organization. Any of the platform's 500 tenants' data is
accessible to any authenticated user.
EXPLOITATION STEPS:
Step 1: Authenticate as any tenant user
POST /api/auth/login → {"token": "[user_token]"}
Step 2: Retrieve own tenant_id from profile
GET /api/v1/users/me
Response includes: "tenant_id": "ten_aaaaaaaa-..."
Step 3: Access any other tenant's records by substituting tenant_id
GET /api/v1/records?tenant_id=ten_bbbbbbbb-...
Authorization: Bearer [own_valid_token]
Response: Complete record set for Tenant B (2,341 records)
PROOF OF CONCEPT (CURL):
# Get your own tenant_id:
curl <https://api.company.com/api/v1/users/me> \\
-H "Authorization: Bearer YOUR_TOKEN" | jq .tenant_id
# Access any other tenant's records:
curl "<https://api.company.com/api/v1/records?tenant_id=TARGET_TENANT_ID>" \\
Compliance Implications of Tenant Isolation Failures
Standard | Relevant Control | Implication of Cross-Tenant Failure |
|---|---|---|
SOC 2 | CC6.1: Logical access controls | Direct control failure, tenant data accessible outside authorized scope |
SOC 2 | CC6.3: Role-based access | Cross-tenant access represents complete RBAC failure at organizational level |
GDPR | Article 25: Privacy by design | Tenant isolation failure is a privacy architecture failure |
GDPR | Article 32: Security of processing | Cross-tenant exposure is a personal data security incident |
GDPR | Article 33: Breach notification | Each affected tenant may trigger separate notification obligation |
ISO 27001 | A.9.4: System access control | Cross-tenant access violates access restriction requirements |
PCI-DSS | Requirement 7: Access restriction | If any tenant stores cardholder data, cross-tenant = PCI scope expansion |
HIPAA | §164.312(a)(1): Access control | PHI exposure across tenant boundaries = HIPAA breach |
The multiplier effect on GDPR is particularly significant: a cross-tenant vulnerability affecting 500 tenants each with EU customer data may require 500 separate breach notifications to data protection authorities, one per data controller (tenant). The operational and reputational cost scales with tenant count, not data volume.
The Multi-Tenant Security Checklist (TL;DR)

Before you assume your system is secure, validate this:
❌ No endpoint accepts tenant_id from request
❌ No ORM query runs without tenant filter
❌ No cache key is shared across tenants
❌ No background job leaks context
❌ No file storage allows cross-tenant access
✅ Tenant derived from authenticated session
✅ Row-level security enforced at DB level
✅ Tenant-aware query layer implemented
✅ Cross-tenant test matrix executed
If even one ❌ exists, isolation is broken.
Why Traditional Security Testing Misses This
Most tools are designed for:
Known vulnerability patterns
Static analysis rules
Dependency issues
But multi-tenant breaches are:
Context-dependent
Logic-driven
Cross-layer issues
Which means:
They don’t show up as “vulnerabilities,” until they become breaches.
Final Thought: If You Can’t Prove Isolation, You Don’t Have It
Most teams assume tenant isolation exists. Very few actually test it the way an attacker would. And that’s the gap. Because in multi-tenant SaaS:
Security is not about preventing access. It’s about preventing the wrong access.
The only testing methodology that reliably catches this is authenticated, gray box penetration testing with explicit cross-tenant test cases across every layer of the isolation stack, combined with white box analysis that reads every ORM query to find the ones missing their tenant filter. That's exactly what CodeAnt AI's Full Assessment delivers.
If no CVSS 9+ critical finding or active data exposure is confirmed, you pay nothing. Testing starts within 24 hours.
FAQs
What is multi-tenant SaaS security?
What is cross-tenant IDOR in SaaS applications?
Why is tenant isolation difficult to implement correctly?
What are the most common vulnerabilities in multi-tenant SaaS systems?
How do you test tenant isolation in SaaS applications?
Table of Contents
Start Your 14-Day Free Trial
AI code reviews, security, and quality trusted by modern engineering teams. No credit card required!
Share blog:











