Skip to content

Cohort SDK Reference

The Cohort API provides programmatic access to patient cohort queries, counts, and demographic breakdowns.

Access the Cohort API through the client.alpha.cohort namespace:

from komodo import Client
client = Client()
cohort_api = client.alpha.cohort

Get a fast approximate patient count for a cohort query.

from komodo import Client
def approx_count(
logic: Dict[str, Any],
patient_type: str = "adv",
date_range: Optional[DateRange] = None,
validate_values: bool = False,
) -> ApproxCountResponse
NameTypeDescription
logicDict[str, Any]The cohort logic tree (e.g., {"mx_diagnosis_code": "C34%"}).
patient_typestrPatient type to query (default: "adv").
date_rangeDateRange | NoneOptional date range filter.
validate_valuesboolWhether to validate code values against the database.
TypeDescription
ApproxCountResponseResponse containing total_count and optional warnings.
from komodo import Client
client = Client()
# Simple query with single diagnosis code
response = client.alpha.cohort.approx_count(
logic={"mx_diagnosis_code": "C34%"},
)
print(f"Approximate count: {response.total_count}")
# Query with multiple codes (OR logic)
response = client.alpha.cohort.approx_count(
logic={"mx_diagnosis_code": ["C34%", "C50%"]},
)
print(f"Patients with lung or breast cancer: {response.total_count}")

Get an exact distinct patient count for a cohort query. Requires a date range.

def distinct_count(
logic: Dict[str, Any],
date_range: DateRange,
patient_type: str = "adv",
granularity: Optional[Literal["year", "quarter", "month"]] = None,
patient_criteria: Optional[Dict[str, Any]] = None,
) -> DistinctCountResponse
NameTypeDescription
logicDict[str, Any]The cohort logic tree.
date_rangeDateRangeRequired date range filter.
patient_typestrPatient type to query (default: "adv").
granularityLiteral["year", "quarter", "month"] | NoneOptional time granularity for bucketed counts.
patient_criteriaDict[str, Any] | NoneOptional patient-level filter criteria.
TypeDescription
DistinctCountResponseResponse containing total, optional counts (time-bucketed), and warnings.
from komodo import Client
from komodo.alpha.models import DateRange
client = Client()
# Get distinct count with date range
response = client.alpha.cohort.distinct_count(
logic={"mx_diagnosis_code": "C34%"},
date_range=DateRange(from_date="2023-01-01", to_date="2023-12-31"),
)
print(f"Distinct patients: {response.total}")
# Get counts by year
response = client.alpha.cohort.distinct_count(
logic={"mx_diagnosis_code": "C34%"},
date_range=DateRange(from_date="2020-01-01", to_date="2023-12-31"),
granularity="year",
)
print(f"Total: {response.total}")
for year, count in response.counts.items():
print(f" {year}: {count}")

Get the list of patient IDs matching a cohort query.

def patient_ids(
logic: Dict[str, Any],
patient_type: str = "adv",
date_range: Optional[DateRange] = None,
validate_values: bool = False,
limit: Optional[int] = None,
) -> PatientIdsResponse
NameTypeDescription
logicDict[str, Any]The cohort logic tree.
patient_typestrPatient type to query (default: "adv").
date_rangeDateRange | NoneOptional date range filter.
validate_valuesboolWhether to validate code values.
limitint | NoneMaximum number of patient IDs to return.
TypeDescription
PatientIdsResponseResponse containing total_count, patient_ids list, and warnings.
from komodo import Client
client = Client()
response = client.alpha.cohort.patient_ids(
logic={"mx_diagnosis_code": "C34%"},
limit=1000,
)
print(f"Found {response.total_count} patients")
print(f"Returned {len(response.patient_ids)} IDs")

Start an asynchronous export of patient IDs to S3.

def export_patient_ids(
logic: Dict[str, Any],
patient_type: str = "adv",
date_range: Optional[DateRange] = None,
validate_values: bool = False,
) -> ExportStatusResponse
TypeDescription
ExportStatusResponseResponse containing job_id and initial status.
from komodo import Client
client = Client()
# Start export
response = client.alpha.cohort.export_patient_ids(
logic={"mx_diagnosis_code": "C34%"},
)
print(f"Export job started: {response.job_id}")
print(f"Status: {response.status}")

Get the status of an export job.

def get_export_status(
job_id: str,
verbose: bool = False,
) -> ExportStatusResponse
NameTypeDescription
job_idstrThe export job ID.
verboseboolIf True, include detailed worker timing info.
TypeDescription
ExportStatusResponseResponse with job status, s3_directory (when complete), and optional error info.
import time
from komodo import Client
from komodo.alpha.models import ExportStatus
client = Client()
# Start an export and get the job_id
export_response = client.alpha.cohort.export_patient_ids(
logic={"mx_diagnosis_code": "C34%"},
)
job_id = export_response.job_id
# Poll for completion
while True:
status = client.alpha.cohort.get_export_status(job_id)
print(f"Status: {status.status}")
if status.status == ExportStatus.COMPLETED:
print(f"Files at: {status.s3_directory}")
break
elif status.status == ExportStatus.FAILED:
print(f"Export failed: {status.error}")
break
time.sleep(5)

Look up available codes for a selector type.

def lookup_codes(
selector_type: str,
code: Optional[str] = None,
pattern: Optional[str] = None,
include_indices: bool = False,
limit: Optional[int] = None,
) -> CodesResponse
NameTypeDescription
selector_typestrThe selector type (e.g., "mx_diagnosis_code").
codestr | NoneExact code to look up.
patternstr | NoneWildcard pattern with % suffix for prefix matching.
include_indicesboolIf True, include index ranges in the response.
limitint | NoneMaximum results to return (1-100, default: 100).
TypeDescription
CodesResponseResponse containing codes (list of CodeEntry objects) and truncated (bool indicating if more results exist).
from komodo import Client
client = Client()
# Find all lung cancer diagnosis codes
codes = client.alpha.cohort.lookup_codes(
selector_type="mx_diagnosis_code",
pattern="C34%",
limit=20,
)
for entry in codes.codes:
print(entry.code)

List available selector types.

def list_selectors() -> SelectorsResponse
TypeDescription
SelectorsResponseResponse containing selectors (list of SelectorInfo objects with name and description fields).
from komodo import Client
client = Client()
selectors = client.alpha.cohort.list_selectors()
for s in selectors.selectors:
print(f"{s.name}: {s.description}")

Convenience method that gets patient count using either approximate or distinct counting based on the specified count type.

def get_total_count(
logic: Dict[str, Any],
count_type: CountType,
date_range: Optional[DateRange] = None,
patient_type: str = "adv",
) -> Tuple[int, Union[ApproxCountResponse, DistinctCountResponse]]
NameTypeDescription
logicDict[str, Any]The cohort logic tree.
count_typeCountTypeWhether to use approximate or distinct counting.
date_rangeDateRange | NoneDate range filter (required for distinct counts).
patient_typestrPatient type to query (default: "adv").
TypeDescription
Tuple[int, ApproxCountResponse | DistinctCountResponse]Tuple of (count, response) where response contains additional info.
from komodo import Client
from komodo.alpha.models import CountType, DateRange
client = Client()
# Get approximate count
count, response = client.alpha.cohort.get_total_count(
logic={"mx_diagnosis_code": "C34%"},
count_type=CountType.APPROXIMATE,
)
print(f"Approximate count: {count}")
# Get distinct count (requires date range)
count, response = client.alpha.cohort.get_total_count(
logic={"mx_diagnosis_code": "C34%"},
count_type=CountType.DISTINCT,
date_range=DateRange(from_date="2023-01-01", to_date="2023-12-31"),
)
print(f"Distinct count: {count}")

Check the Cohort API orchestrator health status.

def health(
deep: bool = False,
) -> HealthResponse
NameTypeDescription
deepboolIf True, return detailed health information about workers, Redis, and indexers.
TypeDescription
HealthResponseResponse containing status and optional detailed health info (workers, redis, indexer).
from komodo import Client
client = Client()
# Basic health check
health = client.alpha.cohort.health()
print(f"Status: {health.status}")
# Detailed health check
health = client.alpha.cohort.health(deep=True)
print(f"Status: {health.status}")
if health.workers:
print(f"Workers: {health.workers.connected}/{health.workers.total}")

Get aggregated request metrics for Cohort API endpoints.

def get_stats(
time_range: Optional[Literal["1h", "6h", "24h"]] = None,
interval: Optional[Literal["1m", "5m", "10m", "1h"]] = None,
) -> StatsResponse
NameTypeDescription
time_rangeLiteral["1h", "6h", "24h"] | NoneTime range for metrics (default: "1h").
intervalLiteral["1m", "5m", "10m", "1h"] | NoneBucket interval for time-series data (default: "1m").
TypeDescription
StatsResponseResponse containing current_minute, last_hour, optional buckets, and response_cache stats.
from komodo import Client
client = Client()
# Get last hour metrics
stats = client.alpha.cohort.get_stats(time_range="1h")
print(f"Current minute: {stats.current_minute}")
print(f"Last hour: {stats.last_hour}")
# Get 24h metrics with hourly buckets
stats = client.alpha.cohort.get_stats(time_range="24h", interval="1h")
if stats.buckets:
for bucket in stats.buckets:
print(bucket)

Query audit logs for Cohort API requests.

def list_audit_logs(
from_time: Optional[str] = None,
to_time: Optional[str] = None,
limit: Optional[int] = None,
) -> List[AuditEntry]
NameTypeDescription
from_timestr | NoneFilter entries after this time (RFC3339 format, e.g., "2023-01-01T00:00:00Z").
to_timestr | NoneFilter entries before this time (RFC3339 format).
limitint | NoneMaximum entries to return (default: 100, max: 1000).
TypeDescription
List[AuditEntry]List of audit entries sorted by timestamp descending (newest first). Each entry contains endpoint, response_code, latency_ms, timestamp, and optional fields.
from komodo import Client
client = Client()
# Get recent audit logs
logs = client.alpha.cohort.list_audit_logs(limit=50)
for entry in logs:
print(f"{entry.timestamp}: {entry.endpoint} - {entry.response_code} ({entry.latency_ms}ms)")
# Get logs for a specific time range
logs = client.alpha.cohort.list_audit_logs(
from_time="2023-06-01T00:00:00Z",
to_time="2023-06-30T23:59:59Z",
limit=100,
)

The logic parameter accepts a dictionary representing the query logic:

# Single condition
{"mx_diagnosis_code": "C34%"}
# Multiple values (OR logic)
{"mx_diagnosis_code": ["C34%", "C50%"]}
# AND logic
{
"op": "and",
"conditions": [
{"mx_diagnosis_code": "C34%"},
{"cpt_hcpcs_code": "96413"},
]
}
# OR logic
{
"op": "or",
"conditions": [
{"mx_diagnosis_code": "C34%"},
{"mx_diagnosis_code": "C50%"},
]
}
# Nested logic
{
"op": "and",
"conditions": [
{"mx_diagnosis_code": "C34%"},
{
"op": "or",
"conditions": [
{"cpt_hcpcs_code": "96413"},
{"cpt_hcpcs_code": "96415"},
]
}
]
}

Using Condition and ConditionGroup Classes

Section titled “Using Condition and ConditionGroup Classes”

For type-safe query building, use the model classes:

from komodo.alpha.models import Condition, ConditionGroup, SelectorType
# Create conditions
diagnosis = Condition(
selector_type=SelectorType.MX_DIAGNOSIS_CODE,
values=["C34%", "C50%"]
)
procedure = Condition(
selector_type=SelectorType.CPT_HCPCS_CODE,
values=["96413"]
)
# Combine with AND logic
query = ConditionGroup(
operator="and",
items=[diagnosis, procedure]
)
# Convert to API format
logic = query.to_logic()

from datetime import date
from komodo.alpha.models import DateRange
# From date objects
date_range = DateRange(
from_date=date(2023, 1, 1),
to_date=date(2023, 12, 31)
)
# From strings (ISO format)
date_range = DateRange(
from_date="2023-01-01",
to_date="2023-12-31"
)
# From strings (mm/dd/yy format)
date_range = DateRange.from_strings("01/01/23", "12/31/23")

Available selector types:

ValueDescriptionExamples
mx_diagnosis_codeICD-10 diagnosis codesC34%, C50.1, Z51.11
cpt_hcpcs_codeCPT/HCPCS procedure codes96413, 99213, J9271
ndcNational Drug Codes12345678901
drg_codeDRG codes470, 871
revenue_codeRevenue codes0636, 0250
from komodo.alpha.models import CountType
CountType.APPROXIMATE # Fast approximate count
CountType.DISTINCT # Exact distinct count (requires date range)
ModelFields
ApproxCountResponsetotal_count, warnings
DistinctCountResponsetotal, counts, warnings
PatientIdsResponsetotal_count, patient_ids, returned_count, warnings
ExportStatusResponsejob_id, status, s3_directory, error, total_patient_count
CodesResponsecodes, truncated
SelectorsResponseselectors
HealthResponsestatus, workers, redis, indexer
StatsResponsecurrent_minute, last_hour, buckets, response_cache
AuditEntryendpoint, response_code, latency_ms, timestamp, request_id, error

All API methods have async variants with the _async suffix:

import asyncio
from komodo import Client
async def get_count():
client = Client()
response = await client.alpha.cohort.approx_count_async(
logic={"mx_diagnosis_code": "C34%"},
)
return response.total_count
count = asyncio.run(get_count())

Available async methods:

  • approx_count_async
  • distinct_count_async
  • patient_ids_async
  • export_patient_ids_async
  • get_export_status_async
  • lookup_codes_async
  • list_selectors_async
  • health_async
  • get_stats_async
  • list_audit_logs_async