Skip to content

vllm.v1.core.sched.scheduler

logger module-attribute

logger = init_logger(__name__)

Scheduler

Bases: SchedulerInterface

Source code in vllm/v1/core/sched/scheduler.py
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
class Scheduler(SchedulerInterface):
    def __init__(
        self,
        vllm_config: VllmConfig,
        kv_cache_config: KVCacheConfig,
        structured_output_manager: StructuredOutputManager,
        block_size: int,
        mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
        include_finished_set: bool = False,
        log_stats: bool = False,
    ) -> None:
        self.vllm_config = vllm_config
        self.scheduler_config = vllm_config.scheduler_config
        self.cache_config = vllm_config.cache_config
        self.lora_config = vllm_config.lora_config
        self.kv_cache_config = kv_cache_config
        self.kv_events_config = vllm_config.kv_events_config
        self.parallel_config = vllm_config.parallel_config
        self.log_stats = log_stats
        self.structured_output_manager = structured_output_manager
        self.is_encoder_decoder = vllm_config.model_config.is_encoder_decoder

        # include_finished_set controls whether a separate set of finished
        # request ids should be included in the EngineCoreOutputs returned
        # by update_from_outputs(). This is currently used in the multi-engine
        # case to track request lifetimes efficiently.
        self.finished_req_ids_dict: dict[int, set[str]] | None = (
            defaultdict(set) if include_finished_set else None
        )

        # Scheduling constraints.
        self.max_num_running_reqs = self.scheduler_config.max_num_seqs
        self.max_num_scheduled_tokens = self.scheduler_config.max_num_batched_tokens
        self.max_model_len = self.scheduler_config.max_model_len

        # Create KVConnector for the Scheduler. Note that each Worker
        # will have a corresponding KVConnector with Role=WORKER.
        # KV Connector pushes/pull of remote KVs for P/D and offloading.
        self.connector = None
        self.connector_prefix_cache_stats: PrefixCacheStats | None = None
        if self.vllm_config.kv_transfer_config is not None:
            assert len(self.kv_cache_config.kv_cache_groups) == 1, (
                "Multiple KV cache groups are not currently supported "
                "with KV connectors"
            )
            assert not self.is_encoder_decoder, (
                "Encoder-decoder models are not currently supported with KV connectors"
            )
            self.connector = KVConnectorFactory.create_connector(
                config=self.vllm_config, role=KVConnectorRole.SCHEDULER
            )
            if self.log_stats:
                self.connector_prefix_cache_stats = PrefixCacheStats()

        self.kv_event_publisher = EventPublisherFactory.create(
            self.kv_events_config,
            self.parallel_config.data_parallel_rank,
        )

        num_gpu_blocks = self.cache_config.num_gpu_blocks
        assert num_gpu_blocks is not None and num_gpu_blocks > 0

        self.block_size = block_size
        self.dcp_world_size = vllm_config.parallel_config.decode_context_parallel_size

        # req_id -> Request
        self.requests: dict[str, Request] = {}
        # Scheduling policy
        if self.scheduler_config.policy == "priority":
            self.policy = SchedulingPolicy.PRIORITY
        elif self.scheduler_config.policy == "fcfs":
            self.policy = SchedulingPolicy.FCFS
        else:
            raise ValueError(
                f"Unknown scheduling policy: {self.scheduler_config.policy}"
            )
        # Priority queues for requests.
        self.waiting = create_request_queue(self.policy)
        self.running: list[Request] = []

        # The request IDs that are finished in between the previous and the
        # current steps. This is used to notify the workers about the finished
        # requests so that they can free the cached states for those requests.
        # This is flushed at the end of each scheduling step.
        self.finished_req_ids: set[str] = set()

        # KV Connector: requests in process of async KV loading or recving
        self.finished_recving_kv_req_ids: set[str] = set()
        self.failed_recving_kv_req_ids: set[str] = set()

        # Encoder-related.
        # Calculate encoder cache size if applicable
        # NOTE: For now we use the same budget for both compute and space.
        # This can be changed when we make encoder cache for embedding caching
        # across requests.
        encoder_compute_budget, encoder_cache_size = compute_encoder_budget(
            model_config=vllm_config.model_config,
            scheduler_config=vllm_config.scheduler_config,
            mm_registry=mm_registry,
        )

        # NOTE(woosuk): Here, "encoder" includes the vision encoder (and
        # projector if needed) for MM models as well as encoder-decoder
        # transformers.
        self.max_num_encoder_input_tokens = encoder_compute_budget
        # NOTE: For the models without encoder (e.g., text-only models),
        # the encoder cache will not be initialized because cache size is 0
        # for these models.
        self.encoder_cache_manager = EncoderCacheManager(cache_size=encoder_cache_size)

        speculative_config = vllm_config.speculative_config
        self.use_eagle = False
        self.num_spec_tokens = self.num_lookahead_tokens = 0
        if speculative_config:
            self.num_spec_tokens = speculative_config.num_speculative_tokens
            if speculative_config.use_eagle():
                self.use_eagle = True
                self.num_lookahead_tokens = self.num_spec_tokens

        # Create the KV cache manager.
        self.kv_cache_manager = KVCacheManager(
            kv_cache_config=kv_cache_config,
            max_model_len=self.max_model_len,
            enable_caching=self.cache_config.enable_prefix_caching,
            use_eagle=self.use_eagle,
            log_stats=self.log_stats,
            enable_kv_cache_events=self.kv_event_publisher is not None,
            dcp_world_size=self.dcp_world_size,
        )
        self.use_pp = self.parallel_config.pipeline_parallel_size > 1

    def schedule(self) -> SchedulerOutput:
        # NOTE(woosuk) on the scheduling algorithm:
        # There's no "decoding phase" nor "prefill phase" in the scheduler.
        # Each request just has the num_computed_tokens and
        # num_tokens_with_spec. num_tokens_with_spec =
        # len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
        # At each step, the scheduler tries to assign tokens to the requests
        # so that each request's num_computed_tokens can catch up its
        # num_tokens_with_spec. This is general enough to cover
        # chunked prefills, prefix caching, speculative decoding,
        # and the "jump decoding" optimization in the future.

        scheduled_new_reqs: list[Request] = []
        scheduled_resumed_reqs: list[Request] = []
        scheduled_running_reqs: list[Request] = []
        preempted_reqs: list[Request] = []

        req_to_new_blocks: dict[str, KVCacheBlocks] = {}
        num_scheduled_tokens: dict[str, int] = {}
        token_budget = self.max_num_scheduled_tokens
        # Encoder-related.
        scheduled_encoder_inputs: dict[str, list[int]] = {}
        encoder_compute_budget = self.max_num_encoder_input_tokens
        # Spec decode-related.
        scheduled_spec_decode_tokens: dict[str, list[int]] = {}

        # For logging.
        scheduled_timestamp = time.monotonic()

        # First, schedule the RUNNING requests.
        req_index = 0
        while req_index < len(self.running) and token_budget > 0:
            request = self.running[req_index]

            num_new_tokens = (
                request.num_tokens_with_spec
                + request.num_output_placeholders
                - request.num_computed_tokens
            )
            if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
                num_new_tokens = self.scheduler_config.long_prefill_token_threshold
            num_new_tokens = min(num_new_tokens, token_budget)

            # Make sure the input position does not exceed the max model len.
            # This is necessary when using spec decoding.
            num_new_tokens = min(
                num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
            )

            # Schedule encoder inputs.
            encoder_inputs_to_schedule = None
            new_encoder_compute_budget = encoder_compute_budget
            if request.has_encoder_inputs:
                (
                    encoder_inputs_to_schedule,
                    num_new_tokens,
                    new_encoder_compute_budget,
                ) = self._try_schedule_encoder_inputs(
                    request,
                    request.num_computed_tokens,
                    num_new_tokens,
                    encoder_compute_budget,
                )

            if num_new_tokens == 0:
                # The request cannot be scheduled because one of the following
                # reasons:
                # 1. No new tokens to schedule. This may happen when
                #    (1) PP>1 and we have already scheduled all prompt tokens
                #    but they are not finished yet.
                #    (2) Async scheduling and the request has reached to either
                #    its max_total_tokens or max_model_len.
                # 2. The encoder budget is exhausted.
                # 3. The encoder cache is exhausted.
                # NOTE(woosuk): Here, by doing `continue` instead of `break`,
                # we do not strictly follow the FCFS scheduling policy and
                # allow the lower-priority requests to be scheduled.
                req_index += 1
                continue

            # Schedule newly needed KV blocks for the request.
            while True:
                new_blocks = self.kv_cache_manager.allocate_slots(
                    request,
                    num_new_tokens,
                    num_lookahead_tokens=self.num_lookahead_tokens,
                )

                if new_blocks is not None:
                    # The request can be scheduled.
                    break

                # The request cannot be scheduled.
                # Preempt the lowest-priority request.
                if self.policy == SchedulingPolicy.PRIORITY:
                    preempted_req = max(
                        self.running,
                        key=lambda r: (r.priority, r.arrival_time),
                    )
                    self.running.remove(preempted_req)
                    if preempted_req in scheduled_running_reqs:
                        scheduled_running_reqs.remove(preempted_req)
                        token_budget += num_scheduled_tokens[preempted_req.request_id]
                        req_to_new_blocks.pop(preempted_req.request_id)
                        num_scheduled_tokens.pop(preempted_req.request_id)
                        req_index -= 1
                else:
                    preempted_req = self.running.pop()

                self.kv_cache_manager.free(preempted_req)
                self.encoder_cache_manager.free(preempted_req)
                preempted_req.status = RequestStatus.PREEMPTED
                preempted_req.num_computed_tokens = 0
                preempted_req.num_preemptions += 1
                if self.log_stats:
                    preempted_req.record_event(
                        EngineCoreEventType.PREEMPTED, scheduled_timestamp
                    )

                self.waiting.prepend_request(preempted_req)
                preempted_reqs.append(preempted_req)
                if preempted_req == request:
                    # No more request to preempt. Cannot schedule this request.
                    break

            if new_blocks is None:
                # Cannot schedule this request.
                break

            # Schedule the request.
            scheduled_running_reqs.append(request)
            req_to_new_blocks[request.request_id] = new_blocks
            num_scheduled_tokens[request.request_id] = num_new_tokens
            token_budget -= num_new_tokens
            req_index += 1

            # Speculative decode related.
            if request.spec_token_ids:
                num_scheduled_spec_tokens = (
                    num_new_tokens + request.num_computed_tokens - request.num_tokens
                )
                if num_scheduled_spec_tokens > 0:
                    # Trim spec_token_ids list to num_scheduled_spec_tokens.
                    del request.spec_token_ids[num_scheduled_spec_tokens:]
                    scheduled_spec_decode_tokens[request.request_id] = (
                        request.spec_token_ids
                    )

            # Encoder-related.
            if encoder_inputs_to_schedule:
                scheduled_encoder_inputs[request.request_id] = (
                    encoder_inputs_to_schedule
                )
                # Allocate the encoder cache.
                for i in encoder_inputs_to_schedule:
                    self.encoder_cache_manager.allocate(request, i)
                encoder_compute_budget = new_encoder_compute_budget

        # Record the LoRAs in scheduled_running_reqs
        scheduled_loras: set[int] = set()
        if self.lora_config:
            scheduled_loras = set(
                req.lora_request.lora_int_id
                for req in scheduled_running_reqs
                if req.lora_request and req.lora_request.lora_int_id > 0
            )
            assert len(scheduled_loras) <= self.lora_config.max_loras

        # Use a temporary RequestQueue to collect requests that need to be
        # skipped and put back at the head of the waiting queue later
        skipped_waiting_requests = create_request_queue(self.policy)

        # Next, schedule the WAITING requests.
        if not preempted_reqs:
            while self.waiting and token_budget > 0:
                if len(self.running) == self.max_num_running_reqs:
                    break

                request = self.waiting.peek_request()

                # KVTransfer: skip request if still waiting for remote kvs.
                if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
                    is_ready = self._update_waiting_for_remote_kv(request)
                    if is_ready:
                        request.status = RequestStatus.WAITING
                    else:
                        logger.debug(
                            "%s is still in WAITING_FOR_REMOTE_KVS state.",
                            request.request_id,
                        )
                        self.waiting.pop_request()
                        skipped_waiting_requests.prepend_request(request)
                        continue

                # Skip request if the structured output request is still waiting
                # for FSM compilation.
                if request.status == RequestStatus.WAITING_FOR_FSM:
                    structured_output_req = request.structured_output_request
                    if structured_output_req and structured_output_req.grammar:
                        request.status = RequestStatus.WAITING
                    else:
                        self.waiting.pop_request()
                        skipped_waiting_requests.prepend_request(request)
                        continue

                # Check that adding the request still respects the max_loras
                # constraint.
                if (
                    self.lora_config
                    and request.lora_request
                    and (
                        len(scheduled_loras) == self.lora_config.max_loras
                        and request.lora_request.lora_int_id not in scheduled_loras
                    )
                ):
                    # Scheduling would exceed max_loras, skip.
                    self.waiting.pop_request()
                    skipped_waiting_requests.prepend_request(request)
                    continue

                num_external_computed_tokens = 0
                load_kv_async = False

                # Get already-cached tokens.
                if request.num_computed_tokens == 0:
                    # Get locally-cached tokens.
                    new_computed_blocks, num_new_local_computed_tokens = (
                        self.kv_cache_manager.get_computed_blocks(request)
                    )

                    # Get externally-cached tokens if using a KVConnector.
                    if self.connector is not None:
                        num_external_computed_tokens, load_kv_async = (
                            self.connector.get_num_new_matched_tokens(
                                request, num_new_local_computed_tokens
                            )
                        )

                        if num_external_computed_tokens is None:
                            # The request cannot be scheduled because
                            # the KVConnector couldn't determine
                            # the number of matched tokens.
                            self.waiting.pop_request()
                            skipped_waiting_requests.prepend_request(request)
                            continue

                    # Total computed tokens (local + external).
                    num_computed_tokens = (
                        num_new_local_computed_tokens + num_external_computed_tokens
                    )
                # KVTransfer: WAITING reqs have num_computed_tokens > 0
                # after async KV recvs are completed.
                else:
                    new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
                    num_new_local_computed_tokens = 0
                    num_computed_tokens = request.num_computed_tokens

                encoder_inputs_to_schedule = None
                new_encoder_compute_budget = encoder_compute_budget

                # KVTransfer: loading remote KV, do not allocate for new work.
                if load_kv_async:
                    assert num_external_computed_tokens > 0
                    num_new_tokens = 0
                # Number of tokens to be scheduled.
                else:
                    # We use `request.num_tokens` instead of
                    # `request.num_prompt_tokens` to consider the resumed
                    # requests, which have output tokens.
                    num_new_tokens = request.num_tokens - num_computed_tokens
                    if (
                        0
                        < self.scheduler_config.long_prefill_token_threshold
                        < num_new_tokens
                    ):
                        num_new_tokens = (
                            self.scheduler_config.long_prefill_token_threshold
                        )

                    # chunked prefill has to be enabled explicitly to allow
                    # pooling requests to be chunked
                    if (
                        not self.scheduler_config.chunked_prefill_enabled
                        and num_new_tokens > token_budget
                    ):
                        self.waiting.pop_request()
                        skipped_waiting_requests.prepend_request(request)
                        continue

                    num_new_tokens = min(num_new_tokens, token_budget)
                    assert num_new_tokens > 0

                    # Schedule encoder inputs.
                    if request.has_encoder_inputs:
                        (
                            encoder_inputs_to_schedule,
                            num_new_tokens,
                            new_encoder_compute_budget,
                        ) = self._try_schedule_encoder_inputs(
                            request,
                            num_computed_tokens,
                            num_new_tokens,
                            encoder_compute_budget,
                        )
                        if num_new_tokens == 0:
                            # The request cannot be scheduled.
                            break

                # Handles an edge case when P/D Disaggregation
                # is used with Spec Decoding where an
                # extra block gets allocated which
                # creates a mismatch between the number
                # of local and remote blocks.
                effective_lookahead_tokens = (
                    0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
                )

                # Determine if we need to allocate cross-attention blocks.
                if self.is_encoder_decoder and request.has_encoder_inputs:
                    # TODO(russellb): For Whisper, we know that the input is
                    # always padded to the maximum length. If we support other
                    # encoder-decoder models, this will need to be updated if we
                    # want to only allocate what is needed.
                    num_encoder_tokens = (
                        self.scheduler_config.max_num_encoder_input_tokens
                    )
                else:
                    num_encoder_tokens = 0

                new_blocks = self.kv_cache_manager.allocate_slots(
                    request,
                    num_new_tokens + num_external_computed_tokens,
                    num_new_local_computed_tokens,
                    new_computed_blocks,
                    num_lookahead_tokens=effective_lookahead_tokens,
                    delay_cache_blocks=load_kv_async,
                    num_encoder_tokens=num_encoder_tokens,
                )

                if new_blocks is None:
                    # The request cannot be scheduled.
                    break

                # KVTransfer: the connector uses this info to determine
                # if a load is needed. Note that
                # This information is used to determine if a load is
                # needed for this request.
                if self.connector is not None:
                    self.connector.update_state_after_alloc(
                        request,
                        new_computed_blocks + new_blocks,
                        num_external_computed_tokens,
                    )
                    self._update_connector_prefix_cache_stats(
                        request, num_external_computed_tokens
                    )

                # Request was already popped from self.waiting
                # unless it was re-added above due to new_blocks being None.
                request = self.waiting.pop_request()
                if load_kv_async:
                    # If loading async, allocate memory and put request
                    # into the WAITING_FOR_REMOTE_KV state.
                    skipped_waiting_requests.prepend_request(request)
                    request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
                    continue

                req_index += 1
                self.running.append(request)
                if self.log_stats:
                    request.record_event(
                        EngineCoreEventType.SCHEDULED, scheduled_timestamp
                    )
                if request.status == RequestStatus.WAITING:
                    scheduled_new_reqs.append(request)
                elif request.status == RequestStatus.PREEMPTED:
                    scheduled_resumed_reqs.append(request)
                else:
                    raise RuntimeError(f"Invalid request status: {request.status}")

                if self.lora_config and request.lora_request:
                    scheduled_loras.add(request.lora_request.lora_int_id)
                req_to_new_blocks[request.request_id] = (
                    self.kv_cache_manager.get_blocks(request.request_id)
                )
                num_scheduled_tokens[request.request_id] = num_new_tokens
                token_budget -= num_new_tokens
                request.status = RequestStatus.RUNNING
                request.num_computed_tokens = num_computed_tokens
                # Count the number of prefix cached tokens.
                if request.num_cached_tokens < 0:
                    request.num_cached_tokens = num_computed_tokens
                # Encoder-related.
                if encoder_inputs_to_schedule:
                    scheduled_encoder_inputs[request.request_id] = (
                        encoder_inputs_to_schedule
                    )
                    # Allocate the encoder cache.
                    for i in encoder_inputs_to_schedule:
                        self.encoder_cache_manager.allocate(request, i)
                    encoder_compute_budget = new_encoder_compute_budget

        # Put back any skipped requests at the head of the waiting queue
        if skipped_waiting_requests:
            self.waiting.prepend_requests(skipped_waiting_requests)

        # Check if the scheduling constraints are satisfied.
        total_num_scheduled_tokens = sum(num_scheduled_tokens.values())
        assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens
        assert token_budget >= 0
        assert len(self.running) <= self.max_num_running_reqs
        # Since some requests in the RUNNING queue may not be scheduled in
        # this step, the total number of scheduled requests can be smaller than
        # len(self.running).
        assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len(
            scheduled_running_reqs
        ) <= len(self.running)

        # Get the longest common prefix among all requests in the running queue.
        # This can be potentially used for cascade attention.
        num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups)
        if self.running:
            any_request = self.running[0]
            num_common_prefix_blocks = (
                self.kv_cache_manager.get_num_common_prefix_blocks(
                    any_request.request_id
                )
            )

        # Construct the scheduler output.
        new_reqs_data = [
            NewRequestData.from_request(
                req, req_to_new_blocks[req.request_id].get_block_ids()
            )
            for req in scheduled_new_reqs
        ]
        cached_reqs_data = self._make_cached_request_data(
            scheduled_running_reqs,
            scheduled_resumed_reqs,
            num_scheduled_tokens,
            scheduled_spec_decode_tokens,
            req_to_new_blocks,
        )
        structured_output_request_ids, grammar_bitmask = self.get_grammar_bitmask(
            num_scheduled_tokens.keys(), scheduled_spec_decode_tokens
        )
        scheduler_output = SchedulerOutput(
            scheduled_new_reqs=new_reqs_data,
            scheduled_cached_reqs=cached_reqs_data,
            num_scheduled_tokens=num_scheduled_tokens,
            total_num_scheduled_tokens=total_num_scheduled_tokens,
            scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
            scheduled_encoder_inputs=scheduled_encoder_inputs,
            num_common_prefix_blocks=num_common_prefix_blocks,
            # finished_req_ids is an existing state in the scheduler,
            # instead of being newly scheduled in this step.
            # It contains the request IDs that are finished in between
            # the previous and the current steps.
            finished_req_ids=self.finished_req_ids,
            free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
            structured_output_request_ids=structured_output_request_ids,
            grammar_bitmask=grammar_bitmask,
        )

        # NOTE(Kuntai): this function is designed for multiple purposes:
        # 1. Plan the KV cache store
        # 2. Wrap up all the KV cache load / save ops into an opaque object
        # 3. Clear the internal states of the connector
        if self.connector is not None:
            meta = self.connector.build_connector_meta(scheduler_output)
            scheduler_output.kv_connector_metadata = meta

        # collect KV cache events from KV cache manager
        events = self.kv_cache_manager.take_events()

        # collect KV cache events from connector
        if self.connector is not None:
            connector_events = self.connector.take_events()
            if connector_events:
                if events is None:
                    events = list(connector_events)
                else:
                    events.extend(connector_events)

        # publish collected KV cache events
        if events:
            assert self.kv_event_publisher is not None
            batch = KVEventBatch(ts=time.time(), events=events)
            self.kv_event_publisher.publish(batch)

        self._update_after_schedule(scheduler_output)
        return scheduler_output

    def _update_after_schedule(
        self,
        scheduler_output: SchedulerOutput,
    ) -> None:
        # Advance the number of computed tokens for the request AFTER
        # the request is scheduled.
        # 1. The scheduler_output of the current step has to include the
        #    original number of scheduled tokens to determine input IDs.
        # 2. Advance the number of computed tokens here allowing us to
        #    schedule the prefill request again immediately in the next
        #    scheduling step.
        # 3. If some tokens (e.g. spec tokens) are rejected later, the number of
        #    computed tokens will be adjusted in update_from_output.
        num_scheduled_tokens = scheduler_output.num_scheduled_tokens
        for req_id, num_scheduled_token in num_scheduled_tokens.items():
            request = self.requests[req_id]
            request.num_computed_tokens += num_scheduled_token

            # NOTE: _free_encoder_inputs relies on num_computed_tokens, which
            # may be updated again in _update_from_output for speculative
            # decoding. However, it is safe to call the method here because
            # encoder inputs are always part of the prompt, not the output,
            # and thus are unaffected by speculative decoding.
            if request.has_encoder_inputs:
                self._free_encoder_inputs(request)

        # Clear the finished request IDs.
        # NOTE: We shouldn't do self.finished_req_ids.clear() here because
        # it will also affect the scheduler output.
        self.finished_req_ids = set()

    def _make_cached_request_data(
        self,
        running_reqs: list[Request],
        resumed_reqs: list[Request],
        num_scheduled_tokens: dict[str, int],
        spec_decode_tokens: dict[str, list[int]],
        req_to_new_blocks: dict[str, KVCacheBlocks],
    ) -> CachedRequestData:
        req_ids: list[str] = []
        new_token_ids: list[list[int]] = []
        new_block_ids: list[tuple[list[int], ...] | None] = []
        resumed_req_token_ids: list[list[int] | None] = []
        num_computed_tokens: list[int] = []
        num_output_tokens: list[int] = []

        # Because resumed_reqs is usually empty, it is more efficient to do
        # in-place appending so that we don't need to allocate a new list.
        resumed_from_preemption = [False] * len(running_reqs)
        resumed_from_preemption += [True] * len(resumed_reqs)
        for idx, req in enumerate(itertools.chain(running_reqs, resumed_reqs)):
            req_id = req.request_id
            req_ids.append(req_id)
            num_tokens = num_scheduled_tokens[req_id] - len(
                spec_decode_tokens.get(req_id, ())
            )
            if self.use_pp:
                # When using PP, the scheduler sends the sampled tokens back,
                # because there's no direct communication between the first-
                # stage worker and the last-stage worker. Otherwise, we don't
                # need to send the sampled tokens back because the model runner
                # will cache them.
                token_ids = req.all_token_ids[
                    req.num_computed_tokens : req.num_computed_tokens + num_tokens
                ]
                new_token_ids.append(token_ids)
            resumed_token_ids = None
            if resumed_from_preemption[idx]:
                resumed_token_ids = req.all_token_ids[
                    : req.num_computed_tokens + num_tokens
                ]
            resumed_req_token_ids.append(resumed_token_ids)
            new_block_ids.append(
                req_to_new_blocks[req_id].get_block_ids(allow_none=True)
            )
            num_computed_tokens.append(req.num_computed_tokens)
            num_output_tokens.append(
                req.num_output_tokens + req.num_output_placeholders
            )

        return CachedRequestData(
            req_ids=req_ids,
            resumed_from_preemption=resumed_from_preemption,
            new_token_ids=new_token_ids,
            resumed_req_token_ids=resumed_req_token_ids,
            new_block_ids=new_block_ids,
            num_computed_tokens=num_computed_tokens,
            num_output_tokens=num_output_tokens,
        )

    def _try_schedule_encoder_inputs(
        self,
        request: Request,
        num_computed_tokens: int,
        num_new_tokens: int,
        encoder_compute_budget: int,
    ) -> tuple[list[int], int, int]:
        """
        Determine which encoder inputs need to be scheduled in the current step,
        and update `num_new_tokens` and encoder token budget accordingly.

        An encoder input will be scheduled if:
        - Its output tokens overlap with the range of tokens being computed
        in this step, i.e.,
        [num_computed_tokens, num_computed_tokens + num_new_tokens).
        - It is not already computed and stored in the encoder cache.
        - There is sufficient encoder token budget to process it.
        - The encoder cache has space to store it.

        If an encoder input cannot be scheduled due to cache or budget
        limitations, the method adjusts `num_new_tokens` to schedule only the
        decoder tokens up to just before the unschedulable encoder input.

        Note that num_computed_tokens includes both locally cached
        blocks and externally cached blocks (via KVConnector).
        """
        if num_new_tokens == 0 or not request.has_encoder_inputs:
            return [], num_new_tokens, encoder_compute_budget
        encoder_inputs_to_schedule: list[int] = []
        mm_features = request.mm_features
        assert mm_features is not None
        assert len(mm_features) > 0

        # NOTE: since scheduler operates on the request level (possibly with
        # multiple encoder inputs per request), we need to create temporary
        # trackers for accounting at the encoder input level.
        mm_hashes_to_schedule = set()
        num_tokens_to_schedule = 0
        for i, mm_feature in enumerate(mm_features):
            start_pos = mm_feature.mm_position.offset
            num_encoder_tokens = mm_feature.mm_position.length

            # The encoder output is needed if the two ranges overlap:
            # [num_computed_tokens, num_computed_tokens + num_new_tokens) and
            # [start_pos, start_pos + num_encoder_tokens)
            if start_pos >= num_computed_tokens + num_new_tokens:
                # The encoder input is not needed in this step.
                break

            if self.is_encoder_decoder and num_computed_tokens > 0:
                assert start_pos == 0, (
                    "Encoder input should be processed at the beginning of "
                    "the sequence when encoder-decoder models are used."
                )
                # Encoder input has already been computed
                # The calculation here is a bit different. We don't turn encoder
                # output into tokens that get processed by the decoder and
                # reflected in num_computed_tokens. Instead, start_pos reflects
                # the position where we need to ensure we calculate encoder
                # inputs. This should always be 0 to ensure we calculate encoder
                # inputs before running the decoder.  Once we've calculated some
                # decoder tokens (num_computed_tokens > 0), then we know we
                # already calculated encoder inputs and can skip here.
                continue
            elif start_pos + num_encoder_tokens <= num_computed_tokens:
                # The encoder input is already computed and stored
                # in the decoder's KV cache.
                continue

            if not self.is_encoder_decoder:
                # We are not using the encoder cache for encoder-decoder models,
                # yet.
                if request.mm_features[i].identifier in mm_hashes_to_schedule:
                    # The same encoder input has already been scheduled in the
                    # current step.
                    continue

                if self.encoder_cache_manager.check_and_update_cache(request, i):
                    # The encoder input is already computed and cached from a
                    # previous step.
                    continue

            # If no encoder input chunking is allowed, we do not want to
            # partially schedule a multimodal item. If the scheduled range would
            # only cover part of the mm input, roll back to before the mm item.
            if (
                self.scheduler_config.disable_chunked_mm_input
                and num_computed_tokens < start_pos
                and (num_computed_tokens + num_new_tokens)
                < (start_pos + num_encoder_tokens)
            ):
                num_new_tokens = start_pos - num_computed_tokens
                break

            if not self.encoder_cache_manager.can_allocate(
                request, i, encoder_compute_budget, num_tokens_to_schedule
            ):
                # The encoder cache is full or the encoder budget is exhausted.
                # NOTE(woosuk): We assume that the encoder input tokens should
                # be processed altogether, as the encoder usually uses
                # bidirectional attention.
                if num_computed_tokens < start_pos:
                    # We only schedule the decoder tokens just before the
                    # encoder input.
                    num_new_tokens = start_pos - num_computed_tokens
                else:
                    # Because of prefix caching, num_computed_tokens is greater
                    # than start_pos even though its encoder input is not
                    # available. In this case, we can't schedule any token for
                    # the request in this step.
                    num_new_tokens = 0
                break

            num_tokens_to_schedule += num_encoder_tokens
            encoder_compute_budget -= num_encoder_tokens
            mm_hashes_to_schedule.add(request.mm_features[i].identifier)
            encoder_inputs_to_schedule.append(i)

        return (
            encoder_inputs_to_schedule,
            num_new_tokens,
            encoder_compute_budget,
        )

    def get_grammar_bitmask(
        self,
        scheduled_request_ids: Iterable[str],
        scheduled_spec_decode_tokens: dict[str, list[int]],
    ) -> tuple[list[str], "npt.NDArray[np.int32] | None"]:
        # Collect list of scheduled request ids that use structured output.
        # The corresponding rows of the bitmask will be in this order.
        # PERF: in case of chunked prefill,
        # request might not include any new tokens.
        # Therefore, we might introduce some additional
        # cycle to fill in the bitmask, which could be a big no-op.
        structured_output_request_ids = [
            req_id
            for req_id in scheduled_request_ids
            if (req := self.requests.get(req_id)) and req.use_structured_output
        ]
        if not structured_output_request_ids:
            return structured_output_request_ids, None

        bitmask = self.structured_output_manager.grammar_bitmask(
            self.requests,
            structured_output_request_ids,
            scheduled_spec_decode_tokens,
        )
        return structured_output_request_ids, bitmask

    def update_from_output(
        self,
        scheduler_output: SchedulerOutput,
        model_runner_output: ModelRunnerOutput,
    ) -> dict[int, EngineCoreOutputs]:
        sampled_token_ids = model_runner_output.sampled_token_ids
        logprobs = model_runner_output.logprobs
        prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict
        num_scheduled_tokens = scheduler_output.num_scheduled_tokens
        pooler_outputs = model_runner_output.pooler_output
        num_nans_in_logits = model_runner_output.num_nans_in_logits
        kv_connector_output = model_runner_output.kv_connector_output

        outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list)
        spec_decoding_stats: SpecDecodingStats | None = None
        kv_connector_stats = (
            kv_connector_output.kv_connector_stats if kv_connector_output else None
        )
        if kv_connector_stats and self.connector:
            stats = self.connector.get_kv_connector_stats()
            if stats:
                kv_connector_stats = kv_connector_stats.aggregate(stats)

        failed_kv_load_req_ids = None
        if kv_connector_output and kv_connector_output.invalid_block_ids:
            # These blocks contain externally computed tokens that failed to
            # load. Identify affected requests and adjust their computed token
            # count to trigger recomputation of the invalid blocks.
            failed_kv_load_req_ids = self._handle_invalid_blocks(
                kv_connector_output.invalid_block_ids
            )

        # NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more,
        # the below loop can be a performance bottleneck. We should do our best
        # to avoid expensive operations inside the loop.
        stopped_running_reqs: set[Request] = set()
        stopped_preempted_reqs: set[Request] = set()
        for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
            assert num_tokens_scheduled > 0
            if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids:
                # Skip requests that were recovered from KV load failure
                continue
            request = self.requests.get(req_id)
            if request is None:
                # The request is already finished. This can happen if the
                # request is aborted while the model is executing it (e.g.,
                # in pipeline parallelism).
                continue

            req_index = model_runner_output.req_id_to_index[req_id]
            generated_token_ids = (
                sampled_token_ids[req_index] if sampled_token_ids else []
            )

            scheduled_spec_token_ids = (
                scheduler_output.scheduled_spec_decode_tokens.get(req_id)
            )
            if scheduled_spec_token_ids:
                num_draft_tokens = len(scheduled_spec_token_ids)
                num_accepted = len(generated_token_ids) - 1
                num_rejected = num_draft_tokens - num_accepted
                # num_computed_tokens represents the number of tokens
                # processed in the current step, considering scheduled
                # tokens and rejections. If some tokens are rejected,
                # num_computed_tokens is decreased by the number of rejected
                # tokens.
                request.num_computed_tokens -= num_rejected
                spec_decoding_stats = self.make_spec_decoding_stats(
                    spec_decoding_stats,
                    num_draft_tokens=num_draft_tokens,
                    num_accepted_tokens=num_accepted,
                )

            stopped = False
            new_logprobs = None
            new_token_ids = generated_token_ids
            kv_transfer_params = None
            status_before_stop = request.status

            # Check for stop and update request status.
            if new_token_ids:
                new_token_ids, stopped = self._update_request_with_output(
                    request, new_token_ids
                )

            # Stop checking for pooler models.
            pooler_output = None
            if pooler_outputs:
                pooler_output = pooler_outputs[req_index]
                stopped = check_stop(request, self.max_model_len, pooler_output)

            if stopped:
                kv_transfer_params = self._free_request(request)
                if status_before_stop == RequestStatus.RUNNING:
                    stopped_running_reqs.add(request)
                else:
                    stopped_preempted_reqs.add(request)

            # Extract sample logprobs if needed.
            if (
                request.sampling_params is not None
                and request.sampling_params.logprobs is not None
                and logprobs
            ):
                # NOTE: once we support N tokens per step (spec decode),
                # the outer lists can be of length > 1.
                new_logprobs = logprobs.slice(req_index, req_index + 1)

            if new_token_ids and self.structured_output_manager.should_advance(request):
                struct_output_request = request.structured_output_request
                assert struct_output_request is not None
                assert struct_output_request.grammar is not None
                struct_output_request.grammar.accept_tokens(req_id, new_token_ids)

            if num_nans_in_logits is not None and req_id in num_nans_in_logits:
                request.num_nans_in_logits = num_nans_in_logits[req_id]

            # Get prompt logprobs for this request.
            prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
            if new_token_ids or pooler_output is not None or kv_transfer_params:
                # Add EngineCoreOutput for this Request.
                outputs[request.client_index].append(
                    EngineCoreOutput(
                        request_id=req_id,
                        new_token_ids=new_token_ids,
                        finish_reason=request.get_finished_reason(),
                        new_logprobs=new_logprobs,
                        new_prompt_logprobs_tensors=prompt_logprobs_tensors,
                        pooling_output=pooler_output,
                        stop_reason=request.stop_reason,
                        events=request.take_events(),
                        kv_transfer_params=kv_transfer_params,
                        trace_headers=request.trace_headers,
                        num_cached_tokens=request.num_cached_tokens,
                    )
                )
            else:
                # Invariant: EngineCore returns no partial prefill outputs.
                assert not prompt_logprobs_tensors

        # Remove the stopped requests from the running and waiting queues.
        if stopped_running_reqs:
            self.running = remove_all(self.running, stopped_running_reqs)
        if stopped_preempted_reqs:
            # This is a rare case and unlikely to impact performance.
            self.waiting.remove_requests(stopped_preempted_reqs)

        # KV Connector: update state for finished KV Transfers.
        if kv_connector_output:
            self._update_from_kv_xfer_finished(kv_connector_output)

        # Create EngineCoreOutputs for all clients that have requests with
        # outputs in this step.
        engine_core_outputs = {
            client_index: EngineCoreOutputs(outputs=outs)
            for client_index, outs in outputs.items()
        }

        finished_req_ids = self.finished_req_ids_dict
        if finished_req_ids:
            # Include ids of requests that finished since last outputs
            # were sent.
            for client_index, finished_set in finished_req_ids.items():
                # Set finished request set in EngineCoreOutputs for this client.
                if (eco := engine_core_outputs.get(client_index)) is not None:
                    eco.finished_requests = finished_set
                else:
                    engine_core_outputs[client_index] = EngineCoreOutputs(
                        finished_requests=finished_set
                    )
            finished_req_ids.clear()

        if (
            stats := self.make_stats(spec_decoding_stats, kv_connector_stats)
        ) is not None:
            # Return stats to only one of the front-ends.
            if (eco := next(iter(engine_core_outputs.values()), None)) is None:
                # We must return the stats even if there are no request
                # outputs this step.
                engine_core_outputs[0] = eco = EngineCoreOutputs()
            eco.scheduler_stats = stats

        return engine_core_outputs

    def _update_request_with_output(
        self,
        request: Request,
        new_token_ids: list[int],
    ) -> tuple[list[int], bool]:
        # Append generated tokens and check for stop. Note that if
        # a request is still being prefilled, we expect the model runner
        # to return empty token ids for the request.
        stopped = False
        for num_new, output_token_id in enumerate(new_token_ids, 1):
            request.append_output_token_ids(output_token_id)

            # Check for stop and update request state.
            # This must be called before we make the EngineCoreOutput.
            stopped = check_stop(request, self.max_model_len)
            if stopped:
                del new_token_ids[num_new:]  # Trim new tokens if needed.
                break
        return new_token_ids, stopped

    def _free_encoder_inputs(self, request: Request) -> None:
        cached_encoder_input_ids = self.encoder_cache_manager.get_cached_input_ids(
            request
        )
        # OPTIMIZATION: Avoid list(set) if the set is empty.
        if not cached_encoder_input_ids:
            return

        # Here, we use list(set) to avoid modifying the set while iterating
        # over it.
        for input_id in list(cached_encoder_input_ids):
            mm_feature = request.mm_features[input_id]
            start_pos = mm_feature.mm_position.offset
            num_tokens = mm_feature.mm_position.length
            if self.is_encoder_decoder and request.num_computed_tokens > 0:
                # With Whisper, as soon as we've generated a single token,
                # we know we're done with the encoder input. Cross Attention
                # KVs have been calculated and cached already.
                self.encoder_cache_manager.free_encoder_input(request, input_id)
            elif start_pos + num_tokens <= request.num_computed_tokens:
                # The encoder output is already processed and stored
                # in the decoder's KV cache.
                self.encoder_cache_manager.free_encoder_input(request, input_id)

    def update_draft_token_ids(
        self,
        draft_token_ids: DraftTokenIds,
    ) -> None:
        for req_id, spec_token_ids in zip(
            draft_token_ids.req_ids,
            draft_token_ids.draft_token_ids,
        ):
            request = self.requests.get(req_id)
            if request is None or request.is_finished():
                # The request may have been finished. Skip.
                continue

            # Add newly generated spec token ids to the request.
            if not spec_token_ids:
                # NOTE(woosuk): request.spec_token_ids should be updated.
                request.spec_token_ids.clear()
            elif self.structured_output_manager.should_advance(request):
                metadata = request.structured_output_request
                request.spec_token_ids = metadata.grammar.validate_tokens(  # type: ignore[union-attr]
                    spec_token_ids
                )
            else:
                request.spec_token_ids = spec_token_ids

    def get_request_counts(self) -> tuple[int, int]:
        """Returns (num_running_reqs, num_waiting_reqs)."""
        return len(self.running), len(self.waiting)

    def add_request(self, request: Request) -> None:
        self.waiting.add_request(request)
        self.requests[request.request_id] = request
        if self.log_stats:
            request.record_event(EngineCoreEventType.QUEUED)

    def finish_requests(
        self,
        request_ids: str | Iterable[str],
        finished_status: RequestStatus,
    ) -> None:
        """Handles the finish signal from outside the scheduler.

        For example, the API server can abort a request when the client
        disconnects.
        """
        assert RequestStatus.is_finished(finished_status)
        if isinstance(request_ids, str):
            request_ids = (request_ids,)
        else:
            request_ids = set(request_ids)

        running_requests_to_remove = set()
        waiting_requests_to_remove = []
        valid_requests = []

        # First pass: collect requests to remove from queues
        for req_id in request_ids:
            request = self.requests.get(req_id)
            if request is None or request.is_finished():
                # Invalid request ID.
                continue

            valid_requests.append(request)
            if request.status == RequestStatus.RUNNING:
                running_requests_to_remove.add(request)
            else:
                waiting_requests_to_remove.append(request)

        # Remove all requests from queues at once for better efficiency
        if running_requests_to_remove:
            self.running = remove_all(self.running, running_requests_to_remove)
        if waiting_requests_to_remove:
            self.waiting.remove_requests(waiting_requests_to_remove)

        # Second pass: set status and free requests
        for request in valid_requests:
            request.status = finished_status
            self._free_request(request)

    def _free_request(self, request: Request) -> dict[str, Any] | None:
        assert request.is_finished()

        delay_free_blocks, kv_xfer_params = self._connector_finished(request)
        self.encoder_cache_manager.free(request)
        request_id = request.request_id
        self.finished_req_ids.add(request_id)
        if self.finished_req_ids_dict is not None:
            self.finished_req_ids_dict[request.client_index].add(request_id)

        if not delay_free_blocks:
            self._free_blocks(request)

        return kv_xfer_params

    def _free_blocks(self, request: Request):
        assert request.is_finished()
        self.kv_cache_manager.free(request)
        del self.requests[request.request_id]

    def get_num_unfinished_requests(self) -> int:
        return len(self.waiting) + len(self.running)

    def has_finished_requests(self) -> bool:
        return len(self.finished_req_ids) > 0

    def reset_prefix_cache(self) -> bool:
        return self.kv_cache_manager.reset_prefix_cache()

    def make_stats(
        self,
        spec_decoding_stats: SpecDecodingStats | None = None,
        kv_connector_stats: KVConnectorStats | None = None,
    ) -> SchedulerStats | None:
        if not self.log_stats:
            return None
        prefix_cache_stats = self.kv_cache_manager.make_prefix_cache_stats()
        assert prefix_cache_stats is not None
        connector_prefix_cache_stats = self._make_connector_prefix_cache_stats()
        return SchedulerStats(
            num_running_reqs=len(self.running),
            num_waiting_reqs=len(self.waiting),
            kv_cache_usage=self.kv_cache_manager.usage,
            prefix_cache_stats=prefix_cache_stats,
            connector_prefix_cache_stats=connector_prefix_cache_stats,
            spec_decoding_stats=spec_decoding_stats,
            num_corrupted_reqs=sum(req.is_output_corrupted for req in self.running),
            kv_connector_stats=kv_connector_stats.data if kv_connector_stats else None,
        )

    def make_spec_decoding_stats(
        self,
        spec_decoding_stats: SpecDecodingStats | None,
        num_draft_tokens: int,
        num_accepted_tokens: int,
    ) -> SpecDecodingStats | None:
        if not self.log_stats:
            return None
        if spec_decoding_stats is None:
            spec_decoding_stats = SpecDecodingStats.new(self.num_spec_tokens)
        spec_decoding_stats.observe_draft(
            num_draft_tokens=num_draft_tokens, num_accepted_tokens=num_accepted_tokens
        )
        return spec_decoding_stats

    def shutdown(self) -> None:
        if self.kv_event_publisher:
            self.kv_event_publisher.shutdown()
        if self.connector is not None:
            self.connector.shutdown()

    ########################################################################
    # KV Connector Related Methods
    ########################################################################

    def _update_connector_prefix_cache_stats(
        self, request: Request, num_external_tokens: int
    ) -> None:
        if self.connector_prefix_cache_stats is None:
            return

        self.connector_prefix_cache_stats.record(
            num_tokens=request.num_tokens,
            num_hits=num_external_tokens,
            preempted=request.num_preemptions > 0,
        )

    def _make_connector_prefix_cache_stats(self) -> PrefixCacheStats | None:
        if self.connector_prefix_cache_stats is None:
            return None
        stats = self.connector_prefix_cache_stats
        self.connector_prefix_cache_stats = PrefixCacheStats()
        return stats

    def get_kv_connector(self) -> KVConnectorBase_V1 | None:
        return self.connector

    def _connector_finished(
        self, request: Request
    ) -> tuple[bool, dict[str, Any] | None]:
        """
        Invoke the KV connector request_finished() method if applicable.

        Returns optional kv transfer parameters to be included with the
        request outputs.
        """
        if self.connector is None:
            return False, None

        (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id)
        return self.connector.request_finished(request, block_ids)

    def _update_waiting_for_remote_kv(self, request: Request) -> bool:
        """
        KV Connector: check if the request_id is finished_recving.

        The finished_recving_kv_req_ids list is populated
        on the previous steps()'s update_from_output based
        on the worker side connector.

        When the kv transfer is ready, we cache the blocks
        and the request state will be moved back to WAITING from
        WAITING_FOR_REMOTE_KV.
        """
        assert self.connector is not None
        if request.request_id not in self.finished_recving_kv_req_ids:
            return False

        if request.request_id in self.failed_recving_kv_req_ids:
            # Request had KV load failures; num_computed_tokens was already
            # updated in _update_requests_with_invalid_blocks
            if request.num_computed_tokens:
                # Cache any valid computed tokens.
                self.kv_cache_manager.cache_blocks(request, request.num_computed_tokens)
            else:
                # No valid computed tokens, release allocated blocks.
                # There may be a local cache hit on retry.
                self.kv_cache_manager.free(request)

            self.failed_recving_kv_req_ids.remove(request.request_id)
        else:
            # Now that the blocks are ready, actually cache them.
            (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id)
            num_computed_tokens = len(block_ids) * self.block_size
            # Handle the case where num request tokens less than one block.
            num_computed_tokens = min(num_computed_tokens, request.num_tokens)
            if num_computed_tokens == request.num_tokens:
                num_computed_tokens -= 1
            # This will cache the blocks iff caching is enabled.
            self.kv_cache_manager.cache_blocks(request, num_computed_tokens)

            # Update the request state for scheduling.
            request.num_computed_tokens = num_computed_tokens

        # Return that we are ready.
        self.finished_recving_kv_req_ids.remove(request.request_id)
        return True

    def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput):
        """
        KV Connector: update the scheduler state based on the output.

        The Worker side connectors add finished_recving and
        finished_sending reqs to the output.
        * if finished_sending: free the blocks
        # if finished_recving: add to state so we can
            schedule the request during the next step.
        """

        if self.connector is not None:
            self.connector.update_connector_output(kv_connector_output)

        # KV Connector:: update recv and send status from last step.
        for req_id in kv_connector_output.finished_recving or ():
            logger.debug("Finished recving KV transfer for request %s", req_id)
            self.finished_recving_kv_req_ids.add(req_id)
        for req_id in kv_connector_output.finished_sending or ():
            logger.debug("Finished sending KV transfer for request %s", req_id)
            assert req_id in self.requests
            self._free_blocks(self.requests[req_id])

    def _update_requests_with_invalid_blocks(
        self, requests: Iterable[Request], invalid_block_ids: set[int]
    ) -> tuple[set[str], int]:
        """
        Identify and update requests affected by invalid KV cache blocks.

        This method scans the given requests, detects those with invalid blocks
        and adjusts their `num_computed_tokens` to the longest valid prefix.
        For observability, it also accumulates the total number of tokens that
        will need to be recomputed across all affected requests.

        Args:
            requests: The set of requests to scan for invalid blocks.
            invalid_block_ids: IDs of invalid blocks.

        Returns:
            tuple:
                - affected_req_ids (set[str]): IDs of requests impacted by
                invalid blocks.
                - total_affected_tokens (int): Total number of tokens that must
                be recomputed across all affected requests (for observability).
        """
        affected_req_ids: set[str] = set()
        total_affected_tokens = 0
        # If a block is invalid and shared by multiple requests in the batch,
        # these requests must be rescheduled, but only the first will recompute
        # it. This set tracks blocks already marked for recomputation.
        marked_invalid_block_ids: set[int] = set()
        for request in requests:
            is_affected = False
            marked_invalid_block = False
            req_id = request.request_id
            # TODO (davidb): add support for hybrid memory allocator
            (req_block_ids,) = self.kv_cache_manager.get_block_ids(req_id)
            # We iterate only over blocks that may contain externally computed
            # tokens
            if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
                # Async loading. If num_computed_tokens is set it implies we
                # already processed some block failures for it in a prior step
                req_num_computed_tokens = (
                    request.num_computed_tokens
                    if req_id in self.failed_recving_kv_req_ids
                    else len(req_block_ids) * self.block_size
                )
            else:
                # Sync loading. num_computed_tokens includes new tokens
                req_num_computed_tokens = request.num_cached_tokens

            req_num_computed_blocks = (
                req_num_computed_tokens + self.block_size - 1
            ) // self.block_size
            for idx, block_id in zip(range(req_num_computed_blocks), req_block_ids):
                if block_id not in invalid_block_ids:
                    continue

                is_affected = True

                if block_id in marked_invalid_block_ids:
                    # This invalid block is shared with a previous request
                    # and was already marked for recomputation.
                    # This means this request can still consider this block
                    # as computed when rescheduled.
                    # Currently this only applies to sync loading; Async
                    # loading does not yet support block sharing
                    continue

                marked_invalid_block_ids.add(block_id)

                if marked_invalid_block:
                    # This request has already marked an invalid block for
                    # recomputation and updated its num_computed_tokens.
                    continue

                marked_invalid_block = True
                # Truncate the computed tokens at the first failed block
                request.num_computed_tokens = idx * self.block_size
                total_affected_tokens += (
                    req_num_computed_tokens - request.num_computed_tokens
                )

            if is_affected:
                if not marked_invalid_block:
                    # All invalid blocks of this request are shared with
                    # previous requests and will be recomputed by them.
                    # Revert to considering only cached tokens as computed.
                    # Currently this only applies to sync loading; Async
                    # loading does not yet support block sharing
                    total_affected_tokens += (
                        request.num_computed_tokens - request.num_cached_tokens
                    )
                    request.num_computed_tokens = request.num_cached_tokens

                affected_req_ids.add(request.request_id)

        return affected_req_ids, total_affected_tokens

    def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
        total_requests_to_reschedule = 0
        total_tokens_to_reschedule = 0

        # --- Handle async KV loads (WAITING_FOR_REMOTE_KVS) ---
        async_load_reqs = (
            req
            for req in self.waiting
            if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS
        )
        async_affected_req_ids, num_tokens_to_reschedule = (
            self._update_requests_with_invalid_blocks(
                async_load_reqs, invalid_block_ids
            )
        )

        total_requests_to_reschedule += len(async_affected_req_ids)
        total_tokens_to_reschedule += num_tokens_to_reschedule

        # Mark requests with async KV load failures; they will be rescheduled
        # once loading completes.
        self.failed_recving_kv_req_ids |= async_affected_req_ids

        # --- Handle sync KV loads (running requests) ---
        sync_affected_req_ids, num_tokens_to_reschedule = (
            self._update_requests_with_invalid_blocks(self.running, invalid_block_ids)
        )

        total_requests_to_reschedule += len(sync_affected_req_ids)
        total_tokens_to_reschedule += num_tokens_to_reschedule

        if total_requests_to_reschedule:
            logger.warning(
                "Recovered from KV load failure: "
                "%d request(s) rescheduled (%d tokens affected).",
                total_requests_to_reschedule,
                total_tokens_to_reschedule,
            )

        # Return the IDs of affected running requests to skip in
        # update_from_output.
        return sync_affected_req_ids

block_size instance-attribute

block_size = block_size

cache_config instance-attribute

cache_config = cache_config

connector instance-attribute

connector = None

connector_prefix_cache_stats instance-attribute

connector_prefix_cache_stats: PrefixCacheStats | None = None

dcp_world_size instance-attribute

dcp_world_size = decode_context_parallel_size

encoder_cache_manager instance-attribute

encoder_cache_manager = EncoderCacheManager(
    cache_size=encoder_cache_size
)

failed_recving_kv_req_ids instance-attribute

failed_recving_kv_req_ids: set[str] = set()

finished_recving_kv_req_ids instance-attribute

finished_recving_kv_req_ids: set[str] = set()

finished_req_ids instance-attribute

finished_req_ids: set[str] = set()

finished_req_ids_dict instance-attribute

finished_req_ids_dict: dict[int, set[str]] | None = (
    defaultdict(set) if include_finished_set else None
)

is_encoder_decoder instance-attribute

is_encoder_decoder = is_encoder_decoder

kv_cache_config instance-attribute

kv_cache_config = kv_cache_config

kv_cache_manager instance-attribute

kv_cache_manager = KVCacheManager(
    kv_cache_config=kv_cache_config,
    max_model_len=max_model_len,
    enable_caching=enable_prefix_caching,
    use_eagle=use_eagle,
    log_stats=log_stats,
    enable_kv_cache_events=kv_event_publisher is not None,
    dcp_world_size=dcp_world_size,
)

kv_event_publisher instance-attribute

kv_event_publisher = create(
    kv_events_config, data_parallel_rank
)

kv_events_config instance-attribute

kv_events_config = kv_events_config

log_stats instance-attribute

log_stats = log_stats

lora_config instance-attribute

lora_config = lora_config

max_model_len instance-attribute

max_model_len = max_model_len

max_num_encoder_input_tokens instance-attribute

max_num_encoder_input_tokens = encoder_compute_budget

max_num_running_reqs instance-attribute

max_num_running_reqs = max_num_seqs

max_num_scheduled_tokens instance-attribute

max_num_scheduled_tokens = max_num_batched_tokens

num_lookahead_tokens instance-attribute

num_lookahead_tokens = 0

num_spec_tokens instance-attribute

num_spec_tokens = 0

parallel_config instance-attribute

parallel_config = parallel_config

policy instance-attribute

policy = PRIORITY

requests instance-attribute

requests: dict[str, Request] = {}

running instance-attribute

running: list[Request] = []

scheduler_config instance-attribute

scheduler_config = scheduler_config

structured_output_manager instance-attribute

structured_output_manager = structured_output_manager

use_eagle instance-attribute

use_eagle = False

use_pp instance-attribute

use_pp = pipeline_parallel_size > 1

vllm_config instance-attribute

vllm_config = vllm_config

waiting instance-attribute

waiting = create_request_queue(policy)

__init__

__init__(
    vllm_config: VllmConfig,
    kv_cache_config: KVCacheConfig,
    structured_output_manager: StructuredOutputManager,
    block_size: int,
    mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
    include_finished_set: bool = False,
    log_stats: bool = False,
) -> None
Source code in vllm/v1/core/sched/scheduler.py
def __init__(
    self,
    vllm_config: VllmConfig,
    kv_cache_config: KVCacheConfig,
    structured_output_manager: StructuredOutputManager,
    block_size: int,
    mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
    include_finished_set: bool = False,
    log_stats: bool = False,
) -> None:
    self.vllm_config = vllm_config
    self.scheduler_config = vllm_config.scheduler_config
    self.cache_config = vllm_config.cache_config
    self.lora_config = vllm_config.lora_config
    self.kv_cache_config = kv_cache_config
    self.kv_events_config = vllm_config.kv_events_config
    self.parallel_config = vllm_config.parallel_config
    self.log_stats = log_stats
    self.structured_output_manager = structured_output_manager
    self.is_encoder_decoder = vllm_config.model_config.is_encoder_decoder

    # include_finished_set controls whether a separate set of finished
    # request ids should be included in the EngineCoreOutputs returned
    # by update_from_outputs(). This is currently used in the multi-engine
    # case to track request lifetimes efficiently.
    self.finished_req_ids_dict: dict[int, set[str]] | None = (
        defaultdict(set) if include_finished_set else None
    )

    # Scheduling constraints.
    self.max_num_running_reqs = self.scheduler_config.max_num_seqs
    self.max_num_scheduled_tokens = self.scheduler_config.max_num_batched_tokens
    self.max_model_len = self.scheduler_config.max_model_len

    # Create KVConnector for the Scheduler. Note that each Worker
    # will have a corresponding KVConnector with Role=WORKER.
    # KV Connector pushes/pull of remote KVs for P/D and offloading.
    self.connector = None
    self.connector_prefix_cache_stats: PrefixCacheStats | None = None
    if self.vllm_config.kv_transfer_config is not None:
        assert len(self.kv_cache_config.kv_cache_groups) == 1, (
            "Multiple KV cache groups are not currently supported "
            "with KV connectors"
        )
        assert not self.is_encoder_decoder, (
            "Encoder-decoder models are not currently supported with KV connectors"
        )
        self.connector = KVConnectorFactory.create_connector(
            config=self.vllm_config, role=KVConnectorRole.SCHEDULER
        )
        if self.log_stats:
            self.connector_prefix_cache_stats = PrefixCacheStats()

    self.kv_event_publisher = EventPublisherFactory.create(
        self.kv_events_config,
        self.parallel_config.data_parallel_rank,
    )

    num_gpu_blocks = self.cache_config.num_gpu_blocks
    assert num_gpu_blocks is not None and num_gpu_blocks > 0

    self.block_size = block_size
    self.dcp_world_size = vllm_config.parallel_config.decode_context_parallel_size

    # req_id -> Request
    self.requests: dict[str, Request] = {}
    # Scheduling policy
    if self.scheduler_config.policy == "priority":
        self.policy = SchedulingPolicy.PRIORITY
    elif self.scheduler_config.policy == "fcfs":
        self.policy = SchedulingPolicy.FCFS
    else:
        raise ValueError(
            f"Unknown scheduling policy: {self.scheduler_config.policy}"
        )
    # Priority queues for requests.
    self.waiting = create_request_queue(self.policy)
    self.running: list[Request] = []

    # The request IDs that are finished in between the previous and the
    # current steps. This is used to notify the workers about the finished
    # requests so that they can free the cached states for those requests.
    # This is flushed at the end of each scheduling step.
    self.finished_req_ids: set[str] = set()

    # KV Connector: requests in process of async KV loading or recving
    self.finished_recving_kv_req_ids: set[str] = set()
    self.failed_recving_kv_req_ids: set[str] = set()

    # Encoder-related.
    # Calculate encoder cache size if applicable
    # NOTE: For now we use the same budget for both compute and space.
    # This can be changed when we make encoder cache for embedding caching
    # across requests.
    encoder_compute_budget, encoder_cache_size = compute_encoder_budget(
        model_config=vllm_config.model_config,
        scheduler_config=vllm_config.scheduler_config,
        mm_registry=mm_registry,
    )

    # NOTE(woosuk): Here, "encoder" includes the vision encoder (and
    # projector if needed) for MM models as well as encoder-decoder
    # transformers.
    self.max_num_encoder_input_tokens = encoder_compute_budget
    # NOTE: For the models without encoder (e.g., text-only models),
    # the encoder cache will not be initialized because cache size is 0
    # for these models.
    self.encoder_cache_manager = EncoderCacheManager(cache_size=encoder_cache_size)

    speculative_config = vllm_config.speculative_config
    self.use_eagle = False
    self.num_spec_tokens = self.num_lookahead_tokens = 0
    if speculative_config:
        self.num_spec_tokens = speculative_config.num_speculative_tokens
        if speculative_config.use_eagle():
            self.use_eagle = True
            self.num_lookahead_tokens = self.num_spec_tokens

    # Create the KV cache manager.
    self.kv_cache_manager = KVCacheManager(
        kv_cache_config=kv_cache_config,
        max_model_len=self.max_model_len,
        enable_caching=self.cache_config.enable_prefix_caching,
        use_eagle=self.use_eagle,
        log_stats=self.log_stats,
        enable_kv_cache_events=self.kv_event_publisher is not None,
        dcp_world_size=self.dcp_world_size,
    )
    self.use_pp = self.parallel_config.pipeline_parallel_size > 1

_connector_finished

_connector_finished(
    request: Request,
) -> tuple[bool, dict[str, Any] | None]

Invoke the KV connector request_finished() method if applicable.

Returns optional kv transfer parameters to be included with the request outputs.

Source code in vllm/v1/core/sched/scheduler.py
def _connector_finished(
    self, request: Request
) -> tuple[bool, dict[str, Any] | None]:
    """
    Invoke the KV connector request_finished() method if applicable.

    Returns optional kv transfer parameters to be included with the
    request outputs.
    """
    if self.connector is None:
        return False, None

    (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id)
    return self.connector.request_finished(request, block_ids)

_free_blocks

_free_blocks(request: Request)
Source code in vllm/v1/core/sched/scheduler.py
def _free_blocks(self, request: Request):
    assert request.is_finished()
    self.kv_cache_manager.free(request)
    del self.requests[request.request_id]

_free_encoder_inputs

_free_encoder_inputs(request: Request) -> None
Source code in vllm/v1/core/sched/scheduler.py
def _free_encoder_inputs(self, request: Request) -> None:
    cached_encoder_input_ids = self.encoder_cache_manager.get_cached_input_ids(
        request
    )
    # OPTIMIZATION: Avoid list(set) if the set is empty.
    if not cached_encoder_input_ids:
        return

    # Here, we use list(set) to avoid modifying the set while iterating
    # over it.
    for input_id in list(cached_encoder_input_ids):
        mm_feature = request.mm_features[input_id]
        start_pos = mm_feature.mm_position.offset
        num_tokens = mm_feature.mm_position.length
        if self.is_encoder_decoder and request.num_computed_tokens > 0:
            # With Whisper, as soon as we've generated a single token,
            # we know we're done with the encoder input. Cross Attention
            # KVs have been calculated and cached already.
            self.encoder_cache_manager.free_encoder_input(request, input_id)
        elif start_pos + num_tokens <= request.num_computed_tokens:
            # The encoder output is already processed and stored
            # in the decoder's KV cache.
            self.encoder_cache_manager.free_encoder_input(request, input_id)

_free_request

_free_request(request: Request) -> dict[str, Any] | None
Source code in vllm/v1/core/sched/scheduler.py
def _free_request(self, request: Request) -> dict[str, Any] | None:
    assert request.is_finished()

    delay_free_blocks, kv_xfer_params = self._connector_finished(request)
    self.encoder_cache_manager.free(request)
    request_id = request.request_id
    self.finished_req_ids.add(request_id)
    if self.finished_req_ids_dict is not None:
        self.finished_req_ids_dict[request.client_index].add(request_id)

    if not delay_free_blocks:
        self._free_blocks(request)

    return kv_xfer_params

_handle_invalid_blocks

_handle_invalid_blocks(
    invalid_block_ids: set[int],
) -> set[str]
Source code in vllm/v1/core/sched/scheduler.py
def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
    total_requests_to_reschedule = 0
    total_tokens_to_reschedule = 0

    # --- Handle async KV loads (WAITING_FOR_REMOTE_KVS) ---
    async_load_reqs = (
        req
        for req in self.waiting
        if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS
    )
    async_affected_req_ids, num_tokens_to_reschedule = (
        self._update_requests_with_invalid_blocks(
            async_load_reqs, invalid_block_ids
        )
    )

    total_requests_to_reschedule += len(async_affected_req_ids)
    total_tokens_to_reschedule += num_tokens_to_reschedule

    # Mark requests with async KV load failures; they will be rescheduled
    # once loading completes.
    self.failed_recving_kv_req_ids |= async_affected_req_ids

    # --- Handle sync KV loads (running requests) ---
    sync_affected_req_ids, num_tokens_to_reschedule = (
        self._update_requests_with_invalid_blocks(self.running, invalid_block_ids)
    )

    total_requests_to_reschedule += len(sync_affected_req_ids)
    total_tokens_to_reschedule += num_tokens_to_reschedule

    if total_requests_to_reschedule:
        logger.warning(
            "Recovered from KV load failure: "
            "%d request(s) rescheduled (%d tokens affected).",
            total_requests_to_reschedule,
            total_tokens_to_reschedule,
        )

    # Return the IDs of affected running requests to skip in
    # update_from_output.
    return sync_affected_req_ids

_make_cached_request_data

_make_cached_request_data(
    running_reqs: list[Request],
    resumed_reqs: list[Request],
    num_scheduled_tokens: dict[str, int],
    spec_decode_tokens: dict[str, list[int]],
    req_to_new_blocks: dict[str, KVCacheBlocks],
) -> CachedRequestData
Source code in vllm/v1/core/sched/scheduler.py
def _make_cached_request_data(
    self,
    running_reqs: list[Request],
    resumed_reqs: list[Request],
    num_scheduled_tokens: dict[str, int],
    spec_decode_tokens: dict[str, list[int]],
    req_to_new_blocks: dict[str, KVCacheBlocks],
) -> CachedRequestData:
    req_ids: list[str] = []
    new_token_ids: list[list[int]] = []
    new_block_ids: list[tuple[list[int], ...] | None] = []
    resumed_req_token_ids: list[list[int] | None] = []
    num_computed_tokens: list[int] = []
    num_output_tokens: list[int] = []

    # Because resumed_reqs is usually empty, it is more efficient to do
    # in-place appending so that we don't need to allocate a new list.
    resumed_from_preemption = [False] * len(running_reqs)
    resumed_from_preemption += [True] * len(resumed_reqs)
    for idx, req in enumerate(itertools.chain(running_reqs, resumed_reqs)):
        req_id = req.request_id
        req_ids.append(req_id)
        num_tokens = num_scheduled_tokens[req_id] - len(
            spec_decode_tokens.get(req_id, ())
        )
        if self.use_pp:
            # When using PP, the scheduler sends the sampled tokens back,
            # because there's no direct communication between the first-
            # stage worker and the last-stage worker. Otherwise, we don't
            # need to send the sampled tokens back because the model runner
            # will cache them.
            token_ids = req.all_token_ids[
                req.num_computed_tokens : req.num_computed_tokens + num_tokens
            ]
            new_token_ids.append(token_ids)
        resumed_token_ids = None
        if resumed_from_preemption[idx]:
            resumed_token_ids = req.all_token_ids[
                : req.num_computed_tokens + num_tokens
            ]
        resumed_req_token_ids.append(resumed_token_ids)
        new_block_ids.append(
            req_to_new_blocks[req_id].get_block_ids(allow_none=True)
        )
        num_computed_tokens.append(req.num_computed_tokens)
        num_output_tokens.append(
            req.num_output_tokens + req.num_output_placeholders
        )

    return CachedRequestData(
        req_ids=req_ids,
        resumed_from_preemption=resumed_from_preemption,
        new_token_ids=new_token_ids,
        resumed_req_token_ids=resumed_req_token_ids,
        new_block_ids=new_block_ids,
        num_computed_tokens=num_computed_tokens,
        num_output_tokens=num_output_tokens,
    )

_make_connector_prefix_cache_stats

_make_connector_prefix_cache_stats() -> (
    PrefixCacheStats | None
)
Source code in vllm/v1/core/sched/scheduler.py
def _make_connector_prefix_cache_stats(self) -> PrefixCacheStats | None:
    if self.connector_prefix_cache_stats is None:
        return None
    stats = self.connector_prefix_cache_stats
    self.connector_prefix_cache_stats = PrefixCacheStats()
    return stats

_try_schedule_encoder_inputs

_try_schedule_encoder_inputs(
    request: Request,
    num_computed_tokens: int,
    num_new_tokens: int,
    encoder_compute_budget: int,
) -> tuple[list[int], int, int]

Determine which encoder inputs need to be scheduled in the current step, and update num_new_tokens and encoder token budget accordingly.

An encoder input will be scheduled if: - Its output tokens overlap with the range of tokens being computed in this step, i.e., [num_computed_tokens, num_computed_tokens + num_new_tokens). - It is not already computed and stored in the encoder cache. - There is sufficient encoder token budget to process it. - The encoder cache has space to store it.

If an encoder input cannot be scheduled due to cache or budget limitations, the method adjusts num_new_tokens to schedule only the decoder tokens up to just before the unschedulable encoder input.

Note that num_computed_tokens includes both locally cached blocks and externally cached blocks (via KVConnector).

Source code in vllm/v1/core/sched/scheduler.py
def _try_schedule_encoder_inputs(
    self,
    request: Request,
    num_computed_tokens: int,
    num_new_tokens: int,
    encoder_compute_budget: int,
) -> tuple[list[int], int, int]:
    """
    Determine which encoder inputs need to be scheduled in the current step,
    and update `num_new_tokens` and encoder token budget accordingly.

    An encoder input will be scheduled if:
    - Its output tokens overlap with the range of tokens being computed
    in this step, i.e.,
    [num_computed_tokens, num_computed_tokens + num_new_tokens).
    - It is not already computed and stored in the encoder cache.
    - There is sufficient encoder token budget to process it.
    - The encoder cache has space to store it.

    If an encoder input cannot be scheduled due to cache or budget
    limitations, the method adjusts `num_new_tokens` to schedule only the
    decoder tokens up to just before the unschedulable encoder input.

    Note that num_computed_tokens includes both locally cached
    blocks and externally cached blocks (via KVConnector).
    """
    if num_new_tokens == 0 or not request.has_encoder_inputs:
        return [], num_new_tokens, encoder_compute_budget
    encoder_inputs_to_schedule: list[int] = []
    mm_features = request.mm_features
    assert mm_features is not None
    assert len(mm_features) > 0

    # NOTE: since scheduler operates on the request level (possibly with
    # multiple encoder inputs per request), we need to create temporary
    # trackers for accounting at the encoder input level.
    mm_hashes_to_schedule = set()
    num_tokens_to_schedule = 0
    for i, mm_feature in enumerate(mm_features):
        start_pos = mm_feature.mm_position.offset
        num_encoder_tokens = mm_feature.mm_position.length

        # The encoder output is needed if the two ranges overlap:
        # [num_computed_tokens, num_computed_tokens + num_new_tokens) and
        # [start_pos, start_pos + num_encoder_tokens)
        if start_pos >= num_computed_tokens + num_new_tokens:
            # The encoder input is not needed in this step.
            break

        if self.is_encoder_decoder and num_computed_tokens > 0:
            assert start_pos == 0, (
                "Encoder input should be processed at the beginning of "
                "the sequence when encoder-decoder models are used."
            )
            # Encoder input has already been computed
            # The calculation here is a bit different. We don't turn encoder
            # output into tokens that get processed by the decoder and
            # reflected in num_computed_tokens. Instead, start_pos reflects
            # the position where we need to ensure we calculate encoder
            # inputs. This should always be 0 to ensure we calculate encoder
            # inputs before running the decoder.  Once we've calculated some
            # decoder tokens (num_computed_tokens > 0), then we know we
            # already calculated encoder inputs and can skip here.
            continue
        elif start_pos + num_encoder_tokens <= num_computed_tokens:
            # The encoder input is already computed and stored
            # in the decoder's KV cache.
            continue

        if not self.is_encoder_decoder:
            # We are not using the encoder cache for encoder-decoder models,
            # yet.
            if request.mm_features[i].identifier in mm_hashes_to_schedule:
                # The same encoder input has already been scheduled in the
                # current step.
                continue

            if self.encoder_cache_manager.check_and_update_cache(request, i):
                # The encoder input is already computed and cached from a
                # previous step.
                continue

        # If no encoder input chunking is allowed, we do not want to
        # partially schedule a multimodal item. If the scheduled range would
        # only cover part of the mm input, roll back to before the mm item.
        if (
            self.scheduler_config.disable_chunked_mm_input
            and num_computed_tokens < start_pos
            and (num_computed_tokens + num_new_tokens)
            < (start_pos + num_encoder_tokens)
        ):
            num_new_tokens = start_pos - num_computed_tokens
            break

        if not self.encoder_cache_manager.can_allocate(
            request, i, encoder_compute_budget, num_tokens_to_schedule
        ):
            # The encoder cache is full or the encoder budget is exhausted.
            # NOTE(woosuk): We assume that the encoder input tokens should
            # be processed altogether, as the encoder usually uses
            # bidirectional attention.
            if num_computed_tokens < start_pos:
                # We only schedule the decoder tokens just before the
                # encoder input.
                num_new_tokens = start_pos - num_computed_tokens
            else:
                # Because of prefix caching, num_computed_tokens is greater
                # than start_pos even though its encoder input is not
                # available. In this case, we can't schedule any token for
                # the request in this step.
                num_new_tokens = 0
            break

        num_tokens_to_schedule += num_encoder_tokens
        encoder_compute_budget -= num_encoder_tokens
        mm_hashes_to_schedule.add(request.mm_features[i].identifier)
        encoder_inputs_to_schedule.append(i)

    return (
        encoder_inputs_to_schedule,
        num_new_tokens,
        encoder_compute_budget,
    )

_update_after_schedule

_update_after_schedule(
    scheduler_output: SchedulerOutput,
) -> None
Source code in vllm/v1/core/sched/scheduler.py
def _update_after_schedule(
    self,
    scheduler_output: SchedulerOutput,
) -> None:
    # Advance the number of computed tokens for the request AFTER
    # the request is scheduled.
    # 1. The scheduler_output of the current step has to include the
    #    original number of scheduled tokens to determine input IDs.
    # 2. Advance the number of computed tokens here allowing us to
    #    schedule the prefill request again immediately in the next
    #    scheduling step.
    # 3. If some tokens (e.g. spec tokens) are rejected later, the number of
    #    computed tokens will be adjusted in update_from_output.
    num_scheduled_tokens = scheduler_output.num_scheduled_tokens
    for req_id, num_scheduled_token in num_scheduled_tokens.items():
        request = self.requests[req_id]
        request.num_computed_tokens += num_scheduled_token

        # NOTE: _free_encoder_inputs relies on num_computed_tokens, which
        # may be updated again in _update_from_output for speculative
        # decoding. However, it is safe to call the method here because
        # encoder inputs are always part of the prompt, not the output,
        # and thus are unaffected by speculative decoding.
        if request.has_encoder_inputs:
            self._free_encoder_inputs(request)

    # Clear the finished request IDs.
    # NOTE: We shouldn't do self.finished_req_ids.clear() here because
    # it will also affect the scheduler output.
    self.finished_req_ids = set()

_update_connector_prefix_cache_stats

_update_connector_prefix_cache_stats(
    request: Request, num_external_tokens: int
) -> None
Source code in vllm/v1/core/sched/scheduler.py
def _update_connector_prefix_cache_stats(
    self, request: Request, num_external_tokens: int
) -> None:
    if self.connector_prefix_cache_stats is None:
        return

    self.connector_prefix_cache_stats.record(
        num_tokens=request.num_tokens,
        num_hits=num_external_tokens,
        preempted=request.num_preemptions > 0,
    )

_update_from_kv_xfer_finished

_update_from_kv_xfer_finished(
    kv_connector_output: KVConnectorOutput,
)

KV Connector: update the scheduler state based on the output.

The Worker side connectors add finished_recving and finished_sending reqs to the output. * if finished_sending: free the blocks

if finished_recving: add to state so we can

schedule the request during the next step.
Source code in vllm/v1/core/sched/scheduler.py
def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput):
    """
    KV Connector: update the scheduler state based on the output.

    The Worker side connectors add finished_recving and
    finished_sending reqs to the output.
    * if finished_sending: free the blocks
    # if finished_recving: add to state so we can
        schedule the request during the next step.
    """

    if self.connector is not None:
        self.connector.update_connector_output(kv_connector_output)

    # KV Connector:: update recv and send status from last step.
    for req_id in kv_connector_output.finished_recving or ():
        logger.debug("Finished recving KV transfer for request %s", req_id)
        self.finished_recving_kv_req_ids.add(req_id)
    for req_id in kv_connector_output.finished_sending or ():
        logger.debug("Finished sending KV transfer for request %s", req_id)
        assert req_id in self.requests
        self._free_blocks(self.requests[req_id])

_update_request_with_output

_update_request_with_output(
    request: Request, new_token_ids: list[int]
) -> tuple[list[int], bool]
Source code in vllm/v1/core/sched/scheduler.py
def _update_request_with_output(
    self,
    request: Request,
    new_token_ids: list[int],
) -> tuple[list[int], bool]:
    # Append generated tokens and check for stop. Note that if
    # a request is still being prefilled, we expect the model runner
    # to return empty token ids for the request.
    stopped = False
    for num_new, output_token_id in enumerate(new_token_ids, 1):
        request.append_output_token_ids(output_token_id)

        # Check for stop and update request state.
        # This must be called before we make the EngineCoreOutput.
        stopped = check_stop(request, self.max_model_len)
        if stopped:
            del new_token_ids[num_new:]  # Trim new tokens if needed.
            break
    return new_token_ids, stopped

_update_requests_with_invalid_blocks

_update_requests_with_invalid_blocks(
    requests: Iterable[Request], invalid_block_ids: set[int]
) -> tuple[set[str], int]

Identify and update requests affected by invalid KV cache blocks.

This method scans the given requests, detects those with invalid blocks and adjusts their num_computed_tokens to the longest valid prefix. For observability, it also accumulates the total number of tokens that will need to be recomputed across all affected requests.

Parameters:

Name Type Description Default
requests Iterable[Request]

The set of requests to scan for invalid blocks.

required
invalid_block_ids set[int]

IDs of invalid blocks.

required

Returns:

Name Type Description
tuple tuple[set[str], int]
  • affected_req_ids (set[str]): IDs of requests impacted by invalid blocks.
  • total_affected_tokens (int): Total number of tokens that must be recomputed across all affected requests (for observability).
Source code in vllm/v1/core/sched/scheduler.py
def _update_requests_with_invalid_blocks(
    self, requests: Iterable[Request], invalid_block_ids: set[int]
) -> tuple[set[str], int]:
    """
    Identify and update requests affected by invalid KV cache blocks.

    This method scans the given requests, detects those with invalid blocks
    and adjusts their `num_computed_tokens` to the longest valid prefix.
    For observability, it also accumulates the total number of tokens that
    will need to be recomputed across all affected requests.

    Args:
        requests: The set of requests to scan for invalid blocks.
        invalid_block_ids: IDs of invalid blocks.

    Returns:
        tuple:
            - affected_req_ids (set[str]): IDs of requests impacted by
            invalid blocks.
            - total_affected_tokens (int): Total number of tokens that must
            be recomputed across all affected requests (for observability).
    """
    affected_req_ids: set[str] = set()
    total_affected_tokens = 0
    # If a block is invalid and shared by multiple requests in the batch,
    # these requests must be rescheduled, but only the first will recompute
    # it. This set tracks blocks already marked for recomputation.
    marked_invalid_block_ids: set[int] = set()
    for request in requests:
        is_affected = False
        marked_invalid_block = False
        req_id = request.request_id
        # TODO (davidb): add support for hybrid memory allocator
        (req_block_ids,) = self.kv_cache_manager.get_block_ids(req_id)
        # We iterate only over blocks that may contain externally computed
        # tokens
        if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
            # Async loading. If num_computed_tokens is set it implies we
            # already processed some block failures for it in a prior step
            req_num_computed_tokens = (
                request.num_computed_tokens
                if req_id in self.failed_recving_kv_req_ids
                else len(req_block_ids) * self.block_size
            )
        else:
            # Sync loading. num_computed_tokens includes new tokens
            req_num_computed_tokens = request.num_cached_tokens

        req_num_computed_blocks = (
            req_num_computed_tokens + self.block_size - 1
        ) // self.block_size
        for idx, block_id in zip(range(req_num_computed_blocks), req_block_ids):
            if block_id not in invalid_block_ids:
                continue

            is_affected = True

            if block_id in marked_invalid_block_ids:
                # This invalid block is shared with a previous request
                # and was already marked for recomputation.
                # This means this request can still consider this block
                # as computed when rescheduled.
                # Currently this only applies to sync loading; Async
                # loading does not yet support block sharing
                continue

            marked_invalid_block_ids.add(block_id)

            if marked_invalid_block:
                # This request has already marked an invalid block for
                # recomputation and updated its num_computed_tokens.
                continue

            marked_invalid_block = True
            # Truncate the computed tokens at the first failed block
            request.num_computed_tokens = idx * self.block_size
            total_affected_tokens += (
                req_num_computed_tokens - request.num_computed_tokens
            )

        if is_affected:
            if not marked_invalid_block:
                # All invalid blocks of this request are shared with
                # previous requests and will be recomputed by them.
                # Revert to considering only cached tokens as computed.
                # Currently this only applies to sync loading; Async
                # loading does not yet support block sharing
                total_affected_tokens += (
                    request.num_computed_tokens - request.num_cached_tokens
                )
                request.num_computed_tokens = request.num_cached_tokens

            affected_req_ids.add(request.request_id)

    return affected_req_ids, total_affected_tokens

_update_waiting_for_remote_kv

_update_waiting_for_remote_kv(request: Request) -> bool

KV Connector: check if the request_id is finished_recving.

The finished_recving_kv_req_ids list is populated on the previous steps()'s update_from_output based on the worker side connector.

When the kv transfer is ready, we cache the blocks and the request state will be moved back to WAITING from WAITING_FOR_REMOTE_KV.

Source code in vllm/v1/core/sched/scheduler.py
def _update_waiting_for_remote_kv(self, request: Request) -> bool:
    """
    KV Connector: check if the request_id is finished_recving.

    The finished_recving_kv_req_ids list is populated
    on the previous steps()'s update_from_output based
    on the worker side connector.

    When the kv transfer is ready, we cache the blocks
    and the request state will be moved back to WAITING from
    WAITING_FOR_REMOTE_KV.
    """
    assert self.connector is not None
    if request.request_id not in self.finished_recving_kv_req_ids:
        return False

    if request.request_id in self.failed_recving_kv_req_ids:
        # Request had KV load failures; num_computed_tokens was already
        # updated in _update_requests_with_invalid_blocks
        if request.num_computed_tokens:
            # Cache any valid computed tokens.
            self.kv_cache_manager.cache_blocks(request, request.num_computed_tokens)
        else:
            # No valid computed tokens, release allocated blocks.
            # There may be a local cache hit on retry.
            self.kv_cache_manager.free(request)

        self.failed_recving_kv_req_ids.remove(request.request_id)
    else:
        # Now that the blocks are ready, actually cache them.
        (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id)
        num_computed_tokens = len(block_ids) * self.block_size
        # Handle the case where num request tokens less than one block.
        num_computed_tokens = min(num_computed_tokens, request.num_tokens)
        if num_computed_tokens == request.num_tokens:
            num_computed_tokens -= 1
        # This will cache the blocks iff caching is enabled.
        self.kv_cache_manager.cache_blocks(request, num_computed_tokens)

        # Update the request state for scheduling.
        request.num_computed_tokens = num_computed_tokens

    # Return that we are ready.
    self.finished_recving_kv_req_ids.remove(request.request_id)
    return True

add_request

add_request(request: Request) -> None
Source code in vllm/v1/core/sched/scheduler.py
def add_request(self, request: Request) -> None:
    self.waiting.add_request(request)
    self.requests[request.request_id] = request
    if self.log_stats:
        request.record_event(EngineCoreEventType.QUEUED)

finish_requests

finish_requests(
    request_ids: str | Iterable[str],
    finished_status: RequestStatus,
) -> None

Handles the finish signal from outside the scheduler.

For example, the API server can abort a request when the client disconnects.

Source code in vllm/v1/core/sched/scheduler.py
def finish_requests(
    self,
    request_ids: str | Iterable[str],
    finished_status: RequestStatus,
) -> None:
    """Handles the finish signal from outside the scheduler.

    For example, the API server can abort a request when the client
    disconnects.
    """
    assert RequestStatus.is_finished(finished_status)
    if isinstance(request_ids, str):
        request_ids = (request_ids,)
    else:
        request_ids = set(request_ids)

    running_requests_to_remove = set()
    waiting_requests_to_remove = []
    valid_requests = []

    # First pass: collect requests to remove from queues
    for req_id in request_ids:
        request = self.requests.get(req_id)
        if request is None or request.is_finished():
            # Invalid request ID.
            continue

        valid_requests.append(request)
        if request.status == RequestStatus.RUNNING:
            running_requests_to_remove.add(request)
        else:
            waiting_requests_to_remove.append(request)

    # Remove all requests from queues at once for better efficiency
    if running_requests_to_remove:
        self.running = remove_all(self.running, running_requests_to_remove)
    if waiting_requests_to_remove:
        self.waiting.remove_requests(waiting_requests_to_remove)

    # Second pass: set status and free requests
    for request in valid_requests:
        request.status = finished_status
        self._free_request(request)

get_grammar_bitmask

get_grammar_bitmask(
    scheduled_request_ids: Iterable[str],
    scheduled_spec_decode_tokens: dict[str, list[int]],
) -> tuple[list[str], NDArray[int32] | None]
Source code in vllm/v1/core/sched/scheduler.py
def get_grammar_bitmask(
    self,
    scheduled_request_ids: Iterable[str],
    scheduled_spec_decode_tokens: dict[str, list[int]],
) -> tuple[list[str], "npt.NDArray[np.int32] | None"]:
    # Collect list of scheduled request ids that use structured output.
    # The corresponding rows of the bitmask will be in this order.
    # PERF: in case of chunked prefill,
    # request might not include any new tokens.
    # Therefore, we might introduce some additional
    # cycle to fill in the bitmask, which could be a big no-op.
    structured_output_request_ids = [
        req_id
        for req_id in scheduled_request_ids
        if (req := self.requests.get(req_id)) and req.use_structured_output
    ]
    if not structured_output_request_ids:
        return structured_output_request_ids, None

    bitmask = self.structured_output_manager.grammar_bitmask(
        self.requests,
        structured_output_request_ids,
        scheduled_spec_decode_tokens,
    )
    return structured_output_request_ids, bitmask

get_kv_connector

get_kv_connector() -> KVConnectorBase_V1 | None
Source code in vllm/v1/core/sched/scheduler.py
def get_kv_connector(self) -> KVConnectorBase_V1 | None:
    return self.connector

get_num_unfinished_requests

get_num_unfinished_requests() -> int
Source code in vllm/v1/core/sched/scheduler.py
def get_num_unfinished_requests(self) -> int:
    return len(self.waiting) + len(self.running)

get_request_counts

get_request_counts() -> tuple[int, int]

Returns (num_running_reqs, num_waiting_reqs).

Source code in vllm/v1/core/sched/scheduler.py
def get_request_counts(self) -> tuple[int, int]:
    """Returns (num_running_reqs, num_waiting_reqs)."""
    return len(self.running), len(self.waiting)

has_finished_requests

has_finished_requests() -> bool
Source code in vllm/v1/core/sched/scheduler.py
def has_finished_requests(self) -> bool:
    return len(self.finished_req_ids) > 0

make_spec_decoding_stats

make_spec_decoding_stats(
    spec_decoding_stats: SpecDecodingStats | None,
    num_draft_tokens: int,
    num_accepted_tokens: int,
) -> SpecDecodingStats | None
Source code in vllm/v1/core/sched/scheduler.py
def make_spec_decoding_stats(
    self,
    spec_decoding_stats: SpecDecodingStats | None,
    num_draft_tokens: int,
    num_accepted_tokens: int,
) -> SpecDecodingStats | None:
    if not self.log_stats:
        return None
    if spec_decoding_stats is None:
        spec_decoding_stats = SpecDecodingStats.new(self.num_spec_tokens)
    spec_decoding_stats.observe_draft(
        num_draft_tokens=num_draft_tokens, num_accepted_tokens=num_accepted_tokens
    )
    return spec_decoding_stats

make_stats

make_stats(
    spec_decoding_stats: SpecDecodingStats | None = None,
    kv_connector_stats: KVConnectorStats | None = None,
) -> SchedulerStats | None
Source code in vllm/v1/core/sched/scheduler.py
def make_stats(
    self,
    spec_decoding_stats: SpecDecodingStats | None = None,
    kv_connector_stats: KVConnectorStats | None = None,
) -> SchedulerStats | None:
    if not self.log_stats:
        return None
    prefix_cache_stats = self.kv_cache_manager.make_prefix_cache_stats()
    assert prefix_cache_stats is not None
    connector_prefix_cache_stats = self._make_connector_prefix_cache_stats()
    return SchedulerStats(
        num_running_reqs=len(self.running),
        num_waiting_reqs=len(self.waiting),
        kv_cache_usage=self.kv_cache_manager.usage,
        prefix_cache_stats=prefix_cache_stats,
        connector_prefix_cache_stats=connector_prefix_cache_stats,
        spec_decoding_stats=spec_decoding_stats,
        num_corrupted_reqs=sum(req.is_output_corrupted for req in self.running),
        kv_connector_stats=kv_connector_stats.data if kv_connector_stats else None,
    )

reset_prefix_cache

reset_prefix_cache() -> bool
Source code in vllm/v1/core/sched/scheduler.py
def reset_prefix_cache(self) -> bool:
    return self.kv_cache_manager.reset_prefix_cache()

schedule

schedule() -> SchedulerOutput
Source code in vllm/v1/core/sched/scheduler.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def schedule(self) -> SchedulerOutput:
    # NOTE(woosuk) on the scheduling algorithm:
    # There's no "decoding phase" nor "prefill phase" in the scheduler.
    # Each request just has the num_computed_tokens and
    # num_tokens_with_spec. num_tokens_with_spec =
    # len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
    # At each step, the scheduler tries to assign tokens to the requests
    # so that each request's num_computed_tokens can catch up its
    # num_tokens_with_spec. This is general enough to cover
    # chunked prefills, prefix caching, speculative decoding,
    # and the "jump decoding" optimization in the future.

    scheduled_new_reqs: list[Request] = []
    scheduled_resumed_reqs: list[Request] = []
    scheduled_running_reqs: list[Request] = []
    preempted_reqs: list[Request] = []

    req_to_new_blocks: dict[str, KVCacheBlocks] = {}
    num_scheduled_tokens: dict[str, int] = {}
    token_budget = self.max_num_scheduled_tokens
    # Encoder-related.
    scheduled_encoder_inputs: dict[str, list[int]] = {}
    encoder_compute_budget = self.max_num_encoder_input_tokens
    # Spec decode-related.
    scheduled_spec_decode_tokens: dict[str, list[int]] = {}

    # For logging.
    scheduled_timestamp = time.monotonic()

    # First, schedule the RUNNING requests.
    req_index = 0
    while req_index < len(self.running) and token_budget > 0:
        request = self.running[req_index]

        num_new_tokens = (
            request.num_tokens_with_spec
            + request.num_output_placeholders
            - request.num_computed_tokens
        )
        if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
            num_new_tokens = self.scheduler_config.long_prefill_token_threshold
        num_new_tokens = min(num_new_tokens, token_budget)

        # Make sure the input position does not exceed the max model len.
        # This is necessary when using spec decoding.
        num_new_tokens = min(
            num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
        )

        # Schedule encoder inputs.
        encoder_inputs_to_schedule = None
        new_encoder_compute_budget = encoder_compute_budget
        if request.has_encoder_inputs:
            (
                encoder_inputs_to_schedule,
                num_new_tokens,
                new_encoder_compute_budget,
            ) = self._try_schedule_encoder_inputs(
                request,
                request.num_computed_tokens,
                num_new_tokens,
                encoder_compute_budget,
            )

        if num_new_tokens == 0:
            # The request cannot be scheduled because one of the following
            # reasons:
            # 1. No new tokens to schedule. This may happen when
            #    (1) PP>1 and we have already scheduled all prompt tokens
            #    but they are not finished yet.
            #    (2) Async scheduling and the request has reached to either
            #    its max_total_tokens or max_model_len.
            # 2. The encoder budget is exhausted.
            # 3. The encoder cache is exhausted.
            # NOTE(woosuk): Here, by doing `continue` instead of `break`,
            # we do not strictly follow the FCFS scheduling policy and
            # allow the lower-priority requests to be scheduled.
            req_index += 1
            continue

        # Schedule newly needed KV blocks for the request.
        while True:
            new_blocks = self.kv_cache_manager.allocate_slots(
                request,
                num_new_tokens,
                num_lookahead_tokens=self.num_lookahead_tokens,
            )

            if new_blocks is not None:
                # The request can be scheduled.
                break

            # The request cannot be scheduled.
            # Preempt the lowest-priority request.
            if self.policy == SchedulingPolicy.PRIORITY:
                preempted_req = max(
                    self.running,
                    key=lambda r: (r.priority, r.arrival_time),
                )
                self.running.remove(preempted_req)
                if preempted_req in scheduled_running_reqs:
                    scheduled_running_reqs.remove(preempted_req)
                    token_budget += num_scheduled_tokens[preempted_req.request_id]
                    req_to_new_blocks.pop(preempted_req.request_id)
                    num_scheduled_tokens.pop(preempted_req.request_id)
                    req_index -= 1
            else:
                preempted_req = self.running.pop()

            self.kv_cache_manager.free(preempted_req)
            self.encoder_cache_manager.free(preempted_req)
            preempted_req.status = RequestStatus.PREEMPTED
            preempted_req.num_computed_tokens = 0
            preempted_req.num_preemptions += 1
            if self.log_stats:
                preempted_req.record_event(
                    EngineCoreEventType.PREEMPTED, scheduled_timestamp
                )

            self.waiting.prepend_request(preempted_req)
            preempted_reqs.append(preempted_req)
            if preempted_req == request:
                # No more request to preempt. Cannot schedule this request.
                break

        if new_blocks is None:
            # Cannot schedule this request.
            break

        # Schedule the request.
        scheduled_running_reqs.append(request)
        req_to_new_blocks[request.request_id] = new_blocks
        num_scheduled_tokens[request.request_id] = num_new_tokens
        token_budget -= num_new_tokens
        req_index += 1

        # Speculative decode related.
        if request.spec_token_ids:
            num_scheduled_spec_tokens = (
                num_new_tokens + request.num_computed_tokens - request.num_tokens
            )
            if num_scheduled_spec_tokens > 0:
                # Trim spec_token_ids list to num_scheduled_spec_tokens.
                del request.spec_token_ids[num_scheduled_spec_tokens:]
                scheduled_spec_decode_tokens[request.request_id] = (
                    request.spec_token_ids
                )

        # Encoder-related.
        if encoder_inputs_to_schedule:
            scheduled_encoder_inputs[request.request_id] = (
                encoder_inputs_to_schedule
            )
            # Allocate the encoder cache.
            for i in encoder_inputs_to_schedule:
                self.encoder_cache_manager.allocate(request, i)
            encoder_compute_budget = new_encoder_compute_budget

    # Record the LoRAs in scheduled_running_reqs
    scheduled_loras: set[int] = set()
    if self.lora_config:
        scheduled_loras = set(
            req.lora_request.lora_int_id
            for req in scheduled_running_reqs
            if req.lora_request and req.lora_request.lora_int_id > 0
        )
        assert len(scheduled_loras) <= self.lora_config.max_loras

    # Use a temporary RequestQueue to collect requests that need to be
    # skipped and put back at the head of the waiting queue later
    skipped_waiting_requests = create_request_queue(self.policy)

    # Next, schedule the WAITING requests.
    if not preempted_reqs:
        while self.waiting and token_budget > 0:
            if len(self.running) == self.max_num_running_reqs:
                break

            request = self.waiting.peek_request()

            # KVTransfer: skip request if still waiting for remote kvs.
            if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
                is_ready = self._update_waiting_for_remote_kv(request)
                if is_ready:
                    request.status = RequestStatus.WAITING
                else:
                    logger.debug(
                        "%s is still in WAITING_FOR_REMOTE_KVS state.",
                        request.request_id,
                    )
                    self.waiting.pop_request()
                    skipped_waiting_requests.prepend_request(request)
                    continue

            # Skip request if the structured output request is still waiting
            # for FSM compilation.
            if request.status == RequestStatus.WAITING_FOR_FSM:
                structured_output_req = request.structured_output_request
                if structured_output_req and structured_output_req.grammar:
                    request.status = RequestStatus.WAITING
                else:
                    self.waiting.pop_request()
                    skipped_waiting_requests.prepend_request(request)
                    continue

            # Check that adding the request still respects the max_loras
            # constraint.
            if (
                self.lora_config
                and request.lora_request
                and (
                    len(scheduled_loras) == self.lora_config.max_loras
                    and request.lora_request.lora_int_id not in scheduled_loras
                )
            ):
                # Scheduling would exceed max_loras, skip.
                self.waiting.pop_request()
                skipped_waiting_requests.prepend_request(request)
                continue

            num_external_computed_tokens = 0
            load_kv_async = False

            # Get already-cached tokens.
            if request.num_computed_tokens == 0:
                # Get locally-cached tokens.
                new_computed_blocks, num_new_local_computed_tokens = (
                    self.kv_cache_manager.get_computed_blocks(request)
                )

                # Get externally-cached tokens if using a KVConnector.
                if self.connector is not None:
                    num_external_computed_tokens, load_kv_async = (
                        self.connector.get_num_new_matched_tokens(
                            request, num_new_local_computed_tokens
                        )
                    )

                    if num_external_computed_tokens is None:
                        # The request cannot be scheduled because
                        # the KVConnector couldn't determine
                        # the number of matched tokens.
                        self.waiting.pop_request()
                        skipped_waiting_requests.prepend_request(request)
                        continue

                # Total computed tokens (local + external).
                num_computed_tokens = (
                    num_new_local_computed_tokens + num_external_computed_tokens
                )
            # KVTransfer: WAITING reqs have num_computed_tokens > 0
            # after async KV recvs are completed.
            else:
                new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
                num_new_local_computed_tokens = 0
                num_computed_tokens = request.num_computed_tokens

            encoder_inputs_to_schedule = None
            new_encoder_compute_budget = encoder_compute_budget

            # KVTransfer: loading remote KV, do not allocate for new work.
            if load_kv_async:
                assert num_external_computed_tokens > 0
                num_new_tokens = 0
            # Number of tokens to be scheduled.
            else:
                # We use `request.num_tokens` instead of
                # `request.num_prompt_tokens` to consider the resumed
                # requests, which have output tokens.
                num_new_tokens = request.num_tokens - num_computed_tokens
                if (
                    0
                    < self.scheduler_config.long_prefill_token_threshold
                    < num_new_tokens
                ):
                    num_new_tokens = (
                        self.scheduler_config.long_prefill_token_threshold
                    )

                # chunked prefill has to be enabled explicitly to allow
                # pooling requests to be chunked
                if (
                    not self.scheduler_config.chunked_prefill_enabled
                    and num_new_tokens > token_budget
                ):
                    self.waiting.pop_request()
                    skipped_waiting_requests.prepend_request(request)
                    continue

                num_new_tokens = min(num_new_tokens, token_budget)
                assert num_new_tokens > 0

                # Schedule encoder inputs.
                if request.has_encoder_inputs:
                    (
                        encoder_inputs_to_schedule,
                        num_new_tokens,
                        new_encoder_compute_budget,
                    ) = self._try_schedule_encoder_inputs(
                        request,
                        num_computed_tokens,
                        num_new_tokens,
                        encoder_compute_budget,
                    )
                    if num_new_tokens == 0:
                        # The request cannot be scheduled.
                        break

            # Handles an edge case when P/D Disaggregation
            # is used with Spec Decoding where an
            # extra block gets allocated which
            # creates a mismatch between the number
            # of local and remote blocks.
            effective_lookahead_tokens = (
                0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
            )

            # Determine if we need to allocate cross-attention blocks.
            if self.is_encoder_decoder and request.has_encoder_inputs:
                # TODO(russellb): For Whisper, we know that the input is
                # always padded to the maximum length. If we support other
                # encoder-decoder models, this will need to be updated if we
                # want to only allocate what is needed.
                num_encoder_tokens = (
                    self.scheduler_config.max_num_encoder_input_tokens
                )
            else:
                num_encoder_tokens = 0

            new_blocks = self.kv_cache_manager.allocate_slots(
                request,
                num_new_tokens + num_external_computed_tokens,
                num_new_local_computed_tokens,
                new_computed_blocks,
                num_lookahead_tokens=effective_lookahead_tokens,
                delay_cache_blocks=load_kv_async,
                num_encoder_tokens=num_encoder_tokens,
            )

            if new_blocks is None:
                # The request cannot be scheduled.
                break

            # KVTransfer: the connector uses this info to determine
            # if a load is needed. Note that
            # This information is used to determine if a load is
            # needed for this request.
            if self.connector is not None:
                self.connector.update_state_after_alloc(
                    request,
                    new_computed_blocks + new_blocks,
                    num_external_computed_tokens,
                )
                self._update_connector_prefix_cache_stats(
                    request, num_external_computed_tokens
                )

            # Request was already popped from self.waiting
            # unless it was re-added above due to new_blocks being None.
            request = self.waiting.pop_request()
            if load_kv_async:
                # If loading async, allocate memory and put request
                # into the WAITING_FOR_REMOTE_KV state.
                skipped_waiting_requests.prepend_request(request)
                request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
                continue

            req_index += 1
            self.running.append(request)
            if self.log_stats:
                request.record_event(
                    EngineCoreEventType.SCHEDULED, scheduled_timestamp
                )
            if request.status == RequestStatus.WAITING:
                scheduled_new_reqs.append(request)
            elif request.status == RequestStatus.PREEMPTED:
                scheduled_resumed_reqs.append(request)
            else:
                raise RuntimeError(f"Invalid request status: {request.status}")

            if self.lora_config and request.lora_request:
                scheduled_loras.add(request.lora_request.lora_int_id)
            req_to_new_blocks[request.request_id] = (
                self.kv_cache_manager.get_blocks(request.request_id)
            )
            num_scheduled_tokens[request.request_id] = num_new_tokens
            token_budget -= num_new_tokens
            request.status = RequestStatus.RUNNING
            request.num_computed_tokens = num_computed_tokens
            # Count the number of prefix cached tokens.
            if request.num_cached_tokens < 0:
                request.num_cached_tokens = num_computed_tokens
            # Encoder-related.
            if encoder_inputs_to_schedule:
                scheduled_encoder_inputs[request.request_id] = (
                    encoder_inputs_to_schedule
                )
                # Allocate the encoder cache.
                for i in encoder_inputs_to_schedule:
                    self.encoder_cache_manager.allocate(request, i)
                encoder_compute_budget = new_encoder_compute_budget

    # Put back any skipped requests at the head of the waiting queue
    if skipped_waiting_requests:
        self.waiting.prepend_requests(skipped_waiting_requests)

    # Check if the scheduling constraints are satisfied.
    total_num_scheduled_tokens = sum(num_scheduled_tokens.values())
    assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens
    assert token_budget >= 0
    assert len(self.running) <= self.max_num_running_reqs
    # Since some requests in the RUNNING queue may not be scheduled in
    # this step, the total number of scheduled requests can be smaller than
    # len(self.running).
    assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len(
        scheduled_running_reqs
    ) <= len(self.running)

    # Get the longest common prefix among all requests in the running queue.
    # This can be potentially used for cascade attention.
    num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups)
    if self.running:
        any_request = self.running[0]
        num_common_prefix_blocks = (
            self.kv_cache_manager.get_num_common_prefix_blocks(
                any_request.request_id
            )
        )

    # Construct the scheduler output.
    new_reqs_data = [
        NewRequestData.from_request(
            req, req_to_new_blocks[req.request_id].get_block_ids()
        )
        for req in scheduled_new_reqs
    ]
    cached_reqs_data = self._make_cached_request_data(
        scheduled_running_reqs,
        scheduled_resumed_reqs,
        num_scheduled_tokens,
        scheduled_spec_decode_tokens,
        req_to_new_blocks,
    )
    structured_output_request_ids, grammar_bitmask = self.get_grammar_bitmask(
        num_scheduled_tokens.keys(), scheduled_spec_decode_tokens
    )
    scheduler_output = SchedulerOutput(
        scheduled_new_reqs=new_reqs_data,
        scheduled_cached_reqs=cached_reqs_data,
        num_scheduled_tokens=num_scheduled_tokens,
        total_num_scheduled_tokens=total_num_scheduled_tokens,
        scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
        scheduled_encoder_inputs=scheduled_encoder_inputs,
        num_common_prefix_blocks=num_common_prefix_blocks,
        # finished_req_ids is an existing state in the scheduler,
        # instead of being newly scheduled in this step.
        # It contains the request IDs that are finished in between
        # the previous and the current steps.
        finished_req_ids=self.finished_req_ids,
        free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
        structured_output_request_ids=structured_output_request_ids,
        grammar_bitmask=grammar_bitmask,
    )

    # NOTE(Kuntai): this function is designed for multiple purposes:
    # 1. Plan the KV cache store
    # 2. Wrap up all the KV cache load / save ops into an opaque object
    # 3. Clear the internal states of the connector
    if self.connector is not None:
        meta = self.connector.build_connector_meta(scheduler_output)
        scheduler_output.kv_connector_metadata = meta

    # collect KV cache events from KV cache manager
    events = self.kv_cache_manager.take_events()

    # collect KV cache events from connector
    if self.connector is not None:
        connector_events = self.connector.take_events()
        if connector_events:
            if events is None:
                events = list(connector_events)
            else:
                events.extend(connector_events)

    # publish collected KV cache events
    if events:
        assert self.kv_event_publisher is not None
        batch = KVEventBatch(ts=time.time(), events=events)
        self.kv_event_publisher.publish(batch)

    self._update_after_schedule(scheduler_output)
    return scheduler_output

shutdown

shutdown() -> None
Source code in vllm/v1/core/sched/scheduler.py
def shutdown(self) -> None:
    if self.kv_event_publisher:
        self.kv_event_publisher.shutdown()
    if self.connector is not None:
        self.connector.shutdown()

update_draft_token_ids

update_draft_token_ids(
    draft_token_ids: DraftTokenIds,
) -> None
Source code in vllm/v1/core/sched/scheduler.py
def update_draft_token_ids(
    self,
    draft_token_ids: DraftTokenIds,
) -> None:
    for req_id, spec_token_ids in zip(
        draft_token_ids.req_ids,
        draft_token_ids.draft_token_ids,
    ):
        request = self.requests.get(req_id)
        if request is None or request.is_finished():
            # The request may have been finished. Skip.
            continue

        # Add newly generated spec token ids to the request.
        if not spec_token_ids:
            # NOTE(woosuk): request.spec_token_ids should be updated.
            request.spec_token_ids.clear()
        elif self.structured_output_manager.should_advance(request):
            metadata = request.structured_output_request
            request.spec_token_ids = metadata.grammar.validate_tokens(  # type: ignore[union-attr]
                spec_token_ids
            )
        else:
            request.spec_token_ids = spec_token_ids

update_from_output

update_from_output(
    scheduler_output: SchedulerOutput,
    model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]
Source code in vllm/v1/core/sched/scheduler.py
def update_from_output(
    self,
    scheduler_output: SchedulerOutput,
    model_runner_output: ModelRunnerOutput,
) -> dict[int, EngineCoreOutputs]:
    sampled_token_ids = model_runner_output.sampled_token_ids
    logprobs = model_runner_output.logprobs
    prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict
    num_scheduled_tokens = scheduler_output.num_scheduled_tokens
    pooler_outputs = model_runner_output.pooler_output
    num_nans_in_logits = model_runner_output.num_nans_in_logits
    kv_connector_output = model_runner_output.kv_connector_output

    outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list)
    spec_decoding_stats: SpecDecodingStats | None = None
    kv_connector_stats = (
        kv_connector_output.kv_connector_stats if kv_connector_output else None
    )
    if kv_connector_stats and self.connector:
        stats = self.connector.get_kv_connector_stats()
        if stats:
            kv_connector_stats = kv_connector_stats.aggregate(stats)

    failed_kv_load_req_ids = None
    if kv_connector_output and kv_connector_output.invalid_block_ids:
        # These blocks contain externally computed tokens that failed to
        # load. Identify affected requests and adjust their computed token
        # count to trigger recomputation of the invalid blocks.
        failed_kv_load_req_ids = self._handle_invalid_blocks(
            kv_connector_output.invalid_block_ids
        )

    # NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more,
    # the below loop can be a performance bottleneck. We should do our best
    # to avoid expensive operations inside the loop.
    stopped_running_reqs: set[Request] = set()
    stopped_preempted_reqs: set[Request] = set()
    for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
        assert num_tokens_scheduled > 0
        if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids:
            # Skip requests that were recovered from KV load failure
            continue
        request = self.requests.get(req_id)
        if request is None:
            # The request is already finished. This can happen if the
            # request is aborted while the model is executing it (e.g.,
            # in pipeline parallelism).
            continue

        req_index = model_runner_output.req_id_to_index[req_id]
        generated_token_ids = (
            sampled_token_ids[req_index] if sampled_token_ids else []
        )

        scheduled_spec_token_ids = (
            scheduler_output.scheduled_spec_decode_tokens.get(req_id)
        )
        if scheduled_spec_token_ids:
            num_draft_tokens = len(scheduled_spec_token_ids)
            num_accepted = len(generated_token_ids) - 1
            num_rejected = num_draft_tokens - num_accepted
            # num_computed_tokens represents the number of tokens
            # processed in the current step, considering scheduled
            # tokens and rejections. If some tokens are rejected,
            # num_computed_tokens is decreased by the number of rejected
            # tokens.
            request.num_computed_tokens -= num_rejected
            spec_decoding_stats = self.make_spec_decoding_stats(
                spec_decoding_stats,
                num_draft_tokens=num_draft_tokens,
                num_accepted_tokens=num_accepted,
            )

        stopped = False
        new_logprobs = None
        new_token_ids = generated_token_ids
        kv_transfer_params = None
        status_before_stop = request.status

        # Check for stop and update request status.
        if new_token_ids:
            new_token_ids, stopped = self._update_request_with_output(
                request, new_token_ids
            )

        # Stop checking for pooler models.
        pooler_output = None
        if pooler_outputs:
            pooler_output = pooler_outputs[req_index]
            stopped = check_stop(request, self.max_model_len, pooler_output)

        if stopped:
            kv_transfer_params = self._free_request(request)
            if status_before_stop == RequestStatus.RUNNING:
                stopped_running_reqs.add(request)
            else:
                stopped_preempted_reqs.add(request)

        # Extract sample logprobs if needed.
        if (
            request.sampling_params is not None
            and request.sampling_params.logprobs is not None
            and logprobs
        ):
            # NOTE: once we support N tokens per step (spec decode),
            # the outer lists can be of length > 1.
            new_logprobs = logprobs.slice(req_index, req_index + 1)

        if new_token_ids and self.structured_output_manager.should_advance(request):
            struct_output_request = request.structured_output_request
            assert struct_output_request is not None
            assert struct_output_request.grammar is not None
            struct_output_request.grammar.accept_tokens(req_id, new_token_ids)

        if num_nans_in_logits is not None and req_id in num_nans_in_logits:
            request.num_nans_in_logits = num_nans_in_logits[req_id]

        # Get prompt logprobs for this request.
        prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
        if new_token_ids or pooler_output is not None or kv_transfer_params:
            # Add EngineCoreOutput for this Request.
            outputs[request.client_index].append(
                EngineCoreOutput(
                    request_id=req_id,
                    new_token_ids=new_token_ids,
                    finish_reason=request.get_finished_reason(),
                    new_logprobs=new_logprobs,
                    new_prompt_logprobs_tensors=prompt_logprobs_tensors,
                    pooling_output=pooler_output,
                    stop_reason=request.stop_reason,
                    events=request.take_events(),
                    kv_transfer_params=kv_transfer_params,
                    trace_headers=request.trace_headers,
                    num_cached_tokens=request.num_cached_tokens,
                )
            )
        else:
            # Invariant: EngineCore returns no partial prefill outputs.
            assert not prompt_logprobs_tensors

    # Remove the stopped requests from the running and waiting queues.
    if stopped_running_reqs:
        self.running = remove_all(self.running, stopped_running_reqs)
    if stopped_preempted_reqs:
        # This is a rare case and unlikely to impact performance.
        self.waiting.remove_requests(stopped_preempted_reqs)

    # KV Connector: update state for finished KV Transfers.
    if kv_connector_output:
        self._update_from_kv_xfer_finished(kv_connector_output)

    # Create EngineCoreOutputs for all clients that have requests with
    # outputs in this step.
    engine_core_outputs = {
        client_index: EngineCoreOutputs(outputs=outs)
        for client_index, outs in outputs.items()
    }

    finished_req_ids = self.finished_req_ids_dict
    if finished_req_ids:
        # Include ids of requests that finished since last outputs
        # were sent.
        for client_index, finished_set in finished_req_ids.items():
            # Set finished request set in EngineCoreOutputs for this client.
            if (eco := engine_core_outputs.get(client_index)) is not None:
                eco.finished_requests = finished_set
            else:
                engine_core_outputs[client_index] = EngineCoreOutputs(
                    finished_requests=finished_set
                )
        finished_req_ids.clear()

    if (
        stats := self.make_stats(spec_decoding_stats, kv_connector_stats)
    ) is not None:
        # Return stats to only one of the front-ends.
        if (eco := next(iter(engine_core_outputs.values()), None)) is None:
            # We must return the stats even if there are no request
            # outputs this step.
            engine_core_outputs[0] = eco = EngineCoreOutputs()
        eco.scheduler_stats = stats

    return engine_core_outputs