Your budget request for the new company personnel index has been declined. Instead, the intern has received a very small bonus in exchange for a homemade solution.
Show them their stinginess could cost them.
The chall maker forgot to remove a debug account... Here is the revenge challenge without this backdoor!
We're given a Python server that looks like this:
import os
import secrets
import sqlite3
import time
from functools import wraps
import bcrypt
import jwt
from dotenv import load_dotenv
from flask import (
Flask,
flash,
jsonify,
make_response,
redirect,
render_template,
request,
)
app = Flask(__name__)
app.static_folder = "static"
load_dotenv()
app.config["SECRET_KEY"] = "".join(
[secrets.choice("abcdef0123456789") for _ in range(32)]
)
FLAG = os.getenv("FLAG")
def init_db():
conn = sqlite3.connect("database.db")
cursor = conn.cursor()
cursor.execute("""DROP TABLE IF EXISTS employees;""")
cursor.execute("""DROP TABLE IF EXISTS revoked_tokens;""")
cursor.execute("""DROP TABLE IF EXISTS users;""")
cursor.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
is_admin BOOL NOT NULL,
password_hash TEXT NOT NULL)""")
cursor.execute("""CREATE TABLE IF NOT EXISTS revoked_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token TEXT NOT NULL)""")
cursor.execute("""CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
position TEXT NOT NULL,
phone TEXT NOT NULL,
location TEXT NOT NULL)""")
conn.commit()
conn.close()
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get("JWT")
if not token:
flash("Token is missing!", "error")
return redirect("/login")
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
username = data["username"]
conn = get_db_connection()
user = conn.execute(
"SELECT id,is_admin FROM users WHERE username = ?", (username,)
).fetchone()
revoked = conn.execute(
"SELECT id FROM revoked_tokens WHERE token = ?", (token,)
).fetchone()
conn.close()
if not user or revoked:
flash("Invalid or revoked token!", "error")
return redirect("/login")
request.is_admin = user["is_admin"]
request.username = username
except jwt.InvalidTokenError:
flash("Invalid token!", "error")
return redirect("/login")
return f(*args, **kwargs)
return decorated
@app.route("/", methods=["GET"])
def index():
token = request.cookies.get("JWT", None)
if token is None:
return redirect("/login")
else:
return redirect("/employees")
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("register.html")
elif request.method == "POST":
data = request.form
username = data.get("username")
password = data.get("password")
if not username or not password:
return jsonify({"message": "Username and password required!"}), 400
password_hash = bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
conn = get_db_connection()
try:
conn.execute(
"INSERT INTO users (username, is_admin, password_hash) VALUES (?, ?, ?)",
(username, False, password_hash),
)
conn.commit()
except sqlite3.IntegrityError:
flash("User already exists.", "error")
return redirect("/register")
finally:
conn.close()
flash("User created successfully.", "success")
return redirect("/login")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.html")
elif request.method == "POST":
data = request.form
username = data.get("username")
password = data.get("password")
conn = get_db_connection()
user = conn.execute(
"SELECT * FROM users WHERE username = ?", (username,)
).fetchone()
conn.close()
if user and bcrypt.checkpw(
password.encode("utf-8"), user["password_hash"].encode("utf-8")
):
token = jwt.encode(
{
"username": username,
"is_admin": user["is_admin"],
"issued": time.time(),
},
app.config["SECRET_KEY"],
algorithm="HS256",
)
resp = make_response(redirect("/employees"))
resp.set_cookie("JWT", token)
return resp
flash("Invalid credentials.", "error")
return redirect("/login")
@app.route("/logout", methods=["GET"])
def logout():
token = request.cookies.get("JWT")
if token:
conn = get_db_connection()
conn.execute("INSERT INTO revoked_tokens (token) VALUES (?)", (token,))
conn.commit()
conn.close()
resp = make_response(redirect("/login"))
resp.delete_cookie("JWT")
return resp
@app.route("/employees", methods=["GET"])
@token_required
def employees():
query = request.args.get("query", "")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
)
results = cursor.fetchall()
conn.close()
print(request.username)
return render_template("employees.html", username=request.username, employees=results, query=query)
@app.route("/employee/<int:employee_id>", methods=["GET"])
@token_required
def employee_details(employee_id):
conn = get_db_connection()
employee = conn.execute(
"SELECT * FROM employees WHERE id = ?", (employee_id,)
).fetchone()
conn.close()
print(employee)
if not employee:
flash("Employee not found", "error")
return redirect("/employees")
return render_template("employee_details.html", username=request.username, employee=employee)
@app.route("/admin", methods=["GET"])
@token_required
def admin():
is_admin = getattr(request, "is_admin", None)
if is_admin:
return render_template("admin.html", username=request.username, flag=FLAG)
flash("You don't have the permission to access this area", "error")
return redirect("/employees")
if __name__ == "__main__":
init_db()
app.run(debug=False, host="0.0.0.0", port=5000)We can see right off the bat that we have SQL injection in the /employees route:
@app.route("/employees", methods=["GET"])
@token_required
def employees():
query = request.args.get("query", "")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
)
results = cursor.fetchall()
conn.close()
print(request.username)
return render_template("employees.html", username=request.username, employees=results, query=query)We can then use UNION SELECT to leak any part of their database. But what's this about revoked tokens?
In their token_required decorator, we can see that our JWT is decoded and properties injected into the request object so long as:
- The JWT is valid.
- The
usernamecorresponds to an existing user. - The token isn't in the
revoked_tokenstable.
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get("JWT")
if not token:
flash("Token is missing!", "error")
return redirect("/login")
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
username = data["username"]
conn = get_db_connection()
user = conn.execute(
"SELECT id,is_admin FROM users WHERE username = ?", (username,)
).fetchone()
revoked = conn.execute(
"SELECT id FROM revoked_tokens WHERE token = ?", (token,)
).fetchone()
conn.close()
if not user or revoked:
flash("Invalid or revoked token!", "error")
return redirect("/login")
request.is_admin = user["is_admin"]
request.username = username
except jwt.InvalidTokenError:
flash("Invalid token!", "error")
return redirect("/login")
return f(*args, **kwargs)
return decoratedSo the main idea is this: we can leak the revoked_tokens table and (assumedly) find an admin token there. If we can slightly perturb this token such that it is no longer exactly the same token as before (but the token still decodes correctly), we can become admin and win!
@app.route("/admin", methods=["GET"])
@token_required
def admin():
is_admin = getattr(request, "is_admin", None)
if is_admin:
return render_template("admin.html", username=request.username, flag=FLAG)
flash("You don't have the permission to access this area", "error")
return redirect("/employees")We can use our simple SQL injection to leak the revoked_tokens table,
a%'UNION SELECT id,token,token,token FROM revoked_tokens;--and just check tokens until we find one corresponding to an admin (in this case,
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCk
So how do we change this slightly to bypass the revoked_tokens check? What if we simply add an = to the end of it? (after all, = is a padding character in base64 each part of the JWT is base64 encoded, right?)
Setting our JWT cookie to
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCk=
we get the flag:
(this same solve works on both Revoked and Revoked Revenge, as we didn't rely on any shenanigans with any "debug accounts").
Why does this payload actually work? Indeed, if we tried to add an = to the JWT on jwt.io, we'd get the following error message:
The caveat is this: JWT segments aren't base64 encoded, they are base64url encoded. And seemingly from reading the MDN documentation,
A common variant of this definition allows only characters that are safe to use in filenames and URL values. This version, defined in RFC 4648, section 5, omits the padding and replaces
+and/with-and_.
So if base64url omits the padding = characters, why did Python's jwt decode it anyways? I decided to read the PyJWT source to find out:
def decode(
self,
jwt: str | bytes,
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: Sequence[str] | None = None,
options: Options | None = None,
# deprecated arg, remove in pyjwt3
verify: bool | None = None,
# could be used as passthrough to api_jws, consider removal in pyjwt3
detached_payload: bytes | None = None,
# passthrough arguments to _validate_claims
# consider putting in options
audience: str | Iterable[str] | None = None,
subject: str | None = None,
issuer: str | Container[str] | None = None,
leeway: float | timedelta = 0,
# kwargs
**kwargs: Any,
) -> dict[str, Any]:
"""
...
"""
if kwargs:
warnings.warn(
"passing additional kwargs to decode() is deprecated "
"and will be removed in pyjwt version 3. "
f"Unsupported kwargs: {tuple(kwargs.keys())}",
RemovedInPyjwt3Warning,
stacklevel=2,
)
decoded = self.decode_complete(
jwt,
key,
algorithms,
options,
verify=verify,
detached_payload=detached_payload,
audience=audience,
subject=subject,
issuer=issuer,
leeway=leeway,
)
return decoded["payload"]Seemingly, decode(...) just calls decode_complete(...):
def decode_complete(
self,
jwt: str | bytes,
key: AllowedPublicKeyTypes = "",
algorithms: Sequence[str] | None = None,
options: Options | None = None,
# deprecated arg, remove in pyjwt3
verify: bool | None = None,
# could be used as passthrough to api_jws, consider removal in pyjwt3
detached_payload: bytes | None = None,
# passthrough arguments to _validate_claims
# consider putting in options
audience: str | Iterable[str] | None = None,
issuer: str | Container[str] | None = None,
subject: str | None = None,
leeway: float | timedelta = 0,
# kwargs
**kwargs: Any,
) -> dict[str, Any]:
"""
...
"""
if kwargs:
warnings.warn(
"passing additional kwargs to decode_complete() is deprecated "
"and will be removed in pyjwt version 3. "
f"Unsupported kwargs: {tuple(kwargs.keys())}",
RemovedInPyjwt3Warning,
stacklevel=2,
)
if options is None:
verify_signature = True
else:
verify_signature = options.get("verify_signature", True)
# If the user has set the legacy `verify` argument, and it doesn't match
# what the relevant `options` entry for the argument is, inform the user
# that they're likely making a mistake.
if verify is not None and verify != verify_signature:
warnings.warn(
"The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
"The equivalent is setting `verify_signature` to False in the `options` dictionary. "
"This invocation has a mismatch between the kwarg and the option entry.",
category=DeprecationWarning,
stacklevel=2,
)
sig_options: SigOptions = {"verify_signature": verify_signature}
decoded = api_jws.decode_complete(
jwt,
key=key,
algorithms=algorithms,
options=sig_options,
detached_payload=detached_payload,
)
payload = self._decode_payload(decoded)
merged_options = self._merge_options(options)
self._validate_claims(
payload,
merged_options,
audience=audience,
issuer=issuer,
leeway=leeway,
subject=subject,
)
decoded["payload"] = payload
return decodedand in decode_complete(), the decoding of the JWT string is handled by api_jws.decode_complete(...)
def decode_complete(
self,
jwt: str | bytes,
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: Sequence[str] | None = None,
options: SigOptions | None = None,
detached_payload: bytes | None = None,
**kwargs: dict[str, Any],
) -> dict[str, Any]:
if kwargs:
warnings.warn(
"passing additional kwargs to decode_complete() is deprecated "
"and will be removed in pyjwt version 3. "
f"Unsupported kwargs: {tuple(kwargs.keys())}",
RemovedInPyjwt3Warning,
stacklevel=2,
)
merged_options: SigOptions
if options is None:
merged_options = self.options
else:
merged_options = {**self.options, **options}
verify_signature = merged_options["verify_signature"]
if verify_signature and not algorithms and not isinstance(key, PyJWK):
raise DecodeError(
'It is required that you pass in a value for the "algorithms" argument when calling decode().'
)
payload, signing_input, header, signature = self._load(jwt)
if header.get("b64", True) is False:
if detached_payload is None:
raise DecodeError(
'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
)
payload = detached_payload
signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
if verify_signature:
self._verify_signature(signing_input, header, signature, key, algorithms)
return {
"payload": payload,
"header": header,
"signature": signature,
}which calls _load(...)
def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict[str, Any], bytes]:
if isinstance(jwt, str):
jwt = jwt.encode("utf-8")
if not isinstance(jwt, bytes):
raise DecodeError(f"Invalid token type. Token must be a {bytes}")
try:
signing_input, crypto_segment = jwt.rsplit(b".", 1)
header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError as err:
raise DecodeError("Not enough segments") from err
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error) as err:
raise DecodeError("Invalid header padding") from err
try:
header: dict[str, Any] = json.loads(header_data)
except ValueError as e:
raise DecodeError(f"Invalid header string: {e}") from e
if not isinstance(header, dict):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error) as err:
raise DecodeError("Invalid payload padding") from err
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error) as err:
raise DecodeError("Invalid crypto padding") from err
return (payload, signing_input, header, signature)which finally calls base64url_decode(...):
def base64url_decode(input: Union[bytes, str]) -> bytes:
input_bytes = force_bytes(input)
rem = len(input_bytes) % 4
if rem > 0:
input_bytes += b"=" * (4 - rem)
return base64.urlsafe_b64decode(input_bytes)
def base64url_encode(input: bytes) -> bytes:
return base64.urlsafe_b64encode(input).replace(b"=", b"")which, apart from actually adding padding to our input, just hooks into the Python standard library urlsafe_b64 method. Surprisingly, base64.urlsafe_b64decode() makes no mention of omitting padding characters!
Finally, what does the RFC say? According to the linked RFC 4648 section 5,
so padding characters can be skipped? Sometimes?
And the padding issue seemingly isn't just an MDN misunderstanding either; if you google "base64 URL encoder", many sites (including the somewhat well-known base64encode.org, but also the third result on google, this npm library, and this random Medium article I came across) will omit padding characters when encoding base64url:
But others, including the first result that happened to come up on google, don't.
>>> base64.urlsafe_b64encode(b'aaaaaaa')
b'YWFhYWFhYQ=='Hell, even the C# standard library for ASP.NET seems to drop padding characters when encoding:
public static int Base64UrlEncode(byte[] input, int offset, char[] output, int outputOffset, int count)
{
ArgumentNullThrowHelper.ThrowIfNull(input);
ArgumentNullThrowHelper.ThrowIfNull(output);
ValidateParameters(input.Length, nameof(input), offset, count);
ArgumentOutOfRangeThrowHelper.ThrowIfNegative(outputOffset);
var arraySizeRequired = GetArraySizeRequiredToEncode(count);
if (output.Length - outputOffset < arraySizeRequired)
{
throw new ArgumentException(
string.Format(
CultureInfo.CurrentCulture,
EncoderResources.WebEncoders_InvalidCountOffsetOrLength,
nameof(count),
nameof(outputOffset),
nameof(output)),
nameof(count));
}
#if NETCOREAPP
return Base64UrlEncode(input.AsSpan(offset, count), output.AsSpan(outputOffset));
#else
// Special-case empty input.
if (count == 0)
{
return 0;
}
// Use base64url encoding with no padding characters. See RFC 4648, Sec. 5.
// Start with default Base64 encoding.
var numBase64Chars = Convert.ToBase64CharArray(input, offset, count, output, outputOffset);
// Fix up '+' -> '-' and '/' -> '_'. Drop padding characters.
for (var i = outputOffset; i - outputOffset < numBase64Chars; i++)
{
var ch = output[i];
if (ch == '+')
{
output[i] = '-';
}
else if (ch == '/')
{
output[i] = '_';
}
else if (ch == '=')
{
// We've reached a padding character; truncate the remainder.
return i - outputOffset;
}
}
return numBase64Chars;
#endif
}(in C#'s Base64UrlDecode, a comment says to assume that the input contains no padding characters, though at first glance the method appears to work even if padding exists.)
All this to say, I think it's a bit strange that there's so much differing behavior around this spec.






