From c78fde86a0a8e7bceab7a7284cd7664849ef5208 Mon Sep 17 00:00:00 2001 From: Austin Ho Date: Thu, 4 Apr 2024 00:03:28 +0000 Subject: [PATCH] #5480: Add support for read/write sharded. Add unit tests. TODO: Add stress tests --- ...queueWriteBuffer_and_EnqueueReadBuffer.cpp | 329 ++++++++------- tt_metal/impl/dispatch/command_queue.cpp | 380 +++++++++++++----- .../impl/dispatch/kernels/cq_prefetch.cpp | 4 +- .../impl/dispatch/kernels/cq_prefetch.hpp | 4 +- 4 files changed, 472 insertions(+), 245 deletions(-) diff --git a/tests/tt_metal/tt_metal/unit_tests_fast_dispatch/command_queue/test_EnqueueWriteBuffer_and_EnqueueReadBuffer.cpp b/tests/tt_metal/tt_metal/unit_tests_fast_dispatch/command_queue/test_EnqueueWriteBuffer_and_EnqueueReadBuffer.cpp index fdd17ae70f4..d4078c58352 100644 --- a/tests/tt_metal/tt_metal/unit_tests_fast_dispatch/command_queue/test_EnqueueWriteBuffer_and_EnqueueReadBuffer.cpp +++ b/tests/tt_metal/tt_metal/unit_tests_fast_dispatch/command_queue/test_EnqueueWriteBuffer_and_EnqueueReadBuffer.cpp @@ -7,9 +7,9 @@ #include "command_queue_fixture.hpp" #include "command_queue_test_utils.hpp" #include "gtest/gtest.h" +#include "tt_metal/detail/tt_metal.hpp" #include "tt_metal/host_api.hpp" #include "tt_metal/test_utils/env_vars.hpp" -#include "tt_metal/detail/tt_metal.hpp" #include "tt_metal/test_utils/print_helpers.hpp" using namespace tt::tt_metal; @@ -27,66 +27,54 @@ struct BufferStressTestConfig { uint32_t num_unique_vectors; }; -class BufferStressTestConfigSharded{ - public: - uint32_t seed; - uint32_t num_iterations = 100; - - const std::array max_num_pages_per_core; - const std::array max_num_cores; - - std::array num_pages_per_core; - std::array num_cores; - std::array page_shape = {32,32}; - uint32_t element_size = 1; - TensorMemoryLayout mem_config = TensorMemoryLayout::HEIGHT_SHARDED; - ShardOrientation shard_orientation = ShardOrientation::ROW_MAJOR; - bool halo = false; - - BufferStressTestConfigSharded(std::array pages_per_core, - std::array cores): - max_num_pages_per_core(pages_per_core), max_num_cores(cores) - { - this->num_pages_per_core = pages_per_core; - this->num_cores = cores; - } +class BufferStressTestConfigSharded { + public: + uint32_t seed; + uint32_t num_iterations = 100; - std::array tensor2d_shape(){ - return {num_pages_per_core[0]*num_cores[0], - num_pages_per_core[1]*num_cores[1]}; - } + const std::array max_num_pages_per_core; + const std::array max_num_cores; - uint32_t num_pages(){ - return tensor2d_shape()[0] * tensor2d_shape()[1]; - } + std::array num_pages_per_core; + std::array num_cores; + std::array page_shape = {32, 32}; + uint32_t element_size = 1; + TensorMemoryLayout mem_config = TensorMemoryLayout::HEIGHT_SHARDED; + ShardOrientation shard_orientation = ShardOrientation::ROW_MAJOR; + bool halo = false; - std::array shard_shape(){ - return {num_pages_per_core[0] * page_shape[0], num_pages_per_core[1] * page_shape[1]}; - } + BufferStressTestConfigSharded(std::array pages_per_core, std::array cores) : + max_num_pages_per_core(pages_per_core), max_num_cores(cores) { + this->num_pages_per_core = pages_per_core; + this->num_cores = cores; + } - CoreRangeSet shard_grid(){ - return CoreRangeSet(std::set( - { - CoreRange(CoreCoord(0, 0), - CoreCoord(this->num_cores[0] -1, this->num_cores[1] - 1)) - })); + std::array tensor2d_shape() { + return {num_pages_per_core[0] * num_cores[0], num_pages_per_core[1] * num_cores[1]}; + } - } + uint32_t num_pages() { return tensor2d_shape()[0] * tensor2d_shape()[1]; } - ShardSpecBuffer shard_parameters(){ - return ShardSpecBuffer( - this->shard_grid(), - this->shard_shape(), - this->shard_orientation, - this->halo, - this->page_shape, - this->tensor2d_shape() - ); - } + std::array shard_shape() { + return {num_pages_per_core[0] * page_shape[0], num_pages_per_core[1] * page_shape[1]}; + } - uint32_t page_size(){ - return page_shape[0] * page_shape[1] * element_size; - } + CoreRangeSet shard_grid() { + return CoreRangeSet(std::set( + {CoreRange(CoreCoord(0, 0), CoreCoord(this->num_cores[0] - 1, this->num_cores[1] - 1))})); + } + + ShardSpecBuffer shard_parameters() { + return ShardSpecBuffer( + this->shard_grid(), + this->shard_shape(), + this->shard_orientation, + this->halo, + this->page_shape, + this->tensor2d_shape()); + } + + uint32_t page_size() { return page_shape[0] * page_shape[1] * element_size; } }; namespace local_test_functions { @@ -102,23 +90,26 @@ vector generate_arange_vector(uint32_t size_bytes) { } template -void test_EnqueueWriteBuffer_and_EnqueueReadBuffer(Device* device, CommandQueue& cq, const TestBufferConfig& config) { - +void test_EnqueueWriteBuffer_and_EnqueueReadBuffer(Device *device, CommandQueue &cq, const TestBufferConfig &config) { // Clear out command queue uint16_t channel = tt::Cluster::instance().get_assigned_channel_for_device(device->id()); chip_id_t mmio_device_id = tt::Cluster::instance().get_associated_mmio_device(device->id()); uint32_t cq_size = tt::Cluster::instance().get_host_channel_size(device->id(), channel) / device->num_hw_cqs(); - std::vector cq_zeros((cq_size - CQ_START)/sizeof(uint32_t), 0); + std::vector cq_zeros((cq_size - CQ_START) / sizeof(uint32_t), 0); tt::Cluster::instance().write_sysmem(cq_zeros.data(), (cq_size - CQ_START), CQ_START, mmio_device_id, channel); - for (const bool cq_write: {true, false}) { - for (const bool cq_read: {true, false}) { + for (const bool cq_write : {true, false}) { + for (const bool cq_read : {true, false}) { if constexpr (cq_dispatch_only) { - if (not (cq_write and cq_read)) { continue; } + if (not(cq_write and cq_read)) { + continue; + } + } + if (not cq_write and not cq_read) { + continue; } - if (not cq_write and not cq_read) { continue; } size_t buf_size = config.num_pages * config.page_size; Buffer bufa(device, buf_size, config.page_size, config.buftype); @@ -155,7 +146,7 @@ void test_EnqueueWriteBuffer_and_EnqueueReadBuffer(Device* device, CommandQueue& template bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( - Device* device, CommandQueue& cq, const BufferStressTestConfig& config) { + Device *device, CommandQueue &cq, const BufferStressTestConfig &config) { srand(config.seed); bool pass = true; uint32_t num_pages_left = config.num_pages_total; @@ -179,15 +170,14 @@ bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( buftype = BufferType::L1; } - unique_ptr buf; try { buf = std::make_unique(device, buf_size, config.page_size, buftype); } catch (...) { Finish(cq); size_t i = 0; - for (const auto& dst: dsts) { - EXPECT_EQ(srcs[i++], dst); + for (const auto &dst : dsts) { + EXPECT_EQ(srcs[i++], dst); } srcs.clear(); dsts.clear(); @@ -198,11 +188,11 @@ bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( vector dst; if constexpr (blocking) { EnqueueReadBuffer(cq, *buf, dst, true); - EXPECT_EQ(src, dst); + EXPECT_EQ(src, dst); } else { srcs.push_back(std::move(src)); dsts.push_back(dst); - buffers.push_back(std::move(buf)); // Ensures that buffer not destroyed when moved out of scope + buffers.push_back(std::move(buf)); // Ensures that buffer not destroyed when moved out of scope EnqueueReadBuffer(cq, *buffers[buffers.size() - 1], dsts[dsts.size() - 1], false); } } @@ -210,51 +200,67 @@ bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( if constexpr (not blocking) { Finish(cq); size_t i = 0; - for (const auto& dst: dsts) { - EXPECT_EQ(srcs[i++], dst); + for (const auto &dst : dsts) { + EXPECT_EQ(srcs[i++], dst); } } return pass; } -bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_sharded( - Device* device, CommandQueue& cq, BufferStressTestConfigSharded config) { +void stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_sharded( + Device *device, CommandQueue &cq, BufferStressTestConfigSharded config) { srand(config.seed); - bool pass = true; - // first keep num_pages_per_core consistent and increase num_cores - for(uint32_t iteration_id = 0; iteration_id < config.num_iterations; iteration_id++){ - uint32_t num_cores_outer = rand() % (config.max_num_cores[1]) + 1; + for (const bool cq_write : {true, false}) { + for (const bool cq_read : {true, false}) { + if (not cq_write and not cq_read) { + continue; + } + // first keep num_pages_per_core consistent and increase num_cores + for (uint32_t iteration_id = 0; iteration_id < config.num_iterations; iteration_id++) { + auto shard_spec = config.shard_parameters(); - config.num_cores[1] = num_cores_outer; - auto shard_spec = config.shard_parameters(); + // explore a tensor_shape , keeping inner pages constant + uint32_t num_pages = config.num_pages(); - // explore a tensor_shape , keeping inner pages constant - uint32_t num_pages = config.num_pages(); + uint32_t buf_size = num_pages * config.page_size(); + vector src(buf_size / sizeof(uint32_t), 0); - uint32_t buf_size = num_pages * config.page_size(); - vector src(buf_size / sizeof(uint32_t), 0); + uint32_t page_size = config.page_size(); + for (uint32_t i = 0; i < src.size(); i++) { + src.at(i) = i; + } + BufferType buftype = BufferType::L1; - uint32_t page_size = config.page_size(); - for (uint32_t i = 0; i < src.size(); i++) { - src.at(i) = i; - } + Buffer buf(device, buf_size, config.page_size(), buftype, config.mem_config, shard_spec); + vector src2 = src; + if (cq_write) { + EnqueueWriteBuffer(cq, buf, src2.data(), false); + } else { + ::detail::WriteToBuffer(buf, src); + tt::Cluster::instance().l1_barrier(device->id()); + } - BufferType buftype = BufferType::L1; + if (cq_write and not cq_read) { + Finish(cq); + } - Buffer buf(device, buf_size, config.page_size(), buftype, config.mem_config, shard_spec); - EnqueueWriteBuffer(cq, buf, src, false); + vector res; + res.resize(buf_size / sizeof(uint32_t)); - vector res; - EnqueueReadBuffer(cq, buf, res, true); - pass &= src == res; + if (cq_read) { + EnqueueReadBuffer(cq, buf, res.data(), true); + } else { + ::detail::ReadFromBuffer(buf, res); + } + EXPECT_EQ(src, res); + } + } } - return pass; } - -void test_EnqueueWrap_on_EnqueueReadBuffer(Device* device, CommandQueue& cq, const TestBufferConfig& config) { +void test_EnqueueWrap_on_EnqueueReadBuffer(Device *device, CommandQueue &cq, const TestBufferConfig &config) { auto [buffer, src] = EnqueueWriteBuffer_prior_to_wrap(device, cq, config); vector dst; EnqueueReadBuffer(cq, buffer, dst, true); @@ -263,8 +269,7 @@ void test_EnqueueWrap_on_EnqueueReadBuffer(Device* device, CommandQueue& cq, con } bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_wrap( - Device* device, CommandQueue& cq, const BufferStressTestConfig& config) { - + Device *device, CommandQueue &cq, const BufferStressTestConfig &config) { srand(config.seed); vector> unique_vectors; @@ -275,27 +280,24 @@ bool stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_wrap( buf_size, 100, std::chrono::system_clock::now().time_since_epoch().count())); } - vector< std::shared_ptr > bufs; + vector> bufs; uint32_t start = 0; - for (uint32_t i = 0; i < config.num_iterations; i++) { size_t buf_size = unique_vectors[i % unique_vectors.size()].size() * sizeof(uint32_t); tt::tt_metal::InterleavedBufferConfig dram_config{ - .device= device, - .size = buf_size, - .page_size = config.page_size, - .buffer_type = tt::tt_metal::BufferType::DRAM - }; + .device = device, + .size = buf_size, + .page_size = config.page_size, + .buffer_type = tt::tt_metal::BufferType::DRAM}; try { bufs.push_back(CreateBuffer(dram_config)); - } catch (const std::exception& e) { + } catch (const std::exception &e) { tt::log_info("Deallocating on iteration {}", i); bufs.clear(); start = i; bufs = {CreateBuffer(dram_config)}; } - EnqueueWriteBuffer(cq, bufs[bufs.size() - 1], unique_vectors[i % unique_vectors.size()], false); } @@ -327,9 +329,7 @@ TEST_F(CommandQueueSingleCardFixture, WriteOneTileToDramBank0) { TEST_F(CommandQueueSingleCardFixture, WriteOneTileToAllDramBanks) { for (Device *device : devices_) { TestBufferConfig config = { - .num_pages = uint32_t(device->num_banks(BufferType::DRAM)), - .page_size = 2048, - .buftype = BufferType::DRAM}; + .num_pages = uint32_t(device->num_banks(BufferType::DRAM)), .page_size = 2048, .buftype = BufferType::DRAM}; local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config); } @@ -348,10 +348,7 @@ TEST_F(CommandQueueSingleCardFixture, WriteOneTileAcrossAllDramBanksTwiceRoundRo TEST_F(CommandQueueSingleCardFixture, Sending131072Pages) { for (Device *device : devices_) { - TestBufferConfig config = { - .num_pages = 131072, - .page_size = 128, - .buftype = BufferType::DRAM}; + TestBufferConfig config = {.num_pages = 131072, .page_size = 128, .buftype = BufferType::DRAM}; local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config); } @@ -382,17 +379,17 @@ TEST_F(CommandQueueSingleCardFixture, TestNon32BAlignedPageSizeForDram2) { TEST_F(CommandQueueFixture, TestPageSizeTooLarge) { if (this->arch_ == tt::ARCH::WORMHOLE_B0) { - GTEST_SKIP(); // This test hanging on wormhole b0 + GTEST_SKIP(); // This test hanging on wormhole b0 } // Should throw a host error due to the page size not fitting in the consumer CB TestBufferConfig config = {.num_pages = 1024, .page_size = 250880 * 2, .buftype = BufferType::DRAM}; - EXPECT_ANY_THROW((local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(this->device_, this->device_->command_queue(), config))); + EXPECT_ANY_THROW((local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer( + this->device_, this->device_->command_queue(), config))); } // Requires enqueue write buffer TEST_F(CommandQueueSingleCardFixture, TestWrapHostHugepageOnEnqueueReadBuffer) { - GTEST_SKIP() << "Re-enable with Austin's fixes for wrapping issue queue if no space availables"; for (Device *device : this->devices_) { uint32_t page_size = 2048; uint16_t channel = tt::Cluster::instance().get_assigned_channel_for_device(device->id()); @@ -420,14 +417,15 @@ TEST_F(CommandQueueSingleCardFixture, DISABLED_TestIssueMultipleReadWriteCommand TestBufferConfig config = {.num_pages = num_pages, .page_size = page_size, .buftype = BufferType::DRAM}; - local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config); + local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer( + device, device->command_queue(), config); } } // Test that command queue wraps when buffer available space in completion region is less than a page TEST_F(CommandQueueSingleCardFixture, TestWrapCompletionQOnInsufficientSpace) { - uint32_t large_page_size = 8192; // page size for first and third read - uint32_t small_page_size = 2048; // page size for second read + uint32_t large_page_size = 8192; // page size for first and third read + uint32_t small_page_size = 2048; // page size for second read for (Device *device : devices_) { uint16_t channel = tt::Cluster::instance().get_assigned_channel_for_device(device->id()); @@ -464,7 +462,8 @@ TEST_F(CommandQueueSingleCardFixture, TestWrapCompletionQOnInsufficientSpace) { } } -// Test that command queue wraps when buffer read needs to be split into multiple enqueue_read_buffer commands and available space in completion region is less than a page +// Test that command queue wraps when buffer read needs to be split into multiple enqueue_read_buffer commands and +// available space in completion region is less than a page TEST_F(CommandQueueSingleCardFixture, TestWrapCompletionQOnInsufficientSpace2) { // Using default 75-25 issue and completion queue split for (Device *device : devices_) { @@ -539,7 +538,9 @@ TEST_F(CommandQueueSingleCardFixture, TestNon32BAlignedPageSizeForL1) { TestBufferConfig config = {.num_pages = 1250, .page_size = 200, .buftype = BufferType::L1}; for (Device *device : devices_) { - if (device->is_mmio_capable()) { continue; } + if (device->is_mmio_capable()) { + continue; + } local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config); } } @@ -572,10 +573,7 @@ TEST_F(CommandQueueSingleCardFixture, TestLargeBuffer4096BPageSize) { constexpr BufferType buff_type = BufferType::L1; for (Device *device : devices_) { - TestBufferConfig config = { - .num_pages = 512, - .page_size = 4096, - .buftype = BufferType::L1}; + TestBufferConfig config = {.num_pages = 512, .page_size = 4096, .buftype = BufferType::L1}; local_test_functions::test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config); } @@ -612,53 +610,92 @@ TEST_F(CommandQueueSingleCardFixture, TestNonblockingReads) { namespace stress_tests { TEST_F(CommandQueueSingleCardFixture, WritesToRandomBufferTypeAndThenReadsBlocking) { - GTEST_SKIP() << "Re-test with Austin's fixes for wrapping"; BufferStressTestConfig config = { .seed = 0, .num_pages_total = 50000, .page_size = 2048, .max_num_pages_per_buffer = 16}; for (Device *device : devices_) { - EXPECT_TRUE( - local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config)); + EXPECT_TRUE(local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( + device, device->command_queue(), config)); } } TEST_F(CommandQueueSingleCardFixture, WritesToRandomBufferTypeAndThenReadsNonblocking) { - GTEST_SKIP() << "Re-test with Austin's fixes for wrapping"; BufferStressTestConfig config = { .seed = 0, .num_pages_total = 50000, .page_size = 2048, .max_num_pages_per_buffer = 16}; for (Device *device : devices_) { - if (not device->is_mmio_capable()) continue; - EXPECT_TRUE( - local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer(device, device->command_queue(), config)); + if (not device->is_mmio_capable()) + continue; + EXPECT_TRUE(local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer( + device, device->command_queue(), config)); } } - +// TODO: Split this into separate tests TEST_F(CommandQueueSingleCardFixture, ShardedBufferReadWrites) { - GTEST_SKIP() << "Sharded buffer is currently unsupported in FD2.0"; - BufferStressTestConfigSharded config({2,2}, {4,2}); - config.seed = 0; - config.num_iterations = 100; - for (Device *device : devices_) { - EXPECT_TRUE( - local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_sharded(device, device->command_queue(), config)); + for (const std::array cores : + {std::array{1, 1}, + std::array{5, 1}, + std::array{1, 5}, + std::array{5, 3}, + std::array{3, 5}, + std::array{5, 5}, + std::array{ + static_cast(device->compute_with_storage_grid_size().x), + static_cast(device->compute_with_storage_grid_size().y)}}) { + for (const std::array num_pages : { + std::array{1, 1}, + std::array{2, 1}, + std::array{1, 2}, + std::array{2, 2}, + std::array{7, 11}, + std::array{3, 65}, + std::array{67, 4}, + std::array{3, 137}, + }) { + for (const std::array page_shape : { + std::array{32, 32}, + std::array{1, 4}, + std::array{1, 120}, + std::array{1, 1024}, + std::array{1, 2048}, + }) { + for (const TensorMemoryLayout shard_strategy : + {TensorMemoryLayout::HEIGHT_SHARDED, + TensorMemoryLayout::WIDTH_SHARDED, + TensorMemoryLayout::BLOCK_SHARDED}) { + for (const uint32_t num_iterations : { + 1, + }) { + BufferStressTestConfigSharded config(num_pages, cores); + config.seed = 0; + config.num_iterations = num_iterations; + config.mem_config = shard_strategy; + config.page_shape = page_shape; + tt::log_info(tt::LogTest, fmt::format("cores: [{},{}] num_pages: [{},{}] page_shape: [{},{}], shard_strategy: {}, num_iterations: {}", cores[0],cores[1], num_pages[0],num_pages[1], page_shape[0],page_shape[1], magic_enum::enum_name(shard_strategy).data(), num_iterations).c_str()); + local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_sharded( + device, device->command_queue(), config); + } + } + } + } + } } } TEST_F(CommandQueueFixture, StressWrapTest) { - const char* arch = getenv("ARCH_NAME"); - if ( strcasecmp(arch,"wormhole_b0") == 0 ) { - tt::log_info("cannot run this test on WH B0"); - GTEST_SKIP(); - return; //skip for WH B0 + const char *arch = getenv("ARCH_NAME"); + if (strcasecmp(arch, "wormhole_b0") == 0) { + tt::log_info("cannot run this test on WH B0"); + GTEST_SKIP(); + return; // skip for WH B0 } BufferStressTestConfig config = { .page_size = 4096, .max_num_pages_per_buffer = 2000, .num_iterations = 10000, .num_unique_vectors = 20}; - EXPECT_TRUE( - local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_wrap(this->device_, this->device_->command_queue(), config)); + EXPECT_TRUE(local_test_functions::stress_test_EnqueueWriteBuffer_and_EnqueueReadBuffer_wrap( + this->device_, this->device_->command_queue(), config)); } } // end namespace stress_tests diff --git a/tt_metal/impl/dispatch/command_queue.cpp b/tt_metal/impl/dispatch/command_queue.cpp index c8a53dad5cc..9416b493f01 100644 --- a/tt_metal/impl/dispatch/command_queue.cpp +++ b/tt_metal/impl/dispatch/command_queue.cpp @@ -129,13 +129,21 @@ EnqueueReadBufferCommand::EnqueueReadBufferCommand( } CQPrefetchCmd relay_buffer; - relay_buffer.base.cmd_id = CQ_PREFETCH_CMD_RELAY_PAGED; + if (is_sharded(buffer.buffer_layout())) { + relay_buffer.base.cmd_id = CQ_PREFETCH_CMD_RELAY_LINEAR; - relay_buffer.relay_paged.is_dram = 0; - relay_buffer.relay_paged.start_page = 0; - relay_buffer.relay_paged.base_addr = 0; - relay_buffer.relay_paged.page_size = 0; - relay_buffer.relay_paged.pages = 0; + relay_buffer.relay_linear.length = 0; + relay_buffer.relay_linear.noc_xy_addr = 0; + relay_buffer.relay_linear.addr = 0; + } else { + relay_buffer.base.cmd_id = CQ_PREFETCH_CMD_RELAY_PAGED; + + relay_buffer.relay_paged.is_dram = 0; + relay_buffer.relay_paged.start_page = 0; + relay_buffer.relay_paged.base_addr = 0; + relay_buffer.relay_paged.page_size = 0; + relay_buffer.relay_paged.pages = 0; + } uint32_t *relay_buffer_ptr = (uint32_t *)&relay_buffer; for (int i = 0; i < sizeof(CQPrefetchCmd) / sizeof(uint32_t); i++) { @@ -171,11 +179,21 @@ const void EnqueueReadBufferCommand::assemble_device_commands(uint32_t dst_addre uint32_t relay_buffer_cmd_idx = relay_paged_cmd_offset / sizeof(uint32_t); CQPrefetchCmd *relay_buffer = (CQPrefetchCmd*)(this->commands.data() + relay_buffer_cmd_idx); - relay_buffer->relay_paged.is_dram = (this->buffer.buffer_type() == BufferType::DRAM); - relay_buffer->relay_paged.start_page = this->src_page_index; - relay_buffer->relay_paged.base_addr = this->buffer.address(); - relay_buffer->relay_paged.page_size = padded_page_size; - relay_buffer->relay_paged.pages = this->pages_to_read; + + if (is_sharded(this->buffer.buffer_layout())) { + relay_buffer->base.cmd_id = CQ_PREFETCH_CMD_RELAY_LINEAR; + auto core = this->buffer.device()->worker_core_from_logical_core(buffer.get_core_from_dev_page_id((this->src_page_index))); + relay_buffer->relay_linear.length = padded_page_size * this->pages_to_read; + relay_buffer->relay_linear.noc_xy_addr = get_noc_unicast_encoding(core); + relay_buffer->relay_linear.addr = this->buffer.address() + buffer.get_host_page_to_local_shard_page_mapping()[buffer.get_dev_to_host_mapped_page_id(this->src_page_index)] * padded_page_size; + } else { + relay_buffer->base.cmd_id = CQ_PREFETCH_CMD_RELAY_PAGED; + relay_buffer->relay_paged.is_dram = (this->buffer.buffer_type() == BufferType::DRAM); + relay_buffer->relay_paged.start_page = this->src_page_index; + relay_buffer->relay_paged.base_addr = this->buffer.address(); + relay_buffer->relay_paged.page_size = padded_page_size; + relay_buffer->relay_paged.pages = this->pages_to_read; + } } void EnqueueReadBufferCommand::process() { @@ -190,6 +208,12 @@ void EnqueueReadBufferCommand::process() { this->manager.fetch_queue_reserve_back(this->command_queue_id); uint32_t write_ptr = this->manager.get_issue_queue_write_ptr(this->command_queue_id); + // Wrap issue queue + const uint32_t command_issue_limit = this->manager.get_issue_queue_limit(this->command_queue_id); + if (write_ptr + align(fetch_size_bytes, 32) > command_issue_limit) { + this->manager.wrap_issue_queue_wr_ptr(this->command_queue_id); + } + this->manager.cq_write(this->commands.data(), fetch_size_bytes, write_ptr); this->manager.issue_queue_push_back(fetch_size_bytes, this->command_queue_id); @@ -260,18 +284,26 @@ EnqueueWriteBufferCommand::EnqueueWriteBufferCommand( this->commands.push_back(*relay_write_ptr++); } - CQDispatchCmd write_paged_cmd; - write_paged_cmd.base.cmd_id = CQ_DISPATCH_CMD_WRITE_PAGED; - // write_paged attributes set in assemble_device_commands - write_paged_cmd.write_paged.is_dram = 0; - write_paged_cmd.write_paged.start_page = 0; - write_paged_cmd.write_paged.base_addr = 0; - write_paged_cmd.write_paged.page_size = 0; - write_paged_cmd.write_paged.pages = 0; + CQDispatchCmd write_cmd; + if (is_sharded(buffer.buffer_layout())) { + write_cmd.base.cmd_id = CQ_DISPATCH_CMD_WRITE_LINEAR; + write_cmd.write_linear.addr = 0; + write_cmd.write_linear.length = 0; + write_cmd.write_linear.noc_xy_addr = 0; + write_cmd.write_linear.num_mcast_dests = 0; + } else { + write_cmd.base.cmd_id = CQ_DISPATCH_CMD_WRITE_PAGED; + // write_paged attributes set in assemble_device_commands + write_cmd.write_paged.is_dram = 0; + write_cmd.write_paged.start_page = 0; + write_cmd.write_paged.base_addr = 0; + write_cmd.write_paged.page_size = 0; + write_cmd.write_paged.pages = 0; + } - uint32_t *write_paged_cmd_ptr = (uint32_t *)&write_paged_cmd; + uint32_t *write_cmd_ptr = (uint32_t *)&write_cmd; for (int i = 0; i < sizeof(CQDispatchCmd) / sizeof(uint32_t); i++) { - this->commands.push_back(*write_paged_cmd_ptr++); + this->commands.push_back(*write_cmd_ptr++); } // no need to add padding @@ -291,16 +323,24 @@ const void EnqueueWriteBufferCommand::assemble_device_commands(uint32_t) { relay_write_cmd->relay_inline.length = payload_size_bytes; relay_write_cmd->relay_inline.stride = align(sizeof(CQPrefetchCmd) + payload_size_bytes, HUGEPAGE_ALIGNMENT); - uint32_t write_paged_cmd_idx = relay_write_idx + (sizeof(CQPrefetchCmd) / sizeof(uint32_t)); - CQDispatchCmd *write_paged_cmd = (CQDispatchCmd*)(this->commands.data() + write_paged_cmd_idx); - write_paged_cmd->write_paged.is_dram = uint8_t(this->buffer.buffer_type() == BufferType::DRAM); + uint32_t write_cmd_idx = relay_write_idx + (sizeof(CQPrefetchCmd) / sizeof(uint32_t)); + CQDispatchCmd *write_cmd = (CQDispatchCmd*)(this->commands.data() + write_cmd_idx); - TT_ASSERT(this->dst_page_index <= 0xFFFF, "Page offset needs to fit within range of uint16_t, bank_base_address was computed incorrectly!"); - - write_paged_cmd->write_paged.start_page = uint16_t(this->dst_page_index & 0xFFFF); - write_paged_cmd->write_paged.base_addr = this->bank_base_address; - write_paged_cmd->write_paged.page_size = padded_page_size; - write_paged_cmd->write_paged.pages = this->pages_to_write; + if (is_sharded(buffer.buffer_layout())) { + write_cmd->base.cmd_id = CQ_DISPATCH_CMD_WRITE_LINEAR; + auto core = this->buffer.device()->worker_core_from_logical_core(buffer.get_core_from_dev_page_id((this->dst_page_index))); + write_cmd->write_linear.addr = this->bank_base_address; + write_cmd->write_linear.length = this->pages_to_write * padded_page_size; + write_cmd->write_linear.noc_xy_addr = get_noc_unicast_encoding(core); + } else { + write_cmd->base.cmd_id = CQ_DISPATCH_CMD_WRITE_PAGED; + write_cmd->write_paged.is_dram = uint8_t(this->buffer.buffer_type() == BufferType::DRAM); + TT_ASSERT(this->dst_page_index <= 0xFFFF, "Page offset needs to fit within range of uint16_t, bank_base_address was computed incorrectly!"); + write_cmd->write_paged.start_page = uint16_t(this->dst_page_index & 0xFFFF); + write_cmd->write_paged.base_addr = this->bank_base_address; + write_cmd->write_paged.page_size = padded_page_size; + write_cmd->write_paged.pages = this->pages_to_write; + } } void EnqueueWriteBufferCommand::process() { @@ -321,13 +361,15 @@ void EnqueueWriteBufferCommand::process() { uint32_t write_ptr = this->manager.get_issue_queue_write_ptr(this->command_queue_id); + // We already checked for issue queue wrap when setting up the cmd, no need to check again here + uint32_t commands_offset_idx = this->issue_wait ? 0 : (wait_size_bytes / sizeof(uint32_t)); this->manager.cq_write(this->commands.data() + commands_offset_idx, commands_size_bytes, write_ptr); uint32_t data_write_ptr = write_ptr + commands_size_bytes; uint32_t buffer_addr_offset = this->bank_base_address - this->buffer.address(); - uint32_t num_banks = this->device->num_banks(this->buffer.buffer_type()); + uint32_t num_banks = is_sharded(this->buffer.buffer_layout()) ? 0 : this->device->num_banks(this->buffer.buffer_type()); uint32_t unpadded_src_offset = ( ((buffer_addr_offset/padded_page_size) * num_banks) + this->dst_page_index) * this->buffer.page_size(); if (this->buffer.page_size() % 32 != 0 and this->buffer.page_size() != this->buffer.size()) { // If page size is not 32B-aligned, we cannot do a contiguous write @@ -498,6 +540,12 @@ void EnqueueRecordEventCommand::process() { this->manager.fetch_queue_reserve_back(this->command_queue_id); uint32_t write_ptr = this->manager.get_issue_queue_write_ptr(this->command_queue_id); + // Wrap issue queue + const uint32_t command_issue_limit = this->manager.get_issue_queue_limit(this->command_queue_id); + if (write_ptr + align(fetch_size_bytes, 32) > command_issue_limit) { + this->manager.wrap_issue_queue_wr_ptr(this->command_queue_id); + } + this->manager.cq_write(this->commands.data(), fetch_size_bytes, write_ptr); this->manager.issue_queue_push_back(fetch_size_bytes, this->command_queue_id); @@ -607,8 +655,6 @@ void HWCommandQueue::enqueue_read_buffer(std::shared_ptr buffer, void* d void HWCommandQueue::enqueue_read_buffer(Buffer& buffer, void* dst, bool blocking) { ZoneScopedN("HWCommandQueue_read_buffer"); - TT_ASSERT(not is_sharded(buffer.buffer_layout()), "Sharded buffer is not supported in FD 2.0"); - chip_id_t mmio_device_id = tt::Cluster::instance().get_associated_mmio_device(this->device->id()); uint16_t channel = tt::Cluster::instance().get_assigned_channel_for_device(this->device->id()); @@ -617,20 +663,56 @@ void HWCommandQueue::enqueue_read_buffer(Buffer& buffer, void* dst, bool blockin uint32_t unpadded_dst_offset = 0; uint32_t src_page_index = 0; - // this is a streaming command so we don't need to break down to multiple - auto command = EnqueueReadInterleavedBufferCommand( - this->id, this->device, buffer, dst, this->manager, this->expected_num_workers_completed, src_page_index, pages_to_read); - - this->issued_completion_q_reads.push( - detail::ReadBufferDescriptor(buffer, padded_page_size, dst, unpadded_dst_offset, pages_to_read, src_page_index) - ); - this->num_entries_in_completion_q++; - - this->enqueue_command(command, blocking); + if (is_sharded(buffer.buffer_layout())) { + constexpr uint32_t half_scratch_space = SCRATCH_DB_SIZE / 2; + // Note that the src_page_index is the device page idx, not the host page idx + // Since we read core by core we are reading the device pages sequentially + for (uint32_t core_id = 0; core_id < buffer.num_cores(); ++core_id) { + auto core_pages = buffer.dev_pages_in_shard(core_id); + uint32_t num_pages = core_pages.size(); + uint32_t max_pages_in_scratch = half_scratch_space / buffer.page_size(); + TT_ASSERT(max_pages_in_scratch > 0); + uint32_t curr_page_idx_in_shard = 0; + while (num_pages != 0) { + uint32_t num_pages_to_read = std::min(num_pages, max_pages_in_scratch); + src_page_index = core_pages[curr_page_idx_in_shard]; + // Unused. Remove? + unpadded_dst_offset = buffer.get_dev_to_host_mapped_page_id(src_page_index) * buffer.page_size(); + + auto command = EnqueueReadShardedBufferCommand( + this->id, this->device, buffer, dst, this->manager, this->expected_num_workers_completed, src_page_index, num_pages_to_read); + + this->issued_completion_q_reads.push( + detail::ReadBufferDescriptor(buffer, padded_page_size, dst, unpadded_dst_offset, num_pages_to_read, src_page_index) + ); + this->num_entries_in_completion_q++; - if (not blocking) { // should this be unconditional? - std::shared_ptr event = std::make_shared(); - this->enqueue_record_event(event); + this->enqueue_command(command, false); + curr_page_idx_in_shard += num_pages_to_read; + num_pages -= num_pages_to_read; + } + } + if (blocking) { + this->finish(); + } else { + std::shared_ptr event = std::make_shared(); + this->enqueue_record_event(event); + } + } else { + // this is a streaming command so we don't need to break down to multiple + auto command = EnqueueReadInterleavedBufferCommand( + this->id, this->device, buffer, dst, this->manager, this->expected_num_workers_completed, src_page_index, pages_to_read); + + this->issued_completion_q_reads.push( + detail::ReadBufferDescriptor(buffer, padded_page_size, dst, unpadded_dst_offset, pages_to_read, src_page_index) + ); + this->num_entries_in_completion_q++; + + this->enqueue_command(command, blocking); + if (not blocking) { // should this be unconditional? + std::shared_ptr event = std::make_shared(); + this->enqueue_record_event(event); + } } } @@ -665,10 +747,6 @@ CoreType HWCommandQueue::get_dispatch_core_type() { void HWCommandQueue::enqueue_write_buffer(const Buffer& buffer, const void* src, bool blocking) { ZoneScopedN("HWCommandQueue_write_buffer"); - if (is_sharded(buffer.buffer_layout())) { - TT_THROW("Sharded buffers are currently unsupported in FD2.0"); - } - if (buffer.buffer_layout() == TensorMemoryLayout::WIDTH_SHARDED or buffer.buffer_layout() == TensorMemoryLayout::BLOCK_SHARDED) { convert_interleaved_to_sharded_on_host(src, buffer); @@ -679,39 +757,78 @@ void HWCommandQueue::enqueue_write_buffer(const Buffer& buffer, const void* src, const uint32_t num_banks = this->device->num_banks(buffer.buffer_type()); const uint32_t command_issue_limit = this->manager.get_issue_queue_limit(this->id); + TT_ASSERT(int32_t(MAX_PREFETCH_COMMAND_SIZE) - int32_t(sizeof(CQPrefetchCmd) + sizeof(CQDispatchCmd)) >= padded_page_size); uint32_t dst_page_index = 0; - uint32_t bank_base_address = buffer.address(); - while (total_pages_to_write > 0) { - uint32_t data_offset_bytes = (sizeof(CQPrefetchCmd) + sizeof(CQDispatchCmd)); // data appended after CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WRITE_PAGED - bool issue_wait = (dst_page_index == 0 and bank_base_address == buffer.address()); // only stall for the first write of the buffer - if (issue_wait) { - data_offset_bytes *= 2; // commands prefixed with CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WAIT + + if (is_sharded(buffer.buffer_layout())) { + // Since we read core by core we are reading the device pages sequentially + for (uint32_t core_id = 0; core_id < buffer.num_cores(); ++core_id) { + auto core_pages = buffer.dev_pages_in_shard(core_id); + uint32_t num_pages = core_pages.size(); + uint32_t curr_page_idx_in_shard = 0; + while (num_pages != 0) { + uint32_t data_offset_bytes = (sizeof(CQPrefetchCmd) + sizeof(CQDispatchCmd)); // data appended after CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WRITE_PAGED + bool issue_wait = dst_page_index == 0; // only stall for the first write of the buffer + if (issue_wait) { + data_offset_bytes *= 2; // commands prefixed with CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WAIT + } + uint32_t space_available_bytes = std::min(command_issue_limit - this->manager.get_issue_queue_write_ptr(this->id), MAX_PREFETCH_COMMAND_SIZE); + int32_t num_pages_available = + (int32_t(space_available_bytes) - int32_t(data_offset_bytes)) / int32_t(padded_page_size); + + uint32_t pages_to_write = std::min(num_pages, (uint32_t)num_pages_available); + if (pages_to_write > 0) { + uint32_t bank_base_address = buffer.address() + curr_page_idx_in_shard * padded_page_size; + // Technically we are going through dst pages in order + dst_page_index = core_pages[curr_page_idx_in_shard]; + + tt::log_debug(tt::LogDispatch, "EnqueueWriteBuffer for channel {}", this->id); + + auto command = EnqueueWriteShardedBufferCommand( + this->id, this->device, buffer, src, this->manager, issue_wait, this->expected_num_workers_completed, bank_base_address, dst_page_index, pages_to_write); + + this->enqueue_command(command, false); + curr_page_idx_in_shard += pages_to_write; + num_pages -= pages_to_write; + } else { + this->manager.wrap_issue_queue_wr_ptr(this->id); + } + } } - uint32_t space_available_bytes = std::min(command_issue_limit - this->manager.get_issue_queue_write_ptr(this->id), MAX_PREFETCH_COMMAND_SIZE); - int32_t num_pages_available = (int32_t(space_available_bytes) - int32_t(data_offset_bytes)) / int32_t(padded_page_size); - if (num_pages_available != 0) { - uint32_t pages_to_write = std::min(total_pages_to_write, (uint32_t)num_pages_available); - - if (dst_page_index > 0xFFFF) { - // Page offset in CQ_DISPATCH_CMD_WRITE_PAGED is uint16_t - // To handle larger page offsets move bank base address up and update page offset to be relative to the new bank address - uint32_t residual = dst_page_index % num_banks; - uint32_t num_full_pages_written_per_bank = dst_page_index / num_banks; - bank_base_address += num_full_pages_written_per_bank * padded_page_size; - dst_page_index = residual; + } else { + uint32_t bank_base_address = buffer.address(); + while (total_pages_to_write > 0) { + uint32_t data_offset_bytes = (sizeof(CQPrefetchCmd) + sizeof(CQDispatchCmd)); // data appended after CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WRITE_PAGED + bool issue_wait = (dst_page_index == 0 and bank_base_address == buffer.address()); // only stall for the first write of the buffer + if (issue_wait) { + data_offset_bytes *= 2; // commands prefixed with CQ_PREFETCH_CMD_RELAY_INLINE + CQ_DISPATCH_CMD_WAIT } + uint32_t space_available_bytes = std::min(command_issue_limit - this->manager.get_issue_queue_write_ptr(this->id), MAX_PREFETCH_COMMAND_SIZE); + int32_t num_pages_available = (int32_t(space_available_bytes) - int32_t(data_offset_bytes)) / int32_t(padded_page_size); + if (num_pages_available != 0) { + uint32_t pages_to_write = std::min(total_pages_to_write, (uint32_t)num_pages_available); + + if (dst_page_index > 0xFFFF) { + // Page offset in CQ_DISPATCH_CMD_WRITE_PAGED is uint16_t + // To handle larger page offsets move bank base address up and update page offset to be relative to the new bank address + uint32_t residual = dst_page_index % num_banks; + uint32_t num_full_pages_written_per_bank = dst_page_index / num_banks; + bank_base_address += num_full_pages_written_per_bank * padded_page_size; + dst_page_index = residual; + } - tt::log_debug(tt::LogDispatch, "EnqueueWriteBuffer for channel {}", this->id); + tt::log_debug(tt::LogDispatch, "EnqueueWriteBuffer for channel {}", this->id); - auto command = EnqueueWriteInterleavedBufferCommand( - this->id, this->device, buffer, src, this->manager, issue_wait, this->expected_num_workers_completed, bank_base_address, dst_page_index, pages_to_write); - this->enqueue_command(command, false); // don't block until the entire src data is enqueued in the issue queue + auto command = EnqueueWriteInterleavedBufferCommand( + this->id, this->device, buffer, src, this->manager, issue_wait, this->expected_num_workers_completed, bank_base_address, dst_page_index, pages_to_write); + this->enqueue_command(command, false); // don't block until the entire src data is enqueued in the issue queue - total_pages_to_write -= pages_to_write; - dst_page_index += pages_to_write; - } else { - this->manager.wrap_issue_queue_wr_ptr(this->id); + total_pages_to_write -= pages_to_write; + dst_page_index += pages_to_write; + } else { + this->manager.wrap_issue_queue_wr_ptr(this->id); + } } } @@ -763,14 +880,20 @@ void HWCommandQueue::enqueue_trace() { } void HWCommandQueue::copy_into_user_space(const detail::ReadBufferDescriptor &read_buffer_descriptor, uint32_t read_ptr, chip_id_t mmio_device_id, uint16_t channel) { - const auto& [buffer_layout, page_size, padded_page_size, dev_page_to_host_page_mapping, dst, dst_offset, num_pages_read, cur_host_page_id] = read_buffer_descriptor; + const auto& [buffer_layout, page_size, padded_page_size, dev_page_to_host_page_mapping, dst, dst_offset, num_pages_read, cur_dev_page_id] = read_buffer_descriptor; uint32_t padded_num_bytes = (num_pages_read * padded_page_size) + sizeof(CQDispatchCmd); uint32_t contig_dst_offset = dst_offset; uint32_t remaining_bytes_to_read = padded_num_bytes; + uint32_t dev_page_id = cur_dev_page_id; // track the amount of bytes read in the last non-aligned page uint32_t remaining_bytes_of_nonaligned_page = 0; + uint32_t offset_in_completion_q_data = (sizeof(CQDispatchCmd) / sizeof(uint32_t)); + + uint32_t pad_size_bytes = padded_page_size - page_size; + uint32_t padded_page_increment = (padded_page_size / sizeof(uint32_t)); + uint32_t page_increment = (page_size / sizeof(uint32_t)); static std::vector completion_q_data; @@ -798,8 +921,7 @@ void HWCommandQueue::copy_into_user_space(const detail::ReadBufferDescriptor &re } // completion queue write ptr on device could have wrapped but our read ptr is lagging behind - uint32_t bytes_xfered = std::min(padded_num_bytes, bytes_avail_in_completion_queue); - bytes_xfered = std::min(bytes_xfered, remaining_bytes_to_read); + uint32_t bytes_xfered = std::min(remaining_bytes_to_read, bytes_avail_in_completion_queue); uint32_t num_pages_xfered = (bytes_xfered + TRANSFER_PAGE_SIZE - 1) / TRANSFER_PAGE_SIZE; completion_q_data.resize(bytes_xfered / sizeof(uint32_t)); @@ -814,38 +936,47 @@ void HWCommandQueue::copy_into_user_space(const detail::ReadBufferDescriptor &re if (buffer_layout == TensorMemoryLayout::INTERLEAVED or buffer_layout == TensorMemoryLayout::HEIGHT_SHARDED) { void* contiguous_dst = (void*)(uint64_t(dst) + contig_dst_offset); - uint32_t offset_in_completion_q_data = (contig_dst_offset == 0) ? (sizeof(CQDispatchCmd) / sizeof(uint32_t)) : 0; if ((page_size % 32) == 0) { - uint32_t data_bytes_xfered = (contig_dst_offset == 0) ? (bytes_xfered - sizeof(CQDispatchCmd)) : bytes_xfered; + uint32_t data_bytes_xfered = bytes_xfered - offset_in_completion_q_data * sizeof(uint32_t); memcpy(contiguous_dst, completion_q_data.data() + offset_in_completion_q_data, data_bytes_xfered); contig_dst_offset += data_bytes_xfered; + offset_in_completion_q_data = 0; } else { uint32_t src_offset = offset_in_completion_q_data; + offset_in_completion_q_data = 0; uint32_t dst_offset_bytes = 0; - uint32_t pad_size_bytes = padded_page_size - page_size; - while (src_offset < completion_q_data.size()) { - uint32_t src_offset_increment = (padded_page_size / sizeof(uint32_t)); + uint32_t src_offset_increment = padded_page_increment; uint32_t num_bytes_to_copy; if (remaining_bytes_of_nonaligned_page > 0) { // Case 1: Portion of the page was copied into user buffer on the previous completion queue pop. - if (remaining_bytes_of_nonaligned_page <= pad_size_bytes) { - num_bytes_to_copy = page_size; - src_offset += (remaining_bytes_of_nonaligned_page / sizeof(uint32_t)); - } else { - num_bytes_to_copy = remaining_bytes_of_nonaligned_page - pad_size_bytes; + uint32_t num_bytes_remaining = (completion_q_data.size() - src_offset) * sizeof(uint32_t); + num_bytes_to_copy = std::min(remaining_bytes_of_nonaligned_page, num_bytes_remaining); + remaining_bytes_of_nonaligned_page -= num_bytes_to_copy; + src_offset_increment = (num_bytes_to_copy/sizeof(uint32_t)); + // We finished copying the page + if (remaining_bytes_of_nonaligned_page == 0) { + uint32_t rem_bytes_in_cq = num_bytes_remaining - remaining_bytes_of_nonaligned_page; + // There is more data after padding + if (rem_bytes_in_cq >= pad_size_bytes) { + src_offset_increment += pad_size_bytes / sizeof(uint32_t); + // Only pad data left in queue + } else { + offset_in_completion_q_data = (pad_size_bytes - rem_bytes_in_cq) / sizeof(uint32_t); + } } - src_offset_increment = (num_bytes_to_copy/sizeof(uint32_t)) + (pad_size_bytes/sizeof(uint32_t)); - - remaining_bytes_of_nonaligned_page = 0; - - } else if (src_offset + src_offset_increment >= completion_q_data.size()) { + } else if (src_offset + padded_page_increment >= completion_q_data.size()) { // Case 2: Last page of data that was popped off the completion queue + // Don't need to compute src_offset_increment since this is end of loop uint32_t num_bytes_remaining = (completion_q_data.size() - src_offset) * sizeof(uint32_t); - num_bytes_to_copy = std::min(num_bytes_remaining, page_size); - remaining_bytes_of_nonaligned_page = padded_page_size - (num_bytes_to_copy == page_size ? num_bytes_remaining : num_bytes_to_copy); + num_bytes_to_copy = std::min(num_bytes_remaining, page_size ); + remaining_bytes_of_nonaligned_page = page_size - num_bytes_to_copy; + // We've copied needed data, start of next read is offset due to remaining pad bytes + if (remaining_bytes_of_nonaligned_page == 0) { + offset_in_completion_q_data = padded_page_increment - num_bytes_remaining / sizeof(uint32_t); + } } else { num_bytes_to_copy = page_size; } @@ -864,7 +995,62 @@ void HWCommandQueue::copy_into_user_space(const detail::ReadBufferDescriptor &re } else if ( buffer_layout == TensorMemoryLayout::WIDTH_SHARDED or buffer_layout == TensorMemoryLayout::BLOCK_SHARDED) { - TT_THROW("Reading width sharded or block sharded buffers is unsupported in FD2.0"); + uint32_t src_offset = offset_in_completion_q_data; + offset_in_completion_q_data = 0; + uint32_t dst_offset_bytes = contig_dst_offset; + + while(src_offset < completion_q_data.size()) { + + uint32_t src_offset_increment = padded_page_increment; + uint32_t num_bytes_to_copy; + if (remaining_bytes_of_nonaligned_page > 0) { + // Case 1: Portion of the page was copied into user buffer on the previous completion queue pop. + uint32_t num_bytes_remaining = (completion_q_data.size() - src_offset) * sizeof(uint32_t); + num_bytes_to_copy = std::min(remaining_bytes_of_nonaligned_page, num_bytes_remaining); + remaining_bytes_of_nonaligned_page -= num_bytes_to_copy; + src_offset_increment = (num_bytes_to_copy/sizeof(uint32_t)); + // We finished copying the page + if (remaining_bytes_of_nonaligned_page == 0) { + dev_page_id++; + uint32_t rem_bytes_in_cq = num_bytes_remaining - remaining_bytes_of_nonaligned_page; + // There is more data after padding + if (rem_bytes_in_cq >= pad_size_bytes) { + src_offset_increment += pad_size_bytes / sizeof(uint32_t); + offset_in_completion_q_data = 0; + // Only pad data left in queue + } else { + offset_in_completion_q_data = (pad_size_bytes - rem_bytes_in_cq) / sizeof(uint32_t); + } + } + } else if (src_offset + padded_page_increment >= completion_q_data.size()) { + // Case 2: Last page of data that was popped off the completion queue + // Don't need to compute src_offset_increment since this is end of loop + uint32_t host_page_id = dev_page_to_host_page_mapping[dev_page_id]; + dst_offset_bytes = host_page_id * page_size; + uint32_t num_bytes_remaining = (completion_q_data.size() - src_offset) * sizeof(uint32_t); + num_bytes_to_copy = std::min(num_bytes_remaining, page_size); + remaining_bytes_of_nonaligned_page = page_size - num_bytes_to_copy; + // We've copied needed data, start of next read is offset due to remaining pad bytes + if (remaining_bytes_of_nonaligned_page == 0) { + offset_in_completion_q_data = padded_page_increment - num_bytes_remaining / sizeof(uint32_t); + dev_page_id++; + } + } else { + num_bytes_to_copy = page_size; + uint32_t host_page_id = dev_page_to_host_page_mapping[dev_page_id]; + dst_offset_bytes = host_page_id * page_size; + dev_page_id++; + } + + memcpy( + (char*)(uint64_t(dst) + dst_offset_bytes), + completion_q_data.data() + src_offset, + num_bytes_to_copy + ); + src_offset += src_offset_increment; + dst_offset_bytes += num_bytes_to_copy; + } + contig_dst_offset = dst_offset_bytes; } } } diff --git a/tt_metal/impl/dispatch/kernels/cq_prefetch.cpp b/tt_metal/impl/dispatch/kernels/cq_prefetch.cpp index 682a3fff004..51a685f5d8c 100644 --- a/tt_metal/impl/dispatch/kernels/cq_prefetch.cpp +++ b/tt_metal/impl/dispatch/kernels/cq_prefetch.cpp @@ -306,7 +306,9 @@ static uint32_t write_pages_to_dispatcher(uint32_t& downstream_data_ptr, } uint64_t noc_addr = get_noc_addr_helper(downstream_noc_xy, downstream_data_ptr); - if (downstream_data_ptr + amt_to_write > downstream_cb_end) { // wrap + if (downstream_data_ptr == downstream_cb_end) { + downstream_data_ptr = downstream_cb_base; + } else if (downstream_data_ptr + amt_to_write > downstream_cb_end) { // wrap uint32_t last_chunk_size = downstream_cb_end - downstream_data_ptr; noc_async_write(scratch_write_addr, noc_addr, last_chunk_size); downstream_data_ptr = downstream_cb_base; diff --git a/tt_metal/impl/dispatch/kernels/cq_prefetch.hpp b/tt_metal/impl/dispatch/kernels/cq_prefetch.hpp index 3f9e7787e88..825a644effd 100644 --- a/tt_metal/impl/dispatch/kernels/cq_prefetch.hpp +++ b/tt_metal/impl/dispatch/kernels/cq_prefetch.hpp @@ -310,7 +310,9 @@ static uint32_t write_pages_to_dispatcher(uint32_t& dispatch_data_ptr, } uint64_t noc_addr = get_noc_addr_helper(dispatch_noc_xy, dispatch_data_ptr); - if (dispatch_data_ptr + amt_to_write > dispatch_cb_end) { // wrap + if (dispatch_data_ptr == dispatch_cb_end) { + dispatch_data_ptr = dispatch_cb_base; + } else if (dispatch_data_ptr + amt_to_write > dispatch_cb_end) { // wrap uint32_t last_chunk_size = dispatch_cb_end - dispatch_data_ptr; noc_async_write(scratch_write_addr, noc_addr, last_chunk_size); dispatch_data_ptr = dispatch_cb_base;