Skip to main content

Overview

The AI Workflows component leverages Kestra orchestration to run automated scripts that generate insights, forecasts, and intelligence using AI models from OpenAI and Google Gemini.

Technology Stack

Node.js

JavaScript runtime for script execution

OpenAI

GPT models for text generation and analysis

Google Gemini

Advanced AI for multi-modal insights

Puppeteer

Web automation and crawling

Key Capabilities

AI-Powered Insights

  • Sales Forecasting
  • Trend Detection
  • Automated Reporting
Generate predictions for future sales using historical data and AI models:
  • Time Series Analysis: Trend identification and seasonality
  • Multi-variate Predictions: Factor in marketing, inventory, pricing
  • Confidence Intervals: Probabilistic forecasts with uncertainty
  • Brand-specific Models: Customized for each brand’s patterns
Output: Daily/weekly/monthly forecast reports

Web Crawling & Intelligence

Monitor competitor prices across the web:
  • Automated price scraping from competitor sites
  • Price change detection and alerting
  • Historical price tracking
  • Margin impact analysis
Frequency: Daily crawls for key products
Track product availability across channels:
  • Stock status monitoring
  • Out-of-stock duration tracking
  • Multi-marketplace availability
  • Fulfillment speed comparison
Use Case: Identify stock-out opportunities
Aggregate and analyze customer reviews:
  • Sentiment analysis across platforms
  • Feature extraction from reviews
  • Competitive review comparison
  • Product improvement insights
AI Models: GPT-4 for sentiment, Gemini for summarization

Script Architecture

Kestra Workflows

Scripts are organized as Kestra workflows:
id: daily-sales-forecast
namespace: trendteller.insights

tasks:
  - id: fetch-data
    type: io.kestra.plugin.scripts.node.Script
    script: |
      const data = await fetchSalesData()
      return { data }

  - id: generate-forecast
    type: io.kestra.plugin.scripts.node.Script
    script: |
      const forecast = await openai.forecast(data)
      return { forecast }

  - id: save-results
    type: io.kestra.plugin.gcp.bigquery.Query
    sql: INSERT INTO forecasts ...

Script Organization

kestra-scripts/
├── insights/
│   ├── sales-forecast.js      # Sales predictions
│   ├── trend-analysis.js      # Trend detection
│   └── anomaly-detection.js   # Outlier identification
├── crawlers/
│   ├── price-monitor.js       # Competitive pricing
│   ├── availability-check.js  # Stock monitoring
│   └── review-scraper.js      # Customer reviews
├── reports/
│   ├── daily-summary.js       # Daily reports
│   ├── weekly-analysis.js     # Weekly deep dive
│   └── executive-dashboard.js # Executive KPIs
└── shared/
    ├── bigquery-client.js     # BigQuery utilities
    ├── ai-helpers.js          # AI model wrappers
    └── email-sender.js        # Report distribution

AI Integration

OpenAI Integration

1

Initialize Client

import OpenAI from 'openai'

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY
})
2

Generate Insights

const completion = await openai.chat.completions.create({
  model: "gpt-4-turbo",
  messages: [
    {
      role: "system",
      content: "You are a data analyst specializing in e-commerce."
    },
    {
      role: "user",
      content: `Analyze this sales data and provide insights: ${data}`
    }
  ]
})
3

Process Results

const insights = completion.choices[0].message.content
await saveToBigQuery(insights)
await sendEmailReport(insights)

Google Gemini Integration

import { GoogleGenerativeAI } from '@google/generative-ai'

const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY)
const model = genAI.getGenerativeModel({ model: "gemini-pro" })

// Generate multi-modal insights
const result = await model.generateContent([
  "Analyze this sales trend chart and provide insights",
  { inlineData: { data: chartImageBase64, mimeType: "image/png" } }
])
Gemini’s multi-modal capabilities allow analysis of charts, images, and text together for richer insights.

Data Access

BigQuery Integration

Scripts access BigQuery for historical data:
import { BigQuery } from '@google-cloud/bigquery'

const bigquery = new BigQuery({
  projectId: 'togo-425319',
  keyFilename: process.env.GOOGLE_APPLICATION_CREDENTIALS
})

// Query sales data
const query = `
  SELECT date, brand_id, total_revenue
  FROM \`togo-425319.gold.daily_sales\`
  WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
  ORDER BY date
`

const [rows] = await bigquery.query(query)

PostgreSQL Access

For operational data stored in Postgres:
import { Client } from 'pg'

const client = new Client({
  host: process.env.PG_HOST,
  database: 'trendteller',
  user: process.env.PG_USER,
  password: process.env.PG_PASSWORD
})

await client.connect()
const result = await client.query('SELECT * FROM users WHERE active = true')

Web Crawling

Puppeteer Automation

import puppeteer from 'puppeteer'

const browser = await puppeteer.launch({ headless: true })
const page = await browser.newPage()

// Navigate to competitor site
await page.goto('https://competitor.com/product/12345')

// Extract price
const price = await page.$eval('.product-price', el => el.textContent)

// Take screenshot for verification
await page.screenshot({ path: `screenshots/${productId}.png` })

await browser.close()
Always respect robots.txt and implement rate limiting to avoid overloading target sites.

Anti-Bot Evasion

// Use stealth plugin to avoid detection
import puppeteer from 'puppeteer-extra'
import StealthPlugin from 'puppeteer-extra-plugin-stealth'

puppeteer.use(StealthPlugin())

// Random delays to mimic human behavior
await page.waitForTimeout(Math.random() * 2000 + 1000)

// Rotate user agents
const userAgents = [/* ... */]
await page.setUserAgent(userAgents[Math.floor(Math.random() * userAgents.length)])

Scheduling & Orchestration

Kestra Schedules

  • Daily Jobs
  • Weekly Jobs
  • On-Demand
Run every day at specific times:
triggers:
  - id: daily
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "0 6 * * *"  # 6 AM daily
  • Sales forecasts (6 AM)
  • Daily summary reports (7 AM)
  • Price monitoring (8 AM)

Output & Distribution

Report Generation

AI-generated reports are distributed via:

Email

Automated email delivery using SendGrid or AWS SES

Dashboard

Reports saved to BigQuery and displayed in frontend

Slack

Critical alerts and summaries posted to Slack

API

Insights exposed via REST API for integrations

Storage

  • Reports: Stored in BigQuery insights dataset
  • Forecasts: Saved to forecasts table with confidence intervals
  • Screenshots: Uploaded to Google Cloud Storage
  • Logs: Kestra execution logs retained for 90 days

Monitoring & Alerting

Execution Monitoring

Track workflow execution:
  • Success rate: Percentage of successful runs
  • Duration: Average and P95 execution time
  • Resource usage: CPU, memory, API credits
  • Error rates: Failed tasks and reasons

Alerts

Automated alerts for:
  • Workflow failures or timeouts
  • API quota exceeded (OpenAI, Gemini)
  • Anomalous insights detected
  • Price changes exceeding thresholds

Cost Management

AI API Costs

Monthly AI API spending is monitored and budgeted:
  • OpenAI: ~$500/month (GPT-4 Turbo)
  • Google Gemini: ~$200/month (Gemini Pro)
  • Total: ~$700/month for AI services

Optimization Strategies

  • Use GPT-3.5 for simple tasks
  • Batch API requests when possible
  • Cache AI responses for repeated queries
  • Implement token limits per request

Next Steps