Skip to content

Instantly share code, notes, and snippets.

@irajnazamdev
Created August 25, 2025 12:50
Show Gist options
  • Select an option

  • Save irajnazamdev/c5b8091aa0fb799263f789f599b2e733 to your computer and use it in GitHub Desktop.

Select an option

Save irajnazamdev/c5b8091aa0fb799263f789f599b2e733 to your computer and use it in GitHub Desktop.
Orchestrated Informatica Lift-and-Shift migration with optimized ETL workflows, pushdown optimization, parameterized templates, automated validation, and governed deployments.
import requests
import time
import json
# Configuration
IICS_BASE_URL = "https://api.informatica.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
TASKFLOW_ID = "your_taskflow_id"
# Authentication
def get_token():
url = f"{IICS_BASE_URL}/identity-service/api/v1/login"
payload = {"client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
resp = requests.post(url, data=payload)
if resp.status_code == 200:
return resp.json().get("access_token")
else:
raise Exception("Failed to get token")
# Start Taskflow
def start_task(token):
url = f"{IICS_BASE_URL}/process-service/api/v1/processes/{TASKFLOW_ID}/start"
headers = {"Authorization": f"Bearer {token}"}
resp = requests.post(url, headers=headers)
if resp.status_code == 200:
return resp.json().get("runId")
else:
raise Exception("Failed to start task")
# Poll Status
def poll_status(token, run_id):
url = f"{IICS_BASE_URL}/process-service/api/v1/processes/runs/{run_id}"
headers = {"Authorization": f"Bearer {token}"}
while True:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
status = resp.json().get("status")
print(f"Current Status: {status}")
if status in ["SUCCESS", "FAILED"]:
return status
else:
raise Exception("Failed to fetch status")
time.sleep(10)
# Data Validation
def validate_counts(source_count, target_count):
if source_count == target_count:
print("Validation successful: Counts match")
return True
else:
print(f"Validation failed: Source={source_count}, Target={target_count}")
return False
# Main Orchestration
def main():
try:
token = get_token()
print("Token acquired")
run_id = start_task(token)
print(f"Taskflow started with runId: {run_id}")
status = poll_status(token, run_id)
print(f"Final Status: {status}")
if status == "SUCCESS":
# Dummy values for demonstration
source_count, target_count = 1000, 1000
validate_counts(source_count, target_count)
else:
print("Taskflow failed, check logs")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment