Skip to content

pydantic_ai.ext

TemporalAgent

Bases: WrapperAgent[AgentDepsT, OutputDataT]

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
class TemporalAgent(WrapperAgent[AgentDepsT, OutputDataT]):
    def __init__(
        self,
        wrapped: AbstractAgent[AgentDepsT, OutputDataT],
        *,
        activity_config: ActivityConfig | None = None,
        model_activity_config: ActivityConfig | None = None,
        toolset_activity_config: dict[str, ActivityConfig] | None = None,
        tool_activity_config: dict[str, dict[str, ActivityConfig | Literal[False]]] | None = None,
        run_context_type: type[TemporalRunContext] = TemporalRunContext,
        temporalize_toolset_func: Callable[
            [
                AbstractToolset[Any],
                str,
                ActivityConfig,
                dict[str, ActivityConfig | Literal[False]],
                type[TemporalRunContext],
            ],
            AbstractToolset[Any],
        ] = temporalize_toolset,
    ):
        """Wrap an agent to make it compatible with Temporal.

        Args:
            wrapped: The agent to wrap.
            activity_config: The Temporal activity config to use.
            model_activity_config: The Temporal activity config to use for model requests.
            toolset_activity_config: The Temporal activity config to use for specific toolsets identified by ID.
            tool_activity_config: The Temporal activity config to use for specific tools identified by toolset ID and tool name.
            run_context_type: The type of run context to use to serialize and deserialize the run context.
            temporalize_toolset_func: The function to use to prepare the toolsets for Temporal.
        """
        super().__init__(wrapped)

        # start_to_close_timeout is required
        activity_config = activity_config or ActivityConfig(start_to_close_timeout=timedelta(seconds=60))

        # pydantic_ai.exceptions.UserError is not retryable
        retry_policy = activity_config.get('retry_policy') or RetryPolicy()
        retry_policy.non_retryable_error_types = [*(retry_policy.non_retryable_error_types or []), UserError.__name__]
        activity_config['retry_policy'] = retry_policy

        model_activity_config = model_activity_config or {}
        toolset_activity_config = toolset_activity_config or {}
        tool_activity_config = tool_activity_config or {}

        agent = wrapped

        if agent.name is None:
            raise UserError(
                "An agent needs to have a unique `name` in order to be used with Temporal. The name will be used to identify the agent's activities within the workflow."
            )

        activity_name_prefix = f'agent__{agent.name}'

        activities: list[Callable[..., Any]] = []
        if not isinstance(agent.model, Model):
            raise UserError(
                'An agent needs to have a `model` in order to be used with Temporal, it cannot be set at agent run time.'
            )

        temporal_model = TemporalModel(
            agent.model,
            activity_name_prefix=activity_name_prefix,
            activity_config=activity_config | model_activity_config,
            run_context_type=run_context_type,
            event_stream_handler=agent.event_stream_handler,
        )
        activities.extend(temporal_model.temporal_activities)

        def temporalize_toolset(toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
            id = toolset.id
            if id is None:
                raise UserError(
                    "Toolsets that are 'leaves' (i.e. those that implement their own tool listing and calling) need to have a unique `id` in order to be used with Temporal. The ID will be used to identify the toolset's activities within the workflow."
                )

            toolset = temporalize_toolset_func(
                toolset,
                activity_name_prefix,
                activity_config | toolset_activity_config.get(id, {}),
                tool_activity_config.get(id, {}),
                run_context_type,
            )
            if isinstance(toolset, TemporalWrapperToolset):
                activities.extend(toolset.temporal_activities)
            return toolset

        temporal_toolsets = [toolset.visit_and_replace(temporalize_toolset) for toolset in agent.toolsets]

        self._model = temporal_model
        self._toolsets = temporal_toolsets
        self._temporal_activities = activities

    @property
    def model(self) -> Model:
        return self._model

    @property
    def toolsets(self) -> Sequence[AbstractToolset[AgentDepsT]]:
        with self._temporal_overrides():
            return super().toolsets

    @property
    def temporal_activities(self) -> list[Callable[..., Any]]:
        return self._temporal_activities

    @contextmanager
    def _temporal_overrides(self) -> Iterator[None]:
        # We reset tools here as the temporalized function toolset is already in self._toolsets.
        with super().override(model=self._model, toolsets=self._toolsets, tools=[]):
            yield

    @overload
    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[OutputDataT]: ...

    @overload
    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[RunOutputDataT]: ...

    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AgentRunResult[Any]:
        """Run the agent with a user prompt in async mode.

        This method builds an internal agent graph (using system prompts, tools and result schemas) and then
        runs the graph to completion. The result of the run is returned.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            agent_run = await agent.run('What is the capital of France?')
            print(agent_run.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional event stream handler to use for this run.

        Returns:
            The result of the run.
        """
        _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

        return await super().run(
            user_prompt,
            output_type=output_type,
            message_history=message_history,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
            event_stream_handler=event_stream_handler,
            **_deprecated_kwargs,
        )

    @overload
    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[OutputDataT]: ...

    @overload
    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[RunOutputDataT]: ...

    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AgentRunResult[Any]:
        """Synchronously run the agent with a user prompt.

        This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        result_sync = agent.run_sync('What is the capital of Italy?')
        print(result_sync.output)
        #> The capital of Italy is Rome.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional event stream handler to use for this run.

        Returns:
            The result of the run.
        """
        _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

        return super().run_sync(
            user_prompt,
            output_type=output_type,
            message_history=message_history,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
            event_stream_handler=event_stream_handler,
            **_deprecated_kwargs,
        )

    @overload
    def run_stream(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AbstractAsyncContextManager[StreamedRunResult[AgentDepsT, OutputDataT]]: ...

    @overload
    def run_stream(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AbstractAsyncContextManager[StreamedRunResult[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    async def run_stream(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AsyncIterator[StreamedRunResult[AgentDepsT, Any]]:
        """Run the agent with a user prompt in async mode, returning a streamed response.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            async with agent.run_stream('What is the capital of the UK?') as response:
                print(await response.get_output())
                #> The capital of the UK is London.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional event stream handler to use for this run. It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.

        Returns:
            The result of the run.
        """
        _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

        async with super().run_stream(
            user_prompt,
            output_type=output_type,
            message_history=message_history,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
            event_stream_handler=event_stream_handler,
            **_deprecated_kwargs,
        ) as result:
            yield result

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, OutputDataT]]: ...

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    async def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.Usage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        **_deprecated_kwargs: Never,
    ) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
        """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
        `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
        executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
        stream of events coming from the execution of tools.

        The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
        and the final result of the run once it has completed.

        For more details, see the documentation of `AgentRun`.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            nodes = []
            async with agent.iter('What is the capital of France?') as agent_run:
                async for node in agent_run:
                    nodes.append(node)
            print(nodes)
            '''
            [
                UserPromptNode(
                    user_prompt='What is the capital of France?',
                    instructions=None,
                    instructions_functions=[],
                    system_prompts=(),
                    system_prompt_functions=[],
                    system_prompt_dynamic_functions={},
                ),
                ModelRequestNode(
                    request=ModelRequest(
                        parts=[
                            UserPromptPart(
                                content='What is the capital of France?',
                                timestamp=datetime.datetime(...),
                            )
                        ]
                    )
                ),
                CallToolsNode(
                    model_response=ModelResponse(
                        parts=[TextPart(content='The capital of France is Paris.')],
                        usage=Usage(
                            requests=1, request_tokens=56, response_tokens=7, total_tokens=63
                        ),
                        model_name='gpt-4o',
                        timestamp=datetime.datetime(...),
                    )
                ),
                End(data=FinalResult(output='The capital of France is Paris.')),
            ]
            '''
            print(agent_run.result.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.

        Returns:
            The result of the run.
        """
        if not workflow.in_workflow():
            async with super().iter(
                user_prompt=user_prompt,
                output_type=output_type,
                message_history=message_history,
                model=model,
                deps=deps,
                model_settings=model_settings,
                usage_limits=usage_limits,
                usage=usage,
                infer_name=infer_name,
                toolsets=toolsets,
                **_deprecated_kwargs,
            ) as run:
                yield run
            return

        if model is not None:
            raise UserError(
                'Model cannot be set at agent run time when using Temporal, it must be set at agent creation time.'
            )
        if toolsets is not None:
            raise UserError(
                'Toolsets cannot be set at agent run time when using Temporal, it must be set at agent creation time.'
            )

        with self._temporal_overrides():
            async with super().iter(
                user_prompt=user_prompt,
                output_type=output_type,
                message_history=message_history,
                deps=deps,
                model_settings=model_settings,
                usage_limits=usage_limits,
                usage=usage,
                infer_name=infer_name,
                **_deprecated_kwargs,
            ) as run:
                yield run

    @contextmanager
    def override(
        self,
        *,
        deps: AgentDepsT | _utils.Unset = _utils.UNSET,
        model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
    ) -> Iterator[None]:
        """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

        This is particularly useful when testing.
        You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

        Args:
            deps: The dependencies to use instead of the dependencies passed to the agent run.
            model: The model to use instead of the model passed to the agent run.
            toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
            tools: The tools to use instead of the tools registered with the agent.
        """
        if workflow.in_workflow():
            if _utils.is_set(model):
                raise UserError(
                    'Model cannot be contextually overridden when using Temporal, it must be set at agent creation time.'
                )
            if _utils.is_set(toolsets):
                raise UserError(
                    'Toolsets cannot be contextually overridden when using Temporal, they must be set at agent creation time.'
                )
            if _utils.is_set(tools):
                raise UserError(
                    'Tools cannot be contextually overridden when using Temporal, they must be set at agent creation time.'
                )

        with super().override(deps=deps, model=model, toolsets=toolsets, tools=tools):
            yield

__init__

__init__(
    wrapped: AbstractAgent[AgentDepsT, OutputDataT],
    *,
    activity_config: ActivityConfig | None = None,
    model_activity_config: ActivityConfig | None = None,
    toolset_activity_config: (
        dict[str, ActivityConfig] | None
    ) = None,
    tool_activity_config: (
        dict[
            str, dict[str, ActivityConfig | Literal[False]]
        ]
        | None
    ) = None,
    run_context_type: type[
        TemporalRunContext
    ] = TemporalRunContext,
    temporalize_toolset_func: Callable[
        [
            AbstractToolset[Any],
            str,
            ActivityConfig,
            dict[str, ActivityConfig | Literal[False]],
            type[TemporalRunContext],
        ],
        AbstractToolset[Any],
    ] = temporalize_toolset
)

Wrap an agent to make it compatible with Temporal.

Parameters:

Name Type Description Default
wrapped AbstractAgent[AgentDepsT, OutputDataT]

The agent to wrap.

required
activity_config ActivityConfig | None

The Temporal activity config to use.

None
model_activity_config ActivityConfig | None

The Temporal activity config to use for model requests.

None
toolset_activity_config dict[str, ActivityConfig] | None

The Temporal activity config to use for specific toolsets identified by ID.

None
tool_activity_config dict[str, dict[str, ActivityConfig | Literal[False]]] | None

The Temporal activity config to use for specific tools identified by toolset ID and tool name.

None
run_context_type type[TemporalRunContext]

The type of run context to use to serialize and deserialize the run context.

TemporalRunContext
temporalize_toolset_func Callable[[AbstractToolset[Any], str, ActivityConfig, dict[str, ActivityConfig | Literal[False]], type[TemporalRunContext]], AbstractToolset[Any]]

The function to use to prepare the toolsets for Temporal.

temporalize_toolset
Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(
    self,
    wrapped: AbstractAgent[AgentDepsT, OutputDataT],
    *,
    activity_config: ActivityConfig | None = None,
    model_activity_config: ActivityConfig | None = None,
    toolset_activity_config: dict[str, ActivityConfig] | None = None,
    tool_activity_config: dict[str, dict[str, ActivityConfig | Literal[False]]] | None = None,
    run_context_type: type[TemporalRunContext] = TemporalRunContext,
    temporalize_toolset_func: Callable[
        [
            AbstractToolset[Any],
            str,
            ActivityConfig,
            dict[str, ActivityConfig | Literal[False]],
            type[TemporalRunContext],
        ],
        AbstractToolset[Any],
    ] = temporalize_toolset,
):
    """Wrap an agent to make it compatible with Temporal.

    Args:
        wrapped: The agent to wrap.
        activity_config: The Temporal activity config to use.
        model_activity_config: The Temporal activity config to use for model requests.
        toolset_activity_config: The Temporal activity config to use for specific toolsets identified by ID.
        tool_activity_config: The Temporal activity config to use for specific tools identified by toolset ID and tool name.
        run_context_type: The type of run context to use to serialize and deserialize the run context.
        temporalize_toolset_func: The function to use to prepare the toolsets for Temporal.
    """
    super().__init__(wrapped)

    # start_to_close_timeout is required
    activity_config = activity_config or ActivityConfig(start_to_close_timeout=timedelta(seconds=60))

    # pydantic_ai.exceptions.UserError is not retryable
    retry_policy = activity_config.get('retry_policy') or RetryPolicy()
    retry_policy.non_retryable_error_types = [*(retry_policy.non_retryable_error_types or []), UserError.__name__]
    activity_config['retry_policy'] = retry_policy

    model_activity_config = model_activity_config or {}
    toolset_activity_config = toolset_activity_config or {}
    tool_activity_config = tool_activity_config or {}

    agent = wrapped

    if agent.name is None:
        raise UserError(
            "An agent needs to have a unique `name` in order to be used with Temporal. The name will be used to identify the agent's activities within the workflow."
        )

    activity_name_prefix = f'agent__{agent.name}'

    activities: list[Callable[..., Any]] = []
    if not isinstance(agent.model, Model):
        raise UserError(
            'An agent needs to have a `model` in order to be used with Temporal, it cannot be set at agent run time.'
        )

    temporal_model = TemporalModel(
        agent.model,
        activity_name_prefix=activity_name_prefix,
        activity_config=activity_config | model_activity_config,
        run_context_type=run_context_type,
        event_stream_handler=agent.event_stream_handler,
    )
    activities.extend(temporal_model.temporal_activities)

    def temporalize_toolset(toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
        id = toolset.id
        if id is None:
            raise UserError(
                "Toolsets that are 'leaves' (i.e. those that implement their own tool listing and calling) need to have a unique `id` in order to be used with Temporal. The ID will be used to identify the toolset's activities within the workflow."
            )

        toolset = temporalize_toolset_func(
            toolset,
            activity_name_prefix,
            activity_config | toolset_activity_config.get(id, {}),
            tool_activity_config.get(id, {}),
            run_context_type,
        )
        if isinstance(toolset, TemporalWrapperToolset):
            activities.extend(toolset.temporal_activities)
        return toolset

    temporal_toolsets = [toolset.visit_and_replace(temporalize_toolset) for toolset in agent.toolsets]

    self._model = temporal_model
    self._toolsets = temporal_toolsets
    self._temporal_activities = activities

run async

run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[OutputDataT]
run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[RunOutputDataT]
run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AgentRunResult[Any]

Run the agent with a user prompt in async mode.

This method builds an internal agent graph (using system prompts, tools and result schemas) and then runs the graph to completion. The result of the run is returned.

Example:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    agent_run = await agent.run('What is the capital of France?')
    print(agent_run.output)
    #> The capital of France is Paris.

Parameters:

Name Type Description Default
user_prompt str | Sequence[UserContent] | None

User input to start/continue the conversation.

None
output_type OutputSpec[RunOutputDataT] | None

Custom output type to use for this run, output_type may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type.

None
message_history list[ModelMessage] | None

History of the conversation so far.

None
model Model | KnownModelName | str | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDepsT

Optional dependencies to use for this run.

None
model_settings ModelSettings | None

Optional settings to use for this model's request.

None
usage_limits UsageLimits | None

Optional limits on model request count or token usage.

None
usage Usage | None

Optional usage to start with, useful for resuming a conversation or agents used in tools.

None
infer_name bool

Whether to try to infer the agent name from the call frame if it's not set.

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

Optional additional toolsets for this run.

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

Optional event stream handler to use for this run.

None

Returns:

Type Description
AgentRunResult[Any]

The result of the run.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
async def run(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.Usage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    **_deprecated_kwargs: Never,
) -> AgentRunResult[Any]:
    """Run the agent with a user prompt in async mode.

    This method builds an internal agent graph (using system prompts, tools and result schemas) and then
    runs the graph to completion. The result of the run is returned.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        agent_run = await agent.run('What is the capital of France?')
        print(agent_run.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional event stream handler to use for this run.

    Returns:
        The result of the run.
    """
    _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

    return await super().run(
        user_prompt,
        output_type=output_type,
        message_history=message_history,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=infer_name,
        toolsets=toolsets,
        event_stream_handler=event_stream_handler,
        **_deprecated_kwargs,
    )

run_sync

run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[OutputDataT]
run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[RunOutputDataT]
run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AgentRunResult[Any]

Synchronously run the agent with a user prompt.

This is a convenience method that wraps self.run with loop.run_until_complete(...). You therefore can't use this method inside async code or if there's an active event loop.

Example:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.output)
#> The capital of Italy is Rome.

Parameters:

Name Type Description Default
user_prompt str | Sequence[UserContent] | None

User input to start/continue the conversation.

None
output_type OutputSpec[RunOutputDataT] | None

Custom output type to use for this run, output_type may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type.

None
message_history list[ModelMessage] | None

History of the conversation so far.

None
model Model | KnownModelName | str | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDepsT

Optional dependencies to use for this run.

None
model_settings ModelSettings | None

Optional settings to use for this model's request.

None
usage_limits UsageLimits | None

Optional limits on model request count or token usage.

None
usage Usage | None

Optional usage to start with, useful for resuming a conversation or agents used in tools.

None
infer_name bool

Whether to try to infer the agent name from the call frame if it's not set.

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

Optional additional toolsets for this run.

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

Optional event stream handler to use for this run.

None

Returns:

Type Description
AgentRunResult[Any]

The result of the run.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def run_sync(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.Usage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    **_deprecated_kwargs: Never,
) -> AgentRunResult[Any]:
    """Synchronously run the agent with a user prompt.

    This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] with `loop.run_until_complete(...)`.
    You therefore can't use this method inside async code or if there's an active event loop.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    result_sync = agent.run_sync('What is the capital of Italy?')
    print(result_sync.output)
    #> The capital of Italy is Rome.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional event stream handler to use for this run.

    Returns:
        The result of the run.
    """
    _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

    return super().run_sync(
        user_prompt,
        output_type=output_type,
        message_history=message_history,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=infer_name,
        toolsets=toolsets,
        event_stream_handler=event_stream_handler,
        **_deprecated_kwargs,
    )

run_stream async

run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AbstractAsyncContextManager[
    StreamedRunResult[AgentDepsT, OutputDataT]
]
run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AbstractAsyncContextManager[
    StreamedRunResult[AgentDepsT, RunOutputDataT]
]
run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AsyncIterator[StreamedRunResult[AgentDepsT, Any]]

Run the agent with a user prompt in async mode, returning a streamed response.

Example:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    async with agent.run_stream('What is the capital of the UK?') as response:
        print(await response.get_output())
        #> The capital of the UK is London.

Parameters:

Name Type Description Default
user_prompt str | Sequence[UserContent] | None

User input to start/continue the conversation.

None
output_type OutputSpec[RunOutputDataT] | None

Custom output type to use for this run, output_type may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type.

None
message_history list[ModelMessage] | None

History of the conversation so far.

None
model Model | KnownModelName | str | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDepsT

Optional dependencies to use for this run.

None
model_settings ModelSettings | None

Optional settings to use for this model's request.

None
usage_limits UsageLimits | None

Optional limits on model request count or token usage.

None
usage Usage | None

Optional usage to start with, useful for resuming a conversation or agents used in tools.

None
infer_name bool

Whether to try to infer the agent name from the call frame if it's not set.

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

Optional additional toolsets for this run.

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

Optional event stream handler to use for this run. It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.

None

Returns:

Type Description
AsyncIterator[StreamedRunResult[AgentDepsT, Any]]

The result of the run.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
@asynccontextmanager
async def run_stream(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.Usage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    **_deprecated_kwargs: Never,
) -> AsyncIterator[StreamedRunResult[AgentDepsT, Any]]:
    """Run the agent with a user prompt in async mode, returning a streamed response.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        async with agent.run_stream('What is the capital of the UK?') as response:
            print(await response.get_output())
            #> The capital of the UK is London.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional event stream handler to use for this run. It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.

    Returns:
        The result of the run.
    """
    _check_no_run_event_stream_handler_in_workflow(event_stream_handler)

    async with super().run_stream(
        user_prompt,
        output_type=output_type,
        message_history=message_history,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=infer_name,
        toolsets=toolsets,
        event_stream_handler=event_stream_handler,
        **_deprecated_kwargs,
    ) as result:
        yield result

iter async

iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, OutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, RunOutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: Usage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    **_deprecated_kwargs: Never
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]

A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an AgentRun object. The AgentRun can be used to async-iterate over the nodes of the graph as they are executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the stream of events coming from the execution of tools.

The AgentRun also provides methods to access the full message history, new messages, and usage statistics, and the final result of the run once it has completed.

For more details, see the documentation of AgentRun.

Example:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    nodes = []
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            nodes.append(node)
    print(nodes)
    '''
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=Usage(
                    requests=1, request_tokens=56, response_tokens=7, total_tokens=63
                ),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    '''
    print(agent_run.result.output)
    #> The capital of France is Paris.

Parameters:

Name Type Description Default
user_prompt str | Sequence[UserContent] | None

User input to start/continue the conversation.

None
output_type OutputSpec[RunOutputDataT] | None

Custom output type to use for this run, output_type may only be used if the agent has no output validators since output validators would expect an argument that matches the agent's output type.

None
message_history list[ModelMessage] | None

History of the conversation so far.

None
model Model | KnownModelName | str | None

Optional model to use for this run, required if model was not set when creating the agent.

None
deps AgentDepsT

Optional dependencies to use for this run.

None
model_settings ModelSettings | None

Optional settings to use for this model's request.

None
usage_limits UsageLimits | None

Optional limits on model request count or token usage.

None
usage Usage | None

Optional usage to start with, useful for resuming a conversation or agents used in tools.

None
infer_name bool

Whether to try to infer the agent name from the call frame if it's not set.

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

Optional additional toolsets for this run.

None

Returns:

Type Description
AsyncIterator[AgentRun[AgentDepsT, Any]]

The result of the run.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
@asynccontextmanager
async def iter(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.Usage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    **_deprecated_kwargs: Never,
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
    """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
    `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
    executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
    stream of events coming from the execution of tools.

    The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
    and the final result of the run once it has completed.

    For more details, see the documentation of `AgentRun`.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        nodes = []
        async with agent.iter('What is the capital of France?') as agent_run:
            async for node in agent_run:
                nodes.append(node)
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=Usage(
                        requests=1, request_tokens=56, response_tokens=7, total_tokens=63
                    ),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print(agent_run.result.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.

    Returns:
        The result of the run.
    """
    if not workflow.in_workflow():
        async with super().iter(
            user_prompt=user_prompt,
            output_type=output_type,
            message_history=message_history,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
            **_deprecated_kwargs,
        ) as run:
            yield run
        return

    if model is not None:
        raise UserError(
            'Model cannot be set at agent run time when using Temporal, it must be set at agent creation time.'
        )
    if toolsets is not None:
        raise UserError(
            'Toolsets cannot be set at agent run time when using Temporal, it must be set at agent creation time.'
        )

    with self._temporal_overrides():
        async with super().iter(
            user_prompt=user_prompt,
            output_type=output_type,
            message_history=message_history,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            **_deprecated_kwargs,
        ) as run:
            yield run

override

override(
    *,
    deps: AgentDepsT | Unset = UNSET,
    model: Model | KnownModelName | str | Unset = UNSET,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | Unset
    ) = UNSET,
    tools: (
        Sequence[
            Tool[AgentDepsT]
            | ToolFuncEither[AgentDepsT, ...]
        ]
        | Unset
    ) = UNSET
) -> Iterator[None]

Context manager to temporarily override agent dependencies, model, toolsets, or tools.

This is particularly useful when testing. You can find an example of this here.

Parameters:

Name Type Description Default
deps AgentDepsT | Unset

The dependencies to use instead of the dependencies passed to the agent run.

UNSET
model Model | KnownModelName | str | Unset

The model to use instead of the model passed to the agent run.

UNSET
toolsets Sequence[AbstractToolset[AgentDepsT]] | Unset

The toolsets to use instead of the toolsets passed to the agent constructor and agent run.

UNSET
tools Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | Unset

The tools to use instead of the tools registered with the agent.

UNSET
Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_agent.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
@contextmanager
def override(
    self,
    *,
    deps: AgentDepsT | _utils.Unset = _utils.UNSET,
    model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
) -> Iterator[None]:
    """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

    This is particularly useful when testing.
    You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

    Args:
        deps: The dependencies to use instead of the dependencies passed to the agent run.
        model: The model to use instead of the model passed to the agent run.
        toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
        tools: The tools to use instead of the tools registered with the agent.
    """
    if workflow.in_workflow():
        if _utils.is_set(model):
            raise UserError(
                'Model cannot be contextually overridden when using Temporal, it must be set at agent creation time.'
            )
        if _utils.is_set(toolsets):
            raise UserError(
                'Toolsets cannot be contextually overridden when using Temporal, they must be set at agent creation time.'
            )
        if _utils.is_set(tools):
            raise UserError(
                'Tools cannot be contextually overridden when using Temporal, they must be set at agent creation time.'
            )

    with super().override(deps=deps, model=model, toolsets=toolsets, tools=tools):
        yield

LogfirePlugin

Bases: Plugin

Temporal client plugin for Logfire.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/_logfire.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class LogfirePlugin(ClientPlugin):
    """Temporal client plugin for Logfire."""

    def __init__(self, setup_logfire: Callable[[], Logfire] = _default_setup_logfire, *, metrics: bool = True):
        self.setup_logfire = setup_logfire
        self.metrics = metrics

    def configure_client(self, config: ClientConfig) -> ClientConfig:
        interceptors = config.get('interceptors', [])
        config['interceptors'] = [*interceptors, TracingInterceptor(get_tracer('temporal'))]
        return super().configure_client(config)

    async def connect_service_client(self, config: ConnectConfig) -> ServiceClient:
        logfire = self.setup_logfire()

        if self.metrics:
            logfire_config = logfire.config
            token = logfire_config.token
            if token is not None:
                base_url = logfire_config.advanced.generate_base_url(token)
                metrics_url = base_url + '/v1/metrics'
                headers = {'Authorization': f'Bearer {token}'}

                config.runtime = Runtime(
                    telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url, headers=headers))
                )

        return await super().connect_service_client(config)

PydanticAIPlugin

Bases: Plugin, Plugin

Temporal client and worker plugin for Pydantic AI.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/__init__.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class PydanticAIPlugin(ClientPlugin, WorkerPlugin):
    """Temporal client and worker plugin for Pydantic AI."""

    def configure_client(self, config: ClientConfig) -> ClientConfig:
        if (data_converter := config.get('data_converter')) and data_converter.payload_converter_class not in (
            DefaultPayloadConverter,
            PydanticPayloadConverter,
        ):
            warnings.warn(  # pragma: no cover
                'A non-default Temporal data converter was used which has been replaced with the Pydantic data converter.'
            )

        config['data_converter'] = pydantic_data_converter
        return super().configure_client(config)

    def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
        runner = config.get('workflow_runner')  # pyright: ignore[reportUnknownMemberType]
        if isinstance(runner, SandboxedWorkflowRunner):  # pragma: no branch
            config['workflow_runner'] = replace(
                runner,
                restrictions=runner.restrictions.with_passthrough_modules(
                    'pydantic_ai',
                    'logfire',
                    'rich',
                    'httpx',
                    # Imported inside `logfire._internal.json_encoder` when running `logfire.info` inside an activity with attributes to serialize
                    'attrs',
                    # Imported inside `logfire._internal.json_schema` when running `logfire.info` inside an activity with attributes to serialize
                    'numpy',
                    'pandas',
                ),
            )

        # pydantic_ai.exceptions.UserError is not retryable
        config['workflow_failure_exception_types'] = [*config.get('workflow_failure_exception_types', []), UserError]  # pyright: ignore[reportUnknownMemberType]

        return super().configure_worker(config)

AgentPlugin

Bases: Plugin

Temporal worker plugin for a specific Pydantic AI agent.

Source code in pydantic_ai_slim/pydantic_ai/ext/temporal/__init__.py
68
69
70
71
72
73
74
75
76
77
78
class AgentPlugin(WorkerPlugin):
    """Temporal worker plugin for a specific Pydantic AI agent."""

    def __init__(self, agent: TemporalAgent[Any, Any]):
        self.agent = agent

    def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
        activities: Sequence[Callable[..., Any]] = config.get('activities', [])  # pyright: ignore[reportUnknownMemberType]
        # Activities are checked for name conflicts by Temporal.
        config['activities'] = [*activities, *self.agent.temporal_activities]
        return super().configure_worker(config)

tool_from_langchain

tool_from_langchain(langchain_tool: LangChainTool) -> Tool

Creates a Pydantic AI tool proxy from a LangChain tool.

Parameters:

Name Type Description Default
langchain_tool LangChainTool

The LangChain tool to wrap.

required

Returns:

Type Description
Tool

A Pydantic AI tool that corresponds to the LangChain tool.

Source code in pydantic_ai_slim/pydantic_ai/ext/langchain.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def tool_from_langchain(langchain_tool: LangChainTool) -> Tool:
    """Creates a Pydantic AI tool proxy from a LangChain tool.

    Args:
        langchain_tool: The LangChain tool to wrap.

    Returns:
        A Pydantic AI tool that corresponds to the LangChain tool.
    """
    function_name = langchain_tool.name
    function_description = langchain_tool.description
    inputs = langchain_tool.args.copy()
    required = sorted({name for name, detail in inputs.items() if 'default' not in detail})
    schema: JsonSchemaValue = langchain_tool.get_input_jsonschema()
    if 'additionalProperties' not in schema:
        schema['additionalProperties'] = False
    if required:
        schema['required'] = required

    defaults = {name: detail['default'] for name, detail in inputs.items() if 'default' in detail}

    # restructures the arguments to match langchain tool run
    def proxy(*args: Any, **kwargs: Any) -> str:
        assert not args, 'This should always be called with kwargs'
        kwargs = defaults | kwargs
        return langchain_tool.run(kwargs)

    return Tool.from_schema(
        function=proxy,
        name=function_name,
        description=function_description,
        json_schema=schema,
    )

LangChainToolset

Bases: FunctionToolset

A toolset that wraps LangChain tools.

Source code in pydantic_ai_slim/pydantic_ai/ext/langchain.py
67
68
69
70
71
class LangChainToolset(FunctionToolset):
    """A toolset that wraps LangChain tools."""

    def __init__(self, tools: list[LangChainTool], *, id: str | None = None):
        super().__init__([tool_from_langchain(tool) for tool in tools], id=id)

tool_from_aci

tool_from_aci(
    aci_function: str, linked_account_owner_id: str
) -> Tool

Creates a Pydantic AI tool proxy from an ACI.dev function.

Parameters:

Name Type Description Default
aci_function str

The ACI.dev function to wrap.

required
linked_account_owner_id str

The ACI user ID to execute the function on behalf of.

required

Returns:

Type Description
Tool

A Pydantic AI tool that corresponds to the ACI.dev tool.

Source code in pydantic_ai_slim/pydantic_ai/ext/aci.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def tool_from_aci(aci_function: str, linked_account_owner_id: str) -> Tool:
    """Creates a Pydantic AI tool proxy from an ACI.dev function.

    Args:
        aci_function: The ACI.dev function to wrap.
        linked_account_owner_id: The ACI user ID to execute the function on behalf of.

    Returns:
        A Pydantic AI tool that corresponds to the ACI.dev tool.
    """
    aci = ACI()
    function_definition = aci.functions.get_definition(aci_function)
    function_name = function_definition['function']['name']
    function_description = function_definition['function']['description']
    inputs = function_definition['function']['parameters']

    json_schema = {
        'additionalProperties': inputs.get('additionalProperties', False),
        'properties': inputs.get('properties', {}),
        'required': inputs.get('required', []),
        # Default to 'object' if not specified
        'type': inputs.get('type', 'object'),
    }

    # Clean the schema
    json_schema = _clean_schema(json_schema)

    def implementation(*args: Any, **kwargs: Any) -> str:
        if args:
            raise TypeError('Positional arguments are not allowed')
        return aci.handle_function_call(
            function_name,
            kwargs,
            linked_account_owner_id=linked_account_owner_id,
            allowed_apps_only=True,
        )

    return Tool.from_schema(
        function=implementation,
        name=function_name,
        description=function_description,
        json_schema=json_schema,
    )

ACIToolset

Bases: FunctionToolset

A toolset that wraps ACI.dev tools.

Source code in pydantic_ai_slim/pydantic_ai/ext/aci.py
70
71
72
73
74
75
76
class ACIToolset(FunctionToolset):
    """A toolset that wraps ACI.dev tools."""

    def __init__(self, aci_functions: Sequence[str], linked_account_owner_id: str, *, id: str | None = None):
        super().__init__(
            [tool_from_aci(aci_function, linked_account_owner_id) for aci_function in aci_functions], id=id
        )