217 lines
9.3 KiB
Python
217 lines
9.3 KiB
Python
|
|
import pandas as pd # Pandas for data manipulation
|
||
|
|
import tqdm # Progress bar for chunk processing
|
||
|
|
import numpy as np # Numerical operations
|
||
|
|
|
||
|
|
train_frac = 0.7 # Fraction of participants for training split
|
||
|
|
val_frac = 0.15 # Fraction of participants for validation split
|
||
|
|
test_frac = 0.15 # Fraction of participants for test split
|
||
|
|
|
||
|
|
# CSV mapping field IDs to human-readable names
|
||
|
|
field_map_file = "../field_ids_enriched.csv"
|
||
|
|
field_dict = {} # Map original field ID -> new column name
|
||
|
|
with open(field_map_file, "r", encoding="utf-8") as f: # Open the field mapping file
|
||
|
|
next(f) # skip header line
|
||
|
|
for line in f: # Iterate through lines
|
||
|
|
parts = line.strip().split(",") # Split by CSV commas
|
||
|
|
if len(parts) >= 3: # Ensure we have at least id and name columns (fix: was >=2)
|
||
|
|
# Original field identifier (e.g., "34-0.0")
|
||
|
|
field_id = parts[0]
|
||
|
|
field_name = parts[2] # Human-readable column name
|
||
|
|
field_dict[field_id] = field_name # Record the mapping
|
||
|
|
# Track as a potential tabular feature
|
||
|
|
|
||
|
|
|
||
|
|
# TSV mapping field IDs to ICD10-related date columns
|
||
|
|
field_to_icd_map = "../icd10_codes_mod.tsv"
|
||
|
|
# Date-like variables to be converted to offsets
|
||
|
|
date_vars = []
|
||
|
|
with open(field_to_icd_map, "r", encoding="utf-8") as f: # Open ICD10 mapping
|
||
|
|
for line in f: # Iterate each mapping row
|
||
|
|
parts = line.strip().split() # Split on whitespace for TSV
|
||
|
|
if len(parts) >= 6: # Guard against malformed lines
|
||
|
|
# Map field ID to the date column name
|
||
|
|
field_dict[parts[0]] = parts[5]
|
||
|
|
date_vars.append(parts[5]) # Track date column names in order
|
||
|
|
|
||
|
|
for j in range(17): # Map up to 17 cancer entry slots (dates and types)
|
||
|
|
# Cancer diagnosis date slot j
|
||
|
|
field_dict[f'40005-{j}.0'] = f'cancer_date_{j}'
|
||
|
|
field_dict[f'40006-{j}.0'] = f'cancer_type_{j}' # Cancer type/code slot j
|
||
|
|
|
||
|
|
# Number of ICD-related date columns before adding extras
|
||
|
|
len_icd = len(date_vars)
|
||
|
|
date_vars.extend(['Death', 'date_of_assessment'] + # Add outcome date and assessment date
|
||
|
|
# Add cancer date columns
|
||
|
|
[f'cancer_date_{j}' for j in range(17)])
|
||
|
|
|
||
|
|
labels_file = "labels.csv" # File listing label codes
|
||
|
|
label_dict = {} # Map code string -> integer label id
|
||
|
|
with open(labels_file, "r", encoding="utf-8") as f: # Open labels file
|
||
|
|
for idx, line in enumerate(f): # Enumerate to assign incremental label IDs
|
||
|
|
parts = line.strip().split(' ') # Split by space
|
||
|
|
if parts and parts[0]: # Guard against empty lines
|
||
|
|
label_dict[parts[0]] = idx
|
||
|
|
|
||
|
|
event_list = [] # Accumulator for event arrays across chunks
|
||
|
|
ukb_iterator = pd.read_csv( # Stream UK Biobank data in chunks
|
||
|
|
"../ukb_data.csv",
|
||
|
|
sep=',',
|
||
|
|
chunksize=10000, # Stream file in manageable chunks to reduce memory footprint
|
||
|
|
# First column (participant ID) becomes DataFrame index
|
||
|
|
index_col=0,
|
||
|
|
low_memory=False # Disable type inference optimization for consistent dtypes
|
||
|
|
)
|
||
|
|
# Iterate chunks with progress
|
||
|
|
for ukb_chunk in tqdm.tqdm(ukb_iterator, desc="Processing UK Biobank data"):
|
||
|
|
# Rename columns to friendly names
|
||
|
|
ukb_chunk = ukb_chunk.rename(columns=field_dict)
|
||
|
|
# Require sex to be present
|
||
|
|
ukb_chunk.dropna(subset=['sex'], inplace=True)
|
||
|
|
ukb_chunk['sex'] += 2 # Recode sex: 0-> 2, 1 -> 3
|
||
|
|
|
||
|
|
# Construct date of birth from year and month (day fixed to 1)
|
||
|
|
ukb_chunk['dob'] = pd.to_datetime(
|
||
|
|
# Guard against malformed dates
|
||
|
|
ukb_chunk[['year', 'month']].assign(DAY=1), errors='coerce'
|
||
|
|
)
|
||
|
|
|
||
|
|
# Use only date variables that actually exist in the current chunk
|
||
|
|
present_date_vars = [c for c in date_vars if c in ukb_chunk.columns]
|
||
|
|
|
||
|
|
# Convert date-like columns to datetime and compute day offsets from dob
|
||
|
|
if present_date_vars:
|
||
|
|
date_cols = ukb_chunk[present_date_vars].apply(
|
||
|
|
pd.to_datetime, format="%Y-%m-%d", errors='coerce' # Parse dates safely
|
||
|
|
)
|
||
|
|
date_cols_days = date_cols.sub(
|
||
|
|
ukb_chunk['dob'], axis=0) # Timedelta relative to dob
|
||
|
|
ukb_chunk[present_date_vars] = date_cols_days.apply(
|
||
|
|
lambda x: x.dt.days) # Store days since dob
|
||
|
|
|
||
|
|
# Process disease events from ICD10-related date columns
|
||
|
|
# Take ICD date cols plus 'Death' if present by order
|
||
|
|
icd10_cols = present_date_vars[:len_icd + 1]
|
||
|
|
# Melt to long form: participant id, event code (column name), and days offset
|
||
|
|
melted_df = ukb_chunk.reset_index().melt(
|
||
|
|
id_vars=['eid'],
|
||
|
|
value_vars=icd10_cols,
|
||
|
|
var_name='event_code',
|
||
|
|
value_name='days',
|
||
|
|
)
|
||
|
|
# Require non-missing day offsets
|
||
|
|
melted_df.dropna(subset=['days'], inplace=True)
|
||
|
|
if not melted_df.empty:
|
||
|
|
melted_df['label'] = melted_df['event_code'].map(
|
||
|
|
label_dict) # Map event code to numeric label
|
||
|
|
# Fix: ensure labels exist before int cast
|
||
|
|
melted_df.dropna(subset=['label'], inplace=True)
|
||
|
|
if not melted_df.empty:
|
||
|
|
event_list.append(
|
||
|
|
melted_df[['eid', 'days', 'label']]
|
||
|
|
.astype(int) # Safe now since label and days are non-null
|
||
|
|
.to_numpy()
|
||
|
|
)
|
||
|
|
|
||
|
|
df_res = ukb_chunk.reset_index() # Bring participant ID out of index
|
||
|
|
# Simplify stub names for wide_to_long
|
||
|
|
# Rename date stubs
|
||
|
|
rename_dict = {f'cancer_date_{j}': f'cancerdate{j}' for j in range(17)}
|
||
|
|
rename_dict.update(
|
||
|
|
# Rename type stubs
|
||
|
|
{f'cancer_type_{j}': f'cancertype{j}' for j in range(17)})
|
||
|
|
df_renamed = df_res.rename(columns=rename_dict) # Apply renaming
|
||
|
|
stubs_to_use = [] # Collect available stubs
|
||
|
|
if any('cancerdate' in col for col in df_renamed.columns):
|
||
|
|
stubs_to_use.append('cancerdate') # Date stub present
|
||
|
|
if any('cancertype' in col for col in df_renamed.columns):
|
||
|
|
stubs_to_use.append('cancertype') # Type stub present
|
||
|
|
|
||
|
|
if len(stubs_to_use) == 2: # Only proceed if both date and type columns exist
|
||
|
|
long_cancer = pd.wide_to_long(
|
||
|
|
df_renamed,
|
||
|
|
stubnames=stubs_to_use,
|
||
|
|
i=['eid'], # Participant ID identifier
|
||
|
|
j='cancer_num' # Index over cancer record number (0..16)
|
||
|
|
).dropna() # Remove rows missing either date or type
|
||
|
|
if not long_cancer.empty:
|
||
|
|
long_cancer['cancer'] = long_cancer['cancertype'].str.slice(
|
||
|
|
0, 3) # Use first 3 chars as code
|
||
|
|
long_cancer['cancer_label'] = long_cancer['cancer'].map(
|
||
|
|
label_dict) # Map to label id
|
||
|
|
cancer_array = (
|
||
|
|
long_cancer.reset_index(
|
||
|
|
)[['eid', 'cancerdate', 'cancer_label']]
|
||
|
|
.dropna()
|
||
|
|
.astype(int)
|
||
|
|
.to_numpy()
|
||
|
|
)
|
||
|
|
if cancer_array.size > 0:
|
||
|
|
event_list.append(cancer_array) # Append cancer events
|
||
|
|
|
||
|
|
# Process BMI, smoking, and alcohol status
|
||
|
|
ukb_bmi = ukb_chunk[['date_of_assessment', 'bmi']].dropna().reset_index()
|
||
|
|
if not ukb_bmi.empty:
|
||
|
|
ukb_bmi['bmi_status'] = np.select(
|
||
|
|
[ukb_bmi['bmi'] > 28, ukb_bmi['bmi'] > 22],
|
||
|
|
[6, 5],
|
||
|
|
default=4
|
||
|
|
)
|
||
|
|
event_list.append(
|
||
|
|
ukb_bmi[['eid', 'date_of_assessment', 'bmi_status']]
|
||
|
|
.astype(int)
|
||
|
|
.to_numpy()
|
||
|
|
)
|
||
|
|
|
||
|
|
ukb_sm = ukb_chunk[['date_of_assessment', 'smoking']].dropna().reset_index()
|
||
|
|
ukb_sm = ukb_sm[ukb_sm['smoking'] != -3] # Exclude unknown smoking status
|
||
|
|
if not ukb_sm.empty:
|
||
|
|
ukb_sm['smoking_status'] = np.select(
|
||
|
|
[ukb_sm['smoking'] == 1, ukb_sm['smoking'] == 2],
|
||
|
|
[9, 8],
|
||
|
|
default=7
|
||
|
|
)
|
||
|
|
event_list.append(
|
||
|
|
ukb_sm[['eid', 'date_of_assessment', 'smoking_status']]
|
||
|
|
.astype(int)
|
||
|
|
.to_numpy()
|
||
|
|
)
|
||
|
|
ukb_al = ukb_chunk[['date_of_assessment', 'alcohol']].dropna().reset_index()
|
||
|
|
ukb_al = ukb_al[ukb_al['alcohol'] != -3] # Exclude unknown alcohol status
|
||
|
|
if not ukb_al.empty:
|
||
|
|
ukb_al['alcohol_status'] = np.select(
|
||
|
|
[ukb_al['alcohol'] == 1, ukb_al['alcohol'] < 4],
|
||
|
|
[12, 11],
|
||
|
|
default=10
|
||
|
|
)
|
||
|
|
event_list.append(
|
||
|
|
ukb_al[['eid', 'date_of_assessment', 'alcohol_status']]
|
||
|
|
.astype(int)
|
||
|
|
.to_numpy()
|
||
|
|
)
|
||
|
|
|
||
|
|
# Combine tabular chunks
|
||
|
|
|
||
|
|
data = np.vstack(event_list) # Stack all event arrays into one
|
||
|
|
|
||
|
|
# Sort by participant then day
|
||
|
|
data = data[np.lexsort((data[:, 1], data[:, 0]))]
|
||
|
|
|
||
|
|
# Keep only events with non-negative day offsets
|
||
|
|
data = data[data[:, 1] >= 0]
|
||
|
|
|
||
|
|
# Remove duplicate (participant_id, label) pairs keeping first occurrence.
|
||
|
|
data = pd.DataFrame(data).drop_duplicates([0, 2]).values
|
||
|
|
|
||
|
|
# Store compactly using unsigned 32-bit integers
|
||
|
|
data = data.astype(np.uint32)
|
||
|
|
|
||
|
|
# Split data into train/val/test based on unique participant IDs
|
||
|
|
unique_ids = np.unique(data[:, 0]) # Unique participant IDs
|
||
|
|
train_split_id = unique_ids[int(len(unique_ids) * train_frac)]
|
||
|
|
val_split_id = unique_ids[int(len(unique_ids) * (train_frac + val_frac))]
|
||
|
|
|
||
|
|
train_data = data[data[:, 0] <= train_split_id].tofile("ukb_real_train.bin")
|
||
|
|
val_data = data[(data[:, 0] > train_split_id) & (
|
||
|
|
data[:, 0] <= val_split_id)].tofile("ukb_real_val.bin")
|
||
|
|
test_data = data[data[:, 0] > val_split_id].tofile("ukb_real_test.bin")
|