Optimizing Workflows in Distributed Systems

A Case Study for the Lleida Population Cancer Registry

Didac Florensa

University of Lleida

Jordi Mateo

University of Lleida

Pablo Fraile

University of Lleida

Whoami

  • Industrial PhD Student at GFT
  • Working on industrial implementations of federated learning

Introduction

Context

This work was part of an Industrial PhD, Florensa Cazorla (2023), collaboration between the Population Cancer Registry, Arnau de Vilanova Hospital, and the University of Lleida.

Purpose

Develop an optimized platform that enables high-quality data for identifying associations between medications and cancer types within the Lleida population registry.

Publications Related

  1. Florensa, Mateo, Solsona, et al. (2023) -> Low-dose acetylsalicylic acid for cancer prevention
  2. Florensa, Mateo, Miret, et al. (2023) -> Metformin and association with pancreas cancer

Team

  • Dídac Florensa: PhD student, responsible for data management, analysis, and requirements gathering.
  • Pablo Fraile: PhD student, responsible for implementing and developing the platform.
  • Jordi Mateo: Professor at the University of Lleida, leading the project.

Problem Statement

Goal

Analyze associations: Medication and cancer type effects on patient survival (protective or harmful).

Challenge

Analyzing 79,931 combinations of medications and cancer types from 2007-2019.

Inital Approach

A single machine would require 61 days to complete this analysis, with each combination consuming 66 seconds.

Profiling

Goal

  • Identify bottlenecks
  • Identify inefficiencies
  • Propose optimizations

Findings

  • Data not retrieved in a single query
  • Join-like applied on a non-relational DB
  • Queries misaligned with schema structure

Proposals

  • Schema redesign based on query access patterns.

Yearly Schema

{
  "expositions": {
    "2017": {
      "J01FA09": 100,
      "J01FA10": 200
    },
    "2018": {
      "J01FA09": 150,
      "J01FA10": 250
    },..
  }
}

ATC Code Schema

{
  "expositions": {
    "J01FA09": {
      "2017": 100,
      "2018": 150
    },
    "J01FA10": {
      "2017": 200,
      "2018": 250
    },...
  }
}

Flattened Schema

{
  "expositions": [
    { "atc": "J01FA09", 
      "year": 2017, 
      "dose": 100 },
    { "atc": "J01FA09", 
      "year": 2018, 
      "dose": 150 },
    ...
  ]
}

Data schema impact

Observation

Proposed solutions reduce query time (at the cost of disk space for indexes). However, deserialization time increases across all proposals.

Next Steps

How can we simultaneously minimize deserialization time and reduce query execution?

Deserialization

Findings

  • PyMongo returns Python dictionaries → slow for large result sets
  • PyMongoArrow improves typing, but still memory-heavy
  • Optimal performance requires columnar layout with primitive types

Solution

  • Solution: split into 3 DataFrames:
    patients, expositions, cancers

Memory Optimization

Findings

  • Most patient features are invariant across combinations (age, BMI, diabetes…)
  • Sorting the data and downcasting types can significantly reduce memory space.

Proposal

  • Precompute static features.
  • Load data once into memory.
  • Save shared data in Apache Parquet.
  • Minimize queries to the database.

Upgrade

Mechanism

  1. Precalculate Dataframes
  2. Read the Dataframes
  3. CSV generation with features and the event column
  4. CSV reading and data preprocessing
  5. COXPH analysis (using a file as stdout)
  6. Read and parse the results
  7. Save structured results for later queries

Improvements

  • Query-driven -> Reduce time to get data.
  • Precalculate shared data -> Avoid repeated queries.
  • Use Parquet files -> Efficient storage and fast access.

Code Optimizations1

Non-index-aware

cancers_df[cancers_df["loc3"] == "C19"]

Index-aware

cancers_df.loc[(slice(None), "C19"), :]

Results

  • Non-index-aware: 1.86 ms
  • Index-aware: 247 μs

Using isin()

cox_df["has_cancer"] = cox_df.index.isin(with_cancer_df.index)

Using loc with prefill

cox_df["has_cancer"] = False
cox_df.loc[with_cancer_df.index, "has_cancer"] = True

Results

  • Using isin(): 3.25 ms
  • Using loc with prefill: 464 μs

Summary of Optimizations

The total reduction in time is around 52 ms per combination. For 79,931 combinations, this results in a total time of ~1 hours.

Eliminating Communications

Problem

Disk I/O for inter-process communication (IPC) with R is a significant bottleneck.

Results

We reduce the processing time from 66 seconds to less than 1 second per combination.

Function Time (ms)
get_cox_df 52
calculate_cox_analysis 776
parse_cox_analysis 22
save_results 21

Multithreading

Technical Insights

  1. Processes outperform threads for CPU-bound tasks -> Python’s Global Interpreter Lock (GIL) limits threads’ performance.
  2. Memory usage is higher with processes, which can lead to out-of-memory errors if too many processes are spawned.
  3. Threads are more efficient for I/O-bound tasks, allowing for faster startup and lower memory usage.

Hybrid Strategy

Since threads and processes are not mutually exclusive, we adopted a hybrid approach:

  • Threads: Efficient for I/O and lightweight parallelism. Used with 2× CPU cores.
  • Processes: Bypass GIL for CPU-bound tasks. Limited by available RAM.

Resource Calibration

The hybrid approach allows fine-tuned calibration of threads and processes, adapting to the device’s CPU and memory capacity. This ensures optimal throughput without exceeding hardware limits.

Task distribution

Architecture

  • Task independence: Each task is a particular combination of medication and cancer type -> can be processed independently.
  • Task Queue: Distributes tasks to worker processes (rabbitmq).
  • Worker Processes: Can be configured to run on different machines, allowing for distributed computing.
  • Task Management: Each worker fetches tasks from the queue, processes them, and returns results to the main process.

Deployment

Requirements

  • Independent combinations to process.
  • Scalable and reproducible execution.

Rationale1

Feature Traditional MPI Cluster Kubernetes
Resource Allocation Static (fixed per job) Dynamic (per-task)
Scaling Manual intervention required Auto-scaling (HPA + Cluster)
Fault Tolerance Job fails if worker crashes Self-healing

Scalability

Comparative Analysis

Cloud Instance type Coremark Workers vCPUs Tasks/s Total time
GKE e2-highcpu-4 51937 1 4 1.0 22h 12min
2 8 1.9 11h 41min
4 16 3.6 06h 10min
8 32 7.0 03h 10min
c2d-highcpu-4 86953 4 16 17.0 01h 18min
On-premise opteron_6247 9634 1 10 0.4 2d 7h 30min
2 20 0.88 1d 1h 13min
4 40 2 11h 6min

Conclusions

Key Optimizations

  • Schema Optimization
    Query-driven design and better deserialization.

  • Precomputation & Storage
    Eliminated redundant calculations and migrated from CSV to Parquet for columnar efficiency.

  • Compute Efficiency and Communication Overhead
    Index-aware queries and optimized pipelines.

  • Parallel Execution
    Hybrid threading/multiprocessing to maximize resource utilization.

  • Distributed Scaling
    Kubernetes-orchestrated workers with queue-based load balancing.

Take Home Messages

  • 61 Days → ~Hours
    Computational throughput improved through systematic optimization.

References

Baziotis, Stefanos, Daniel Kang, and Charith Mendis. 2024. “Dias: Dynamic Rewriting of Pandas Code.” Proceedings of the ACM on Management of Data 2 (1): 58:1–27. https://doi.org/10.1145/3639313.
Florensa Cazorla, Dı́dac. 2023. “Machine Learning Approaches for Comprehensive Analysis of Population Cancer Registry Data.”
Florensa, Dı́dac, J Mateo, C Miret, S Godoy, and P Godoy. 2023. “DIABETES, EXCESS WEIGHT, METFORMIN AND ASSOCIATION WITH PANCREAS CANCER.” In GACETA SANITARIA, 37:S198–98. ELSEVIER 685 ROUTE 202-206, BRIDGEWATER, NJ 08807 USA.
Florensa, Dı́dac, Jordi Mateo, Francesc Solsona, Leonardo Galván, Miquel Mesas, Ramon Piñol, Leonardo Espinosa-Leal, and Pere Godoy. 2023. “Low-Dose Acetylsalicylic Acid for Cancer Prevention Considering Risk Factors: A Retrospective Cohort Study.” Annals of Epidemiology 84: 60–66.
Telenyk, Sergii, Oleksii Sopov, Eduard Zharikov, and Grzegorz Nowakowski. 2021. “A Comparison of Kubernetes and Kubernetes-Compatible Platforms.” In 2021 11th IEEE International Conference on Intelligent Data Acquisition and Advanced Computing Systems: Technology and Applications (IDAACS), 313–17. Cracow, Poland: IEEE. https://doi.org/10.1109/IDAACS53288.2021.9660392.