This repository contains a streaming-first RAG pipeline built from three applications:
- apps/cdc – Flink 1.19 session cluster that uses Flink CDC to capture data from MySQL and publish it to Kafka.
- apps/knowledge – Flink 2.0 DataStream job that reads CDC events from Kafka and asynchronously enriches Qdrant with fresh embeddings.
- apps/ask – Flink 2.0 DataStream job that consumes questions from Kafka, retrieves context from Qdrant, calls OpenAI, and emits answers back to Kafka.
The overall flow is:
MySQL -> (Flink CDC) -> Kafka topic `knowledge_topic`
-> (Knowledge Job) -> Qdrant with metadata = context ID
Ask topic `ask` -> (Ask Job) -> Answer topic `answer`
apps/
ask/ # Flink 2.0 ask processor
knowledge/ # Flink 2.0 knowledge ingestor
cdc/ # Session-cluster CDC stack (Flink 1.19 + MySQL + Kafka)
libs/
core/ # Shared LangChain/Qdrant services
Dockerfile # Multi-target builder for ask & knowledge images
docker-compose.yml # Full stack (CDC + jobs + infra)
up.sh # One command bootstrap
- Docker 24+ with Compose v2
- Java tooling is handled inside the Docker builds (Gradle 8, JDK 21)
- An OpenAI API key with access to
gpt-4o-miniandtext-embedding-3-small
-
Copy the environment template and add your secrets:
cp .env.example .env # edit .env and set OPENAI_API_KEY at minimum -
Launch the entire stack:
./up.sh
This builds the Flink fat jars, creates the Docker images, and starts MySQL, Kafka, Qdrant and all Flink jobs.
-
Once everything is healthy you can inspect the UIs:
- CDC Flink UI: http://localhost:8081
- Knowledge Flink UI: http://localhost:8082
- Ask Flink UI: http://localhost:8083
- AKHQ (Kafka UI): http://localhost:8085
- Qdrant UI/API: http://localhost:6333
Use the helper script to write knowledge rows into MySQL (they are CDC-ed into Kafka automatically):
./run.sh --context-id main --context "Paris is the capital of France."The CDC job publishes {id, context, knowledge} JSON events to the knowledge_topic. The knowledge job embeds the text and stores it in Qdrant with metadata key defined by CONTEXT_METADATA_KEY (default context).
Send questions through the same script. The job accepts either context or legacy sessionId keys:
./run.sh --context-id main --ask "What is the capital of France?"Stream answers from the answer topic (Ctrl+C to stop):
./run.sh --listMessages include context, question, answer, and a timestamp.
Most values have sensible defaults but can be overridden via .env or the host environment.
| Variable | Default | Description |
|---|---|---|
OPENAI_API_KEY |
required | API key used by both Flink 2.0 jobs |
QDRANT_COLLECTION |
flink_streaming_ai_docs |
Collection that stores embeddings |
CONTEXT_METADATA_KEY |
context |
Metadata key used for Qdrant filtering |
ASK_TOPIC |
ask |
Kafka topic producing questions |
ANSWER_TOPIC |
answer |
Kafka topic receiving answers |
ASK_GROUP_ID |
ask-processor |
Consumer group for the ask job |
KNOWLEDGE_TOPIC |
knowledge_topic |
Kafka topic with CDC updates |
KNOWLEDGE_GROUP_ID |
knowledge-to-qdrant |
Consumer group for the knowledge job |
Internal services (MySQL, Kafka, AKHQ, Qdrant) can also be modified directly in docker-compose.yml.
- Both Flink 2.0 applications rely on shaded JARs created with the Shadow plugin. The multi-target
Dockerfilebuilds them automatically. - Shared LangChain services live under
libs/coreand are reused by both jobs. - The CDC stack still runs on Flink 1.19 because Flink CDC 3.5 depends on the classic SourceFunction API.
- If you need to reset state, use
docker compose down -vto remove Docker volumes (Kafka/MySQL/Qdrant data).
- Ensure
OPENAI_API_KEYis set before running./up.sh; otherwise the Flink jobs will continuously restart. - Use
docker compose logs -f knowledge-jobmanagerorask-jobmanagerto inspect failures in the jobs. - If you change Java code, rerun
docker compose buildto rebuild the shaded JARs.