Markdown Converter
Agent skill for markdown-converter
This is an AI-powered marketing automation platform that makes creating and managing marketing campaigns as easy as having a conversation. Built with LangGraph for multi-agent orchestration and Context Engineering for reliability.
Sign in to like and favorite skills
This is an AI-powered marketing automation platform that makes creating and managing marketing campaigns as easy as having a conversation. Built with LangGraph for multi-agent orchestration and Context Engineering for reliability.
Vision: "Claude Code for Marketing" - where non-technical users can create, manage, and optimize campaigns using natural language.
Before implementing any features:
pip install -r requirements.txt and npm install.env has required keys (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.)src/agents/ as a Python fileexamples/agents/base_agent.pysrc/ ├── agents/ # Python agent implementations ├── workflows/ # LangGraph workflow definitions ├── app/ # Next.js frontend ├── components/ # React components ├── lib/ # Utilities and integrations ├── db/ # Database schemas └── types/ # TypeScript/Python type definitions
from typing import Dict, Any, List from langchain.tools import tool from pydantic import BaseModel, Field import logging class AgentInput(BaseModel): """Input schema for agent""" task: str = Field(description="Task description") context: Dict[str, Any] = Field(default_factory=dict) class MarketingAgent: """Base agent implementation""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(name) self.tools = self._setup_tools() @tool async def process_task(self, input: AgentInput) -> Dict[str, Any]: """Main agent entry point""" try: # Implementation return {"status": "success", "result": result} except Exception as e: self.logger.error(f"Error in {self.name}: {e}") return {"status": "error", "message": str(e)} def _setup_tools(self) -> List: """Define agent-specific tools""" return []
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator class WorkflowState(TypedDict): messages: Annotated[list, operator.add] current_agent: str task_status: str results: dict # Build workflow workflow = StateGraph(WorkflowState) # Add nodes workflow.add_node("supervisor", supervisor_agent) workflow.add_node("specialist", specialist_agent) # Add edges workflow.set_entry_point("supervisor") workflow.add_conditional_edges( "supervisor", route_decision, { "continue": "specialist", "complete": END } ) # Compile app = workflow.compile(checkpointer=checkpointer)
from typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): # Message history messages: Annotated[Sequence[BaseMessage], operator.add] # Workflow control current_agent: str next_agent: str # Task tracking task_id: str task_status: Literal["pending", "in_progress", "completed", "failed"] # Results results: Dict[str, Any] errors: List[str] # Metadata user_id: str session_id: str timestamp: datetime
# Python environment python -m pytest tests/ python -m mypy src/ # Frontend npm run lint npm run typecheck npm run build # Integration tests python tests/integration/test_workflows.py # LangSmith validation python scripts/validate_langsmith.py
examples/ directory for patternsfrom functools import lru_cache import hashlib @lru_cache(maxsize=100) def get_cached_response(query_hash: str): """Cache frequently requested data""" pass def hash_query(query: dict) -> str: """Create cache key from query""" return hashlib.md5(json.dumps(query, sort_keys=True).encode()).hexdigest()
async def stream_agent_response(agent, input_data): """Stream responses for better UX""" async for chunk in agent.astream(input_data): yield f"data: {json.dumps(chunk)}\n\n"
Problem: Railway services maintain their initial configuration type. A service configured for Next.js will not properly run Python apps even if you push Python code.
Solution:
Working Python Config:
# app.py from flask import Flask, jsonify import os app = Flask(__name__) @app.route('/') def health(): return jsonify({"status": "healthy"}) if __name__ == '__main__': port = int(os.environ.get('PORT', 8080)) app.run(host='0.0.0.0', port=port)
# requirements.txt flask==3.0.0 gunicorn==21.2.0
# Procfile web: gunicorn app:app --bind 0.0.0.0:$PORT
# 1. Commit changes git add -A git commit -m "Your change description" git push origin main # 2. Deploy to Railway (Python backend) cd /Users/jaimeortiz/Test\ Main/metaads-new railway link -p 88cfcbd9-fe82-4bda-bb9b-fd1cf5f5688e railway up --service metaads-python-api # 3. Monitor deployment railway logs --service metaads-python-api # 4. Verify deployment curl https://metaads-python-api-production.up.railway.app
railway variablesrailway status (should show metaads-python-api)railway logs/python-3.11 not python-3.11.8)Remember: We're building the future of marketing automation. Every line of code should make marketing easier for our users.