The specific problem we're facing is a lack of parallelism on data loads. Our usage does not quite fit standard database models, of which two are typical - OLAP (on-line analytical processing - essentially read-only database that is updated, say, nightly, contains tons of data, and is queried frequently using specific set of queries for which it was designed) and OLTP (on-line transaction processing - where the "hot" subset of the data is smaller, but is read and updated very frequently).
Our database is the worst of both worlds - it writes about 1 million rows a minute, and the reads are rather infrequent. (An argument that SQL server is not the right technology for this can be made, but this discussion is outside the context of this post).
There are two basic problems.
First is the lack of parallelism in bulk transfers on the SQL server - on an 8-core, 32GB machine with 3 10k RPM data drives (which is what $4k currently buys at Dell) the load (and index update) part is completely single-threaded. Which means that one core runs at 100% CPU, and the rest are doing nothing.
I wrote a very simple program that does all the data transforms in memory, and this part is completely parallel - the server runs at 60-70% CPU utilization - and is very fast. Unfortunately, the very last insert into SQL - Amdahl's law! - is now controls the overall performance.
The second - bigger - problem is that eventually the index no longer fits in the RAM, and due to the nature of the data it seems to be rewritten almost entirely on every upload. If I restrict SQL memory to 6GB, this covers roughly 2 hours of input, the disk starts thrashing really badly, and the load times go to hell - what initially would take only 30 seconds becomes 3 minutes.
One potential solution is to partition the database, but once we do it, it starts negatively affect the performance of our queries.
The data flow through the system is really quite small relative to the power of the hardware. The rows we're writing are only a few dozens of bytes each, and the aggregate data flow is less than 1MBps. But because the entire index is being rewritten all the time, the box keeps writing at at the rate of 20-30 MBps for several minutes.
This seemed kinda slow (although speeding it up is of course not going to be a part of the solution - we need to figure out a more global approach) - so I wanted to check what the hardware is capable of doing in terms of disk throughput, so I quickly typed up a piece of code that also makes a reasonable tutorial on how to use overlapped I/O on Windows. Hence this article.
The essential steps as as follows.
First, open the file using FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING:
HANDLE hFile = CreateFileW(szFileName, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, NULL);
Second, you want to use DMA for the data transfer, to avoid copying, and for that the data buffer should be aligned on a sector boundary. The easiest way is to VirtualAlloc it, since this will force the buffer to be contiguous and aligned on 64k:
buffer = VirtualAlloc(NULL, bufferSize, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
Use WriteFileEx to schedule your I/O. The hEvent part of the overlapped is not used, so a developer can use it to pass context to the completion routine. The API is quite weird this way - you'd expect that hEvent would take an event to signal on I/O completion - but that's not how it works.
overlapped.hEvent = (HANDLE)1;
overlapped.Offset = (DWORD)offset;
overlapped.OffsetHigh = (DWORD)(offset >> 32);
offset += bufferSize;
WriteFileEx(hFile, buffers, bufferSize, &overlapped, WriteFinished);
where WriteFinished gets called when the write is done:
static void CALLBACK WriteFinished(DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
lpOverlapped->hEvent = (HANDLE)2;
}
Finally, once the write is scheduled, the thread must be put into alertable sleep (or wait - use the SleepEx/WaitForSingleObjectEx/WaitForMultipleObjectEx functions that take the alertable flag:
SleepEx(INFINITE, TRUE);
or
DWORD dwRes = WaitForSingleObjectEx(hSomeEvent, INFINITE, TRUE);
if (dwRes == WAIT_IO_COMPLETION)
{
// write is done
...
Obviously, you will be scheduling multiple I/Os - using an array of OVERLAPPED structures and hEvent fields inside them to keep track of what has finished and what has not is handy.
Below is the full text.
Oh, yes, and that server does ~110 MBps writes on a 10K RPM disk using the code below (and ~80 MBps writes on 7200 RPM disk), with practically zero CPU utilization.
//-----------------------------------------------------------------------
// <copyright>
// Copyright (C) Sergey Solyanik. All rights reserved.
//
// This software is in public domain and is "free as in beer". It can be
// redistributed in full or in parts for free and without any preconditions.
// </copyright>
//-----------------------------------------------------------------------
#include <windows.h>
#include <stdio.h>
#define MAX_OUTSTANDING_WRITES 64
enum WriteProgress
{
WriteScheduled = 1,
WriteSucceeded = 2,
WriteError = 3
};
static void CALLBACK WriteFinished(DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
if (dwErrorCode == 0)
{
lpOverlapped->hEvent = (HANDLE)WriteSucceeded;
return;
}
wprintf(L"Error: %d\n", dwErrorCode);
lpOverlapped->hEvent = (HANDLE)WriteError;
}
int wmain(int argc, WCHAR* argv[])
{
if (argc != 5)
{
wprintf(L"Usage: writedata filename total_size chunk_size "
L"number_of_writes\n");
wprintf(L"Note: total_size is in megabytes\n");
wprintf(L" chunk_size is in bytes and must be a power of 2 "
L"greater than 2048\n");
wprintf(L" number_of_writes is the number of writes "
L"that are scheduled simultaneously\n");
return 1;
}
if (GetFileAttributesW(argv[1]) != 0xffffffff)
{
wprintf(L"%s already exists.", argv[1]);
return 2;
}
int size = _wtoi(argv[2]);
if (size <= 0)
{
wprintf(L"Size must be a positive number.");
return 3;
}
LARGE_INTEGER bytes;
bytes.QuadPart = (__int64)size * 1024L * 1024L;
int bufferSize = _wtoi(argv[3]);
if (bufferSize <= 0)
{
wprintf(L"Buffer size should be a positive number.");
return 4;
}
if (bufferSize & (bufferSize - 1))
{
wprintf(L"Buffer size must be power of 2");
return 4;
}
if (bufferSize < 4096)
{
wprintf(L"Buffer size is too small");
return 4;
}
int simwrites = _wtoi(argv[4]);
if (simwrites <= 0)
{
wprintf(L"Number of simultaneous writes should be a "
L"positive number.");
return 5;
}
if (simwrites > MAX_OUTSTANDING_WRITES)
{
wprintf(L"Number of simultaneous writes is too large.");
return 5;
}
HANDLE hFile = CreateFileW(argv[1], GENERIC_WRITE, 0, NULL,
CREATE_ALWAYS, FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING,
NULL);
SetFilePointerEx(hFile, bytes, NULL, FILE_BEGIN);
SetEndOfFile(hFile);
OVERLAPPED overlappeds[MAX_OUTSTANDING_WRITES];
memset(overlappeds, 0, sizeof(overlappeds));
void *buffers[MAX_OUTSTANDING_WRITES];
for (int i = 0 ; i < simwrites ; ++i)
{
buffers[i] = VirtualAlloc(NULL, bufferSize,
MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
memset(buffers[i], i, bufferSize);
}
DWORD tick = GetTickCount();
__int64 totalScheduled = 0;
__int64 totalWritten = 0;
__int64 nWrites = bytes.QuadPart / bufferSize;
int nOutstanding = 0;
for (;;)
{
int nScheduled = 0;
for (int i = 0; i < simwrites; ++i)
{
if ((int)overlappeds[i].hEvent == WriteScheduled)
{
++nScheduled;
continue;
}
if ((int)overlappeds[i].hEvent == WriteSucceeded)
{
totalWritten += bufferSize;
wprintf(L"\r%I64d", totalWritten);
memset(&overlappeds[i], 0, sizeof(OVERLAPPED));
}
if ((int)overlappeds[i].hEvent == WriteError)
{
CancelIo(hFile);
goto finished;
}
if (nWrites > 0)
{
overlappeds[i].hEvent = (HANDLE)WriteScheduled;
overlappeds[i].Offset = (DWORD)totalScheduled;
overlappeds[i].OffsetHigh =
(DWORD)(totalScheduled >> 32);
totalScheduled += bufferSize;
--nWrites;
++nScheduled;
if (!WriteFileEx(hFile, buffers[i], bufferSize,
&overlappeds[i], WriteFinished))
{
DWORD dwErr = GetLastError();
wprintf(L"Write error %d\n", dwErr);
CancelIo(hFile);
goto finished;
}
}
}
if (nScheduled == 0)
break;
SleepEx(INFINITE, TRUE);
}
finished:
for (int i = 0 ; i < simwrites; ++i)
VirtualFree(buffers[i], 0, MEM_RELEASE);
CloseHandle(hFile);
int seconds = (GetTickCount() - tick) / 1000;
if (seconds <= 0)
seconds = 1;
wprintf (L" in %d seconds (%d MBps)\n", seconds,
size / seconds);
return 0;
}
4 comments:
Did you try SSIS for the transforms? You can (albeit laboriously) parallelize the writes.
Yes, that's where we started :-) (parenthetically, in the beginning we thought that the whole reason for SSIS is that they handle parallelism for you, but no). We're still investigating it, but it does not look super promising (the bottleneck is still SQL and SSIS does not really posess any special magic).
Is it bing-something?
If you do it with SQL, Google wins because it doesn't.
nice tutorial Sergey :) Helped me understand overlapped IO. Thank you!
Post a Comment