Skip to content

Instantly share code, notes, and snippets.

@t3yamoto
Last active December 15, 2025 13:36
Show Gist options
  • Select an option

  • Save t3yamoto/cae9eb847b1626e5164d2914f56055aa to your computer and use it in GitHub Desktop.

Select an option

Save t3yamoto/cae9eb847b1626e5164d2914f56055aa to your computer and use it in GitHub Desktop.
aws-durable-execution-sdk-python で wait_for_condition を使うサンプル
from aws_durable_execution_sdk_python import (
durable_execution,
durable_step,
DurableContext,
StepContext,
)
from aws_durable_execution_sdk_python.waits import (
WaitForConditionConfig,
WaitForConditionDecision,
)
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
from typing import TypedDict, NotRequired
import boto3
ec2 = boto3.client("ec2")
class State(TypedDict):
instance_id: str
status: NotRequired[str]
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
print("Durable function の実行を開始します")
# イベント変数からインスタンス ID を取得
instance_id = event["instance_id"]
# EC2 インスタンス起動 API (非同期)を呼び出す
context.step(start_instance(instance_id))
# インスタンスの状態が "running" になるまでポーリングする
result = context.wait_for_condition(
check=check_status,
config=WaitForConditionConfig(
initial_state={"instance_id": instance_id},
wait_strategy=wait_strategy,
),
)
print("Durable function の実行を終了します")
return result
@durable_step
def start_instance(context: StepContext, instance_id: str):
ec2.start_instances(InstanceIds=[instance_id])
def check_status(state: State, context: WaitForConditionCheckContext) -> State:
print(f"ステータスチェック開始: {state}")
res = ec2.describe_instances(InstanceIds=[state["instance_id"]])
instance_state = res["Reservations"][0]["Instances"][0]["State"]["Name"]
state = {"instance_id": state["instance_id"], "status": instance_state}
print(f"ステータスチェック結果: {state}")
return state
def wait_strategy(state: State, attempt: int) -> WaitForConditionDecision:
if state.get("status") == "running":
print("インスタンスが running 状態になりました。ポーリングを終了します")
return WaitForConditionDecision.stop_polling()
else:
print("インスタンスが running 状態ではありません。ポーリングを継続します")
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment