### tests/test_utils.py
import pytest

from jwt.utils import force_bytes, from_base64url_uint, to_base64url_uint


@pytest.mark.parametrize(
    "inputval,expected",
    [
        (0, b"AA"),
        (1, b"AQ"),
        (255, b"_w"),
        (65537, b"AQAB"),
        (123456789, b"B1vNFQ"),
        pytest.param(-1, "", marks=pytest.mark.xfail(raises=ValueError)),
    ],
)
def test_to_base64url_uint(inputval, expected):
    actual = to_base64url_uint(inputval)
    assert actual == expected


@pytest.mark.parametrize(
    "inputval,expected",
    [
        (b"AA", 0),
        (b"AQ", 1),
        (b"_w", 255),
        (b"AQAB", 65537),
        (b"B1vNFQ", 123456789),
    ],
)
def test_from_base64url_uint(inputval, expected):
    actual = from_base64url_uint(inputval)
    assert actual == expected


def test_force_bytes_raises_error_on_invalid_object():
    with pytest.raises(TypeError):
        force_bytes({})  # type: ignore[arg-type]
### tests/test_algorithms.py
import base64
import json
from typing import Any, cast

import pytest

from jwt.algorithms import HMACAlgorithm, NoneAlgorithm, has_crypto
from jwt.exceptions import InvalidKeyError
from jwt.utils import base64url_decode

from .keys import load_ec_pub_key_p_521, load_hmac_key, load_rsa_pub_key
from .utils import crypto_required, key_path

if has_crypto:
    from cryptography.hazmat.primitives.asymmetric.ec import (
        EllipticCurvePrivateKey,
        EllipticCurvePublicKey,
    )
    from cryptography.hazmat.primitives.asymmetric.ed448 import (
        Ed448PrivateKey,
        Ed448PublicKey,
    )
    from cryptography.hazmat.primitives.asymmetric.ed25519 import (
        Ed25519PrivateKey,
        Ed25519PublicKey,
    )
    from cryptography.hazmat.primitives.asymmetric.rsa import (
        RSAPrivateKey,
        RSAPublicKey,
    )

    from jwt.algorithms import ECAlgorithm, OKPAlgorithm, RSAAlgorithm, RSAPSSAlgorithm


class TestAlgorithms:
    def test_none_algorithm_should_throw_exception_if_key_is_not_none(self):
        algo = NoneAlgorithm()

        with pytest.raises(InvalidKeyError):
            algo.prepare_key("123")

    def test_none_algorithm_should_throw_exception_on_to_jwk(self):
        algo = NoneAlgorithm()

        with pytest.raises(NotImplementedError):
            algo.to_jwk("dummy")  # Using a dummy argument as is it not relevant

    def test_none_algorithm_should_throw_exception_on_from_jwk(self):
        algo = NoneAlgorithm()

        with pytest.raises(NotImplementedError):
            algo.from_jwk({})  # Using a dummy argument as is it not relevant

    def test_hmac_should_reject_nonstring_key(self):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)

        with pytest.raises(TypeError) as context:
            algo.prepare_key(object())  # type: ignore[arg-type]

        exception = context.value
        assert str(exception) == "Expected a string value"

    def test_hmac_should_accept_unicode_key(self):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)

        algo.prepare_key("awesome")

    @pytest.mark.parametrize(
        "key",
        [
            "testkey2_rsa.pub.pem",
            "testkey2_rsa.pub.pem",
            "testkey_pkcs1.pub.pem",
            "testkey_rsa.cer",
            "testkey_rsa.pub",
        ],
    )
    def test_hmac_should_throw_exception(self, key):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)

        with pytest.raises(InvalidKeyError):
            with open(key_path(key)) as keyfile:
                algo.prepare_key(keyfile.read())

    def test_hmac_jwk_should_parse_and_verify(self):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)

        with open(key_path("jwk_hmac.json")) as keyfile:
            key = algo.from_jwk(keyfile.read())

        signature = algo.sign(b"Hello World!", key)
        assert algo.verify(b"Hello World!", key, signature)

    @pytest.mark.parametrize("as_dict", (False, True))
    def test_hmac_to_jwk_returns_correct_values(self, as_dict):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)
        key: Any = algo.to_jwk("secret", as_dict=as_dict)

        if not as_dict:
            key = json.loads(key)

        assert key == {"kty": "oct", "k": "c2VjcmV0"}

    def test_hmac_from_jwk_should_raise_exception_if_not_hmac_key(self):
        algo = HMACAlgorithm(HMACAlgorithm.SHA256)

        with open(key_path("jwk_rsa_pub.json")) as keyfile:
            with pytest.raises(InvalidKeyError):
                algo.from_jwk(keyfile.read())

    @crypto_required
    def test_rsa_should_parse_pem_public_key(self):
        algo = RSAAlgorithm(RSAAlgorithm.SHA256)

        with open(key_path("testkey2_rsa.pub.pem")) as pem_key:
            algo.prepare_key(pem_key.read())

    @crypto_required
    def test_rsa_should_accept_pem_private_key_bytes(self):
        algo = RSAAlgorithm(RSAAlgorithm.SHA256)

        with open(key_path("testkey_rsa.priv"), "rb") as pem_key:
            algo.prepare_key(pem_key.read())

    @crypto_required
    def test_rsa_should_accept_unicode_key(self):
        algo = RSAAlgorithm(RSAAlgorithm.SHA256)

        with open(key_path("testkey_rsa.priv")) as rsa_key:
            algo.prepare_key(rsa_key.read())

    @crypto_required
    def test_rsa_should_reject_non_string_key(self):
        algo = RSAAlgorithm(RSAAlgorithm.SHA256)

        with pytest.raises(TypeError):
            algo.prepare_key(None)  # type: ignore[arg-type]

    @crypto_required
    def test_rsa_verify_should_return_false_if_signature_invalid(self):
        algo = RSAAlgorithm(RSAAlgorithm.SHA256)

        message = b"Hello World!"

        sig = base64.b64decode(
            b"yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
            b"10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
            b"2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
            b"sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
            b"fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
            b"APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
        )

        sig += b"123"  # Signature is now invalid

        with open(key_path("testkey_rsa.pub")) as keyfile:
            pub_key = cast(RSAPublicKey, algo.prepare_key(keyfile.read()))

        result = algo.verify(message, pub_key, sig)
        assert not result

    @crypto_required
    def test_ec_jwk_public_and_private_keys_should_parse_and_verify(self):
        tests = {
            "P-256": ECAlgorithm.SHA256,
            "P-384": ECAlgorithm.SHA384,
            "P-521": ECAlgorithm.SHA512,
            "secp256k1": ECAlgorithm.SHA256,
        }
        for curve, hash in tests.items():
            algo = ECAlgorithm(hash)

            with open(key_path(f"jwk_ec_pub_{curve}.json")) as keyfile:
                pub_key = cast(EllipticCurvePublicKey, algo.from_jwk(keyfile.read()))

            with open(key_path(f"jwk_ec_key_{curve}.json")) as keyfile:
                priv_key = cast(EllipticCurvePrivateKey, algo.from_jwk(keyfile.read()))

            signature = algo.sign(b"Hello World!", priv_key)
            assert algo.verify(b"Hello World!", pub_key, signature)

    @crypto_required
    def test_ec_jwk_fails_on_invalid_json(self):
        algo = ECAlgorithm(ECAlgorithm.SHA512)

        valid_points = {
            "P-256": {
                "x": "PTTjIY84aLtaZCxLTrG_d8I0G6YKCV7lg8M4xkKfwQ4",
                "y": "ank6KA34vv24HZLXlChVs85NEGlpg2sbqNmR_BcgyJU",
            },
            "P-384": {
                "x": "IDC-5s6FERlbC4Nc_4JhKW8sd51AhixtMdNUtPxhRFP323QY6cwWeIA3leyZhz-J",
                "y": "eovmN9ocANS8IJxDAGSuC1FehTq5ZFLJU7XSPg36zHpv4H2byKGEcCBiwT4sFJsy",
            },
            "P-521": {
                "x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt",
                "y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1",
            },
            "secp256k1": {
                "x": "MLnVyPDPQpNm0KaaO4iEh0i8JItHXJE0NcIe8GK1SYs",
                "y": "7r8d-xF7QAgT5kSRdly6M8xeg4Jz83Gs_CQPQRH65QI",
            },
        }

        # Invalid JSON
        with pytest.raises(InvalidKeyError):
            algo.from_jwk("<this isn't json>")

        # Bad key type
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "RSA"}')

        # Missing data
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "EC"}')
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "EC", "x": "1"}')
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "EC", "y": "1"}')

        # Missing curve
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "EC", "x": "dGVzdA==", "y": "dGVzdA=="}')

        # EC coordinates not equally long
        with pytest.raises(InvalidKeyError):
            algo.from_jwk('{"kty": "EC", "x": "dGVzdHRlc3Q=", "y": "dGVzdA=="}')

        # EC coordinates length invalid
        for curve in ("P-256", "P-384", "P-521", "secp256k1"):
            with pytest.raises(InvalidKeyError):
                algo.from_jwk(
                    f'{{"kty": "EC", "crv": "{curve}", "x": "dGVzdA==", "y": "dGVzdA=="}}'
                )

        # EC private key length invalid
        for curve, point in valid_points.items():
            with pytest.raises(InvalidKeyError):
                algo.from_jwk(
                    f'{{"kty": "EC", "crv": "{curve}", "x": "{point["x"]}", "y": "{point["y"]}", "d": "dGVzdA=="}}'
                )

    @crypto_required
    def test_ec_private_key_to_jwk_works_with_from_jwk(self):
        algo = ECAlgorithm(ECAlgorithm.SHA256)

        with open(key_path("testkey_ec.priv")) as ec_key:
            orig_key = cast(EllipticCurvePrivateKey, algo.prepare_key(ec_key.read()))

        parsed_key = cast(EllipticCurvePrivateKey, algo.from_jwk(algo.to_jwk(orig_key)))
        assert parsed_key.private_numbers() == orig_key.private_numbers()
        assert (
            parsed_key.private_numbers().public_numbers
            == orig_key.private_numbers().public_numbers
        )

    @crypto_required
    def test_ec_public_key_to_jwk_works_with_from_jwk(self):
        algo = ECAlgorithm(ECAlgorithm.SHA256)

### tests/test_api_jws.py
import json
from decimal import Decimal

import pytest

from jwt.algorithms import NoneAlgorithm, has_crypto
from jwt.api_jws import PyJWS
from jwt.exceptions import (
    DecodeError,
    InvalidAlgorithmError,
    InvalidSignatureError,
    InvalidTokenError,
)
from jwt.utils import base64url_decode
from jwt.warnings import RemovedInPyjwt3Warning

from .utils import crypto_required, key_path, no_crypto_required

try:
    from cryptography.hazmat.primitives.serialization import (
        load_pem_private_key,
        load_pem_public_key,
        load_ssh_public_key,
    )
except ModuleNotFoundError:
    pass


@pytest.fixture
def jws():
    return PyJWS()


@pytest.fixture
def payload():
    """Creates a sample jws claimset for use as a payload during tests"""
    return b"hello world"


class TestJWS:
    def test_register_algo_does_not_allow_duplicate_registration(self, jws):
        jws.register_algorithm("AAA", NoneAlgorithm())

        with pytest.raises(ValueError):
            jws.register_algorithm("AAA", NoneAlgorithm())

    def test_register_algo_rejects_non_algorithm_obj(self, jws):
        with pytest.raises(TypeError):
            jws.register_algorithm("AAA123", {})

    def test_unregister_algo_removes_algorithm(self, jws):
        supported = jws.get_algorithms()
        assert "none" in supported
        assert "HS256" in supported

        jws.unregister_algorithm("HS256")

        supported = jws.get_algorithms()
        assert "HS256" not in supported

    def test_unregister_algo_throws_error_if_not_registered(self, jws):
        with pytest.raises(KeyError):
            jws.unregister_algorithm("AAA")

    def test_algo_parameter_removes_alg_from_algorithms_list(self, jws):
        assert "none" in jws.get_algorithms()
        assert "HS256" in jws.get_algorithms()

        jws = PyJWS(algorithms=["HS256"])
        assert "none" not in jws.get_algorithms()
        assert "HS256" in jws.get_algorithms()

    def test_override_options(self):
        jws = PyJWS(options={"verify_signature": False})

        assert not jws.options["verify_signature"]

    def test_non_object_options_dont_persist(self, jws, payload):
        token = jws.encode(payload, "secret")

        jws.decode(token, "secret", options={"verify_signature": False})

        assert jws.options["verify_signature"]

    def test_options_must_be_dict(self):
        pytest.raises(TypeError, PyJWS, options=object())
        pytest.raises((TypeError, ValueError), PyJWS, options=("something"))

    def test_encode_decode(self, jws, payload):
        secret = "secret"
        jws_message = jws.encode(payload, secret, algorithm="HS256")
        decoded_payload = jws.decode(jws_message, secret, algorithms=["HS256"])

        assert decoded_payload == payload

    def test_decode_fails_when_alg_is_not_on_method_algorithms_param(
        self, jws, payload
    ):
        secret = "secret"
        jws_token = jws.encode(payload, secret, algorithm="HS256")
        jws.decode(jws_token, secret, algorithms=["HS256"])

        with pytest.raises(InvalidAlgorithmError):
            jws.decode(jws_token, secret, algorithms=["HS384"])

    def test_decode_works_with_unicode_token(self, jws):
        secret = "secret"
        unicode_jws = (
            "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
            ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
            ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
        )

        jws.decode(unicode_jws, secret, algorithms=["HS256"])

    def test_decode_missing_segments_throws_exception(self, jws):
        secret = "secret"
        example_jws = "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJoZWxsbyI6ICJ3b3JsZCJ9"  # Missing segment

        with pytest.raises(DecodeError) as context:
            jws.decode(example_jws, secret, algorithms=["HS256"])

        exception = context.value
        assert str(exception) == "Not enough segments"

    def test_decode_invalid_token_type_is_none(self, jws):
        example_jws = None
        example_secret = "secret"

        with pytest.raises(DecodeError) as context:
            jws.decode(example_jws, example_secret, algorithms=["HS256"])

        exception = context.value
        assert "Invalid token type" in str(exception)

    def test_decode_invalid_token_type_is_int(self, jws):
        example_jws = 123
        example_secret = "secret"

        with pytest.raises(DecodeError) as context:
            jws.decode(example_jws, example_secret, algorithms=["HS256"])

        exception = context.value
        assert "Invalid token type" in str(exception)

    def test_decode_with_no<response clipped><NOTE>Due to the max output limit, only part of the full response has been shown to you.</NOTE> with mock.patch("urllib.request.urlopen") as urlopen_mock:
        urlopen_mock.side_effect = URLError("Fail to process the request.")
        yield urlopen_mock


@contextlib.contextmanager
def mocked_first_call_wrong_kid_second_call_correct_kid(
    response_data_one, response_data_two
):
    with mock.patch("urllib.request.urlopen") as urlopen_mock:
        response = mock.Mock()
        response.__enter__ = mock.Mock(return_value=response)
        response.__exit__ = mock.Mock()
        response.read.side_effect = [
            json.dumps(response_data_one),
            json.dumps(response_data_two),
        ]
        urlopen_mock.return_value = response
        yield urlopen_mock


@contextlib.contextmanager
def mocked_timeout():
    with mock.patch("urllib.request.urlopen") as urlopen_mock:
        urlopen_mock.side_effect = TimeoutError("timed out")
        yield urlopen_mock


@crypto_required
class TestPyJWKClient:
    def test_fetch_data_forwards_headers_to_correct_url(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID) as mock_request:
            custom_headers = {"User-agent": "my-custom-agent"}
            jwks_client = PyJWKClient(url, headers=custom_headers)
            jwk_set = jwks_client.get_jwk_set()
            request_params = mock_request.call_args[0][0]
            assert request_params.full_url == url
            assert request_params.headers == custom_headers

        assert len(jwk_set.keys) == 1

    def test_get_jwk_set(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client = PyJWKClient(url)
            jwk_set = jwks_client.get_jwk_set()

        assert len(jwk_set.keys) == 1

    def test_get_signing_keys(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client = PyJWKClient(url)
            signing_keys = jwks_client.get_signing_keys()

        assert len(signing_keys) == 1
        assert isinstance(signing_keys[0], PyJWK)

    def test_get_signing_keys_if_no_use_provided(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        mocked_key = RESPONSE_DATA_WITH_MATCHING_KID["keys"][0].copy()
        del mocked_key["use"]
        response = {"keys": [mocked_key]}

        with mocked_success_response(response):
            jwks_client = PyJWKClient(url)
            signing_keys = jwks_client.get_signing_keys()

        assert len(signing_keys) == 1
        assert isinstance(signing_keys[0], PyJWK)

    def test_get_signing_keys_raises_if_none_found(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        mocked_key = RESPONSE_DATA_WITH_MATCHING_KID["keys"][0].copy()
        mocked_key["use"] = "enc"
        response = {"keys": [mocked_key]}
        with mocked_success_response(response):
            jwks_client = PyJWKClient(url)

            with pytest.raises(PyJWKClientError) as exc:
                jwks_client.get_signing_keys()

        assert "The JWKS endpoint did not contain any signing keys" in str(exc.value)

    def test_get_signing_key(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"
        kid = "NEE1QURBOTM4MzI5RkFDNTYxOTU1MDg2ODgwQ0UzMTk1QjYyRkRFQw"

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client = PyJWKClient(url)
            signing_key = jwks_client.get_signing_key(kid)

        assert isinstance(signing_key, PyJWK)
        assert signing_key.key_type == "RSA"
        assert signing_key.key_id == kid
        assert signing_key.public_key_use == "sig"

    def test_get_signing_key_caches_result(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"
        kid = "NEE1QURBOTM4MzI5RkFDNTYxOTU1MDg2ODgwQ0UzMTk1QjYyRkRFQw"

        jwks_client = PyJWKClient(url, cache_keys=True)

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client.get_signing_key(kid)

        # mocked_response does not allow urllib.request.urlopen to be called twice
        # so a second mock is needed
        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID) as repeated_call:
            jwks_client.get_signing_key(kid)

        assert repeated_call.call_count == 0

    def test_get_signing_key_does_not_cache_opt_out(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"
        kid = "NEE1QURBOTM4MzI5RkFDNTYxOTU1MDg2ODgwQ0UzMTk1QjYyRkRFQw"

        jwks_client = PyJWKClient(url, cache_jwk_set=False)

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client.get_signing_key(kid)

        # mocked_response does not allow urllib.request.urlopen to be called twice
        # so a second mock is needed
        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID) as repeated_call:
            jwks_client.get_signing_key(kid)

        assert repeated_call.call_count == 1

    def test_get_signing_key_from_jwt(self):
        token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6Ik5FRTFRVVJCT1RNNE16STVSa0ZETlRZeE9UVTFNRGcyT0Rnd1EwVXpNVGsxUWpZeVJrUkZRdyJ9.eyJpc3MiOiJodHRwczovL2Rldi04N2V2eDlydS5hdXRoMC5jb20vIiwic3ViIjoiYVc0Q2NhNzl4UmVMV1V6MGFFMkg2a0QwTzNjWEJWdENAY2xpZW50cyIsImF1ZCI6Imh0dHBzOi8vZXhwZW5zZXMtYXBpIiwiaWF0IjoxNTcyMDA2OTU0LCJleHAiOjE1NzIwMDY5NjQsImF6cCI6ImFXNENjYTc5eFJlTFdVejBhRTJINmtEME8zY1hCVnRDIiwiZ3R5IjoiY2xpZW50LWNyZWRlbnRpYWxzIn0.PUxE7xn52aTCohGiWoSdMBZGiYAHwE5FYie0Y1qUT68IHSTXwXVd6hn02HTah6epvHHVKA2FqcFZ4GGv5VTHEvYpeggiiZMgbxFrmTEY0csL6VNkX1eaJGcuehwQCRBKRLL3zKmA5IKGy5GeUnIbpPHLHDxr-GXvgFzsdsyWlVQvPX2xjeaQ217r2PtxDeqjlf66UYl6oY6AqNS8DH3iryCvIfCcybRZkc_hdy-6ZMoKT6Piijvk_aXdm7-QQqKJFHLuEqrVSOuBqqiNfVrG27QzAPuPOxvfXTVLXL2jek5meH6n-VWgrBdoMFH93QEszEDowDAEhQPHVs0xj7SIzA"
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client = PyJWKClient(url)
            signing_key = jwks_client.get_signing_key_from_jwt(token)

        data = jwt.decode(
            token,
            signing_key.key,
            algorithms=["RS256"],
            audience="https://expenses-api",
            options={"verify_exp": False},
        )

        assert data == {
            "iss": "https://dev-87evx9ru.auth0.com/",
            "sub": "aW4Cca79xReLWUz0aE2H6kD0O3cXBVtC@clients",
            "aud": "https://expenses-api",
            "iat": 1572006954,
            "exp": 1572006964,
            "azp": "aW4Cca79xReLWUz0aE2H6kD0O3cXBVtC",
            "gty": "client-credentials",
        }

    def test_get_jwk_set_caches_result(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        jwks_client = PyJWKClient(url)
        assert jwks_client.jwk_set_cache is not None

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client.get_jwk_set()

        # mocked_response does not allow urllib.request.urlopen to be called twice
        # so a second mock is needed
        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID) as repeated_call:
            jwks_client.get_jwk_set()

        assert repeated_call.call_count == 0

    def test_get_jwt_set_cache_expired_result(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        jwks_client = PyJWKClient(url, lifespan=1)
        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
            jwks_client.get_jwk_set()

        time.sleep(2)

        # mocked_response does not allow urllib.request.urlopen to be called twice
        # so a second mock is needed
        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID) as repeated_call:
            jwks_client.get_jwk_set()

        assert repeated_call.call_count == 1

    def test_get_jwt_set_cache_disabled(self):
        url = "https://dev-87evx9ru.auth0.com/.well-known/jwks.json"

        jwks_client = PyJWKClient(url, cache_jwk_set=False)
        assert jwks_client.jwk_set_cache is None

        with mocked_success_response(RESPONSE_DATA_WITH_MATCHING_KID):
### tests/test_advisory.py
import pytest

import jwt
from jwt.algorithms import get_default_algorithms
from jwt.exceptions import InvalidKeyError

from .utils import crypto_required

priv_key_bytes = b"""-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEIIbBhdo2ah7X32i50GOzrCr4acZTe6BezUdRIixjTAdL
-----END PRIVATE KEY-----"""

pub_key_bytes = (
    b"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIPL1I9oiq+B8crkmuV4YViiUnhdLjCp3hvy1bNGuGfNL"
)

ssh_priv_key_bytes = b"""-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIOWc7RbaNswMtNtc+n6WZDlUblMr2FBPo79fcGXsJlGQoAoGCCqGSM49
AwEHoUQDQgAElcy2RSSSgn2RA/xCGko79N+7FwoLZr3Z0ij/ENjow2XpUDwwKEKk
Ak3TDXC9U8nipMlGcY7sDpXp2XyhHEM+Rw==
-----END EC PRIVATE KEY-----"""

ssh_key_bytes = b"""ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJXMtkUkkoJ9kQP8QhpKO/TfuxcKC2a92dIo/xDY6MNl6VA8MChCpAJN0w1wvVPJ4qTJRnGO7A6V6dl8oRxDPkc="""


class TestAdvisory:
    @crypto_required
    def test_ghsa_ffqj_6fqr_9h24(self):
        # Generate ed25519 private key
        # private_key = ed25519.Ed25519PrivateKey.generate()

        # Get private key bytes as they would be stored in a file
        # priv_key_bytes = private_key.private_bytes(
        #     encoding=serialization.Encoding.PEM,
        #     format=serialization.PrivateFormat.PKCS8,
        #     encryption_algorithm=serialization.NoEncryption(),
        # )

        # Get public key bytes as they would be stored in a file
        # pub_key_bytes = private_key.public_key().public_bytes(
        #     encoding=serialization.Encoding.OpenSSH,
        #     format=serialization.PublicFormat.OpenSSH,
        # )

        # Making a good jwt token that should work by signing it
        # with the private key
        # encoded_good = jwt.encode({"test": 1234}, priv_key_bytes, algorithm="EdDSA")
        encoded_good = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ0ZXN0IjoxMjM0fQ.M5y1EEavZkHSlj9i8yi9nXKKyPBSAUhDRTOYZi3zZY11tZItDaR3qwAye8pc74_lZY3Ogt9KPNFbVOSGnUBHDg"

        # Using HMAC with the public key to trick the receiver to think that the
        # public key is a HMAC secret
        encoded_bad = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0ZXN0IjoxMjM0fQ.6ulDpqSlbHmQ8bZXhZRLFko9SwcHrghCwh8d-exJEE4"

        algorithm_names = list(get_default_algorithms())

        # Both of the jwt tokens are validated as valid
        jwt.decode(
            encoded_good,
            pub_key_bytes,
            algorithms=algorithm_names,
        )

        with pytest.raises(InvalidKeyError):
            jwt.decode(
                encoded_bad,
                pub_key_bytes,
                algorithms=algorithm_names,
            )

        # Of course the receiver should specify ed25519 algorithm to be used if
        # they specify ed25519 public key. However, if other algorithms are used,
        # the POC does not work
        # HMAC specifies illegal strings for the HMAC secret in jwt/algorithms.py
        #
        #        invalid_str ings = [
        #            b"-----BEGIN PUBLIC KEY-----",
        #            b"-----BEGIN CERTIFICATE-----",
        #            b"-----BEGIN RSA PUBLIC KEY-----",
        #            b"ssh-rsa",
        #        ]
        #
        # However, OKPAlgorithm (ed25519) accepts the following in  jwt/algorithms.py:
        #
        #                if "-----BEGIN PUBLIC" in str_key:
        #                    return load_pem_public_key(key)
        #                if "-----BEGIN PRIVATE" in str_key:
        #                    return load_pem_private_key(key, password=None)
        #                if str_key[0:4] == "ssh-":
        #                    return load_ssh_public_key(key)
        #
        # These should most likely made to match each other to prevent this behavior

        # POC for the ecdsa-sha2-nistp256 format.
        # openssl ecparam -genkey -name prime256v1 -noout -out ec256-key-priv.pem
        # openssl ec -in ec256-key-priv.pem -pubout > ec256-key-pub.pem
        # ssh-keygen -y -f ec256-key-priv.pem > ec256-key-ssh.pub

        # Making a good jwt token that should work by signing it with the private key
        # encoded_good = jwt.encode({"test": 1234}, ssh_priv_key_bytes, algorithm="ES256")
        encoded_good = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZXN0IjoxMjM0fQ.NX42mS8cNqYoL3FOW9ZcKw8Nfq2mb6GqJVADeMA1-kyHAclilYo_edhdM_5eav9tBRQTlL0XMeu_WFE_mz3OXg"

        # Using HMAC with the ssh public key to trick the receiver to think that the public key is a HMAC secret
        # encoded_bad = jwt.encode({"test": 1234}, ssh_key_bytes, algorithm="HS256")
        encoded_bad = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZXN0IjoxMjM0fQ.5eYfbrbeGYmWfypQ6rMWXNZ8bdHcqKng5GPr9MJZITU"

        algorithm_names = list(get_default_algorithms())
        # Both of the jwt tokens are validated as valid
        jwt.decode(
            encoded_good,
            ssh_key_bytes,
            algorithms=algorithm_names,
        )

        with pytest.raises(InvalidKeyError):
            jwt.decode(
                encoded_bad,
                ssh_key_bytes,
                algorithms=algorithm_names,
            )
### tests/test_compressed_jwt.py
import json
import zlib

from jwt import PyJWT


class CompressedPyJWT(PyJWT):
    def _decode_payload(self, decoded):
        return json.loads(
            # wbits=-15 has zlib not worry about headers of crc's
            zlib.decompress(decoded["payload"], wbits=-15).decode("utf-8")
        )


def test_decodes_complete_valid_jwt_with_compressed_payload():
    # Test case from https://github.com/jpadilla/pyjwt/pull/753/files
    example_payload = {"hello": "world"}
    example_secret = "secret"
    # payload made with the pako (https://nodeca.github.io/pako/) library in Javascript:
    # Buffer.from(pako.deflateRaw('{"hello": "world"}')).toString('base64')
    example_jwt = (
        b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
        b".q1bKSM3JyVeyUlAqzy/KSVGqBQA="
        b".08wHYeuh1rJXmcBcMrz6NxmbxAnCQp2rGTKfRNIkxiw="
    )
    decoded = CompressedPyJWT().decode_complete(
        example_jwt, example_secret, algorithms=["HS256"]
    )

    assert decoded == {
        "header": {"alg": "HS256", "typ": "JWT"},
        "payload": example_payload,
        "signature": (
            b"\xd3\xcc\x07a\xeb\xa1\xd6\xb2W\x99\xc0\\2\xbc\xfa7"
            b"\x19\x9b\xc4\t\xc2B\x9d\xab\x192\x9fD\xd2$\xc6,"
        ),
    }
### tests/test_exceptions.py
from jwt.exceptions import MissingRequiredClaimError


def test_missing_required_claim_error_has_proper_str():
    exc = MissingRequiredClaimError("abc")

    assert str(exc) == 'Token is missing the "abc" claim'
[The command completed with exit code 0.]
[Current working directory: /workspace/pyjwt]
[Python interpreter: /usr/bin/python]
[Command finished with exit code 0]