Markdown Converter
Agent skill for markdown-converter
File: docs/guide.md
Sign in to like and favorite skills
If you are an AI assistant involved in building LLM Apps, read this guide VERY, VERY carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (
) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning }docs/design.md
These system designs should be a collaboration between humans and AI assistants:
| Stage | Human | AI | Comment |
|---|---|---|---|
| 1. Project Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context best. |
| 2. Utility Functions | ★★☆ Medium | ★★☆ Medium | The human is familiar with external APIs and integrations, and the AI assists with implementation. |
| 3. Flow Design | ★★☆ Medium | ★★☆ Medium | The human identifies complex and ambiguous parts, and the AI helps with redesign. |
| 4. Data Schema | ★☆☆ Low | ★★★ High | The AI assists in designing the data schema based on the flow. |
| 5. Implementation | ★☆☆ Low | ★★★ High | The human identifies complex and ambiguous parts, and the AI helps with redesign. |
| 6. Optimization | ★★☆ Medium | ★★☆ Medium | The human reviews the code and evaluates the results, while the AI helps optimize. |
| 7. Reliability | ★☆☆ Low | ★★★ High | The AI helps write test cases and address corner cases. |
Project Requirements: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. An AI systems are:
If a human can’t solve it, an LLM can’t automate it! Before building an LLM system, thoroughly understand the problem by manually solving example inputs to develop intuition.
{: .best-practice }
Utility Functions: AI system is the decision-maker and relies on external utility functions to:

Start small! Only include the most important ones to begin with! {: .best-practice }
Flow Design (Compute): Create a high-level outline for your application’s flow.
exec: The specific utility function to call (ideally, one function per node)Data Schema (Data): Plan how data will be stored and updated.
prep: How the node reads datapost: How the node writes dataImplementation: Implement nodes and flows based on the design.
Optimization:
Use Intuition: For a quick initial evaluation, human intuition is often a good start.
Redesign Flow (Back to Step 3): Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
If your flow design is already solid, move on to micro-optimizations:
You’ll likely iterate a lot! Expect to repeat Steps 3–6 hundreds of times.
{: .best-practice }
Reliability
exec to ensure outputs meet requirements, and consider increasing max_retries and wait times.my_project/ ├── main.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md
docs/design.md: Contains project documentation for each step above. This should be high-level and no-code.utils/: Contains all utility functions.
call_llm.py or search_web.py.main() function to try that API callflow.py: Implements the application’s flow, starting with node definitions followed by the overall structure.main.py: Serves as the project’s entry point.A 100-line minimalist LLM framework for Agents, Task Decomposition, RAG, etc.
We model the LLM workflow as a Graph + Shared Store:
If you are an AI assistant reading this document, please read all the non-optional and non-advanced parts. If you are helping a human build an LLM app, please carefully review the Development Playbook. {: .warning }
We do not provide built-in utility functions. Example implementations are provided as reference. {: .warning }
Async Nodes implement
prep_async(), exec_async(), exec_fallback_async(), and/or post_async(). This is useful for:
exec_async().Note:
AsyncNode must be wrapped in AsyncFlow. AsyncFlow can also include regular (sync) nodes.
class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main())
Batch makes it easier to handle large inputs in one Node or rerun a Flow multiple times. Example use cases:
A BatchNode extends
Node but changes prep() and exec():
prep(shared): returns an iterable (e.g., list, generator).exec(item): called once per item in that iterable.post(shared, prep_res, exec_res_list): after all items are processed, receives a list of results (exec_res_list) and returns an Action.class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared)
A BatchFlow runs a Flow multiple times, each time with different
params. Think of it as a loop that replays the Flow for each parameter set.
class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared)
prep(shared) returns a list of param dicts—e.g., [{filename: "file1.txt"}, {filename: "file2.txt"}, ...].params.flow.run(shared) using the merged result.You can nest a BatchFlow in another BatchFlow. For instance:
{"directory": "/pathA"}, {"directory": "/pathB"}, ...).At each level, BatchFlow merges its own param dict with the parent’s. By the time you reach the innermost node, the final
params is the merged result of all parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow)
Nodes and Flows communicate in two ways:
Shared Store (recommended)
prep() and post().Params (only for Batch)
params dict passed in by the parent Flow, used as an identifier for tasks. Parameter keys and values shall be immutable.If you know memory management, think of the Shared Store like a heap (shared by all function calls), and Params like a stack (assigned by the caller).
Use
for almost all cases. It's flexible and easy to manage. It separates Data Schema from Compute Logic, making the code easier to maintain.Shared Storeis more a syntax sugar for Batch. {: .best-practice }Params
A shared store is typically an in-mem dictionary, like:
shared = {"data": {}, "summary": {}, "config": {...}, ...}
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared)
Here:
LoadData writes to shared["data"].Summarize reads from shared["data"], summarizes, and writes to shared["summary"].Params let you store per-Node or per-Flow config that doesn't need to live in the shared store. They are:
prep->exec->post).set_params().Only set the uppermost Flow params because others will be overwritten by the parent Flow.
If you need to set child node params, see Batch. {: .warning }
Typically, Params are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
# 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1
A Flow orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the Actions returned from each Node's
post().
Each Node's
post() returns an Action string. By default, if post() doesn't return anything, we treat that as "default".
You define transitions with the syntax:
Basic default transition:
node_a >> node_b
This means if node_a.post() returns "default", go to node_b.
(Equivalent to node_a - "default" >> node_b)
Named action transition:
node_a - "action_name" >> node_b
This means if node_a.post() returns "action_name", go to node_b.
It's possible to create loops, branching, or multi-step flows.
A Flow begins with a start node. You call
Flow(start=some_node) to specify the entry point. When you call flow.run(shared), it executes the start node, looks at its returned Action from post(), follows the transition, and continues until there's no next node.
Here's a minimal flow of two nodes in a chain:
node_a >> node_b flow = Flow(start=node_a) flow.run(shared)
node_a.node_a.post() returns "default"."default" Action is linked to node_b and runs node_b.node_b.post() returns "default" but we didn't define node_b >> something_else. So the flow ends there.Here's a simple expense approval flow that demonstrates branching and looping. The
ReviewExpense node can return three possible Actions:
"approved": expense is approved, move to payment processing"needs_revision": expense needs changes, send back for revision"rejected": expense is denied, finish the processWe can wire them like this:
# Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review)
Let's see how it flows:
review.post() returns "approved", the expense moves to the payment nodereview.post() returns "needs_revision", it goes to the revise node, which then loops back to reviewreview.post() returns "rejected", it moves to the finish node and stopsflowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish
node.run(shared): Just runs that node alone (calls prep->exec->post()), returns an Action.flow.run(shared): Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.does not proceed to the successor. This is mainly for debugging or testing a single node.node.run(shared)Always use
in production to ensure the full pipeline runs correctly. {: .warning }flow.run(...)
A Flow can act like a Node, which enables powerful composition patterns. This means you can:
params will be a merging of all parents' params.A Flow is also a Node, so it will run
prep() and post(). However:
exec(), as its main logic is to orchestrate its nodes.post() always receives None for exec_res and should instead get the flow execution results from the shared store.Here's how to connect a flow to another node:
# Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow)
When
parent_flow.run() executes:
subflowsubflow runs through its nodes (node_a->node_b)subflow completes, execution continues to node_cHere's a practical example that breaks down order processing into nested flows:
# Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data)
This creates a clean separation of concerns while maintaining a clear execution path:
flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end
A Node is the smallest building block. Each Node has 3 steps
prep->exec->post:
prep(shared)
shared store.prep_res, which is used by exec() and post().exec(prep_res)
shared.exec_res, which is passed to post().post(shared, prep_res, exec_res)
shared.action = "default" if None).Why 3 steps? To enforce the principle of separation of concerns. The data storage and data processing are operated separately.
All steps are optional. E.g., you can only implement
andprepif you just need to process data. {: .note }post
You can retry
exec() if it raises an exception via two parameters when define the Node:
max_retries (int): Max times to run exec(). The default is 1 (no retry).wait (int): The time to wait (in seconds) before next retry. By default, wait=0 (no waiting).
wait is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.my_node = SummarizeFile(max_retries=3, wait=10)
When an exception occurs in
exec(), the Node automatically retries until:
max_retries - 1 times already and fails on the last attempt.You can get the current retry times (0-based) from
self.cur_retry.
class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed")
To gracefully handle the exception (after all retries) rather than raising it, override:
def exec_fallback(self, prep_res, exc): raise exc
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the
exec_res passed to post().
class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"])
Parallel Nodes and Flows let you run multiple Async Nodes and Flows concurrently—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning }
Ensure Tasks Are Independent: If each item depends on the output of a previous item, do not parallelize.
Beware of Rate Limits: Parallel calls can quickly trigger rate limits on LLM services. You may need a throttling mechanism (e.g., semaphores or sleep intervals).
Consider Single-Node Batch APIs: Some LLMs offer a batch inference API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice }
Like AsyncBatchNode, but run
exec_async() in parallel:
class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node)
Parallel version of BatchFlow. Each iteration of the sub-flow runs concurrently using different parameters:
class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared)
Agent is a powerful design pattern, where node can take dynamic actions based on the context it receives. To express an agent, create a Node (the agent) with branching to other nodes (Actions).
The core of build performant and reliable agents boils down to:
Context Management: Provide clear, relevant context so agents can understand the problem.E.g., Rather than dumping an entire chat history or entire files, use a Workflow that filters out and includes only the most relevant information.
Action Space: Define a well-structured, unambiguous, and easy-to-use set of actions. For instance, avoid creating overlapping actions like
andread_databases. Instead, unify data sources (e.g., move CSVs into a database) and design a single action. The action can be parameterized (e.g., string for search) or programmable (e.g., SQL queries). {: .best-practice }read_csvs
This agent:
class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: ```yaml action: search/answer reason: why this action search_term: search phrase if action is search ```""" resp = call_llm(prompt) yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
MapReduce is a design pattern suitable when you have either:
and there is a logical way to break the task into smaller, ideally independent parts. You first break down the task using BatchNode in the map phase, followed by aggregation in the reduce phase.
class MapSummaries(BatchNode): def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)] def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}") def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list class ReduceSummaries(Node): def prep(self, shared): return shared["summaries"] def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}") def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res # Connect nodes map_node = MapSummaries() reduce_node = ReduceSummaries() map_node >> reduce_node # Create flow summarize_flow = Flow(start=map_node) summarize_flow.run(shared)
Multi-turn conversations require memory management to maintain context while avoiding overwhelming the LLM.
Sending the full chat history may overwhelm LLMs.
class ChatNode(Node): def prep(self, shared): if "history" not in shared: shared["history"] = [] user_input = input("You: ") return shared["history"], user_input def exec(self, inputs): history, user_input = inputs messages = [{"role": "system", "content": "You are a helpful assistant"}] for h in history: messages.append(h) messages.append({"role": "user", "content": user_input}) response = call_llm(messages) return response def post(self, shared, prep_res, exec_res): shared["history"].append({"role": "user", "content": prep_res[1]}) shared["history"].append({"role": "assistant", "content": exec_res}) return "continue" chat = ChatNode() chat - "continue" >> chat flow = Flow(start=chat)
We can:
################################ # Node A: Retrieve user input & relevant messages ################################ class ChatRetrieve(Node): def prep(self, s): s.setdefault("history", []) s.setdefault("memory_index", None) user_input = input("You: ") return user_input def exec(self, user_input): emb = get_embedding(user_input) relevant = [] if len(shared["history"]) > 8 and shared["memory_index"]: idx, _ = search_index(shared["memory_index"], emb, top_k=2) relevant = [shared["history"][i[0]] for i in idx] return (user_input, relevant) def post(self, s, p, r): user_input, relevant = r s["user_input"] = user_input s["relevant"] = relevant return "continue" ################################ # Node B: Call LLM, update history + index ################################ class ChatReply(Node): def prep(self, s): user_input = s["user_input"] recent = s["history"][-8:] relevant = s.get("relevant", []) return user_input, recent, relevant def exec(self, inputs): user_input, recent, relevant = inputs msgs = [{"role":"system","content":"You are a helpful assistant."}] if relevant: msgs.append({"role":"system","content":f"Relevant: {relevant}"}) msgs.extend(recent) msgs.append({"role":"user","content":user_input}) ans = call_llm(msgs) return ans def post(self, s, pre, ans): user_input, _, _ = pre s["history"].append({"role":"user","content":user_input}) s["history"].append({"role":"assistant","content":ans}) # Manage memory index if len(s["history"]) == 8: embs = [] for i in range(0, 8, 2): text = s["history"][i]["content"] + " " + s["history"][i+1]["content"] embs.append(get_embedding(text)) s["memory_index"] = create_index(embs) elif len(s["history"]) > 8: text = s["history"][-2]["content"] + " " + s["history"][-1]["content"] new_emb = np.array([get_embedding(text)]).astype('float32') s["memory_index"].add(new_emb) print(f"Assistant: {ans}") return "continue" ################################ # Flow wiring ################################ retrieve = ChatRetrieve() reply = ChatReply() retrieve - "continue" >> reply reply - "continue" >> retrieve flow = Flow(start=retrieve) shared = {} flow.run(shared)
Multiple Agents can work together by handling subtasks and communicating the progress. Communication between agents is typically implemented using message queues in shared storage.
Most of time, you don't need Multi-Agents. Start with a simple solution first. {: .best-practice }
Here's a simple example showing how to implement agent communication using
asyncio.Queue.
The agent listens for messages, processes them, and continues listening:
class AgentNode(AsyncNode): async def prep_async(self, _): message_queue = self.params["messages"] message = await message_queue.get() print(f"Agent received: {message}") return message # Create node and flow agent = AgentNode() agent >> agent # connect to self flow = AsyncFlow(start=agent) # Create heartbeat sender async def send_system_messages(message_queue): counter = 0 messages = [ "System status: all systems operational", "Memory usage: normal", "Network connectivity: stable", "Processing load: optimal" ] while True: message = f"{messages[counter % len(messages)]} | timestamp_{counter}" await message_queue.put(message) counter += 1 await asyncio.sleep(1) async def main(): message_queue = asyncio.Queue() shared = {} flow.set_params({"messages": message_queue}) # Run both coroutines await asyncio.gather( flow.run_async(shared), send_system_messages(message_queue) ) asyncio.run(main())
The output:
Agent received: System status: all systems operational | timestamp_0 Agent received: Memory usage: normal | timestamp_1 Agent received: Network connectivity: stable | timestamp_2 Agent received: Processing load: optimal | timestamp_3
Here's a more complex example where two agents play the word-guessing game Taboo. One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word:
class AsyncHinter(AsyncNode): async def prep_async(self, shared): guess = await shared["hinter_queue"].get() if guess == "GAME_OVER": return None return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) async def exec_async(self, inputs): if inputs is None: return None target, forbidden, past_guesses = inputs prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" if past_guesses: prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." prompt += "\nUse at most 5 words." hint = call_llm(prompt) print(f"\nHinter: Here's your hint - {hint}") return hint async def post_async(self, shared, prep_res, exec_res): if exec_res is None: return "end" await shared["guesser_queue"].put(exec_res) return "continue" class AsyncGuesser(AsyncNode): async def prep_async(self, shared): hint = await shared["guesser_queue"].get() return hint, shared.get("past_guesses", []) async def exec_async(self, inputs): hint, past_guesses = inputs prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" guess = call_llm(prompt) print(f"Guesser: I guess it's - {guess}") return guess async def post_async(self, shared, prep_res, exec_res): if exec_res.lower() == shared["target_word"].lower(): print("Game Over - Correct guess!") await shared["hinter_queue"].put("GAME_OVER") return "end" if "past_guesses" not in shared: shared["past_guesses"] = [] shared["past_guesses"].append(exec_res) await shared["hinter_queue"].put(exec_res) return "continue" async def main(): # Set up game shared = { "target_word": "nostalgia", "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() } print("Game starting!") print(f"Target word: {shared['target_word']}") print(f"Forbidden words: {shared['forbidden_words']}") # Initialize by sending empty guess to hinter await shared["hinter_queue"].put("") # Create nodes and flows hinter = AsyncHinter() guesser = AsyncGuesser() # Set up flows hinter_flow = AsyncFlow(start=hinter) guesser_flow = AsyncFlow(start=guesser) # Connect nodes to themselves hinter - "continue" >> hinter guesser - "continue" >> guesser # Run both agents concurrently await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) ) asyncio.run(main())
The Output:
Game starting! Target word: nostalgia Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] Hinter: Here's your hint - Thinking of childhood summer days Guesser: I guess it's - popsicle Hinter: Here's your hint - When childhood cartoons make you emotional Guesser: I guess it's - nostalgic Hinter: Here's your hint - When old songs move you Guesser: I guess it's - memories Hinter: Here's your hint - That warm emotion about childhood Guesser: I guess it's - nostalgia Game Over - Correct guess!
For certain LLM tasks like answering questions, providing context is essential. Use vector search to find relevant context for LLM responses.
class PrepareEmbeddings(Node): def prep(self, shared): return shared["texts"] def exec(self, texts): # Embed each text chunk embs = [get_embedding(t) for t in texts] return embs def post(self, shared, prep_res, exec_res): shared["search_index"] = create_index(exec_res) # no action string means "default" class AnswerQuestion(Node): def prep(self, shared): question = input("Enter question: ") return question def exec(self, question): q_emb = get_embedding(question) idx, _ = search_index(shared["search_index"], q_emb, top_k=1) best_id = idx[0][0] relevant_text = shared["texts"][best_id] prompt = f"Question: {question}\nContext: {relevant_text}\nAnswer:" return call_llm(prompt) def post(self, shared, p, answer): print("Answer:", answer) ############################################ # Wire up the flow prep = PrepareEmbeddings() qa = AnswerQuestion() prep >> qa flow = Flow(start=prep) # Example usage shared = {"texts": ["I love apples", "Cats are great", "The sky is blue"]} flow.run(shared)
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
In practice, Prompting is simple and reliable for modern LLMs.
product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users.
summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels.
server: host: 127.0.0.1 port: 8080 ssl: true
When prompting the LLM to produce structured output:
yaml).Node handles retry).class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: ```yaml summary: - bullet 1 - bullet 2 - bullet 3 ```""" response = call_llm(prompt) yaml_str = response.split("```yaml")[1].split("```")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result
Besides using
statements, another popular way to validate schemas is Pydantic {: .note }assert
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
In JSON
{ "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" }
\".\n.In YAML
dialogue: | Alice said: "Hello Bob. How are you? I am good."
|).\n.Many real-world tasks are too complex for one LLM call. The solution is to decompose them into a chain of multiple Nodes.
- You don't want to make each task too coarse, because it may be too complex for one LLM call.
- You don't want to make each task too granular, because then the LLM call doesn't have enough context and results are not consistent across nodes.
You usually need multiple iterations to find the sweet spot. If the task has too many edge cases, consider using Agents. {: .best-practice }
class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared)
For dynamic cases, consider using Agents.
We don't provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a
call_llm function that takes a prompt and returns the LLM response," you shall get something like:
def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?")
Store the API key in an environment variable like OPENAI_API_KEY for security. {: .note }
Feel free to enhance your
call_llm function as needed. Here are examples:
def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content
from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass
⚠️ Caching conflicts with Node retries, as retries yield the same result.
To address this, you could use cached results only if not retried. {: .warning }
from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0)
def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response
I believe it is a bad practice to provide LLM-specific implementations in a general framework:
Similar to LLM wrappers, we don't provide built-in tools. Here, we recommend some minimal (and incomplete) implementations of commonly used tools. These examples can serve as a starting point for your own tooling.
def get_embedding(text): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.embeddings.create( model="text-embedding-ada-002", input=text ) return r.data[0].embedding get_embedding("What's the meaning of life?")
import faiss import numpy as np def create_index(embeddings): dim = len(embeddings[0]) index = faiss.IndexFlatL2(dim) index.add(np.array(embeddings).astype('float32')) return index def search_index(index, query_embedding, top_k=5): D, I = index.search( np.array([query_embedding]).astype('float32'), top_k ) return I, D index = create_index(embeddings) search_index(index, query_embedding)
import sqlite3 def execute_sql(query): conn = sqlite3.connect("mydb.db") cursor = conn.cursor() cursor.execute(query) result = cursor.fetchall() conn.commit() conn.close() return result
⚠️ Beware of SQL injection risk {: .warning }
def run_code(code_str): env = {} exec(code_str, env) return env run_code("print('Hello, world!')")
⚠️ exec() is dangerous with untrusted input {: .warning }
If your PDFs are text-based, use PyMuPDF:
import fitz # PyMuPDF def extract_text(pdf_path): doc = fitz.open(pdf_path) text = "" for page in doc: text += page.get_text() doc.close() return text extract_text("document.pdf")
For image-based PDFs (e.g., scanned), OCR is needed. A easy and fast option is using an LLM with vision capabilities:
from openai import OpenAI import base64 def call_llm_vision(prompt, image_data): client = OpenAI(api_key="YOUR_API_KEY_HERE") img_base64 = base64.b64encode(image_data).decode('utf-8') response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_base64}"}} ] }] ) return response.choices[0].message.content pdf_document = fitz.open("document.pdf") page_num = 0 page = pdf_document[page_num] pix = page.get_pixmap() img_data = pix.tobytes("png") call_llm_vision("Extract text from this image", img_data)
def crawl_web(url): import requests from bs4 import BeautifulSoup html = requests.get(url).text soup = BeautifulSoup(html, "html.parser") return soup.title.string, soup.get_text()
def search_google(query): import requests params = { "engine": "google", "q": query, "api_key": "YOUR_API_KEY" } r = requests.get("https://serpapi.com/search", params=params) return r.json()
def transcribe_audio(file_path): import openai audio_file = open(file_path, "rb") transcript = openai.Audio.transcribe("whisper-1", audio_file) return transcript["text"]
def text_to_speech(text): import pyttsx3 engine = pyttsx3.init() engine.say(text) engine.runAndWait()
def send_email(to_address, subject, body, from_address, password): import smtplib from email.mime.text import MIMEText msg = MIMEText(body) msg["Subject"] = subject msg["From"] = from_address msg["To"] = to_address with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: server.login(from_address, password) server.sendmail(from_address, [to_address], msg.as_string())
Similar to LLM wrappers, we don't provide built-in visualization and debugging. Here, we recommend some minimal (and incomplete) implementations These examples can serve as a starting point for your own tooling.
This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization.
{% raw %}
def build_mermaid(start): ids, visited, lines = {}, set(), ["graph LR"] ctr = 1 def get_id(n): nonlocal ctr return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] def link(a, b): lines.append(f" {a} --> {b}") def walk(node, parent=None): if node in visited: return parent and link(parent, get_id(node)) visited.add(node) if isinstance(node, Flow): node.start and parent and link(parent, get_id(node.start)) lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]") node.start and walk(node.start) for nxt in node.successors.values(): node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt) lines.append(" end\n") else: lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") parent and link(parent, nid) [walk(nxt, nid) for nxt in node.successors.values()] walk(start) return "\n".join(lines)
{% endraw %}
For example, suppose we have a complex Flow for data science:
class DataPrepBatchNode(BatchNode): def prep(self,shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): pass class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) result = build_mermaid(start=data_science_flow)
The code generates a Mermaid diagram:
graph LR subgraph sub_flow_N1[DataScienceFlow] N2['DataPrepBatchNode'] N3['ValidateDataNode'] N2 --> N3 N3 --> N4 subgraph sub_flow_N5[ModelFlow] N4['FeatureExtractionNode'] N6['TrainModelNode'] N4 --> N6 N7['EvaluateModelNode'] N6 --> N7 end end
It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack:
import inspect def get_node_call_stack(): stack = inspect.stack() node_names = [] seen_ids = set() for frame_info in stack[1:]: local_vars = frame_info.frame.f_locals if 'self' in local_vars: caller_self = local_vars['self'] if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: seen_ids.add(id(caller_self)) node_names.append(type(caller_self).__name__) return node_names
For example, suppose we have a complex Flow for data science:
class DataPrepBatchNode(BatchNode): def prep(self, shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): def prep(self, shared): stack = get_node_call_stack() print("Call stack:", stack) class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) data_science_flow.run({})
The output would be:
Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']