Mai-Tai supports multiple authentication methods:
- Email/Password - Traditional registration and login
- OAuth (GitHub/Google) - Social login via NextAuth.js
- API Keys - For agent/MCP server authentication
- IAP (Identity-Aware Proxy) - Google Cloud IAP for production
┌──────────┐ ┌──────────────┐ ┌──────────────────┐
│ User │ │ Frontend │ │ Backend │
└────┬─────┘ └──────┬───────┘ └────────┬─────────┘
│ │ │
│ 1. Enter email │ │
│ + password │ │
│─────────────────►│ │
│ │ │
│ │ 2. POST /auth/login │
│ │ (form data) │
│ │─────────────────────►│
│ │ │
│ │ │ 3. Verify password
│ │ │ with bcrypt
│ │ │
│ │ 4. Return JWT tokens │
│ │◄─────────────────────│
│ │ │
│ 5. Store tokens │ │
│ in localStorage │
│◄─────────────────│ │
│ │ │
│ 6. Redirect to │ │
│ /workspaces │ │
┌──────────┐ ┌──────────────┐ ┌────────────────┐ ┌──────────────────┐
│ User │ │ Frontend │ │ OAuth Provider │ │ Backend │
│ │ │ (NextAuth) │ │(GitHub/Google) │ │ (FastAPI) │
└────┬─────┘ └──────┬───────┘ └───────┬────────┘ └────────┬─────────┘
│ │ │ │
│ 1. Click │ │ │
│ "Sign in with │ │ │
│ GitHub/Google" │ │ │
│─────────────────►│ │ │
│ │ │ │
│ │ 2. Redirect to │ │
│ │ OAuth Provider │ │
│ │────────────────────►│ │
│ │ │ │
│ 3. User authorizes app │ │
│◄───────────────────────────────────────│ │
│ │ │ │
│ │ 4. Callback with │ │
│ │ authorization code │ │
│ │◄────────────────────│ │
│ │ │ │
│ │ 5. Exchange code │ │
│ │ for access token │ │
│ │────────────────────►│ │
│ │ │ │
│ │ 6. Get user profile │ │
│ │◄────────────────────│ │
│ │ │ │
│ │ 7. POST /auth/oauth │ │
│ │ {provider, oauth_id,│ │
│ │ email, name} │ │
│ │────────────────────────────────────────────►│
│ │ │ │
│ │ │ 8. Find/create │
│ │ │ user + workspace│
│ │ │ + API key │
│ │ │ │
│ │ 9. Return JWT tokens│ │
│ │◄────────────────────────────────────────────│
│ │ │ │
│ 10. Store tokens │ │ │
│ Redirect to │ │ │
│ /workspaces │ │ │
│◄─────────────────│ │ │
┌──────────┐ ┌──────────────────┐
│ Agent │ │ Backend │
│ (MCP) │ │ (FastAPI) │
└────┬─────┘ └────────┬─────────┘
│ │
│ 1. Request with │
│ X-API-Key: │
│ mt_xxx... │
│ X-Workspace-ID: │
│ uuid │
│───────────────────►│
│ │
│ │ 2. Hash API key
│ │ with SHA-256
│ │
│ │ 3. Look up hash
│ │ in api_keys table
│ │
│ │ 4. Verify workspace
│ │ ownership
│ │
│ 5. Return data │
│◄───────────────────│
import bcrypt
def get_password_hash(password: str) -> str:
"""Hash a password with bcrypt."""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))from jose import jwt
from datetime import datetime, timedelta, timezone
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=30))
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, settings.secret_key, algorithm="HS256")
def create_refresh_token(data: dict) -> str:
"""Create a JWT refresh token (longer-lived)."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=7)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, settings.secret_key, algorithm="HS256")
def decode_token(token: str) -> dict | None:
"""Decode and verify a JWT token."""
try:
return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
except JWTError:
return Noneasync def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
"""Get the current authenticated user from JWT token."""
# Decode the token
payload = decode_token(token)
if payload is None or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token")
# Get user from database
user_id = payload.get("sub")
result = await db.execute(select(User).where(User.id == UUID(user_id)))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return userasync def get_api_key_auth(
x_api_key: str | None = Header(None),
x_workspace_id: str | None = Header(None),
db: AsyncSession = Depends(get_db),
) -> ApiKeyAuth:
"""Authenticate using X-API-Key header."""
if not x_api_key:
raise HTTPException(status_code=401, detail="X-API-Key required")
# Hash the provided key
key_hash = hashlib.sha256(x_api_key.encode()).hexdigest()
# Look up in database
result = await db.execute(select(ApiKey).where(ApiKey.key_hash == key_hash))
api_key = result.scalar_one_or_none()
if not api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
# Verify workspace ownership...
return ApiKeyAuth(api_key=api_key, workspace=workspace)interface AuthContextType {
user: User | null;
token: string | null;
isLoading: boolean;
login: (token: string, refreshToken: string) => void;
logout: () => void;
refreshUser: () => Promise<void>;
}
export function AuthProvider({ children }: { children: ReactNode }) {
const [user, setUser] = useState<User | null>(null);
const [token, setToken] = useState<string | null>(null);
const login = useCallback((accessToken: string, refreshToken: string) => {
localStorage.setItem('access_token', accessToken);
localStorage.setItem('refresh_token', refreshToken);
setToken(accessToken);
getMe(accessToken).then(setUser);
}, []);
const logout = useCallback(() => {
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
setToken(null);
setUser(null);
}, []);
// On mount, check for existing token
useEffect(() => {
const storedToken = localStorage.getItem('access_token');
if (storedToken) {
setToken(storedToken);
getMe(storedToken).then(setUser);
}
}, []);
return (
<AuthContext.Provider value={{ user, token, isLoading, login, logout, refreshUser }}>
{children}
</AuthContext.Provider>
);
}async function api<T>(endpoint: string, options: ApiOptions = {}): Promise<T> {
const { method = 'GET', body, token } = options;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
const res = await fetch(`${API_URL}${endpoint}`, {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
throw new ApiError(res.status, await res.text());
}
return res.json();
}| Token Type | Purpose | Expiration | Storage |
|---|---|---|---|
| Access Token | API authentication | 30 minutes | localStorage |
| Refresh Token | Get new access token | 7 days | localStorage |
| API Key | Agent/MCP auth | Never (until revoked) | Agent config |
mt_<base64url-encoded-32-bytes>
Example: mt_8tSz2OTk4Wo4izZhl3tTewo7dyEKFUbsU3qecIpaDYY
- Prefix
mt_identifies it as a Mai-Tai key - Only the SHA-256 hash is stored in the database
- Raw key is shown once at creation time
class User(Base):
__tablename__ = "users"
id: UUID
email: str # Unique
name: str
password_hash: str | None # Nullable for OAuth users
avatar_url: str | None
oauth_provider: str | None # "github", "google", or None
oauth_id: str | None # Provider's user ID
is_admin: bool
created_at: datetime
updated_at: datetimeSECRET_KEY=<random-32-bytes> # JWT signing key
ALGORITHM=HS256 # JWT algorithm
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
USE_IAP=false # Enable Google Cloud IAP authNEXT_PUBLIC_API_URL=http://localhost:8000
NEXT_PUBLIC_USE_IAP=false
# OAuth (NextAuth.js)
NEXTAUTH_URL=http://localhost:3000
NEXTAUTH_SECRET=<random-32-bytes>
GITHUB_CLIENT_ID=xxx
GITHUB_CLIENT_SECRET=xxx
GOOGLE_CLIENT_ID=xxx
GOOGLE_CLIENT_SECRET=xxx