Quick project to test apidaora
In a python 3.8 virtualenv :
git clone https://gist.github.com/f7e0e24e00415e2295d020fa88d191a8.git test-apidaora
cd test-apidaora
pip install -r requirements.txt
pytest -vv| __pycache__ |
| import os | |
| import pytest | |
| from async_asgi_testclient import TestClient | |
| from apidaora import appdaora | |
| @pytest.fixture | |
| def app(): | |
| _app = appdaora([]) | |
| raise NotImplementedError("override the `app` fixture") | |
| @pytest.fixture | |
| def client(app): | |
| """ | |
| fixture providing a test HTTP client (based on the [requests](https://pypi.org/project/requests/) module) | |
| """ | |
| return TestClient(app) | |
| @pytest.fixture | |
| def redis_url(): | |
| """ | |
| fixture providing the redis url found in the `REDIS` environment variable, or if not found skip the test | |
| """ | |
| url = os.environ.get('REDIS') | |
| if url is None: | |
| pytest.skip("set the `REDIS` environment variable to run this test") | |
| return url |
| import time | |
| from apidaora import route | |
| @route.background('/hello-single-1', lock_args=True) | |
| def hello_task(name: str) -> str: | |
| time.sleep(1) | |
| return f'Hello {name} from module 1!' |
| import time | |
| from apidaora import route | |
| @route.background('/hello-single-2', lock_args=True) | |
| def hello_task(name: str) -> str: | |
| time.sleep(1) | |
| return f'Hello {name} from module 2!' |
| apidaora==0.26.0 | |
| aioredis==1.3.1 | |
| pytest | |
| pytest-asyncio | |
| async-asgi-testclient |
| import pytest | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| from unittest.mock import ANY | |
| from apidaora import appdaora, route | |
| @pytest.fixture | |
| def app(): | |
| @route.background('/hello') | |
| def hello_task(name: str) -> str: | |
| time.sleep(1) | |
| return f'Hello {name}!' | |
| return appdaora(hello_task) | |
| @pytest.mark.asyncio | |
| async def test_background_task(client): | |
| expected_signature = 'aedb1ee4c3c7' | |
| r1 = await client.post("/hello?name=foo") | |
| assert r1.status_code == 202 | |
| data1 = r1.json() | |
| assert data1 == { | |
| 'args_signature': None, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| task_id_1 = data1["task_id"] | |
| r2 = await client.get(f"/hello?task_id={task_id_1}") | |
| assert r2.status_code == 200 | |
| data2 = r2.json() | |
| assert data2 == { | |
| 'args_signature': None, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'running', | |
| 'task_id': task_id_1, | |
| } | |
| await asyncio.sleep(1.1) | |
| r3 = await client.get(f"/hello?task_id={task_id_1}") | |
| assert r3.status_code == 200 | |
| data3 = r3.json() | |
| assert data3 == { | |
| 'args_signature': None, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'finished', | |
| 'task_id': task_id_1, | |
| 'end_time': ANY, | |
| 'result': "Hello foo!" | |
| } | |
| assert datetime.fromisoformat(data3["start_time"]) < datetime.fromisoformat(data3["end_time"]) | |
| r4 = await client.post("/hello?name=bar") | |
| assert r4.status_code == 202 | |
| data4 = r4.json() | |
| assert data4 == { | |
| 'args_signature': None, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| assert task_id_1 != data4["task_id"] |
| import pytest | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| from unittest.mock import ANY | |
| from apidaora import appdaora, route | |
| @pytest.fixture | |
| def app(redis_url): | |
| @route.background('/hello-single', lock_args=True, tasks_repository=redis_url) | |
| def hello_task_single_redis(name: str) -> str: | |
| time.sleep(1) | |
| return f'Hello {name}!' | |
| return appdaora(hello_task_single_redis) | |
| @pytest.mark.asyncio | |
| async def test_background_task(client): | |
| expected_signature = '67c3063580fb' | |
| expected_arg_signature = '5383d9fcb808' | |
| # create a task | |
| r1 = await client.post("/hello-single?name=foo") | |
| assert r1.status_code == 202 | |
| data1 = r1.json() | |
| assert data1 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| task_id = data1["task_id"] | |
| # check we can query status | |
| r2 = await client.get(f"/hello-single?task_id={task_id}") | |
| assert r2.status_code == 200 | |
| data2 = r2.json() | |
| assert data2 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'running', | |
| 'task_id': task_id, | |
| } | |
| # we cannot create a similar task while the first one is not finished | |
| r1_second_try = await client.post("/hello-single?name=foo", allow_redirects=False) | |
| assert r1_second_try.status_code == 303 | |
| assert r1_second_try.headers["location"] == f"hello-single?task_id={task_id}" | |
| # coffe time | |
| await asyncio.sleep(1.1) | |
| # first task should be finished by now | |
| r3 = await client.get(f"/hello-single?task_id={task_id}") | |
| assert r3.status_code == 200 | |
| data3 = r3.json() | |
| assert data3 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'finished', | |
| 'task_id': task_id, | |
| 'end_time': ANY, | |
| 'result': "Hello foo!" | |
| } | |
| assert datetime.fromisoformat(data3["start_time"]) < datetime.fromisoformat(data3["end_time"]) | |
| # we can go ahead a create another similar task | |
| r1_third_try = await client.post("/hello-single?name=foo") | |
| assert r1_third_try.status_code == 202 | |
| data1_third_try = r1_third_try.json() | |
| assert data1_third_try == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| assert task_id != data1_third_try["task_id"] | |
| import pytest | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| from unittest.mock import ANY | |
| from apidaora import appdaora, route | |
| @pytest.fixture | |
| def app(): | |
| @route.background('/hello-single', lock_args=True) | |
| def hello_task_single(name: str) -> str: | |
| time.sleep(1) | |
| return f'Hello {name}!' | |
| return appdaora(hello_task_single) | |
| @pytest.mark.asyncio | |
| async def test_background_task(client): | |
| expected_signature = 'cabf40005ce5' | |
| expected_arg_signature = '5383d9fcb808' | |
| # create a task | |
| r1 = await client.post("/hello-single?name=foo") | |
| assert r1.status_code == 202 | |
| data1 = r1.json() | |
| assert data1 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| task_id = data1["task_id"] | |
| # check we can query status | |
| r2 = await client.get(f"/hello-single?task_id={task_id}") | |
| assert r2.status_code == 200 | |
| data2 = r2.json() | |
| assert data2 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'running', | |
| 'task_id': task_id, | |
| } | |
| # we cannot create a similar task while the first one is not finished | |
| r1_second_try = await client.post("/hello-single?name=foo", allow_redirects=False) | |
| assert r1_second_try.status_code == 303, f"{r1_second_try.status_code}: {r1_second_try.text}" | |
| assert r1_second_try.headers["location"] == f"hello-single?task_id={task_id}" | |
| # coffe time | |
| await asyncio.sleep(1.1) | |
| # first task should be finished by now | |
| r3 = await client.get(f"/hello-single?task_id={task_id}") | |
| assert r3.status_code == 200 | |
| data3 = r3.json() | |
| assert data3 == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': data1["start_time"], | |
| 'status': 'finished', | |
| 'task_id': task_id, | |
| 'end_time': ANY, | |
| 'result': "Hello foo!" | |
| } | |
| assert datetime.fromisoformat(data3["start_time"]) < datetime.fromisoformat(data3["end_time"]) | |
| # we can go ahead a create another similar task | |
| r1_third_try = await client.post("/hello-single?name=foo") | |
| assert r1_third_try.status_code == 202 | |
| assert r1_third_try.status_code == 202 | |
| data1_third_try = r1_third_try.json() | |
| assert data1_third_try == { | |
| 'args_signature': expected_arg_signature, | |
| 'signature': expected_signature, | |
| 'start_time': ANY, | |
| 'status': 'running', | |
| 'task_id': ANY, | |
| } | |
| assert task_id != data1_third_try["task_id"] | |
| import pytest | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| from unittest.mock import ANY | |
| from apidaora import appdaora, route | |
| import module1 | |
| import module2 | |
| @pytest.fixture | |
| def app(): | |
| return appdaora([ | |
| module1.hello_task, | |
| module2.hello_task | |
| ]) | |
| @pytest.mark.parametrize("route_num", [1, 2]) | |
| @pytest.mark.asyncio | |
| async def test_one_route_at_a_time(client, route_num): | |
| r = await client.post(f"/hello-single-{route_num}?name=foo", allow_redirects=False) | |
| assert r.status_code == 202, f"{r.status_code}: {r.text}" | |
| task_id = r.json()["task_id"] | |
| await asyncio.sleep(1.1) | |
| r = await client.get(f"/hello-single-1?task_id={task_id}") | |
| assert r.status_code == 200 | |
| data = r.json() | |
| assert data["status"] == "finished" | |
| assert data["result"] == f"Hello foo from module {route_num}!" | |
| @pytest.mark.asyncio | |
| async def test_both_routes_together(client): | |
| r = await client.post("/hello-single-1?name=foo", allow_redirects=False) | |
| assert r.status_code == 202, f"{r.status_code}: {r.headers!r}" | |
| task_id_1 = r.json()["task_id"] | |
| r = await client.post("/hello-single-2?name=foo", allow_redirects=False) | |
| assert r.status_code == 202, f"{r.status_code}: {r.headers!r}" | |
| task_id_2 = r.json()["task_id"] | |
| await asyncio.sleep(1.1) | |
| r = await client.get(f"/hello-single-1?task_id={task_id_1}") | |
| assert r.status_code == 200 | |
| data = r.json() | |
| assert data["status"] == "finished" | |
| assert data["result"] == "Hello foo from module 1!" | |
| r = await client.get(f"/hello-single-2?task_id={task_id_2}") | |
| assert r.status_code == 200 | |
| data = r.json() | |
| assert data["status"] == "finished" | |
| assert data["result"] == "Hello foo from module 2!" |