<h1 align="center">
<a href="https://prompts.chat">
Guidelines for testing Langflow components and backend code, with emphasis on async patterns and robust, well-documented testing practices.
Sign in to like and favorite skills
description: "Guidelines for testing Langflow components and backend code, with emphasis on async patterns and robust, well-documented testing practices." globs:
Guidelines for testing Langflow components and backend code, with emphasis on async patterns and robust, well-documented testing practices.
src/backend/tests/src/backend/tests/unit/components/ (organized by component subdirectory)make integration_tests (requires additional setup)my_component.py → test_my_component.pyclient Fixture (FastAPI Test Client)src/backend/tests/conftest.pyhttpx.AsyncClient connected to the full application via ASGITransport + LifespanManager.async def test_login_endpoint(client): response = await client.post("api/v1/login", data={"username": "foo", "password": "bar"}) assert response.status_code == 200
@pytest.mark.noclient.ComponentTestBase FamilyLocated in
src/backend/tests/base.py.
| Base Class | Creates ? | Typical Use | Notes |
|---|---|---|---|
| No | Core logic for component version testing | Requires you to provide fixtures described below. |
| Yes () | Components that need API access during | Inherit when the component interacts with backend services. |
| No | Pure-logic components with no API calls | Lightweight alternative. |
Required fixtures for subclasses:
component_class → the component class under test.default_kwargs → dict of kwargs to instantiate the component (can be empty).file_names_mapping → list of VersionComponentMapping dicts mapping historical Langflow versions to module/file names.Example skeleton:
from tests.base import ComponentTestBaseWithClient, VersionComponentMapping, DID_NOT_EXIST from langflow.components.my_namespace import MyComponent class TestMyComponent(ComponentTestBaseWithClient): @pytest.fixture def component_class(self): return MyComponent @pytest.fixture def default_kwargs(self): return {"foo": "bar"} @pytest.fixture def file_names_mapping(self): return [ VersionComponentMapping(version="1.1.1", module="my_module", file_name="my_component.py"), # Older versions can be mapped or DID_NOT_EXIST VersionComponentMapping(version="1.0.19", module="my_module", file_name=DID_NOT_EXIST), ]
ComponentTestBase automatically provides:
test_latest_version → Instantiates component via component_class and asserts run() doesn't return None.test_all_versions_have_a_file_name_defined → Ensures mapping completeness vs SUPPORTED_VERSIONS constant (src/backend/tests/constants.py).test_component_versions (parametrised) → Builds component from source for each supported version and asserts successful execution.When adding a new component test, inherit from the correct base class and provide the three fixtures. This greatly reduces boilerplate and enforces version compatibility.
.md extensionimport pytest import asyncio @pytest.mark.asyncio async def test_async_component(): # Test async component methods result = await component.async_method() assert result is not None
@pytest.mark.asyncio async def test_background_task_completion(): # Ensure background tasks complete properly task = asyncio.create_task(component.background_operation()) result = await asyncio.wait_for(task, timeout=5.0) assert result.success
@pytest.mark.asyncio async def test_queue_operations(): # Test queue put/get operations without blocking queue = asyncio.Queue() # Non-blocking put queue.put_nowait(test_data) # Verify queue processing result = await asyncio.wait_for(queue.get(), timeout=1.0) assert result == test_data
no_blockbuster pytest marker to skip blockbuster plugin in tests@pytest.mark.no_blockbustertest_database.py may fail in batch runs but pass individuallyuv run pytest src/backend/tests/unit/test_database.pyasyncio.to_thread scenariosmake unit_tests # Run all backend unit tests make integration_tests # Run integration tests (requires additional setup) make coverage # Run tests with coverage (requires additional setup) make tests_frontend # Run frontend tests (requires additional setup)
# Run specific test file uv run pytest src/backend/tests/unit/test_specific_component.py # Run specific test method uv run pytest src/backend/tests/unit/test_component.py::test_specific_method # Run with verbose output uv run pytest -v src/backend/tests/unit/
def test_component_error_handling(): """Test that component handles errors gracefully.""" with pytest.raises(ExpectedError): component.method_that_should_fail(invalid_input) # Test error message quality try: component.risky_operation() except ComponentError as e: assert "helpful error message" in str(e)
@pytest.fixture async def component_with_cleanup(): """Fixture ensuring proper resource cleanup.""" component = MyComponent() await component.initialize() try: yield component finally: await component.cleanup()
@pytest.mark.asyncio async def test_operation_timeout(): """Test that operations respect timeout constraints.""" start_time = time.time() with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for( component.long_running_operation(), timeout=1.0 ) elapsed = time.time() - start_time assert elapsed < 1.5 # Allow some tolerance
async def test_component_background_processing(): """ Test that component processes items in background without blocking. This test verifies: 1. Items are added to processing queue immediately 2. Background processing completes successfully 3. No deadlocks occur during shutdown 4. All tasks are properly cleaned up """ # Setup component with background processing component = BackgroundComponent() await component.start() try: # Add items for processing for i in range(10): await component.add_item(f"item_{i}") # Verify items are queued (non-blocking) assert component.queue_size() == 10 # Wait for processing to complete await component.wait_for_completion(timeout=5.0) # Verify all items processed assert component.processed_count() == 10 finally: # Ensure clean shutdown await component.stop() assert component.is_stopped()
no_blockbuster, etc.)Test Langflow's
Message objects and chat functionality:
from langflow.schema.message import Message from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER async def test_message_response(self, component_class, default_kwargs): """Test Message object creation and properties.""" component = component_class(**default_kwargs) message = await component.message_response() assert isinstance(message, Message) assert message.text == default_kwargs["input_value"] assert message.sender == default_kwargs["sender"] assert message.sender_name == default_kwargs["sender_name"] assert message.session_id == default_kwargs["session_id"] assert message.files == default_kwargs["files"] # Test message properties structure assert message.properties.model_dump() == { "background_color": default_kwargs["background_color"], "text_color": default_kwargs["text_color"], "icon": default_kwargs["chat_icon"], # ... other expected properties }
Use predefined JSON flows and utility functions:
from tests.unit.build_utils import create_flow, build_flow, get_build_events, consume_and_assert_stream async def test_flow_execution(client, json_memory_chatbot_no_llm, logged_in_headers): """Test flow creation, building, and execution.""" # Create flow from JSON data flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) # Start the build process build_response = await build_flow(client, flow_id, logged_in_headers) job_id = build_response["job_id"] # Get and validate event stream events_response = await get_build_events(client, job_id, logged_in_headers) assert events_response.status_code == codes.OK # Process events and validate structure await consume_and_assert_stream(events_response, job_id)
Test components that need external APIs with proper markers:
@pytest.mark.api_key_required @pytest.mark.no_blockbuster async def test_component_with_external_api(self): """Test component with external API integration.""" api_key = os.getenv("OPENAI_API_KEY") component = MyAPIComponent( api_key=api_key, model_name="gpt-4o", input_value="test input", session_id=str(uuid4()), ) response = await component.message_response() assert response.data.get("text") is not None
Use
MockLanguageModel for testing without external APIs:
from tests.unit.mock_language_model import MockLanguageModel @pytest.fixture def default_kwargs(self): return { "agent_llm": MockLanguageModel(), "input_value": "test message", "session_id": str(uuid4()), # ... other component-specific kwargs }
Use
anyio and aiofiles for async file operations:
import aiofiles import anyio async def test_file_operations(self, component_class, tmp_path): """Test component file handling.""" # Create temporary test file test_file = anyio.Path(tmp_path) / "test.txt" await test_file.write_text("Test content", encoding="utf-8") # Test with file input component = component_class(files=[str(test_file)]) result = await component.run() # Verify file was processed assert len(result.files) == 1 assert await test_file.exists()
Test Langflow's REST API endpoints:
async def test_flows_endpoint(client, logged_in_headers): """Test flow creation via API.""" flow_data = { "name": "Test Flow", "description": "Test description", "data": {"nodes": [], "edges": []} } response = await client.post( "api/v1/flows/", json=flow_data, headers=logged_in_headers ) assert response.status_code == codes.CREATED flow = response.json() assert flow["name"] == "Test Flow" assert "id" in flow
Test component configuration updates:
async def test_build_config_updates(self, component_class, default_kwargs): """Test dynamic build configuration updates.""" component = await self.component_setup(component_class, default_kwargs) frontend_node = component.to_frontend_node() build_config = frontend_node["data"]["node"]["template"] # Test configuration update component.set(llm_provider="OpenAI") updated_config = await component.update_build_config( build_config, "OpenAI", "llm_provider" ) assert updated_config["llm_provider"]["value"] == "OpenAI" assert "gpt-4o" in updated_config["model_name"]["options"]
Test real-time event streaming endpoints:
async def consume_and_validate_events(response, expected_job_id): """Consume NDJSON event stream and validate structure.""" count = 0 first_event_seen = False end_event_seen = False async for line in response.aiter_lines(): if not line: continue parsed = json.loads(line) # Validate job_id in events if "job_id" in parsed: assert parsed["job_id"] == expected_job_id # First event should be vertices_sorted if not first_event_seen: assert parsed["event"] == "vertices_sorted" first_event_seen = True # Track end event elif parsed["event"] == "end": end_event_seen = True count += 1 assert first_event_seen and end_event_seen return count
Test backward compatibility across Langflow versions:
@pytest.fixture def file_names_mapping(self): """Map component files across Langflow versions.""" return [ VersionComponentMapping( version="1.1.1", module="inputs", file_name="chat" ), VersionComponentMapping( version="1.1.0", module="inputs", file_name="chat" ), VersionComponentMapping( version="1.0.19", module="inputs", file_name="ChatInput" ), ]
Test webhook endpoints with proper payload handling:
async def test_webhook_endpoint(client, added_webhook_test): """Test webhook flow execution.""" endpoint_name = added_webhook_test["endpoint_name"] endpoint = f"api/v1/webhook/{endpoint_name}" payload = {"path": "/tmp/test_file.txt"} response = await client.post(endpoint, json=payload) assert response.status_code == 202 # Verify webhook processed the payload # ... additional validation logic
Test error handling by mocking internal functions:
async def test_cancellation_error(client, flow_id, logged_in_headers, monkeypatch): """Test error handling during flow cancellation.""" import langflow.api.v1.chat async def mock_cancel_with_error(*args, **kwargs): raise RuntimeError("Cancellation failed") monkeypatch.setattr( langflow.api.v1.chat, "cancel_flow_build", mock_cancel_with_error ) response = await client.post( f"api/v1/build/{job_id}/cancel", headers=logged_in_headers ) assert response.status_code == codes.INTERNAL_SERVER_ERROR assert "Cancellation failed" in response.json()["detail"]