On this tutorial, we construct an ultra-advanced agentic AI workflow that behaves like a production-grade analysis and reasoning system somewhat than a single immediate name. We ingest actual net sources asynchronously, break up them into provenance-tracked chunks, and run hybrid retrieval utilizing each TF-IDF (sparse) and OpenAI embeddings (dense), then fuse outcomes for greater recall and stability. We orchestrate a number of brokers, planning, synthesis, and restore, whereas imposing strict guardrails so each main declare is grounded in retrieved proof, and we persist episodic reminiscence. Therefore, the system improves its technique over time. Take a look at the FULL CODES right here.
!pip -q set up openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy
import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import Record, Dict, Tuple, Optionally available, Any
import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Discipline
from sklearn.feature_extraction.textual content import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from openai import AsyncOpenAI
from brokers import Agent, Runner, SQLiteSession
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
if not os.environ.get("OPENAI_API_KEY"):
elevate RuntimeError("OPENAI_API_KEY not supplied.")
print("✅ OpenAI API key loaded securely.")
oa = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
def sha1(s: str) -> str:
return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()
def normalize_url(u: str) -> str:
u = (u or "").strip()
return u.rstrip(").,]"'")
def clean_html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
txt = soup.get_text("n")
txt = re.sub(r"n{3,}", "nn", txt).strip()
txt = re.sub(r"[ t]+", " ", txt)
return txt
def chunk_text(textual content: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> Record[str]:
if not textual content:
return []
textual content = re.sub(r"s+", " ", textual content).strip()
n = len(textual content)
step = max(1, chunk_chars - overlap_chars)
chunks = []
i = 0
whereas i < n:
chunks.append(textual content[i:i + chunk_chars])
i += step
return chunks
def canonical_chunk_id(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
s = s.strip("<>"'()[]{}")
s = s.rstrip(".,;:")
return s
def inject_exec_summary_citations(exec_summary: str, citations: Record[str], allowed_chunk_ids: Record[str]) -> str:
exec_summary = exec_summary or ""
cset = []
for c in citations:
c = canonical_chunk_id(c)
if c and c in allowed_chunk_ids and c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) < 2:
for c in allowed_chunk_ids:
if c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) >= 2:
wanted = [c for c in cset if c not in exec_summary]
if wanted:
exec_summary = exec_summary.strip()
if exec_summary and never exec_summary.endswith("."):
exec_summary += "."
exec_summary += f" (cite: {cset[0]}) (cite: {cset[1]})"
return exec_summary
We arrange the atmosphere, securely load the OpenAI API key, and initialize core utilities that the whole lot else will depend on. We outline hashing, URL normalization, HTML cleansing, and chunking so all downstream steps function on clear, constant textual content. We additionally add deterministic helpers to normalize and inject citations, guaranteeing guardrails are all the time glad. Take a look at the FULL CODES right here.
async def fetch_many(urls: Record[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -> Dict[str, str]:
headers = {"Person-Agent": "Mozilla/5.0 (AgenticAI/4.2)"}
urls = [normalize_url(u) for u in urls]
urls = [u for u in urls if u.startswith("http")]
urls = listing(dict.fromkeys(urls))
out: Dict[str, str] = {}
async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as shopper:
async def _one(url: str):
strive:
r = await shopper.get(url)
r.raise_for_status()
out[url] = clean_html_to_text(r.textual content)[:per_url_char_limit]
besides Exception as e:
out[url] = f"__FETCH_ERROR__ {sort(e).__name__}: {e}"
await asyncio.collect(*[_one(u) for u in urls])
return out
def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
seen = set()
out = {}
for url, txt in sources.objects():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
h = sha1(txt[:25000])
if h in seen:
proceed
seen.add(h)
out[url] = txt
return out
class ChunkRecord(BaseModel):
chunk_id: str
url: str
chunk_index: int
textual content: str
class RetrievalHit(BaseModel):
chunk_id: str
url: str
chunk_index: int
score_sparse: float = 0.0
score_dense: float = 0.0
score_fused: float = 0.0
textual content: str
class EvidencePack(BaseModel):
question: str
hits: Record[RetrievalHit]
We asynchronously fetch a number of net sources in parallel and aggressively deduplicate content material to keep away from redundant proof. We convert uncooked pages into structured textual content and outline the core knowledge fashions that signify chunks and retrieval hits. We guarantee every bit of textual content is traceable again to a selected supply and chunk index. Take a look at the FULL CODES right here.
EPISODE_DB = "agentic_episode_memory.db"
def episode_db_init():
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
query TEXT NOT NULL,
urls_json TEXT NOT NULL,
retrieval_queries_json TEXT NOT NULL,
useful_sources_json TEXT NOT NULL
)
""")
con.commit()
con.shut()
def episode_store(query: str, urls: Record[str], retrieval_queries: Record[str], useful_sources: Record[str]):
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(
"INSERT INTO episodes(ts, query, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)",
(int(time.time()), query, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
)
con.commit()
con.shut()
def episode_recall(query: str, top_k: int = 2) -> Record[Dict[str, Any]]:
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("SELECT ts, query, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200")
rows = cur.fetchall()
con.shut()
q_tokens = set(re.findall(r"[A-Za-z]{3,}", (query or "").decrease()))
scored = []
for ts, q2, u, rq, us in rows:
t2 = set(re.findall(r"[A-Za-z]{3,}", (q2 or "").decrease()))
if not t2:
proceed
rating = len(q_tokens & t2) / max(1, len(q_tokens))
if rating > 0:
scored.append((rating, {
"ts": ts,
"query": q2,
"urls": json.hundreds(u),
"retrieval_queries": json.hundreds(rq),
"useful_sources": json.hundreds(us),
}))
scored.kind(key=lambda x: x[0], reverse=True)
return [x[1] for x in scored[:top_k]]
episode_db_init()
We introduce episodic reminiscence backed by SQLite so the system can recall what labored in earlier runs. We retailer questions, retrieval methods, and helpful sources to information future planning. We additionally implement light-weight similarity-based recall to bias the system towards traditionally efficient patterns. Take a look at the FULL CODES right here.
class HybridIndex:
def __init__(self):
self.information: Record[ChunkRecord] = []
self.tfidf: Optionally available[TfidfVectorizer] = None
self.tfidf_mat = None
self.emb_mat: Optionally available[np.ndarray] = None
def build_sparse(self):
corpus = [r.text for r in self.records] if self.information else [""]
self.tfidf = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=80000)
self.tfidf_mat = self.tfidf.fit_transform(corpus)
def search_sparse(self, question: str, okay: int) -> Record[Tuple[int, float]]:
if not self.information or self.tfidf is None or self.tfidf_mat is None:
return []
qv = self.tfidf.rework([query])
sims = cosine_similarity(qv, self.tfidf_mat).flatten()
high = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in high]
def set_dense(self, mat: np.ndarray):
self.emb_mat = mat.astype(np.float32)
def search_dense(self, q_emb: np.ndarray, okay: int) -> Record[Tuple[int, float]]:
if self.emb_mat is None or not self.information:
return []
M = self.emb_mat
q = q_emb.astype(np.float32).reshape(1, -1)
M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
q_norm = q / (np.linalg.norm(q) + 1e-9)
sims = (M_norm @ q_norm.T).flatten()
high = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in high]
def rrf_fuse(rankings: Record[List[int]], okay: int = 60) -> Dict[int, float]:
scores: Dict[int, float] = {}
for r in rankings:
for pos, idx in enumerate(r, begin=1):
scores[idx] = scores.get(idx, 0.0) + 1.0 / (okay + pos)
return scores
HYBRID = HybridIndex()
ALLOWED_URLS: Record[str] = []
EMBED_MODEL = "text-embedding-3-small"
async def embed_batch(texts: Record[str]) -> np.ndarray:
resp = await oa.embeddings.create(mannequin=EMBED_MODEL, enter=texts, encoding_format="float")
vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)
async def embed_texts(texts: Record[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
sem = asyncio.Semaphore(max_concurrency)
mats: Record[Tuple[int, np.ndarray]] = []
async def _one(begin: int, batch: Record[str]):
async with sem:
m = await embed_batch(batch)
mats.append((begin, m))
duties = []
for begin in vary(0, len(texts), batch_size):
batch = [t[:7000] for t in texts[start:start + batch_size]]
duties.append(_one(begin, batch))
await asyncio.collect(*duties)
mats.kind(key=lambda x: x[0])
emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
if emb.form[0] != len(texts):
elevate RuntimeError(f"Embedding rows mismatch: received {emb.form[0]} anticipated {len(texts)}")
return emb
async def embed_query(question: str) -> np.ndarray:
m = await embed_batch([query[:7000]])
return m[0] if m.form[0] else np.zeros((0,), dtype=np.float32)
async def build_index(urls: Record[str], max_chunks_per_url: int = 60):
international ALLOWED_URLS
fetched = await fetch_many(urls)
fetched = dedupe_texts(fetched)
information: Record[ChunkRecord] = []
allowed: Record[str] = []
for url, txt in fetched.objects():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
allowed.append(url)
chunks = chunk_text(txt)[:max_chunks_per_url]
for i, ch in enumerate(chunks):
cid = f"{sha1(url)}:{i}"
information.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, textual content=ch))
if not information:
err_view = {normalize_url(u): fetched.get(normalize_url(u), "") for u in urls}
elevate RuntimeError("No sources fetched efficiently.n" + json.dumps(err_view, indent=2)[:4000])
ALLOWED_URLS = allowed
HYBRID.information = information
HYBRID.build_sparse()
texts = [r.text for r in HYBRID.records]
emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
HYBRID.set_dense(emb)
We construct a hybrid retrieval index that mixes sparse TF-IDF search with dense OpenAI embeddings. We allow reciprocal rank fusion, in order that sparse and dense alerts complement one another somewhat than compete. We assemble the index as soon as per run and reuse it throughout all retrieval queries for effectivity. Take a look at the FULL CODES right here.
def build_evidence_pack(question: str, sparse: Record[Tuple[int,float]], dense: Record[Tuple[int,float]], okay: int = 10) -> EvidencePack:
sparse_rank = [i for i,_ in sparse]
dense_rank = [i for i,_ in dense]
sparse_scores = {i:s for i,s in sparse}
dense_scores = {i:s for i,s in dense}
fused = rrf_fuse([sparse_rank, dense_rank], okay=60) if dense_rank else rrf_fuse([sparse_rank], okay=60)
high = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]
hits: Record[RetrievalHit] = []
for idx in high:
r = HYBRID.information[idx]
hits.append(RetrievalHit(
chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
score_sparse=float(sparse_scores.get(idx, 0.0)),
score_dense=float(dense_scores.get(idx, 0.0)),
score_fused=float(fused.get(idx, 0.0)),
textual content=r.textual content
))
return EvidencePack(question=question, hits=hits)
async def gather_evidence(queries: Record[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
proof: Record[EvidencePack] = []
useful_sources_count: Dict[str, int] = {}
all_chunk_ids: Record[str] = []
for q in queries:
sparse = HYBRID.search_sparse(q, okay=sparse_k)
q_emb = await embed_query(q)
dense = HYBRID.search_dense(q_emb, okay=dense_k)
pack = build_evidence_pack(q, sparse, dense, okay=per_query_k)
proof.append(pack)
for h in pack.hits[:6]:
useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
for h in pack.hits:
all_chunk_ids.append(h.chunk_id)
useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
all_chunk_ids = sorted(listing(dict.fromkeys(all_chunk_ids)))
return proof, useful_sources[:8], all_chunk_ids
class Plan(BaseModel):
goal: str
subtasks: Record[str]
retrieval_queries: Record[str]
acceptance_checks: Record[str]
class UltraAnswer(BaseModel):
title: str
executive_summary: str
structure: Record[str]
retrieval_strategy: Record[str]
agent_graph: Record[str]
implementation_notes: Record[str]
risks_and_limits: Record[str]
citations: Record[str]
sources: Record[str]
def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: Record[str]) -> UltraAnswer:
knowledge = ans.model_dump()
knowledge["citations"] = [canonical_chunk_id(x) for x in (data.get("citations") or [])]
knowledge["citations"] = [x for x in data["citations"] if x in allowed_chunk_ids]
knowledge["executive_summary"] = inject_exec_summary_citations(knowledge.get("executive_summary",""), knowledge["citations"], allowed_chunk_ids)
return UltraAnswer(**knowledge)
def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: Record[str]) -> None:
extras = [u for u in ans.sources if u not in ALLOWED_URLS]
if extras:
elevate ValueError(f"Non-allowed sources in output: {extras}")
cset = set(ans.citations or [])
lacking = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
if lacking:
elevate ValueError(f"Citations reference unknown chunk_ids (not retrieved): {lacking}")
if len(cset) < 6:
elevate ValueError("Want at the least 6 distinct chunk_id citations in extremely mode.")
es_text = ans.executive_summary or ""
es_count = sum(1 for cid in cset if cid in es_text)
if es_count < 2:
elevate ValueError("Govt abstract should embrace at the least 2 chunk_id citations verbatim.")
PLANNER = Agent(
title="Planner",
mannequin="gpt-4o-mini",
directions=(
"Return a technical Plan schema.n"
"Make 10-16 retrieval_queries.n"
"Acceptance should embrace: at the least 6 citations and exec_summary incorporates at the least 2 citations verbatim."
),
output_type=Plan,
)
SYNTHESIZER = Agent(
title="Synthesizer",
mannequin="gpt-4o-mini",
directions=(
"Return UltraAnswer schema.n"
"Laborious constraints:n"
"- executive_summary MUST embrace at the least TWO citations verbatim as: (cite: ).n"
"- citations should be chosen ONLY from ALLOWED_CHUNK_IDS listing.n"
"- citations listing should embrace at the least 6 distinctive chunk_ids.n"
"- sources should be subset of allowed URLs.n"
),
output_type=UltraAnswer,
)
FIXER = Agent(
title="Fixer",
mannequin="gpt-4o-mini",
directions=(
"Restore to fulfill guardrails.n"
"Guarantee executive_summary consists of at the least TWO citations verbatim.n"
"Select citations ONLY from ALLOWED_CHUNK_IDS listing.n"
"Return UltraAnswer schema."
),
output_type=UltraAnswer,
)
session = SQLiteSession("ultra_agentic_user", "ultra_agentic_session.db")
We collect proof by operating a number of focused queries, fusing sparse and dense outcomes, and assembling proof packs with scores and provenance. We outline strict schemas for plans and remaining solutions, then normalize and validate citations towards retrieved chunk IDs. We implement arduous guardrails so each reply stays grounded and auditable. Take a look at the FULL CODES right here.
async def run_ultra_agentic(query: str, urls: Record[str], max_repairs: int = 2) -> UltraAnswer:
await build_index(urls)
recall_hint = json.dumps(episode_recall(query, top_k=2), indent=2)[:2000]
plan_res = await Runner.run(
PLANNER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n",
session=session
)
plan: Plan = plan_res.final_output
queries = (plan.retrieval_queries or [])[:16]
evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)
evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)
draft_res = await Runner.run(
SYNTHESIZER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Proof packs:n{evidence_json}nn"
"Return UltraAnswer.",
session=session
)
draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)
last_err = None
for i in vary(max_repairs + 1):
strive:
validate_ultra(draft, allowed_chunk_ids)
episode_store(query, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
return draft
besides Exception as e:
last_err = str(e)
if i >= max_repairs:
draft = normalize_answer(draft, allowed_chunk_ids)
validate_ultra(draft, allowed_chunk_ids)
return draft
fixer_res = await Runner.run(
FIXER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Guardrail error:n{last_err}nn"
f"Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn"
f"Proof packs:n{evidence_json}nn"
"Return corrected UltraAnswer that passes guardrails.",
session=session
)
draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)
elevate RuntimeError(f"Surprising failure: {last_err}")
query = (
"Design a production-lean however superior agentic AI workflow in Python with hybrid retrieval, "
"provenance-first citations, critique-and-repair loops, and episodic reminiscence. "
"Clarify why every layer issues, failure modes, and analysis."
)
urls = [
"https://openai.github.io/openai-agents-python/",
"https://openai.github.io/openai-agents-python/agents/",
"https://openai.github.io/openai-agents-python/running_agents/",
"https://github.com/openai/openai-agents-python",
]
ans = await run_ultra_agentic(query, urls, max_repairs=2)
print("nTITLE:n", ans.title)
print("nEXECUTIVE SUMMARY:n", ans.executive_summary)
print("nARCHITECTURE:")
for x in ans.structure:
print("-", x)
print("nRETRIEVAL STRATEGY:")
for x in ans.retrieval_strategy:
print("-", x)
print("nAGENT GRAPH:")
for x in ans.agent_graph:
print("-", x)
print("nIMPLEMENTATION NOTES:")
for x in ans.implementation_notes:
print("-", x)
print("nRISKS & LIMITS:")
for x in ans.risks_and_limits:
print("-", x)
print("nCITATIONS (chunk_ids):")
for c in ans.citations:
print("-", c)
print("nSOURCES:")
for s in ans.sources:
print("-", s)
We orchestrate the complete agentic loop by chaining planning, synthesis, validation, and restore in an async-safe pipeline. We mechanically retry and repair outputs till they cross all constraints with out human intervention. We end by operating a full instance and printing a totally grounded, production-ready agentic response.
In conclusion, we developed a complete agentic pipeline sturdy to widespread failure modes: unstable embedding shapes, quotation drift, and lacking grounding in government summaries. We validated outputs towards allowlisted sources, retrieved chunk IDs, mechanically normalized citations, and injected deterministic citations when wanted to ensure compliance with out sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic reminiscence, we created a reusable basis we will lengthen with stronger evaluations (claim-to-evidence protection scoring, adversarial red-teaming, and regression checks) to constantly harden the system because it scales to new domains and bigger corpora.
Take a look at the FULL CODES right here. Additionally, be at liberty to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you may be part of us on telegram as effectively.
