The Cohort API provides programmatic access to patient cohort queries, counts, and demographic breakdowns.
Access the Cohort API through the client.alpha.cohort namespace:
from komodo import Client
cohort_api = client.alpha.cohort
Get a fast approximate patient count for a cohort query.
from komodo import Client
patient_type : str = " adv " ,
date_range : Optional[DateRange] = None ,
validate_values : bool = False ,
Name Type Description logicDict[str, Any]The cohort logic tree (e.g., {"mx_diagnosis_code": "C34%"}). patient_typestrPatient type to query (default: "adv"). date_rangeDateRange | NoneOptional date range filter. validate_valuesboolWhether to validate code values against the database.
Type Description ApproxCountResponseResponse containing total_count and optional warnings.
from komodo import Client
# Simple query with single diagnosis code
response = client.alpha.cohort. approx_count (
logic = { " mx_diagnosis_code " : " C34% " } ,
print ( f "Approximate count: {response.total_count} " )
# Query with multiple codes (OR logic)
response = client.alpha.cohort. approx_count (
logic = { " mx_diagnosis_code " : [ " C34% " , " C50% " ] } ,
print ( f "Patients with lung or breast cancer: {response.total_count} " )
Get an exact distinct patient count for a cohort query. Requires a date range.
patient_type : str = " adv " ,
granularity : Optional[Literal[ " year " , " quarter " , " month " ]] = None ,
patient_criteria : Optional[Dict[ str , Any]] = None ,
) -> DistinctCountResponse
Name Type Description logicDict[str, Any]The cohort logic tree. date_rangeDateRangeRequired date range filter. patient_typestrPatient type to query (default: "adv"). granularityLiteral["year", "quarter", "month"] | NoneOptional time granularity for bucketed counts. patient_criteriaDict[str, Any] | NoneOptional patient-level filter criteria.
Type Description DistinctCountResponseResponse containing total, optional counts (time-bucketed), and warnings.
from komodo import Client
from komodo.alpha.models import DateRange
# Get distinct count with date range
response = client.alpha.cohort. distinct_count (
logic = { " mx_diagnosis_code " : " C34% " } ,
date_range = DateRange ( from_date = " 2023-01-01 " , to_date = " 2023-12-31 " ) ,
print ( f "Distinct patients: {response.total} " )
response = client.alpha.cohort. distinct_count (
logic = { " mx_diagnosis_code " : " C34% " } ,
date_range = DateRange ( from_date = " 2020-01-01 " , to_date = " 2023-12-31 " ) ,
print ( f "Total: {response.total} " )
for year, count in response.counts. items ():
print ( f " {year} : {count} " )
Get the list of patient IDs matching a cohort query.
patient_type : str = " adv " ,
date_range : Optional[DateRange] = None ,
validate_values : bool = False ,
limit : Optional[ int ] = None ,
Name Type Description logicDict[str, Any]The cohort logic tree. patient_typestrPatient type to query (default: "adv"). date_rangeDateRange | NoneOptional date range filter. validate_valuesboolWhether to validate code values. limitint | NoneMaximum number of patient IDs to return.
Type Description PatientIdsResponseResponse containing total_count, patient_ids list, and warnings.
from komodo import Client
response = client.alpha.cohort. patient_ids (
logic = { " mx_diagnosis_code " : " C34% " } ,
print ( f "Found {response.total_count} patients" )
print ( f "Returned { len ( response.patient_ids ) } IDs" )
Start an asynchronous export of patient IDs to S3.
patient_type : str = " adv " ,
date_range : Optional[DateRange] = None ,
validate_values : bool = False ,
) -> ExportStatusResponse
Type Description ExportStatusResponseResponse containing job_id and initial status.
from komodo import Client
response = client.alpha.cohort. export_patient_ids (
logic = { " mx_diagnosis_code " : " C34% " } ,
print ( f "Export job started: {response.job_id} " )
print ( f "Status: {response.status} " )
Get the status of an export job.
) -> ExportStatusResponse
Name Type Description job_idstrThe export job ID. verboseboolIf True, include detailed worker timing info.
Type Description ExportStatusResponseResponse with job status, s3_directory (when complete), and optional error info.
from komodo import Client
from komodo.alpha.models import ExportStatus
# Start an export and get the job_id
export_response = client.alpha.cohort. export_patient_ids (
logic = { " mx_diagnosis_code " : " C34% " } ,
job_id = export_response.job_id
status = client.alpha.cohort. get_export_status ( job_id )
print ( f "Status: {status.status} " )
if status.status == ExportStatus. COMPLETED :
print ( f "Files at: {status.s3_directory} " )
elif status.status == ExportStatus. FAILED :
print ( f "Export failed: {status.error} " )
Look up available codes for a selector type.
code : Optional[ str ] = None ,
pattern : Optional[ str ] = None ,
include_indices : bool = False ,
limit : Optional[ int ] = None ,
Name Type Description selector_typestrThe selector type (e.g., "mx_diagnosis_code"). codestr | NoneExact code to look up. patternstr | NoneWildcard pattern with % suffix for prefix matching. include_indicesboolIf True, include index ranges in the response. limitint | NoneMaximum results to return (1-100, default: 100).
Type Description CodesResponseResponse containing codes (list of CodeEntry objects) and truncated (bool indicating if more results exist).
from komodo import Client
# Find all lung cancer diagnosis codes
codes = client.alpha.cohort. lookup_codes (
selector_type = " mx_diagnosis_code " ,
for entry in codes.codes:
List available selector types.
def list_selectors () -> SelectorsResponse
Type Description SelectorsResponseResponse containing selectors (list of SelectorInfo objects with name and description fields).
from komodo import Client
selectors = client.alpha.cohort. list_selectors ()
for s in selectors.selectors:
print ( f " {s.name} : {s.description} " )
Convenience method that gets patient count using either approximate or distinct counting based on the specified count type.
date_range : Optional[DateRange] = None ,
patient_type : str = " adv " ,
) -> Tuple[ int , Union[ApproxCountResponse, DistinctCountResponse]]
Name Type Description logicDict[str, Any]The cohort logic tree. count_typeCountTypeWhether to use approximate or distinct counting. date_rangeDateRange | NoneDate range filter (required for distinct counts). patient_typestrPatient type to query (default: "adv").
Type Description Tuple[int, ApproxCountResponse | DistinctCountResponse]Tuple of (count, response) where response contains additional info.
from komodo import Client
from komodo.alpha.models import CountType, DateRange
count, response = client.alpha.cohort. get_total_count (
logic = { " mx_diagnosis_code " : " C34% " } ,
count_type = CountType.APPROXIMATE ,
print ( f "Approximate count: {count} " )
# Get distinct count (requires date range)
count, response = client.alpha.cohort. get_total_count (
logic = { " mx_diagnosis_code " : " C34% " } ,
count_type = CountType.DISTINCT ,
date_range = DateRange ( from_date = " 2023-01-01 " , to_date = " 2023-12-31 " ) ,
print ( f "Distinct count: {count} " )
Check the Cohort API orchestrator health status.
Name Type Description deepboolIf True, return detailed health information about workers, Redis, and indexers.
Type Description HealthResponseResponse containing status and optional detailed health info (workers, redis, indexer).
from komodo import Client
health = client.alpha.cohort. health ()
print ( f "Status: {health.status} " )
health = client.alpha.cohort. health ( deep = True )
print ( f "Status: {health.status} " )
print ( f "Workers: {health.workers.connected} / {health.workers.total} " )
Get aggregated request metrics for Cohort API endpoints.
time_range : Optional[Literal[ " 1h " , " 6h " , " 24h " ]] = None ,
interval : Optional[Literal[ " 1m " , " 5m " , " 10m " , " 1h " ]] = None ,
Name Type Description time_rangeLiteral["1h", "6h", "24h"] | NoneTime range for metrics (default: "1h"). intervalLiteral["1m", "5m", "10m", "1h"] | NoneBucket interval for time-series data (default: "1m").
Type Description StatsResponseResponse containing current_minute, last_hour, optional buckets, and response_cache stats.
from komodo import Client
stats = client.alpha.cohort. get_stats ( time_range = " 1h " )
print ( f "Current minute: {stats.current_minute} " )
print ( f "Last hour: {stats.last_hour} " )
# Get 24h metrics with hourly buckets
stats = client.alpha.cohort. get_stats ( time_range = " 24h " , interval = " 1h " )
for bucket in stats.buckets:
Query audit logs for Cohort API requests.
from_time : Optional[ str ] = None ,
to_time : Optional[ str ] = None ,
limit : Optional[ int ] = None ,
Name Type Description from_timestr | NoneFilter entries after this time (RFC3339 format, e.g., "2023-01-01T00:00:00Z"). to_timestr | NoneFilter entries before this time (RFC3339 format). limitint | NoneMaximum entries to return (default: 100, max: 1000).
Type Description List[AuditEntry]List of audit entries sorted by timestamp descending (newest first). Each entry contains endpoint, response_code, latency_ms, timestamp, and optional fields.
from komodo import Client
logs = client.alpha.cohort. list_audit_logs ( limit = 50 )
print ( f " {entry.timestamp} : {entry.endpoint} - {entry.response_code} ( {entry.latency_ms} ms)" )
# Get logs for a specific time range
logs = client.alpha.cohort. list_audit_logs (
from_time = " 2023-06-01T00:00:00Z " ,
to_time = " 2023-06-30T23:59:59Z " ,
The logic parameter accepts a dictionary representing the query logic:
{ " mx_diagnosis_code " : " C34% " }
# Multiple values (OR logic)
{ " mx_diagnosis_code " : [ " C34% " , " C50% " ] }
{ " mx_diagnosis_code " : " C34% " },
{ " cpt_hcpcs_code " : " 96413 " },
{ " mx_diagnosis_code " : " C34% " },
{ " mx_diagnosis_code " : " C50% " },
{ " mx_diagnosis_code " : " C34% " },
{ " cpt_hcpcs_code " : " 96413 " },
{ " cpt_hcpcs_code " : " 96415 " },
For type-safe query building, use the model classes:
from komodo.alpha.models import Condition, ConditionGroup, SelectorType
selector_type = SelectorType.MX_DIAGNOSIS_CODE ,
selector_type = SelectorType.CPT_HCPCS_CODE ,
items = [ diagnosis, procedure ]
from datetime import date
from komodo.alpha.models import DateRange
from_date = date ( 2023 , 1 , 1 ) ,
to_date = date ( 2023 , 12 , 31 )
# From strings (ISO format)
# From strings (mm/dd/yy format)
date_range = DateRange. from_strings ( " 01/01/23 " , " 12/31/23 " )
Available selector types:
Value Description Examples mx_diagnosis_codeICD-10 diagnosis codes C34%, C50.1, Z51.11cpt_hcpcs_codeCPT/HCPCS procedure codes 96413, 99213, J9271ndcNational Drug Codes 12345678901drg_codeDRG codes 470, 871revenue_codeRevenue codes 0636, 0250
from komodo.alpha.models import CountType
CountType. APPROXIMATE # Fast approximate count
CountType. DISTINCT # Exact distinct count (requires date range)
Model Fields ApproxCountResponsetotal_count, warningsDistinctCountResponsetotal, counts, warningsPatientIdsResponsetotal_count, patient_ids, returned_count, warningsExportStatusResponsejob_id, status, s3_directory, error, total_patient_countCodesResponsecodes, truncatedSelectorsResponseselectorsHealthResponsestatus, workers, redis, indexerStatsResponsecurrent_minute, last_hour, buckets, response_cacheAuditEntryendpoint, response_code, latency_ms, timestamp, request_id, error
All API methods have async variants with the _async suffix:
from komodo import Client
response = await client.alpha.cohort. approx_count_async (
logic = { " mx_diagnosis_code " : " C34% " } ,
return response.total_count
count = asyncio. run ( get_count ())
Available async methods:
approx_count_async
distinct_count_async
patient_ids_async
export_patient_ids_async
get_export_status_async
lookup_codes_async
list_selectors_async
health_async
get_stats_async
list_audit_logs_async