Building an Asynchronous API Testing Framework with Pytest, Allure, and Auto-Generation
Asynchronous Data Collection
Efficiency is critical in API testing. As the number of endpoints grows, sequential execution becomes a bottleneck. By leveraging Python's coroutines via aiohttp, we can achieve concurrent request execution. Furthermore, separating the data collection phase from the test execution phase allows us to bridge the gap between asynchronous HTTP calls and synchronous test runners like pytest.
The architecture is split into two distinct phases: first, asynchronously reading YAML specifications, firing HTTP requests, and gathering results; second, dynamically generating valid pytest test cases based on the collected data.
Loading YAML Test Specifications
Using aiofiles ensures that file I/O does not block the event loop. The YAML structure maps directly to the parameters required by aiohttp.ClientSession.request, eliminating the need for redundant data transformation.
args:
- post
- /api/v1/resource
kwargs:
-
caseName: Create Resource
data:
identifier: ${gen_random_string(8)}
validator:
-
:
status: successThe asynchronous loader parses the file and evaluates custom template syntax, such as ${function()} for dynamic value generation.
import aiofiles
import yaml
import re
import os
async def load_test_spec(directory='', filename=''):
if directory:
filepath = os.path.join(directory, filename)
async with aiofiles.open(filepath, 'r', encoding='utf-8', errors='ignore') as stream:
content = await stream.read()
spec_data = yaml.safe_load(content)
func_pattern = re.compile(r'^\${([A-Za-z_]+\w*\(.*\))}$')
fallback_func_pattern = re.compile(r'^\${(.*)}$')
default_val_pattern = re.compile(r'^\$\((.*)\)$')
def resolve_syntax(node):
if isinstance(node, (list, tuple)):
for idx, item in enumerate(node):
node[idx] = resolve_syntax(item) or item
elif isinstance(node, dict):
for key, val in node.items():
node[key] = resolve_syntax(val) or val
elif isinstance(node, str):
match = func_pattern.match(node) or fallback_func_pattern.match(node)
if match:
return eval(match.group(1))
match = default_val_pattern.match(node)
if match:
category, key = match.group(1).split(':')
return config_defaults.get(category).get(key)
return node
resolve_syntax(spec_data)
return spec_dataExecuting HTTP Requests
The request handler wraps aiohttp to send API calls asynchronously while attaching necessary authorization headers.
from aiohttp import ClientSession
async def dispatch_request(session, base_url, *args, **kwargs):
http_method, endpoint = args
payload = kwargs.get('data') or kwargs.get('params') or kwargs.get('') or {}
kwargs.setdefault('headers', {}).update({'Authorization': f'Bearer {config.auth_token}'})
target_url = f"{base_url}{endpoint}"
async with session.request(http_method, target_url, **kwargs) as resp:
parsed_response = await process_response(resp)
return {
'response': parsed_response,
'url': target_url,
'payload': payload
}Concurrency Control and Data Aggregation
To prevent overwhelming the server, a semaphore limits concurrent connections. Results are grouped by their directory structure for downstream processing.
from aiohttp import CookieJar
import asyncio
async def execute_concurrently(test_files, loop, semaphore=None):
aggregated_data = {}
jar = CookieJar(unsafe=True)
async with ClientSession(loop=loop, cookie_jar=jar, headers={'Authorization': f'Bearer {config.auth_token}'}) as session:
await authenticate_session(session)
async with semaphore:
for file_path in test_files:
case_data = await process_single_case(session, file_path)
group_key = case_data.pop('group_dir')
aggregated_data.setdefault(group_key, []).append(case_data)
return aggregated_data
async def process_single_case(session, filepath):
project = filepath.split(os.sep)[1]
base_url = config.service_urls.get(project)
spec = await load_test_spec(filename=filepath)
result = {'group_dir': os.path.dirname(filepath), 'endpoint': spec.args[1].replace('/', '_')}
if isinstance(spec.kwargs, list):
for idx, params in enumerate(spec.kwargs):
step_name = params.pop('caseName')
resp_data = await dispatch_request(session, base_url, *spec.args, **params)
resp_data['case_name'] = step_name
result.setdefault('responses', []).append({'response': resp_data, 'validator': spec.validator[idx]})
else:
step_name = spec.kwargs.pop('caseName')
resp_data = await dispatch_request(session, base_url, *spec.args, **spec.kwargs)
resp_data['case_name'] = step_name
result.setdefault('responses', []).append({'response': resp_data, 'validator': spec.validator})
return result
def run_event_loop(test_files):
loop = asyncio.get_event_loop()
concurrency_limit = asyncio.Semaphore(config.max_concurrency)
task = loop.create_task(execute_concurrently(test_files, loop, concurrency_limit))
try:
loop.run_until_complete(task)
finally:
loop.close()
return task.result()Dynamic Pytest Case Generation
Pytest requires test files and functions following specific naming conventions to discover and execute tests. By utilizing a bootstrap test file and a session-scoped fixture, we can intercept the test startup, dynamically write Python test files based on the collected data, invoke a nested pytest run, and clean up the generated files afterward.
# bootstrap_test.py
import pytest
@pytest.mark.usefixtures('generate_dynamic_suite')
class TestBootstrapper(object):
def test_spark(self):
pytest.skip('Bootstrap entry point, execution skipped.')The session-scoped fixture discovers YAML files, triggers the async event loop to fetch all API responses, writes temporary test modules containing parameterized test methods, and executes them.
# conftest.py
import pytest
import os
import re
@pytest.fixture(scope='session')
def generate_dynamic_suite(request):
target_dir = request.config.getoption("--rootdir")
env_setting = request.config.getoption("--te")
spec_files = []
if os.path.isdir(target_dir):
for root, _, files in os.walk(target_dir):
if re.match(r'\w+', root):
spec_files.extend([os.path.join(root, f) for f in files if f.endswith('yml')])
aggregated_results = run_event_loop(spec_files)
module_template = """import allure
from conftest import DynamicCaseMeta
@allure.feature('{} API Validation ({} Project)')
class Test{}API(object, metaclass=DynamicCaseMeta):
collected_data = {}
"""
generated_files = []
for root, _, _ in os.walk(target_dir):
if not ('.' in root or '__' in root):
suite_name = os.path.basename(root)
proj_name = os.path.basename(os.path.dirname(root))
module_path = os.path.join(root, f'test_{suite_name}.py')
with open(module_path, 'w', encoding='utf-8') as writer:
writer.write(module_template.format(suite_name, proj_name, suite_name.title(), aggregated_results.get(root)))
generated_files.append(module_path)
pytest.main(['-v', target_dir, '--alluredir', 'allure-results', '--te', env_setting, '--capture', 'no', '--disable-warnings'])
for temp_file in generated_files:
os.remove(temp_file)
return generated_filesMetaclass for Test Method Injection
A metaclass dynamically attaches test methods to the generated classes. Each method is decorated with pytest.mark.parametrize and Allure reporting steps.
import types
import builtins
import allure
import pytest
METHOD_BODY = """
def {}(self, resp_data, validation_rule):
with allure.step(resp_data.pop('case_name')):
assert_response(resp_data, validation_rule)
"""
def compile_method(expression, scope={}):
builtins.__dict__.update(scope)
compiled_code = compile(expression, '', 'exec')
method_code = [c for c in compiled_code.co_consts if isinstance(c, types.CodeType)][0]
return types.FunctionType(method_code, builtins.__dict__)
class DynamicCaseMeta(type):
def __new__(mcs, cls_name, bases, cls_dict):
test_data = cls_dict.pop('collected_data', [])
for item in test_data:
endpoint_id = item.pop('endpoint')
method_name = f'test_{endpoint_id}'
params = [tuple(v.values()) for v in item.get('responses', [])]
raw_method = compile_method(METHOD_BODY.format(method_name), scope={'assert_response': assert_response, 'allure': allure})
decorated_method = allure.story(endpoint_id.replace('_', '/'))(raw_method)
cls_dict[method_name] = pytest.mark.parametrize('resp_data, validation_rule', params)(decorated_method)
return super().__new__(mcs, cls_name, bases, cls_dict)Automated YAML Scaffolding via Swagger
Manually creating YAML files for numerous endpoints is tedious. By fetching the Swagger JSON specification, we can auto-generate the test scaffolding. The API documentation is typically available at an endpoint like /v2/api-docs.
import os
from requests import Session
YAML_SCAFFOLD = """args:
- {http_method}
- {route}
kwargs:
-
caseName: {description}
{payload_type}:
{payload_body}
validator:
-
:
success: True
"""
def scaffold_from_swagger(api_docs_url, proj_name):
spec = Session().get(api_docs_url).()
routes = spec.get('paths')
workspace = os.getcwd()
proj_dir = os.path.join(workspace, proj_name)
if not os.path.exists(proj_dir):
os.makedirs(proj_dir)
for path, methods in routes.items():
segments = [seg for seg in path.split('/') if seg]
folder = segments[0]
file_name = ''.join([s.title() for s in segments[1:]]) if len(segments) > 1 else folder
file_name += '.yml'
route_dir = os.path.join(proj_dir, folder)
if not os.path.exists(route_dir):
os.makedirs(route_dir)
for http_method, details in methods.items():
payload_type = 'params' if http_method == 'get' else 'data'
parameters = details.get('parameters', [])
payload_body = ''
try:
for param in parameters:
payload_body += f"{param.get('name')}: \n "
except TypeError:
payload_body = '{}'
file_path = os.path.join(route_dir, file_name)
with open(file_path, 'w', encoding='utf-8') as writer:
writer.write(YAML_SCAFFOLD.format(
http_method=http_method,
route=path,
description=details.get('description', 'Auto-generated case'),
payload_type=payload_type,
payload_body=payload_body
))