Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building an Asynchronous API Testing Framework with Pytest, Allure, and Auto-Generation

Tech 1

Asynchronous Data Collection

Efficiency is critical in API testing. As the number of endpoints grows, sequential execution becomes a bottleneck. By leveraging Python's coroutines via aiohttp, we can achieve concurrent request execution. Furthermore, separating the data collection phase from the test execution phase allows us to bridge the gap between asynchronous HTTP calls and synchronous test runners like pytest.

The architecture is split into two distinct phases: first, asynchronously reading YAML specifications, firing HTTP requests, and gathering results; second, dynamically generating valid pytest test cases based on the collected data.

Loading YAML Test Specifications

Using aiofiles ensures that file I/O does not block the event loop. The YAML structure maps directly to the parameters required by aiohttp.ClientSession.request, eliminating the need for redundant data transformation.

args:
  - post
  - /api/v1/resource
kwargs:
  -
    caseName: Create Resource
    data:
      identifier: ${gen_random_string(8)}
validator:
  -
    :
      status: success

The asynchronous loader parses the file and evaluates custom template syntax, such as ${function()} for dynamic value generation.

import aiofiles
import yaml
import re
import os

async def load_test_spec(directory='', filename=''):
    if directory:
        filepath = os.path.join(directory, filename)
    async with aiofiles.open(filepath, 'r', encoding='utf-8', errors='ignore') as stream:
        content = await stream.read()

    spec_data = yaml.safe_load(content)

    func_pattern = re.compile(r'^\${([A-Za-z_]+\w*\(.*\))}$')
    fallback_func_pattern = re.compile(r'^\${(.*)}$')
    default_val_pattern = re.compile(r'^\$\((.*)\)$')

    def resolve_syntax(node):
        if isinstance(node, (list, tuple)):
            for idx, item in enumerate(node):
                node[idx] = resolve_syntax(item) or item
        elif isinstance(node, dict):
            for key, val in node.items():
                node[key] = resolve_syntax(val) or val
        elif isinstance(node, str):
            match = func_pattern.match(node) or fallback_func_pattern.match(node)
            if match:
                return eval(match.group(1))
            match = default_val_pattern.match(node)
            if match:
                category, key = match.group(1).split(':')
                return config_defaults.get(category).get(key)
            return node

    resolve_syntax(spec_data)
    return spec_data

Executing HTTP Requests

The request handler wraps aiohttp to send API calls asynchronously while attaching necessary authorization headers.

from aiohttp import ClientSession

async def dispatch_request(session, base_url, *args, **kwargs):
    http_method, endpoint = args
        payload = kwargs.get('data') or kwargs.get('params') or kwargs.get('') or {}
    kwargs.setdefault('headers', {}).update({'Authorization': f'Bearer {config.auth_token}'})
    target_url = f"{base_url}{endpoint}"

    async with session.request(http_method, target_url, **kwargs) as resp:
        parsed_response = await process_response(resp)
        return {
            'response': parsed_response,
            'url': target_url,
            'payload': payload
        }

Concurrency Control and Data Aggregation

To prevent overwhelming the server, a semaphore limits concurrent connections. Results are grouped by their directory structure for downstream processing.

from aiohttp import CookieJar
import asyncio

async def execute_concurrently(test_files, loop, semaphore=None):
    aggregated_data = {}
    jar = CookieJar(unsafe=True)
    async with ClientSession(loop=loop, cookie_jar=jar, headers={'Authorization': f'Bearer {config.auth_token}'}) as session:
        await authenticate_session(session)
        async with semaphore:
            for file_path in test_files:
                case_data = await process_single_case(session, file_path)
                group_key = case_data.pop('group_dir')
                aggregated_data.setdefault(group_key, []).append(case_data)
    return aggregated_data

async def process_single_case(session, filepath):
    project = filepath.split(os.sep)[1]
    base_url = config.service_urls.get(project)
    spec = await load_test_spec(filename=filepath)
    result = {'group_dir': os.path.dirname(filepath), 'endpoint': spec.args[1].replace('/', '_')}

    if isinstance(spec.kwargs, list):
        for idx, params in enumerate(spec.kwargs):
            step_name = params.pop('caseName')
            resp_data = await dispatch_request(session, base_url, *spec.args, **params)
            resp_data['case_name'] = step_name
            result.setdefault('responses', []).append({'response': resp_data, 'validator': spec.validator[idx]})
    else:
        step_name = spec.kwargs.pop('caseName')
        resp_data = await dispatch_request(session, base_url, *spec.args, **spec.kwargs)
        resp_data['case_name'] = step_name
        result.setdefault('responses', []).append({'response': resp_data, 'validator': spec.validator})

    return result

def run_event_loop(test_files):
    loop = asyncio.get_event_loop()
    concurrency_limit = asyncio.Semaphore(config.max_concurrency)
    task = loop.create_task(execute_concurrently(test_files, loop, concurrency_limit))
    try:
        loop.run_until_complete(task)
    finally:
        loop.close()
    return task.result()

Dynamic Pytest Case Generation

Pytest requires test files and functions following specific naming conventions to discover and execute tests. By utilizing a bootstrap test file and a session-scoped fixture, we can intercept the test startup, dynamically write Python test files based on the collected data, invoke a nested pytest run, and clean up the generated files afterward.

# bootstrap_test.py
import pytest

@pytest.mark.usefixtures('generate_dynamic_suite')
class TestBootstrapper(object):
    def test_spark(self):
        pytest.skip('Bootstrap entry point, execution skipped.')

The session-scoped fixture discovers YAML files, triggers the async event loop to fetch all API responses, writes temporary test modules containing parameterized test methods, and executes them.

# conftest.py
import pytest
import os
import re

@pytest.fixture(scope='session')
def generate_dynamic_suite(request):
    target_dir = request.config.getoption("--rootdir")
    env_setting = request.config.getoption("--te")
    spec_files = []

    if os.path.isdir(target_dir):
        for root, _, files in os.walk(target_dir):
            if re.match(r'\w+', root):
                spec_files.extend([os.path.join(root, f) for f in files if f.endswith('yml')])

    aggregated_results = run_event_loop(spec_files)

    module_template = """import allure
from conftest import DynamicCaseMeta
@allure.feature('{} API Validation ({} Project)')
class Test{}API(object, metaclass=DynamicCaseMeta):
    collected_data = {}
"""
    generated_files = []

    for root, _, _ in os.walk(target_dir):
        if not ('.' in root or '__' in root):
            suite_name = os.path.basename(root)
            proj_name = os.path.basename(os.path.dirname(root))
            module_path = os.path.join(root, f'test_{suite_name}.py')
            with open(module_path, 'w', encoding='utf-8') as writer:
                writer.write(module_template.format(suite_name, proj_name, suite_name.title(), aggregated_results.get(root)))
            generated_files.append(module_path)

    pytest.main(['-v', target_dir, '--alluredir', 'allure-results', '--te', env_setting, '--capture', 'no', '--disable-warnings'])

    for temp_file in generated_files:
        os.remove(temp_file)

    return generated_files

Metaclass for Test Method Injection

A metaclass dynamically attaches test methods to the generated classes. Each method is decorated with pytest.mark.parametrize and Allure reporting steps.

import types
import builtins
import allure
import pytest

METHOD_BODY = """
def {}(self, resp_data, validation_rule):
    with allure.step(resp_data.pop('case_name')):
        assert_response(resp_data, validation_rule)
"""

def compile_method(expression, scope={}):
    builtins.__dict__.update(scope)
    compiled_code = compile(expression, '', 'exec')
    method_code = [c for c in compiled_code.co_consts if isinstance(c, types.CodeType)][0]
    return types.FunctionType(method_code, builtins.__dict__)

class DynamicCaseMeta(type):
    def __new__(mcs, cls_name, bases, cls_dict):
        test_data = cls_dict.pop('collected_data', [])
        for item in test_data:
            endpoint_id = item.pop('endpoint')
            method_name = f'test_{endpoint_id}'
            params = [tuple(v.values()) for v in item.get('responses', [])]
            raw_method = compile_method(METHOD_BODY.format(method_name), scope={'assert_response': assert_response, 'allure': allure})
            decorated_method = allure.story(endpoint_id.replace('_', '/'))(raw_method)
            cls_dict[method_name] = pytest.mark.parametrize('resp_data, validation_rule', params)(decorated_method)
        return super().__new__(mcs, cls_name, bases, cls_dict)

Automated YAML Scaffolding via Swagger

Manually creating YAML files for numerous endpoints is tedious. By fetching the Swagger JSON specification, we can auto-generate the test scaffolding. The API documentation is typically available at an endpoint like /v2/api-docs.

import os
from requests import Session

YAML_SCAFFOLD = """args:
  - {http_method}
  - {route}
kwargs:
  -
    caseName: {description}
    {payload_type}:
        {payload_body}
validator:
  -
    :
      success: True
"""

def scaffold_from_swagger(api_docs_url, proj_name):
    spec = Session().get(api_docs_url).()
    routes = spec.get('paths')
    workspace = os.getcwd()
    proj_dir = os.path.join(workspace, proj_name)

    if not os.path.exists(proj_dir):
        os.makedirs(proj_dir)

    for path, methods in routes.items():
        segments = [seg for seg in path.split('/') if seg]
        folder = segments[0]
        file_name = ''.join([s.title() for s in segments[1:]]) if len(segments) > 1 else folder
        file_name += '.yml'

        route_dir = os.path.join(proj_dir, folder)
        if not os.path.exists(route_dir):
            os.makedirs(route_dir)

        for http_method, details in methods.items():
            payload_type = 'params' if http_method == 'get' else 'data'
            parameters = details.get('parameters', [])
            payload_body = ''
            try:
                for param in parameters:
                    payload_body += f"{param.get('name')}: \n        "
            except TypeError:
                payload_body = '{}'

            file_path = os.path.join(route_dir, file_name)
            with open(file_path, 'w', encoding='utf-8') as writer:
                writer.write(YAML_SCAFFOLD.format(
                    http_method=http_method,
                    route=path,
                    description=details.get('description', 'Auto-generated case'),
                    payload_type=payload_type,
                    payload_body=payload_body
                ))

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.