Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building Enterprise-Grade AI Agents with Dify, DMS Notebook, and Airflow

Tech May 9 4

Background and Challenges

Dify, as a low-code AI application development platform, significantly lowers the barrier to entry for large language model applications through its intuitive visual workflow orchestration capabilities. However, during enterprise-level implementation, we've identified two critical limitations in its native capabilities:

• Restricted code execution: While Dify's built-in Sandbox node supports basic Python code execution, it cannot install custom Python packages, making it difficult to support complex business logic, data processing, or algorithm calls; • Lack of automation scheduling: Dify's native architecture doesn't support timed triggering, periodic execution, or dependency orchestration for Agents or Agentic Workflows, preventing integration into enterprise automation systems.

These issues severely limit Dify's deep application in production environments—especially when building intelligent Agents with "perception-decision-execution-feedback"闭环 capabilities, relying solely on Dify often falls short.

To overcome these limitations, we've developed a comprehensive "Dify + DMS Notebook + DMS Airflow" architecture that effectively addresses Dify's execution and scheduling shortcomings:

• ✅ DMS Notebook: Provides a complete, customizable Python runtime environment supporting third-party library installation, interactive development, and complex logic implementation as a powerful supplement to Dify's Sandbox; • ✅ DMS Airflow: Serves as a unified scheduling engine for timed triggering, dependency management, and reliable execution of Dify workflows, Notebook scripts, or Agent tasks; • ✅ DMS Platform Integration: Enables end-to-end management from development, debugging, deployment to scheduling and monitoring, significantly enhancing Dify's engineering capabilities in enterprise scenarios.

This article demonstrates how to build a schedulable, scalable, and maintainable Agent system using the DMS platform through a complete sales data analysis robot development case.

Extending Dify's Code Execution with DMS Notebook

Why Notebook?

In Dify, when needing to call Pandas for data cleaning, use Prophet for time series forecasting, or integrate enterprise SDKs, the Sandbox node often proves inadequate. DMS Notebook provides a complete Python environment supporting:

• Custom pip package installation; • Environment variable configuration (e.g., AK/SK, API Keys); • Asynchronous service development (e.g., FastAPI); • Secure communication with other services within VPC.

This makes it an ideal "execution unit" for extending Dify's external capabilities.

Step-by-Step: Building a Sales Data Analysis API Service

1. Create DMS Notebook Session

• Navigate to DMS Console > Notebook Sessions > Create Session;

• Configure parameters as follows:

• Select appropriate Python image version; • In Configuration > Edit Settings:

• PyPI Package Management: Enter dependencies in requirements.txt format (e.g., pandas, fastapi, uvicorn, nest-asyncio); • Environment Variables: Set ALIBABA_CLOUD_ACCESS_KEY_ID, ALIBABA_CLOUD_ACCESS_KEY_SECRET, model API Keys, etc.; • Key configurations: • fastapi, uvicorn, and nest-asyncio libraries must be installed; • Set resource release time = 0 (prevents service auto-release); • Set environment variable DMS_KERNEL_IDLE_TIMEOUT=0 (avoids Jupyter Kernel kill due to inactivity).

Pitfall tip: Without setting DMS_KERNEL_IDLE_TIMEOUT=0, long-running API services may be recycled after minutes of inactivity, causing subsequent call failures.

• After creation, click Start in the notebook session window.

2. Write and Launch FastAPI Service

• Click folder icon, right-click default library, then click New Notebook file

• Write Python code in code blocks, refer to template below for API service:

Click to view code``` import os from fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Header from fastapi.staticfiles import StaticFiles from typing import Optional import nest_asyncio import asyncio import httpx import io ''' Note: Jupyter runs in an asyncio event loop. We cannot run another loop directly in an existing one, but nest_asyncio allows us to patch this. ''' nest_asyncio.apply() app = FastAPI(title="Revenue Analytics Service", description="Provides data analysis and chart generation for Dify integration")

Create static directory if it doesn't exist

static_dir = "static" if not os.path.exists(static_dir): os.makedirs(static_dir)

Mount static file service

app.mount("/static", StaticFiles(directory=static_dir), name="static") @app.get("/") async def root(): """ Root endpoint returning service information # --- How to call with curl --- curl -X GET "http://127.0.0.1:8000/"

"""
return {
    "message": "Service is operational",
    "documentation": "/docs",
    "note": "..."
}

@app.post("/process-revenue/{item_id}") async def process_revenue( request: Request,

# Path parameters
item_id: int = Path(..., title="Item ID", ge=1),

# Query parameters
is_premium: bool = Query(False, description="Premium item flag"),

# Request headers
x_token: Optional[str] = Header(None, description="Custom authentication token")

): """ Receives JSON request body, path parameters, query parameters, and headers. # --- How to call with curl --- # -X POST: Specify request method # URL: Contains path parameter {item_id} and query parameter ?is_premium=true # -H: Add headers # -d: Send request body (JSON string) curl -X POST "http://127.0.0.1:8000/process-revenue/101?is_premium=true"
-H "Content-Type: application/json"
-H "X-Token: my-secret-token"
-d '{"name": "Laptop", "price": 7999.9, "tags": ["electronics", "office"]}' """ if x_token != "my-secret-token": raise HTTPException(status_code=401, detail="Invalid X-Token")

try:
    # Manually parse JSON request body
    json_body = await request.json()
    
    name = json_body.get("name")
    price = json_body.get("price")
    
    # Your business logic
    if not name or not price:
        raise HTTPException(status_code=400, detail="Missing 'name' or 'price' in request body")
    return {
        "message": "Data processing successful",
        "received_data": {
            "item_id": item_id,
            "is_premium": is_premium,
            "x_token": x_token,
            "body": json_body
        }
    }
except Exception as e:
    raise HTTPException(status_code=500, detail=f"Service error: {str(e)}")

@app.post("/upload-document") async def upload_document( # Form data token: str = Form(...), # Uploaded file file: UploadFile = File(...) ): """ Upload file and accompanying text via form-data. # --- How to call with curl --- # -F: For sending multipart/form-data # -F "file=@/path/to/you're/file.txt": @ indicates file path for curl to read # -F "token=user123": Send form field named token # Note: Replace /path/to/your/file.txt with actual local file path curl -X POST "http://127.0.0.1:8000/upload-document"
-F "file=@./test_upload.txt"
-F "token=my-form-token" """ # Create sample file for curl example if not os.path.exists("test_upload.txt"): with open("test_upload.txt", "w") as f: f.write("This is a test file for curl upload.") try: contents = await file.read() file_location = os.path.join(static_dir, file.filename) with open(file_location, "wb") as f: f.write(contents)

    return {
        "message": "File upload successful!",
        "token": token,
        "filename": file.filename,
        "file_size": len(contents),
        "file_url": f"/static/{file.filename}"
    }
except Exception as e:
    raise HTTPException(status_code=500, detail=f"File processing error: {str(e)}")

@app.get("/status") async def get_server_status(): """ Get server status. # --- How to call with curl --- curl -X GET "http://127.0.0.1:8000/status" """ return {"status": "running"} async def run_server(host="127.0.0.1", port=8000): """Run uvicorn server in background""" import uvicorn config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))

Wait for service startup

await asyncio.sleep(2)

Create async HTTP client

async with httpx.AsyncClient() as client: print("Sending request to http://127.0.0.1:8000/status...")

# Send GET request
response = await client.get("http://127.0.0.1:8000/status")

# Print result
if response.status_code == 200:
    print("Service started successfully")
else:
    print("Service startup failed, check error messages")




• Next, let's build a daily revenue analysis API:

Click to view code```
import os
import pandas as pd
from fastapi import FastAPI, HTTPException, Request, File, UploadFile
from fastapi.staticfiles import StaticFiles
import nest_asyncio
import asyncio
import httpx
import io
'''
Note: Jupyter runs in an asyncio event loop. We cannot run another loop directly in an existing one, but nest_asyncio allows us to patch this.
'''
nest_asyncio.apply()
app = FastAPI(title="Revenue Analytics Service", description="Provides data analysis and chart generation for Dify integration")
# Create static directory if it doesn't exist
static_dir = "static"
if not os.path.exists(static_dir):
    os.makedirs(static_dir)
# Mount static file service
app.mount("/static", StaticFiles(directory=static_dir), name="static")
def load_revenue_data_from_file(file_content: bytes):
    """Load revenue data from uploaded file content"""
    try:
        # Convert bytes to StringIO object
        csv_string = file_content.decode('utf-8')
        df = pd.read_csv(io.StringIO(csv_string))
        
        # Verify required columns exist
        required_columns = ['Date', 'Product', 'Price', 'Quantity', 'Region']
        if not all(col in df.columns for col in required_columns):
            raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}")
        
        # Convert data types
        df['Date'] = pd.to_datetime(df['Date'])
        df['Price'] = pd.to_numeric(df['Price'], errors='coerce')
        df['Quantity'] = pd.to_numeric(df['Quantity'], errors='coerce')
        
        # Calculate revenue (Revenue = Price × Quantity)
        df['Revenue'] = df['Price'] * df['Quantity']
        
        return df
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}")
@app.get("/")
async def root():
    """Root endpoint, returns service information"""
    return {
        "message": "Revenue Analytics Service is running",
        "documentation": "/docs",
        "endpoints": [
            "POST /analysis/daily_revenue_analysis"
        ],
        "note": "Requires CSV file upload with columns: Date, Product, Price, Quantity, Region"
    }
@app.post("/analysis/daily_revenue_analysis")
async def daily_revenue_analysis(
    file: UploadFile = File(...)
):
    """Daily revenue analysis - analyze sales data in uploaded file"""
    try:
        # Validate file type
        if not file.filename.endswith('.csv'):
            raise HTTPException(status_code=400, detail="File must be CSV format")
        
        # Read uploaded file
        file_content = await file.read()
        df = load_revenue_data_from_file(file_content)
        
        # Get date range from data
        df['Date'] = pd.to_datetime(df['Date']).dt.date
        unique_dates = sorted(df['Date'].unique())
        
        if len(unique_dates) == 0:
            raise HTTPException(status_code=400, detail="No valid date data in file")
        
        # If multiple dates, use latest as analysis target
        target_date = unique_dates[-1] if len(unique_dates) > 1 else unique_dates[0]
        
        # Filter data for target date
        daily_data = df[df['Date'] == target_date].copy()
        
        if daily_data.empty:
            raise HTTPException(status_code=400, detail=f"No sales data found for date {target_date}")
        
        # Basic statistics
        total_revenue = daily_data['Revenue'].sum()
        total_orders = len(daily_data)
        total_quantity = daily_data['Quantity'].sum()
        avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
        
        # Product analysis
        product_analysis = daily_data.groupby('Product').agg({
            'Revenue': 'sum',
            'Quantity': 'sum',
            'Price': 'mean'
        }).round(2)
        
        # Sort by revenue, get top 5 products
        top_products = product_analysis.sort_values('Revenue', ascending=False).head(5)
        top_products_list = []
        for product, row in top_products.iterrows():
            top_products_list.append({
                "product": product,
                "revenue": float(row['Revenue']),
                "quantity": int(row['Quantity']),
                "avg_price": float(row['Price'])
            })
        
        # Region analysis
        region_analysis = daily_data.groupby('Region').agg({
            'Revenue': 'sum',
            'Quantity': 'sum'
        }).round(2)
        
        # Sort by revenue
        top_regions = region_analysis.sort_values('Revenue', ascending=False)
        region_list = []
        for region, row in top_regions.iterrows():
            region_list.append({
                "region": region,
                "revenue": float(row['Revenue']),
                "quantity": int(row['Quantity']),
                "percentage": round(float(row['Revenue']) / total_revenue * 100, 2)
            })
        
        # Price range analysis
        daily_data['price_range'] = pd.cut(daily_data['Price'], 
                                         bins=[0, 100, 500, 1000, 5000, float('inf')], 
                                         labels=['0-100', '100-500', '500-1000', '1000-5000', '5000+'])
        
        price_range_analysis = daily_data.groupby('price_range').agg({
            'Revenue': 'sum',
            'Quantity': 'sum'
        }).round(2)
        
        price_ranges = []
        for price_range, row in price_range_analysis.iterrows():
            if not pd.isna(row['Revenue']) and row['Revenue'] > 0:
                price_ranges.append({
                    "range": str(price_range),
                    "revenue": float(row['Revenue']),
                    "quantity": int(row['Quantity'])
                })
        
        # Generate insights
        insights = []
        
        # Revenue insights
        if total_revenue > 100000:
            insights.append(f"Excellent daily performance with total revenue of {total_revenue:,.2f}")
        elif total_revenue > 50000:
            insights.append(f"Good daily performance with revenue of {total_revenue:,.2f}")
        else:
            insights.append(f"Daily revenue of {total_revenue:,.2f}, consider reviewing sales strategy")
        
        # Product insights
        if len(top_products_list) > 0:
            best_product = top_products_list[0]
            insights.append(f"Top selling product is {best_product['product']} with revenue {best_product['revenue']:,.2f}")
        
        # Region insights
        if len(region_list) > 0:
            best_region = region_list[0]
            insights.append(f"Best performing region is {best_region['region']} with {best_region['percentage']}% of total revenue")
        
        # Order insights
        if avg_order_value > 1000:
            insights.append(f"High average order value of {avg_order_value:,.2f}, indicating strong customer purchasing power")
        
        return {
            "analysis_date": str(target_date),
            "summary": {
                "total_revenue": round(float(total_revenue), 2),
                "total_orders": int(total_orders),
                "total_quantity": int(total_quantity),
                "average_order_value": round(float(avg_order_value), 2)
            },
            "top_products": top_products_list,
            "region_analysis": region_list,
            "price_range_analysis": price_ranges,
            "insights": insights,
            "data_info": {
                "date_range": f"{unique_dates[0]} to {unique_dates[-1]}" if len(unique_dates) > 1 else str(unique_dates[0]),
                "total_records": len(daily_data),
                "unique_products": len(daily_data['Product'].unique()),
                "unique_regions": len(daily_data['Region'].unique())
            }
        }
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Daily revenue analysis error: {str(e)}")
@app.get("/status")
async def get_server_status():
    """Get server status"""
    try:
        return {
            "status": "running"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}")
async def run_server(host="127.0.0.1", port=8000):
    """Run uvicorn server in background"""
    import uvicorn
    config = uvicorn.Config(app, host=host, port=port, log_level="info")
    server = uvicorn.Server(config)
    await server.serve()
task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
# Wait for service startup
await asyncio.sleep(2)
# Create async HTTP client
async with httpx.AsyncClient() as client:
    print("Sending request to http://127.0.0.1:8000/status...")
    
    # Send GET request
    response = await client.get("http://127.0.0.1:8000/status")
    
    # Print result
    if response.status_code == 200:
        print("Service started successfully")
    else:
        print("Service startup failed, check error messages")


• Asynchronous support: Jupyter's built-in asyncio event loop allows direct use of async/await. • Run relevant code blocks to see API service startup success in output.

  1. Check IP Address • In notebook code blocks, use English exclamation mark + terminal command format to execute commands, or use !pip install xxx to install additional Python packages. Next, create a new code block, enter !ifconfig and run to view the Notebook session's IP address in VPC. The IP 172.16.0.252 in the image is the required IP, and the API service address is: http://172.16.0.252:8080/analyze_revenue

Accessing the Service from Dify on DMS

Now we use this simulated sales data file to access the API service:

• Add HTTP Request node in workflow

• Access the created API service via http://:/xxx and pass parameters in BODY

• Test run shows the request successfully returns response

• You can also see the service being called in the Notebook session

Next, let's demonstrate complete service calls using this Dify workflow example: sales_analysis.yml

Add a custom robot to a DingTalk group and refer to https://open.dingtalk.com/document/orgapp/robot-overview to obtain DingTalk robot access_token and sign_secret

After filling in your DingTalk robot parameters, click Run in the top right corner -> Upload sample sales data from local, and click Start Run

The robot in the DingTalk group successfully sends the analysis report for the sales data

Click Publish in the top right corner -> Publish Update to publish the workflow for latter scheduled calls

Implementing Timed Scheduling with DMS Airflow

Create DMS Airflow Instance

Refer to the following links to create an Airflow instance in DMS https://help.aliyun.com/zh/dms/purchase-airflow-resources?spm=a2c4g.11186623.help-menu-26437.d_2_8_1_2_0.3aa88c02Z9PHKzhttps://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance?spm=a2c4g.11186623.help-menu-26437.d_2_8_1_2_1.1edc167ePCMXpI&scm=20140722.H_2881043._.OR_help-T_cn~zh-V_1For more Airflow operations, refer to https://airflow.apache.org/docs/apache-airflow/stable/index.html

Write DAG: Daily Automated Sales Analysis Trigger

• Below is sample Python code for timed Dify workflow API calls

Click to view code``` import pendulum import requests import json from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable CSV_FILE_PATH = "/yourpath/daily_revenue_data.csv" DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # Replace with you're Dify Workflow API URL

Safely get API Key from Airflow Variable

DIFY_API_KEY = Variable.get("dify_api_key") APP_API_KEY = Variable.get("app_api_key") def call_dify_workflow_with_csv(**kwargs): """ Read CSV file content and call Dify workflow as file upload. """ print(f"Preparing to read from '{CSV_FILE_PATH}'...") try: with open(CSV_FILE_PATH, 'rb') as f: files_to_upload = { 'file': ('daily_revenue_data.csv', f, 'document/csv') } # Prepare API request headers and body headers = { 'Authorization': f'Bearer {APP_API_KEY}', 'DifyApiKey': f'{DIFY_API_KEY}', }

        file_upload_response=requests.post(
            DIFY_API_URL+'/files/upload',
            headers=headers, 
            data={'user': 'airflow-user-demo'}, 
            files=files_to_upload,
        )
        print(file_upload_response.json())
            
        file_id=file_upload_response.json().get('id')
        headers.update({'Content-Type': 'application/json'})
        
        # 'inputs' is usually a JSON string
        # 'user' is required, representing the end user identifier
        input_data = {
            'revenue_data':  {
                    "type": "document",
                    "transfer_method": "local_file",
                    "upload_file_id": file_id
                }
        }
        data = {
            'inputs': input_data,  
            'user': 'airflow-user-demo',
            'response_mode': 'blocking',
        }
        print("Starting Dify API call...")
        print(f"URL: {DIFY_API_URL}")
        
        response = requests.post(
            DIFY_API_URL+'/workflows/run', 
            headers=headers, 
            json=data,
        )
        
        # Check if request was successful
        response.raise_for_status() 
        
        print(f"API call successful! Status code: {response.status_code}")
        
        # Print response content
        print("--- Dify API Response ---")
        
        print(response.json()["data"]["outputs"]["answer"])
                 
        print("\n--- End of Response ---")
        
        # You can also push complete response to XComs for downstream tasks
        # ti = kwargs['ti']
        # ti.xcom_push(key='dify_response', value=full_response)
except FileNotFoundError:
    print(f"Error: File not found at '{CSV_FILE_PATH}'")
    raise
except requests.exceptions.RequestException as e:
    print(f"API call failed: {e}")
    raise

with DAG( dag_id="dify_workflow", start_date=pendulum.datetime(2023, 10, 27, tz="Asia/Shanghai"), # '0 8 * * *' represents 8:00 AM daily (UTC+8) # Airflow uses UTC by default, but cron expressions themselves are timezone-agnostic schedule="0 8 * * *", catchup=False, tags=["dify", "api", "example"], doc_md=""" ### Dify Workflow Call DAG This DAG runs daily at 8:00 AM, performing: 1. Reads a CSV file from local filesystem. 2. Calls a Dify workflow with the CSV as attachment. 3. Prints Dify API response. """ ) as dag: run_dify_workflow = PythonOperator( task_id="call_dify", python_callable=call_dify_workflow_with_csv, )




Note: To upload local files via API when calling Dify workflows, you need to first upload the file via /files/upload to get file ID, then pass the file ID to the workflow.

• After creation, open the Airflow instance to see the scheduled task

• Daily at 8:00, the system automatically calls the Dify workflow, with the DingTalk robot pushing the analysis report

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.