Thursday, January 15, 2026

Learn how to Construct a Stateless, Safe, and Asynchronous MCP-Fashion Protocol for Scalable Agent Workflows


On this tutorial, we construct a clear, superior demonstration of contemporary MCP design by specializing in three core concepts: stateless communication, strict SDK-level validation, and asynchronous, long-running operations. We implement a minimal MCP-like protocol utilizing structured envelopes, signed requests, and Pydantic-validated instruments to indicate how brokers and companies can work together safely with out counting on persistent periods. Try the FULL CODES right here.

import asyncio, time, json, uuid, hmac, hashlib
from dataclasses import dataclass
from typing import Any, Dict, Elective, Literal, Listing
from pydantic import BaseModel, Subject, ValidationError, ConfigDict


def _now_ms():
   return int(time.time() * 1000)


def _uuid():
   return str(uuid.uuid4())


def _canonical_json(obj):
   return json.dumps(obj, separators=(",", ":"), sort_keys=True).encode()


def _hmac_hex(secret, payload):
   return hmac.new(secret, _canonical_json(payload), hashlib.sha256).hexdigest()

We arrange the core utilities required throughout the whole system, together with time helpers, UUID era, canonical JSON serialization, and cryptographic signing. We make sure that all requests and responses may be deterministically signed and verified utilizing HMAC. Try the FULL CODES right here.

class MCPEnvelope(BaseModel):
   model_config = ConfigDict(additional="forbid")
   v: Literal["mcp/0.1"] = "mcp/0.1"
   request_id: str = Subject(default_factory=_uuid)
   ts_ms: int = Subject(default_factory=_now_ms)
   client_id: str
   server_id: str
   instrument: str
   args: Dict[str, Any] = Subject(default_factory=dict)
   nonce: str = Subject(default_factory=_uuid)
   signature: str


class MCPResponse(BaseModel):
   model_config = ConfigDict(additional="forbid")
   v: Literal["mcp/0.1"] = "mcp/0.1"
   request_id: str
   ts_ms: int = Subject(default_factory=_now_ms)
   okay: bool
   server_id: str
   standing: Literal["ok", "accepted", "running", "done", "error"]
   end result: Elective[Dict[str, Any]] = None
   error: Elective[str] = None
   signature: str

We outline the structured MCP envelope and response codecs that each interplay follows. We implement strict schemas utilizing Pydantic to ensure that malformed or sudden fields are rejected early. It ensures constant contracts between purchasers and servers, which is essential for SDK standardization. Try the FULL CODES right here.

class ServerIdentityOut(BaseModel):
   model_config = ConfigDict(additional="forbid")
   server_id: str
   fingerprint: str
   capabilities: Dict[str, Any]


class BatchSumIn(BaseModel):
   model_config = ConfigDict(additional="forbid")
   numbers: Listing[float] = Subject(min_length=1)


class BatchSumOut(BaseModel):
   model_config = ConfigDict(additional="forbid")
   depend: int
   whole: float


class StartLongTaskIn(BaseModel):
   model_config = ConfigDict(additional="forbid")
   seconds: int = Subject(ge=1, le=20)
   payload: Dict[str, Any] = Subject(default_factory=dict)


class PollJobIn(BaseModel):
   model_config = ConfigDict(additional="forbid")
   job_id: str

We declare the validated enter and output fashions for every instrument uncovered by the server. We use Pydantic constraints to obviously specific what every instrument accepts and returns. It makes instrument conduct predictable and secure, even when invoked by LLM-driven brokers. Try the FULL CODES right here.

@dataclass
class JobState:
   job_id: str
   standing: str
   end result: Elective[Dict[str, Any]] = None
   error: Elective[str] = None


class MCPServer:
   def __init__(self, server_id, secret):
       self.server_id = server_id
       self.secret = secret
       self.jobs = {}
       self.duties = {}


   def _fingerprint(self):
       return hashlib.sha256(self.secret).hexdigest()[:16]


   async def deal with(self, env_dict, client_secret):
       env = MCPEnvelope(**env_dict)
       payload = env.model_dump()
       sig = payload.pop("signature")
       if _hmac_hex(client_secret, payload) != sig:
           return {"error": "unhealthy signature"}


       if env.instrument == "server_identity":
           out = ServerIdentityOut(
               server_id=self.server_id,
               fingerprint=self._fingerprint(),
               capabilities={"async": True, "stateless": True},
           )
           resp = MCPResponse(
               request_id=env.request_id,
               okay=True,
               server_id=self.server_id,
               standing="okay",
               end result=out.model_dump(),
               signature="",
           )


       elif env.instrument == "batch_sum":
           args = BatchSumIn(**env.args)
           out = BatchSumOut(depend=len(args.numbers), whole=sum(args.numbers))
           resp = MCPResponse(
               request_id=env.request_id,
               okay=True,
               server_id=self.server_id,
               standing="okay",
               end result=out.model_dump(),
               signature="",
           )


       elif env.instrument == "start_long_task":
           args = StartLongTaskIn(**env.args)
           jid = _uuid()
           self.jobs[jid] = JobState(jid, "working")


           async def run():
               await asyncio.sleep(args.seconds)
               self.jobs[jid].standing = "finished"
               self.jobs[jid].end result = args.payload


           self.duties[jid] = asyncio.create_task(run())
           resp = MCPResponse(
               request_id=env.request_id,
               okay=True,
               server_id=self.server_id,
               standing="accepted",
               end result={"job_id": jid},
               signature="",
           )


       elif env.instrument == "poll_job":
           args = PollJobIn(**env.args)
           job = self.jobs[args.job_id]
           resp = MCPResponse(
               request_id=env.request_id,
               okay=True,
               server_id=self.server_id,
               standing=job.standing,
               end result=job.end result,
               signature="",
           )


       payload = resp.model_dump()
       resp.signature = _hmac_hex(self.secret, payload)
       return resp.model_dump()

We implement the stateless MCP server together with its async process administration logic. We deal with request verification, instrument dispatch, and long-running job execution with out counting on session state. By returning job identifiers and permitting polling, we reveal non-blocking, scalable process execution. Try the FULL CODES right here.

class MCPClient:
   def __init__(self, client_id, secret, server):
       self.client_id = client_id
       self.secret = secret
       self.server = server


   async def name(self, instrument, args=None):
       env = MCPEnvelope(
           client_id=self.client_id,
           server_id=self.server.server_id,
           instrument=instrument,
           args=args or {},
           signature="",
       ).model_dump()
       env["signature"] = _hmac_hex(self.secret, {okay: v for okay, v in env.objects() if okay != "signature"})
       return await self.server.deal with(env, self.secret)


async def demo():
   server_secret = b"server_secret"
   client_secret = b"client_secret"
   server = MCPServer("mcp-server-001", server_secret)
   consumer = MCPClient("client-001", client_secret, server)


   print(await consumer.name("server_identity"))
   print(await consumer.name("batch_sum", {"numbers": [1, 2, 3]}))


   begin = await consumer.name("start_long_task", {"seconds": 2, "payload": {"process": "demo"}})
   jid = begin["result"]["job_id"]


   whereas True:
       ballot = await consumer.name("poll_job", {"job_id": jid})
       if ballot["status"] == "finished":
           print(ballot)
           break
       await asyncio.sleep(0.5)


await demo()

We construct a light-weight stateless consumer that indicators every request and interacts with the server by means of structured envelopes. We reveal synchronous calls, enter validation failures, and asynchronous process polling in a single circulation. It reveals how purchasers can reliably devour MCP-style companies in actual agent pipelines.

In conclusion, we confirmed how MCP evolves from a easy tool-calling interface into a strong protocol appropriate for real-world programs. We began duties asynchronously and ballot for outcomes with out blocking execution, implement clear contracts by means of schema validation, and depend on stateless, signed messages to protect safety and suppleness. Collectively, these patterns reveal how fashionable MCP-style programs help dependable, enterprise-ready agent workflows whereas remaining easy, clear, and straightforward to increase.


Try the FULL CODES right here. Additionally, be happy to comply with us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be a part of us on telegram as effectively.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles