Build AI with Confluent Intelligence in Confluent Cloud¶
Confluent Intelligence enables you to seamlessly integrate large-language models (LLMs), machine learning (ML), retrieval-augmented generation (RAG), and agentic AI into your streaming data workflows.
Confluent Intelligence is a suite of the following features.
- Streaming Agents: Use Streaming Agents to build AI workflows that can invoke tools to interact with external systems, perform actions, or retrieve information as part of an AI workflow.
- Real-time Context Engine: Enable Streaming Agents to query the most up-to-date context, grounding their responses in real-time data. They support storing and searching vector embeddings, like those generated from text or images, allowing for semantic similarity lookups that enhance the agent’s ability to retrieve relevant information.
- External Tables: Enable data streams to be enriched with non-Kafka data sources to provide the most current and complete views possible. Confluent Cloud for Apache Flink provides functions for searching over external tables, like KEY_SEARCH_AGG, TEXT_SEARCH_AGG, and VECTOR_SEARCH_AGG.
- Real-time embedding support: Confluent Cloud for Apache Flink provides built-in functions for creating embeddings, like AI_EMBEDDING.
- Remote Model Support: Connect to remote AI and ML models hosted on various platforms, like OpenAI, AWS Bedrock, AWS Sagemaker, Google Cloud’s Vertex AI, and Azure AI. This means you can leverage models hosted outside Confluent Cloud for real-time predictions and inference.
- Built-in machine learning (ML) functions: Confluent Cloud for Apache Flink provides built-in functions for building ML workflows, like ML_DETECT_ANOMALIES, ML_EVALUATE, and ML_PREDICT.
- Secure Connections: Secure, reusable way to integrate and manage connectivity with external systems
Streaming Agents¶
Streaming Agents with Confluent Intelligence¶
The key to agentic AI isn’t building better LLMs – it’s data readiness.
Streaming Agents bridge the gap between enterprise data and AI capabilities by providing:
- Real-time data access: Fresh, contextualized data for AI decision-making
- Unified data processing: Seamless integration of streaming and batch data
- Enterprise data utilization: Effective use of existing enterprise data assets
- Context-aware automation: Agents that understand and act on current business context
With Streaming Agents, you can:
- Unify stream processing and agentic AI workflows using familiar Flink APIs, simplifying development and enabling every engineer to be an AI engineer.
- Integrate seamlessly with any tool, model, and data system.
- Access real-time context to enable agents to operate dynamically on live operational events and effectively use LLMs as reasoning engines to plan, decide, and act.
- Ensure agents are secure and trustworthy with full visibility, control, and secure, governed event flows.
For more information, see Streaming Agents.
External tables and search¶
External tables and search with Confluent Cloud for Apache Flink¶
External tables and search enable data streams to be enriched with non-Apache Kafka® data sources to provide the most current and complete views possible. External tables enable capabilities like:
- Real-time data enrichment: Join real-time data streams with data from relational databases, vector databases, and REST APIs to enable more accurate AI decision-making and agentic RAG.
- Unified search: Use Flink SQL to perform both vector search for RAG and quick external table lookups, eliminating complex data synchronization.
- Native integration: Ensure reliability and scalability with a native Flink integration that eliminates custom-built failure points while leveraging Confluent Cloud’s security features and public and private networking capabilities.
- Context-aware AI: Provide agents with the most current and complete data for accurate decision-making.
For more information, see Search External Tables.
Real-time embedding support¶
Real-time embedding support with Confluent Cloud for Apache Flink¶
Provide fresh, accurate context with real-time embeddings, which continuously turn unstructured enterprise data into vector embeddings to enable RAG and mitigate LLM hallucinations.
Use any embedding model, like OpenAI, Amazon, or Google Gemini, to any vector database, like MongoDB Atlas, Pinecone, Elastic, or Couchbase, across any cloud. Save time with Create Embeddings Action, which is a no-code shortcut for vectorizing data in just a few clicks.
For more information, see Create Embeddings.
Remote model inference¶
Remote model inference with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink enables running inference with remote AI/ML models in Flink SQL statements.
AI model inference enables agents to call LLMs for real-time reasoning and helps implement retrieval-augmented generation (RAG) by using AI models as first-class resources in Flink SQL.
- Utilize AI models directly within Flink SQL queries
- Manage remotely hosted AI models with SQL DDL statements
- Invoke remote AI model endpoints, like OpenAI, AzureML, and AWS SageMaker.
Model inference allows you to simplify the development and deployment of AI/ML applications by providing a unified platform for both data processing and AI/ML tasks.
- Simplify development by using familiar SQL syntax to enable agents to interact directly with AI/ML models, including LLMs, reducing the need for specialized ML tools and languages.
- Seamlessly coordinate between data processing and AI workflows to improve efficiency and reduce operational complexity.
- Enable accurate, real-time AI decision-making by leveraging fresh, contextual streaming data to support patterns like Retrieval-Augmented Generation (RAG), which updates LLM models with real-time information
By working with AI/ML models directly as first-class resources within Flink, you can now utilize them within your SQL queries using a familiar syntax for data processing. This approach allows you to create and manage remotely hosted models using SQL Data Definition language (DDL) statements, eliminating the need to interact with the underlying infrastructure.
You can call remote model endpoints, like OpenAI, GCP Vertex AI, AWS SageMaker, and Azure, and receive inference results in your Flink jobs. This integration enables seamless use of remote AI/ML models in your real-time data pipeline, enhancing the flexibility and scalability of your AI applications.
CREATE MODEL Statement¶
The CREATE MODEL statement registers a remote model in your Flink environment for real-time prediction and inference over streaming data.
- Model inference in streaming queries
- Once a model is registered, you can use it in your Flink SQL statements to run inference. Confluent Cloud for Apache Flink provides built-in functions for AI/ML workflows with your registered models.
- Model Versioning
- Basic model versioning capabilities enable you to manage different versions of models.
- Supported platforms
The following platforms are supported for remote model inference.
- AWS Bedrock: Navigate to the AWS Bedrock site, select a foundation model, and use the provided model ID and endpoint for inference.
- AWS Sagemaker: Similar to AWS Bedrock, but using Sagemaker Studio.
- OpenAI: Use OpenAI’s API for model inference.
- Google Cloud Vertex AI: Integrate models hosted on Vertex AI.
- Azure AI Foundry: Use models hosted on Azure AI and Azure ML.
Flink SQL integration¶
Confluent Cloud for Apache Flink provides a Flink SQL interface for creating and managing model, agent, and tool resources. You can use a SQL statement to create a model resource and pass it on for inference in streaming queries. The SQL interface is available in Cloud Console and the Flink SQL shell.
- CREATE MODEL: Create a model resource.
- CREATE AGENT: Create an agent resource.
- CREATE TOOL: Create a tool resource.
Model inference and agentic AI functions like these are available for building AI/ML workflows.
- AI_TOOL_INVOKE: Invoke model context protocol (MCP) tools and user-defined functions (UDFs) in your streaming agents.
- AI_COMPLETE: Generate text completions.
- AI_EMBEDDING: Create embeddings.
For more information, see Run an AI Model.
Built-in machine learning (ML) functions¶
Built-in ML functions with Confluent Cloud for Apache Flink¶
Simplify complex data science tasks into Flink SQL statements. Built-in ML functions enable forecasting and anomaly detection with Flink SQL functions to derive real-time insights, with no ML expertise or model building needed.
- Do continuous forecasting on time-series streaming data, with out-of-the-box configuration (Auto-ARIMA) or custom user configuration, like training size, seasonality, and forecast horizon.
- Perform anomaly detection with Boolean value prediction for each new event.
- See real-time visualizations, like time-series charts and graphs showing forecasted values and anomalies.
Built-in ML Functions provide time-series Forecasting and Anomaly Detection SQL functions for streaming data, enabling you to derive real-time insights. These functions simplify complex data science tasks into Flink SQL, providing a familiar yet powerful way to apply AI to streaming data. Built on top of popular ML algorithms like ARIMA optimized for real-time performance, the functions deliver accurate forecasts and reliable anomaly detection.
With built-in ML functions, you can:
- Eliminate the need for batch processes
- Bridge the gap between data analysis and machine learning
- Gain real-time, actionable insights
Built-in ML functions make it easier for you to harness the full potential of AI-driven analytics. SQL functions enable real-time analysis, reduce complexity, and speed up decision-making by delivering insights immediately as the data is ingested. Built-in forecasting and anomaly detection make real-time AI accessible to everyone, enabling agents and teams to make smarter decisions faster.
Common use cases include:
- Operational monitoring: Detect system failures or performance issues in real time, minimizing downtime.
- Financial forecasting: Predict trends and identify irregular transactions in streaming financial data.
- IoT analytics: Monitor sensor data in industrial settings to detect equipment malfunctions or predict maintenance needs.
- Retail analytics: Forecast demand and optimize inventory by identifying purchasing trends in real time.
Built-in ML functions¶
The following functions are available for building ML workflows.
- ML_DETECT_ANOMALIES: Detect anomalies in your data.
- ML_EVALUATE: Evaluate the performance of an AI/ML model.
- ML_FORECAST: Forecast trends in your data.
- ML_PREDICT: Run a remote AI/ML model for tasks like predicting outcomes, generating text, and classification.
ML preprocessing utility functions like these are available for building ML workflows.
- ML_BUCKETIZE: Bucketize a column.
- ML_CHARACTER_TEXT_SPLITTER, ML_FILE_FORMAT_TEXT_SPLITTER, ML_RECURSIVE_TEXT_SPLITTER: Split a column into multiple columns.
- ML_LABEL_ENCODER, ML_ONE_HOT_ENCODER: Encode a column.
- ML_MAX_ABS_SCALER, ML_MIN_MAX_SCALER: Scale a column.
- ML_NGRAMS: Create n-grams.
- ML_NORMALIZER: Normalize a column.
- ML_ROBUST_SCALER, ML_STANDARD_SCALER: Scale a column.
For more information, see Built-in AI/ML Functions.
Secure connections¶
Secure connections with Confluent Cloud for Apache Flink¶
Reusable connection resources provide a secure way to seamlessly integrate and manage connectivity with external systems. Connections on Confluent Cloud enable secure connections to models, vector databases, and MCP directly using Flink SQL.
- Secure integration: Connect to relational and vector databases, models, REST APIs, and MCP servers using Flink SQL.
- Credential management: Securely pass secrets for external systems, ensuring sensitive credentials are stored separately and never exposed in catalog metadata, logs, or configuration files.
- Reusability: Enable the same connection to be shared across multiple tables, models, and functions.
- Centralized management: Centralize connection management for large-scale deployments.
Connecting to models, vector databases, and MCP is crucial for building agents. Connections in Flink provide a secure, reusable way to manage external connectivity. Sensitive credentials are stored separately from connection metadata, enabling seamless and secure integration with external systems.
For more information, see Reuse Confluent Cloud Connections With External Services.
RBAC for model inference¶
The following table shows the model actions that are available for different RBAC permissions.
| Role | CREATE MODEL | Invoke model for prediction | List/Describe Models | DROP MODEL | Grant permissions on models |
|---|---|---|---|---|---|
| CloudClusterAdmin | Yes [1] | Yes [1] | Yes [1] | Yes [1] | Yes [1] |
| EnvironmentAdmin | Yes | Yes | Yes | Yes | Yes |
| DeveloperManage | Yes | No | Yes | Yes | No |
| DeveloperRead | No | Yes | Yes | No | No |
| DeveloperWrite | Yes | Yes | Yes | No | No |
| ResourceOwner | Yes | Yes | Yes | Yes | Yes |
| OrganizationAdmin | Yes | Yes | Yes | Yes | Yes |
| [1] | (1, 2, 3, 4, 5) For own Kafka cluster. |