Skip to content

Instantly share code, notes, and snippets.

@mriddle
Created March 10, 2026 14:57
Show Gist options
  • Select an option

  • Save mriddle/275c3a9df1b464bc570b90176960907a to your computer and use it in GitHub Desktop.

Select an option

Save mriddle/275c3a9df1b464bc570b90176960907a to your computer and use it in GitHub Desktop.
Interactive CLI for Linear documents and comment threads.
#!/usr/bin/env python3
"""
Interactive CLI for Linear documents and comment threads.
Features:
- List recent documents
- Search documents by title
- Open document by ID/slug
- Read thread summaries in a screen-reader friendly format
- Navigate threads (`next`, `prev`, `show`, `current`)
- Reply to threads
- React to threads with emoji
Usage:
export LINEAR_API_KEY=lin_api_...
python3 scripts/linear-document-cli.py
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.request
from collections import defaultdict
from datetime import datetime
LINEAR_API_URL = "https://api.linear.app/graphql"
LIST_DOCUMENTS_QUERY = """
query ListDocuments($first: Int!, $filter: DocumentFilter) {
documents(first: $first, orderBy: updatedAt, filter: $filter) {
nodes {
id
slugId
title
url
updatedAt
creator { name }
}
}
}
"""
GET_DOCUMENT_WITH_COMMENTS_QUERY = """
query GetDocumentWithComments($documentId: String!, $commentsFirst: Int!, $commentsAfter: String) {
document(id: $documentId) {
id
slugId
title
url
documentContentId
updatedAt
comments(first: $commentsFirst, after: $commentsAfter, orderBy: createdAt) {
nodes {
id
body
createdAt
updatedAt
url
quotedText
parentId
resolvedAt
reactionData
user { id name }
}
pageInfo {
hasNextPage
endCursor
}
}
}
}
"""
COMMENT_CREATE_MUTATION = """
mutation CreateComment($input: CommentCreateInput!) {
commentCreate(input: $input) {
success
comment {
id
url
}
}
}
"""
REACTION_CREATE_MUTATION = """
mutation CreateReaction($input: ReactionCreateInput!) {
reactionCreate(input: $input) {
success
reaction {
id
emoji
comment { id }
user { name }
}
}
}
"""
def graphql(token: str, query: str, variables: dict | None = None) -> dict:
payload = json.dumps({"query": query, "variables": variables or {}}).encode()
req = urllib.request.Request(
LINEAR_API_URL,
data=payload,
headers={
"Authorization": token,
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else ""
raise RuntimeError(f"HTTP {e.code}: {e.reason}\n{body}") from e
if "errors" in data:
msg = "; ".join(err.get("message", str(err)) for err in data["errors"])
raise RuntimeError(f"GraphQL error: {msg}")
return data.get("data", {})
def fmt_ts(ts: str | None) -> str:
if not ts:
return ""
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M")
except Exception:
return ts
def list_documents(token: str, limit: int = 25, title_query: str | None = None) -> list[dict]:
filter_obj = None
if title_query:
filter_obj = {"title": {"containsIgnoreCase": title_query}}
data = graphql(
token,
LIST_DOCUMENTS_QUERY,
{"first": limit, "filter": filter_obj},
)
return (data.get("documents") or {}).get("nodes") or []
def fetch_document_with_comments(token: str, document_id: str) -> dict | None:
all_comments: list[dict] = []
cursor = None
first = 100
doc = None
while True:
data = graphql(
token,
GET_DOCUMENT_WITH_COMMENTS_QUERY,
{"documentId": document_id, "commentsFirst": first, "commentsAfter": cursor},
)
doc = data.get("document")
if not doc:
return None
conn = doc.get("comments") or {}
all_comments.extend(conn.get("nodes") or [])
page_info = conn.get("pageInfo") or {}
if not page_info.get("hasNextPage"):
break
cursor = page_info.get("endCursor")
if not cursor:
break
return {
"document": {
"id": doc.get("id"),
"slugId": doc.get("slugId"),
"title": doc.get("title"),
"url": doc.get("url"),
"documentContentId": doc.get("documentContentId"),
"updatedAt": doc.get("updatedAt"),
},
"comments": all_comments,
}
def build_threads(comments: list[dict]) -> list[dict]:
by_parent: dict[str | None, list[dict]] = defaultdict(list)
for c in comments:
by_parent[c.get("parentId")].append(c)
for _, children in by_parent.items():
children.sort(key=lambda c: c.get("createdAt") or "")
roots = by_parent[None]
roots.sort(key=lambda c: c.get("createdAt") or "")
threads = []
for root in roots:
thread_comments = [root] + by_parent.get(root.get("id"), [])
threads.append({"root": root, "comments": thread_comments})
return threads
def format_reactions(reaction_data: dict | None) -> str:
if not reaction_data:
return "none"
if isinstance(reaction_data, list):
parts = []
for item in reaction_data:
if not isinstance(item, dict):
continue
emoji = item.get("emoji")
reactions = item.get("reactions")
count = len(reactions) if isinstance(reactions, list) else 1
if emoji:
parts.append(f"{emoji} x{count}")
return ", ".join(parts) if parts else "none"
if not isinstance(reaction_data, dict):
return str(reaction_data)
parts = []
for emoji, value in reaction_data.items():
if isinstance(value, list):
count = len(value)
elif isinstance(value, dict):
count = len(value)
else:
count = 1
parts.append(f"{emoji} x{count}")
return ", ".join(parts) if parts else "none"
def print_document_list(docs: list[dict]) -> None:
print()
if not docs:
print("No documents found.")
return
print("Documents:")
for i, d in enumerate(docs, 1):
title = d.get("title") or "(untitled)"
slug = d.get("slugId") or ""
updated = fmt_ts(d.get("updatedAt"))
print(f" {i}. {title}")
print(f" slug: {slug} | updated: {updated}")
print()
def print_doc_header(document: dict, visible_count: int, total_count: int, thread_filter: str, search_query: str) -> None:
print()
print(f"Document: {document.get('title', '(untitled)')}")
print(f"URL: {document.get('url', '')}")
print(f"Updated: {fmt_ts(document.get('updatedAt'))}")
print(f"Visible threads: {visible_count} / {total_count}")
print(f"Filter: {thread_filter} | Search: {search_query or '(none)'}")
print()
def thread_short_context(root: dict, max_len: int = 120) -> str:
quoted = (root.get("quotedText") or "").replace("\n", " ").strip()
if quoted:
return quoted[:max_len] + ("..." if len(quoted) > max_len else "")
body = (root.get("body") or "").replace("\n", " ").strip()
if not body:
return "(no context)"
return body[:max_len] + ("..." if len(body) > max_len else "")
def print_thread_summary(
document: dict,
visible_threads: list[dict],
total_threads: int,
selected_thread_idx: int | None,
thread_filter: str,
search_query: str,
) -> None:
print_doc_header(document, len(visible_threads), total_threads, thread_filter, search_query)
if not visible_threads:
print("No comment threads on this document.")
print()
return
print("Threads (summary):")
for idx, thread in enumerate(visible_threads, 1):
root = thread["root"]
status = "RESOLVED" if root.get("resolvedAt") else "OPEN"
author = (root.get("user") or {}).get("name") or "Unknown"
reply_count = max(len(thread["comments"]) - 1, 0)
cursor = ">" if selected_thread_idx == idx - 1 else " "
print(
f" {cursor} {idx}. [{status}] {author} | replies={reply_count} | "
f"reactions={format_reactions(root.get('reactionData'))}"
)
print(f" {thread_short_context(root)}")
print()
def print_thread_detail(document: dict, visible_threads: list[dict], selected_thread_idx: int | None) -> None:
print()
print(f"Document: {document.get('title', '(untitled)')}")
if selected_thread_idx is None:
print("No thread selected. Use 'show <n>' or 'next'.")
print()
return
if selected_thread_idx < 0 or selected_thread_idx >= len(visible_threads):
print("Selected thread is out of range for current filters/search.")
print()
return
thread = visible_threads[selected_thread_idx]
root = thread["root"]
thread_comments = thread["comments"]
status = "RESOLVED" if root.get("resolvedAt") else "OPEN"
author = (root.get("user") or {}).get("name") or "Unknown"
quoted = (root.get("quotedText") or "").strip()
print(f"Thread {selected_thread_idx + 1} [{status}]")
print(f" Root comment id: {root.get('id')}")
print(f" Started by: {author} at {fmt_ts(root.get('createdAt'))}")
print(f" Reactions: {format_reactions(root.get('reactionData'))}")
if quoted:
q = quoted.replace("\n", " ").strip()
q = q[:220] + ("..." if len(q) > 220 else "")
print(f' Context text: "{q}"')
else:
print(" Context text: (none)")
print(" Messages:")
for c_i, c in enumerate(thread_comments, 1):
c_author = (c.get("user") or {}).get("name") or "Unknown"
c_time = fmt_ts(c.get("createdAt"))
body = (c.get("body") or "").strip().replace("\n", "\n ")
print(f" {c_i}. {c_author} at {c_time}")
print(f" {body if body else '(empty)'}")
print()
def thread_matches(thread: dict, search_query: str) -> bool:
q = search_query.lower().strip()
if not q:
return True
root = thread["root"]
candidates = [
root.get("quotedText") or "",
root.get("body") or "",
(root.get("user") or {}).get("name") or "",
]
for c in thread["comments"]:
candidates.extend(
[
c.get("body") or "",
(c.get("user") or {}).get("name") or "",
]
)
return q in "\n".join(candidates).lower()
def derive_visible_threads(current_threads: list[dict], thread_filter: str, search_query: str) -> list[dict]:
visible = current_threads
if thread_filter == "open":
visible = [t for t in visible if not t["root"].get("resolvedAt")]
elif thread_filter == "resolved":
visible = [t for t in visible if t["root"].get("resolvedAt")]
if search_query.strip():
visible = [t for t in visible if thread_matches(t, search_query)]
return visible
def selected_root_id(visible_threads: list[dict], selected_thread_idx: int | None) -> str | None:
if selected_thread_idx is None:
return None
if selected_thread_idx < 0 or selected_thread_idx >= len(visible_threads):
return None
return visible_threads[selected_thread_idx]["root"].get("id")
def reselect_index_by_root_id(
visible_threads: list[dict], selected_root_comment_id: str | None, fallback_idx: int | None
) -> int | None:
if not visible_threads:
return None
if selected_root_comment_id:
for idx, thread in enumerate(visible_threads):
if thread["root"].get("id") == selected_root_comment_id:
return idx
if fallback_idx is None:
return 0
if fallback_idx < 0:
return 0
if fallback_idx >= len(visible_threads):
return len(visible_threads) - 1
return fallback_idx
def prompt_multiline() -> str:
print("Enter message text. End with a single '.' on its own line.")
lines: list[str] = []
while True:
line = input()
if line == ".":
break
lines.append(line)
return "\n".join(lines).strip()
def create_reply(token: str, document_content_id: str, parent_comment_id: str, body: str) -> dict:
data = graphql(
token,
COMMENT_CREATE_MUTATION,
{"input": {"documentContentId": document_content_id, "parentId": parent_comment_id, "body": body}},
)
payload = data.get("commentCreate") or {}
if not payload.get("success"):
raise RuntimeError("Failed to create reply.")
return payload.get("comment") or {}
def create_reaction(token: str, comment_id: str, emoji: str) -> dict:
data = graphql(
token,
REACTION_CREATE_MUTATION,
{"input": {"commentId": comment_id, "emoji": emoji}},
)
payload = data.get("reactionCreate") or {}
if not payload.get("success"):
raise RuntimeError("Failed to create reaction.")
return payload.get("reaction") or {}
def print_help() -> None:
print("Commands:")
print(" help Show commands")
print(" list List recent documents")
print(" search <text> Search documents by title")
print(" open <id-or-slug> Open document by ID or slug")
print(" pick <number> Open a document from the last list/search results")
print(" threads Show thread summary list")
print(" show <n> Show full detail for thread number n")
print(" current Show detail for selected thread")
print(" next / prev Move selected thread and show detail")
print(" all-threads Show all threads")
print(" open-threads Filter to unresolved/open threads")
print(" resolved-threads Filter to resolved threads")
print(" search-threads <text> Search thread content")
print(" clear-search Clear thread search")
print(" reply [thread-number] Reply (defaults to selected thread)")
print(" react [thread-number] <emoji> React (defaults to selected thread)")
print(" quit Exit")
print()
def main() -> None:
token = os.environ.get("LINEAR_API_KEY", "").strip()
if not token:
print("Set LINEAR_API_KEY first (export LINEAR_API_KEY=lin_api_...).", file=sys.stderr)
sys.exit(1)
print("Linear Document CLI")
print("Type 'help' for commands.")
last_docs: list[dict] = []
current: dict | None = None
current_threads: list[dict] = []
visible_threads: list[dict] = []
selected_thread_idx: int | None = None
thread_filter = "all"
search_query = ""
while True:
try:
raw = input("linear-doc> ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not raw:
continue
parts = raw.split()
cmd = parts[0].lower()
args = parts[1:]
try:
if cmd in {"quit", "exit", "q"}:
break
if cmd in {"help", "h", "?"}:
print_help()
continue
if cmd == "list":
last_docs = list_documents(token, limit=25)
print_document_list(last_docs)
continue
if cmd == "search":
query = " ".join(args).strip()
if not query:
print("Usage: search <text>")
continue
last_docs = list_documents(token, limit=25, title_query=query)
print_document_list(last_docs)
continue
if cmd == "pick":
if not args:
print("Usage: pick <number>")
continue
if not last_docs:
print("No previous list/search results.")
continue
idx = int(args[0])
if idx < 1 or idx > len(last_docs):
print("Number out of range.")
continue
doc_id = last_docs[idx - 1].get("slugId") or last_docs[idx - 1].get("id")
if not doc_id:
print("Selected document has no ID/slug.")
continue
current = fetch_document_with_comments(token, doc_id)
if not current:
print("Document not found or no access.")
continue
thread_filter = "all"
search_query = ""
current_threads = build_threads(current["comments"])
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, 0)
print("Document loaded. Use 'threads' for summary, 'show <n>' for detail.")
continue
if cmd == "open":
if not args:
print("Usage: open <id-or-slug>")
continue
current = fetch_document_with_comments(token, args[0])
if not current:
print("Document not found or no access.")
continue
thread_filter = "all"
search_query = ""
current_threads = build_threads(current["comments"])
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, 0)
print("Document loaded. Use 'threads' for summary, 'show <n>' for detail.")
continue
if cmd == "threads":
if not current:
print("No document selected. Use list/search/open first.")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, selected_thread_idx)
print_thread_summary(
current["document"],
visible_threads,
len(current_threads),
selected_thread_idx,
thread_filter,
search_query,
)
continue
if cmd == "show":
if not current:
print("No document selected.")
continue
if not args:
print("Usage: show <thread-number>")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
t_idx = int(args[0]) - 1
if t_idx < 0 or t_idx >= len(visible_threads):
print("Thread number out of range.")
continue
selected_thread_idx = t_idx
print_thread_detail(current["document"], visible_threads, selected_thread_idx)
continue
if cmd == "current":
if not current:
print("No document selected.")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, selected_thread_idx)
print_thread_detail(current["document"], visible_threads, selected_thread_idx)
continue
if cmd in {"next", "prev"}:
if not current:
print("No document selected.")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
if not visible_threads:
selected_thread_idx = None
print("No visible threads with current filter/search.")
continue
if selected_thread_idx is None:
selected_thread_idx = 0
elif cmd == "next":
selected_thread_idx = min(selected_thread_idx + 1, len(visible_threads) - 1)
else:
selected_thread_idx = max(selected_thread_idx - 1, 0)
print_thread_detail(current["document"], visible_threads, selected_thread_idx)
continue
if cmd in {"all-threads", "open-threads", "resolved-threads"}:
if not current:
print("No document selected.")
continue
selected_root_comment_id = selected_root_id(visible_threads, selected_thread_idx)
if cmd == "all-threads":
thread_filter = "all"
elif cmd == "open-threads":
thread_filter = "open"
else:
thread_filter = "resolved"
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(
visible_threads, selected_root_comment_id, selected_thread_idx
)
print(f"Thread filter set to '{thread_filter}'.")
print_thread_summary(
current["document"],
visible_threads,
len(current_threads),
selected_thread_idx,
thread_filter,
search_query,
)
continue
if cmd == "search-threads":
if not current:
print("No document selected.")
continue
query = " ".join(args).strip()
if not query:
print("Usage: search-threads <text>")
continue
search_query = query
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, 0)
print(f"Thread search set to: {search_query}")
print_thread_summary(
current["document"],
visible_threads,
len(current_threads),
selected_thread_idx,
thread_filter,
search_query,
)
continue
if cmd == "clear-search":
if not current:
print("No document selected.")
continue
selected_root_comment_id = selected_root_id(visible_threads, selected_thread_idx)
search_query = ""
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(
visible_threads, selected_root_comment_id, selected_thread_idx
)
print("Thread search cleared.")
continue
if cmd == "reply":
if not current:
print("No document selected.")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
if not visible_threads:
print("No visible threads to reply to.")
continue
if args:
t_idx = int(args[0]) - 1
if t_idx < 0 or t_idx >= len(visible_threads):
print("Thread number out of range.")
continue
selected_thread_idx = t_idx
else:
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, selected_thread_idx)
if selected_thread_idx is None:
print("No selected thread. Use 'show <n>' first.")
continue
thread = visible_threads[selected_thread_idx]
root = thread["root"]
body = prompt_multiline()
if not body:
print("Reply cancelled (empty body).")
continue
new_comment = create_reply(
token,
current["document"]["documentContentId"],
root["id"],
body,
)
print(f"Reply posted: {new_comment.get('url', '')}")
prev_selected_root_id = root.get("id")
current = fetch_document_with_comments(token, current["document"]["slugId"] or current["document"]["id"])
current_threads = build_threads(current["comments"]) if current else []
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(
visible_threads, prev_selected_root_id, selected_thread_idx
)
continue
if cmd == "react":
if not current:
print("No document selected.")
continue
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
if not visible_threads:
print("No visible threads to react to.")
continue
t_idx: int | None = None
emoji = ""
if args:
try:
parsed_idx = int(args[0]) - 1
t_idx = parsed_idx
emoji = " ".join(args[1:]).strip()
except ValueError:
emoji = " ".join(args).strip()
if t_idx is not None:
if t_idx < 0 or t_idx >= len(visible_threads):
print("Thread number out of range.")
continue
selected_thread_idx = t_idx
else:
selected_thread_idx = reselect_index_by_root_id(visible_threads, None, selected_thread_idx)
if selected_thread_idx is None:
print("No selected thread. Use 'show <n>' first.")
continue
if not emoji:
print("Usage: react [thread-number] <emoji>")
continue
root = visible_threads[selected_thread_idx]["root"]
reaction = create_reaction(token, root["id"], emoji)
print(f"Reaction added: {reaction.get('emoji', emoji)}")
prev_selected_root_id = root.get("id")
current = fetch_document_with_comments(token, current["document"]["slugId"] or current["document"]["id"])
current_threads = build_threads(current["comments"]) if current else []
visible_threads = derive_visible_threads(current_threads, thread_filter, search_query)
selected_thread_idx = reselect_index_by_root_id(
visible_threads, prev_selected_root_id, selected_thread_idx
)
continue
print("Unknown command. Type 'help' for commands.")
except ValueError:
print("Invalid number. Try again.")
except RuntimeError as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment