Building Enterprise-Grade AI Agents with Dify, DMS Notebook, and Airflow
Background and Challenges
Dify, as a low-code AI application development platform, significantly lowers the barrier to entry for large language model applications through its intuitive visual workflow orchestration capabilities. However, during enterprise-level implementation, we've identified two critical limitations in its native capabilities:
• Restricted code execution: While Dify's built-in Sandbox node supports basic Python code execution, it cannot install custom Python packages, making it difficult to support complex business logic, data processing, or algorithm calls; • Lack of automation scheduling: Dify's native architecture doesn't support timed triggering, periodic execution, or dependency orchestration for Agents or Agentic Workflows, preventing integration into enterprise automation systems.
These issues severely limit Dify's deep application in production environments—especially when building intelligent Agents with "perception-decision-execution-feedback"闭环 capabilities, relying solely on Dify often falls short.
To overcome these limitations, we've developed a comprehensive "Dify + DMS Notebook + DMS Airflow" architecture that effectively addresses Dify's execution and scheduling shortcomings:
• ✅ DMS Notebook: Provides a complete, customizable Python runtime environment supporting third-party library installation, interactive development, and complex logic implementation as a powerful supplement to Dify's Sandbox; • ✅ DMS Airflow: Serves as a unified scheduling engine for timed triggering, dependency management, and reliable execution of Dify workflows, Notebook scripts, or Agent tasks; • ✅ DMS Platform Integration: Enables end-to-end management from development, debugging, deployment to scheduling and monitoring, significantly enhancing Dify's engineering capabilities in enterprise scenarios.
This article demonstrates how to build a schedulable, scalable, and maintainable Agent system using the DMS platform through a complete sales data analysis robot development case.
Extending Dify's Code Execution with DMS Notebook
Why Notebook?
In Dify, when needing to call Pandas for data cleaning, use Prophet for time series forecasting, or integrate enterprise SDKs, the Sandbox node often proves inadequate. DMS Notebook provides a complete Python environment supporting:
• Custom pip package installation; • Environment variable configuration (e.g., AK/SK, API Keys); • Asynchronous service development (e.g., FastAPI); • Secure communication with other services within VPC.
This makes it an ideal "execution unit" for extending Dify's external capabilities.
Step-by-Step: Building a Sales Data Analysis API Service
1. Create DMS Notebook Session
• Navigate to DMS Console > Notebook Sessions > Create Session;
• Configure parameters as follows:
• Select appropriate Python image version; • In Configuration > Edit Settings:
• PyPI Package Management: Enter dependencies in requirements.txt format (e.g., pandas, fastapi, uvicorn, nest-asyncio); • Environment Variables: Set ALIBABA_CLOUD_ACCESS_KEY_ID, ALIBABA_CLOUD_ACCESS_KEY_SECRET, model API Keys, etc.; • Key configurations: • fastapi, uvicorn, and nest-asyncio libraries must be installed; • Set resource release time = 0 (prevents service auto-release); • Set environment variable DMS_KERNEL_IDLE_TIMEOUT=0 (avoids Jupyter Kernel kill due to inactivity).
Pitfall tip: Without setting DMS_KERNEL_IDLE_TIMEOUT=0, long-running API services may be recycled after minutes of inactivity, causing subsequent call failures.
• After creation, click Start in the notebook session window.
2. Write and Launch FastAPI Service
• Click folder icon, right-click default library, then click New Notebook file
• Write Python code in code blocks, refer to template below for API service:
Click to view code``` import os from fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Header from fastapi.staticfiles import StaticFiles from typing import Optional import nest_asyncio import asyncio import httpx import io ''' Note: Jupyter runs in an asyncio event loop. We cannot run another loop directly in an existing one, but nest_asyncio allows us to patch this. ''' nest_asyncio.apply() app = FastAPI(title="Revenue Analytics Service", description="Provides data analysis and chart generation for Dify integration")
Create static directory if it doesn't exist
static_dir = "static" if not os.path.exists(static_dir): os.makedirs(static_dir)
Mount static file service
app.mount("/static", StaticFiles(directory=static_dir), name="static") @app.get("/") async def root(): """ Root endpoint returning service information # --- How to call with curl --- curl -X GET "http://127.0.0.1:8000/"
"""
return {
"message": "Service is operational",
"documentation": "/docs",
"note": "..."
}
@app.post("/process-revenue/{item_id}") async def process_revenue( request: Request,
# Path parameters
item_id: int = Path(..., title="Item ID", ge=1),
# Query parameters
is_premium: bool = Query(False, description="Premium item flag"),
# Request headers
x_token: Optional[str] = Header(None, description="Custom authentication token")
): """ Receives JSON request body, path parameters, query parameters, and headers. # --- How to call with curl --- # -X POST: Specify request method # URL: Contains path parameter {item_id} and query parameter ?is_premium=true # -H: Add headers # -d: Send request body (JSON string) curl -X POST "http://127.0.0.1:8000/process-revenue/101?is_premium=true"
-H "Content-Type: application/json"
-H "X-Token: my-secret-token"
-d '{"name": "Laptop", "price": 7999.9, "tags": ["electronics", "office"]}' """ if x_token != "my-secret-token": raise HTTPException(status_code=401, detail="Invalid X-Token")
try:
# Manually parse JSON request body
json_body = await request.json()
name = json_body.get("name")
price = json_body.get("price")
# Your business logic
if not name or not price:
raise HTTPException(status_code=400, detail="Missing 'name' or 'price' in request body")
return {
"message": "Data processing successful",
"received_data": {
"item_id": item_id,
"is_premium": is_premium,
"x_token": x_token,
"body": json_body
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Service error: {str(e)}")
@app.post("/upload-document") async def upload_document( # Form data token: str = Form(...), # Uploaded file file: UploadFile = File(...) ): """ Upload file and accompanying text via form-data. # --- How to call with curl --- # -F: For sending multipart/form-data # -F "file=@/path/to/you're/file.txt": @ indicates file path for curl to read # -F "token=user123": Send form field named token # Note: Replace /path/to/your/file.txt with actual local file path curl -X POST "http://127.0.0.1:8000/upload-document"
-F "file=@./test_upload.txt"
-F "token=my-form-token" """ # Create sample file for curl example if not os.path.exists("test_upload.txt"): with open("test_upload.txt", "w") as f: f.write("This is a test file for curl upload.") try: contents = await file.read() file_location = os.path.join(static_dir, file.filename) with open(file_location, "wb") as f: f.write(contents)
return {
"message": "File upload successful!",
"token": token,
"filename": file.filename,
"file_size": len(contents),
"file_url": f"/static/{file.filename}"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"File processing error: {str(e)}")
@app.get("/status") async def get_server_status(): """ Get server status. # --- How to call with curl --- curl -X GET "http://127.0.0.1:8000/status" """ return {"status": "running"} async def run_server(host="127.0.0.1", port=8000): """Run uvicorn server in background""" import uvicorn config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
Wait for service startup
await asyncio.sleep(2)
Create async HTTP client
async with httpx.AsyncClient() as client: print("Sending request to http://127.0.0.1:8000/status...")
# Send GET request
response = await client.get("http://127.0.0.1:8000/status")
# Print result
if response.status_code == 200:
print("Service started successfully")
else:
print("Service startup failed, check error messages")
• Next, let's build a daily revenue analysis API:
Click to view code```
import os
import pandas as pd
from fastapi import FastAPI, HTTPException, Request, File, UploadFile
from fastapi.staticfiles import StaticFiles
import nest_asyncio
import asyncio
import httpx
import io
'''
Note: Jupyter runs in an asyncio event loop. We cannot run another loop directly in an existing one, but nest_asyncio allows us to patch this.
'''
nest_asyncio.apply()
app = FastAPI(title="Revenue Analytics Service", description="Provides data analysis and chart generation for Dify integration")
# Create static directory if it doesn't exist
static_dir = "static"
if not os.path.exists(static_dir):
os.makedirs(static_dir)
# Mount static file service
app.mount("/static", StaticFiles(directory=static_dir), name="static")
def load_revenue_data_from_file(file_content: bytes):
"""Load revenue data from uploaded file content"""
try:
# Convert bytes to StringIO object
csv_string = file_content.decode('utf-8')
df = pd.read_csv(io.StringIO(csv_string))
# Verify required columns exist
required_columns = ['Date', 'Product', 'Price', 'Quantity', 'Region']
if not all(col in df.columns for col in required_columns):
raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}")
# Convert data types
df['Date'] = pd.to_datetime(df['Date'])
df['Price'] = pd.to_numeric(df['Price'], errors='coerce')
df['Quantity'] = pd.to_numeric(df['Quantity'], errors='coerce')
# Calculate revenue (Revenue = Price × Quantity)
df['Revenue'] = df['Price'] * df['Quantity']
return df
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}")
@app.get("/")
async def root():
"""Root endpoint, returns service information"""
return {
"message": "Revenue Analytics Service is running",
"documentation": "/docs",
"endpoints": [
"POST /analysis/daily_revenue_analysis"
],
"note": "Requires CSV file upload with columns: Date, Product, Price, Quantity, Region"
}
@app.post("/analysis/daily_revenue_analysis")
async def daily_revenue_analysis(
file: UploadFile = File(...)
):
"""Daily revenue analysis - analyze sales data in uploaded file"""
try:
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be CSV format")
# Read uploaded file
file_content = await file.read()
df = load_revenue_data_from_file(file_content)
# Get date range from data
df['Date'] = pd.to_datetime(df['Date']).dt.date
unique_dates = sorted(df['Date'].unique())
if len(unique_dates) == 0:
raise HTTPException(status_code=400, detail="No valid date data in file")
# If multiple dates, use latest as analysis target
target_date = unique_dates[-1] if len(unique_dates) > 1 else unique_dates[0]
# Filter data for target date
daily_data = df[df['Date'] == target_date].copy()
if daily_data.empty:
raise HTTPException(status_code=400, detail=f"No sales data found for date {target_date}")
# Basic statistics
total_revenue = daily_data['Revenue'].sum()
total_orders = len(daily_data)
total_quantity = daily_data['Quantity'].sum()
avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
# Product analysis
product_analysis = daily_data.groupby('Product').agg({
'Revenue': 'sum',
'Quantity': 'sum',
'Price': 'mean'
}).round(2)
# Sort by revenue, get top 5 products
top_products = product_analysis.sort_values('Revenue', ascending=False).head(5)
top_products_list = []
for product, row in top_products.iterrows():
top_products_list.append({
"product": product,
"revenue": float(row['Revenue']),
"quantity": int(row['Quantity']),
"avg_price": float(row['Price'])
})
# Region analysis
region_analysis = daily_data.groupby('Region').agg({
'Revenue': 'sum',
'Quantity': 'sum'
}).round(2)
# Sort by revenue
top_regions = region_analysis.sort_values('Revenue', ascending=False)
region_list = []
for region, row in top_regions.iterrows():
region_list.append({
"region": region,
"revenue": float(row['Revenue']),
"quantity": int(row['Quantity']),
"percentage": round(float(row['Revenue']) / total_revenue * 100, 2)
})
# Price range analysis
daily_data['price_range'] = pd.cut(daily_data['Price'],
bins=[0, 100, 500, 1000, 5000, float('inf')],
labels=['0-100', '100-500', '500-1000', '1000-5000', '5000+'])
price_range_analysis = daily_data.groupby('price_range').agg({
'Revenue': 'sum',
'Quantity': 'sum'
}).round(2)
price_ranges = []
for price_range, row in price_range_analysis.iterrows():
if not pd.isna(row['Revenue']) and row['Revenue'] > 0:
price_ranges.append({
"range": str(price_range),
"revenue": float(row['Revenue']),
"quantity": int(row['Quantity'])
})
# Generate insights
insights = []
# Revenue insights
if total_revenue > 100000:
insights.append(f"Excellent daily performance with total revenue of {total_revenue:,.2f}")
elif total_revenue > 50000:
insights.append(f"Good daily performance with revenue of {total_revenue:,.2f}")
else:
insights.append(f"Daily revenue of {total_revenue:,.2f}, consider reviewing sales strategy")
# Product insights
if len(top_products_list) > 0:
best_product = top_products_list[0]
insights.append(f"Top selling product is {best_product['product']} with revenue {best_product['revenue']:,.2f}")
# Region insights
if len(region_list) > 0:
best_region = region_list[0]
insights.append(f"Best performing region is {best_region['region']} with {best_region['percentage']}% of total revenue")
# Order insights
if avg_order_value > 1000:
insights.append(f"High average order value of {avg_order_value:,.2f}, indicating strong customer purchasing power")
return {
"analysis_date": str(target_date),
"summary": {
"total_revenue": round(float(total_revenue), 2),
"total_orders": int(total_orders),
"total_quantity": int(total_quantity),
"average_order_value": round(float(avg_order_value), 2)
},
"top_products": top_products_list,
"region_analysis": region_list,
"price_range_analysis": price_ranges,
"insights": insights,
"data_info": {
"date_range": f"{unique_dates[0]} to {unique_dates[-1]}" if len(unique_dates) > 1 else str(unique_dates[0]),
"total_records": len(daily_data),
"unique_products": len(daily_data['Product'].unique()),
"unique_regions": len(daily_data['Region'].unique())
}
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Daily revenue analysis error: {str(e)}")
@app.get("/status")
async def get_server_status():
"""Get server status"""
try:
return {
"status": "running"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}")
async def run_server(host="127.0.0.1", port=8000):
"""Run uvicorn server in background"""
import uvicorn
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
# Wait for service startup
await asyncio.sleep(2)
# Create async HTTP client
async with httpx.AsyncClient() as client:
print("Sending request to http://127.0.0.1:8000/status...")
# Send GET request
response = await client.get("http://127.0.0.1:8000/status")
# Print result
if response.status_code == 200:
print("Service started successfully")
else:
print("Service startup failed, check error messages")
• Asynchronous support: Jupyter's built-in asyncio event loop allows direct use of async/await. • Run relevant code blocks to see API service startup success in output.
- Check IP Address • In notebook code blocks, use English exclamation mark + terminal command format to execute commands, or use !pip install xxx to install additional Python packages. Next, create a new code block, enter !ifconfig and run to view the Notebook session's IP address in VPC. The IP 172.16.0.252 in the image is the required IP, and the API service address is: http://172.16.0.252:8080/analyze_revenue
Accessing the Service from Dify on DMS
Now we use this simulated sales data file to access the API service:
• Add HTTP Request node in workflow
• Access the created API service via http://:/xxx and pass parameters in BODY
• Test run shows the request successfully returns response
• You can also see the service being called in the Notebook session
Next, let's demonstrate complete service calls using this Dify workflow example: sales_analysis.yml
Add a custom robot to a DingTalk group and refer to https://open.dingtalk.com/document/orgapp/robot-overview to obtain DingTalk robot access_token and sign_secret
After filling in your DingTalk robot parameters, click Run in the top right corner -> Upload sample sales data from local, and click Start Run
The robot in the DingTalk group successfully sends the analysis report for the sales data
Click Publish in the top right corner -> Publish Update to publish the workflow for latter scheduled calls
Implementing Timed Scheduling with DMS Airflow
Create DMS Airflow Instance
Refer to the following links to create an Airflow instance in DMS https://help.aliyun.com/zh/dms/purchase-airflow-resources?spm=a2c4g.11186623.help-menu-26437.d_2_8_1_2_0.3aa88c02Z9PHKzhttps://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance?spm=a2c4g.11186623.help-menu-26437.d_2_8_1_2_1.1edc167ePCMXpI&scm=20140722.H_2881043._.OR_help-T_cn~zh-V_1For more Airflow operations, refer to https://airflow.apache.org/docs/apache-airflow/stable/index.html
Write DAG: Daily Automated Sales Analysis Trigger
• Below is sample Python code for timed Dify workflow API calls
Click to view code``` import pendulum import requests import json from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable CSV_FILE_PATH = "/yourpath/daily_revenue_data.csv" DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # Replace with you're Dify Workflow API URL
Safely get API Key from Airflow Variable
DIFY_API_KEY = Variable.get("dify_api_key") APP_API_KEY = Variable.get("app_api_key") def call_dify_workflow_with_csv(**kwargs): """ Read CSV file content and call Dify workflow as file upload. """ print(f"Preparing to read from '{CSV_FILE_PATH}'...") try: with open(CSV_FILE_PATH, 'rb') as f: files_to_upload = { 'file': ('daily_revenue_data.csv', f, 'document/csv') } # Prepare API request headers and body headers = { 'Authorization': f'Bearer {APP_API_KEY}', 'DifyApiKey': f'{DIFY_API_KEY}', }
file_upload_response=requests.post(
DIFY_API_URL+'/files/upload',
headers=headers,
data={'user': 'airflow-user-demo'},
files=files_to_upload,
)
print(file_upload_response.json())
file_id=file_upload_response.json().get('id')
headers.update({'Content-Type': 'application/json'})
# 'inputs' is usually a JSON string
# 'user' is required, representing the end user identifier
input_data = {
'revenue_data': {
"type": "document",
"transfer_method": "local_file",
"upload_file_id": file_id
}
}
data = {
'inputs': input_data,
'user': 'airflow-user-demo',
'response_mode': 'blocking',
}
print("Starting Dify API call...")
print(f"URL: {DIFY_API_URL}")
response = requests.post(
DIFY_API_URL+'/workflows/run',
headers=headers,
json=data,
)
# Check if request was successful
response.raise_for_status()
print(f"API call successful! Status code: {response.status_code}")
# Print response content
print("--- Dify API Response ---")
print(response.json()["data"]["outputs"]["answer"])
print("\n--- End of Response ---")
# You can also push complete response to XComs for downstream tasks
# ti = kwargs['ti']
# ti.xcom_push(key='dify_response', value=full_response)
except FileNotFoundError:
print(f"Error: File not found at '{CSV_FILE_PATH}'")
raise
except requests.exceptions.RequestException as e:
print(f"API call failed: {e}")
raise
with DAG( dag_id="dify_workflow", start_date=pendulum.datetime(2023, 10, 27, tz="Asia/Shanghai"), # '0 8 * * *' represents 8:00 AM daily (UTC+8) # Airflow uses UTC by default, but cron expressions themselves are timezone-agnostic schedule="0 8 * * *", catchup=False, tags=["dify", "api", "example"], doc_md=""" ### Dify Workflow Call DAG This DAG runs daily at 8:00 AM, performing: 1. Reads a CSV file from local filesystem. 2. Calls a Dify workflow with the CSV as attachment. 3. Prints Dify API response. """ ) as dag: run_dify_workflow = PythonOperator( task_id="call_dify", python_callable=call_dify_workflow_with_csv, )
Note: To upload local files via API when calling Dify workflows, you need to first upload the file via /files/upload to get file ID, then pass the file ID to the workflow.
• After creation, open the Airflow instance to see the scheduled task
• Daily at 8:00, the system automatically calls the Dify workflow, with the DingTalk robot pushing the analysis report