Excursions in composition: Sequential stream concatenation


As we've seen a few times already (when building context menus and exploring fiber-based enumeration), composition is an important concept in object-oriented programming. Today, we're going to compose two sequential streams by concatenation.

There really isn't much to it. The idea is to take two streams and start by reading from the first one. When that runs out, switch to reading from the second one. Most of this is just typing. (As usual, I am using plain C++; in real life, you can save yourselves a lot of typing by using a class library of your choosing.)

We'll start with a base class that does all the boring typing related to implementing a read-only sequential stream.

class CROSequentialStreamBase : public ISequentialStream
{
public:
 // *** IUnknown ***
 STDMETHODIMP QueryInterface(REFIID riid, void **ppv)
 {
  if (riid == IID_IUnknown || riid == IID_ISequentialStream) {
    *ppv = static_cast<IUnknown*>(this);
    AddRef();
    return S_OK;
  }
  *ppv = NULL;
  return E_NOINTERFACE;
 }

 STDMETHODIMP_(ULONG) AddRef()
 {
  return InterlockedIncrement(&m_cRef);
 }

 STDMETHODIMP_(ULONG) Release()
 {
  LONG cRef = InterlockedDecrement(&m_cRef);
  if (cRef == 0) delete this;
  return cRef;
 }

 // *** ISequentialStream ***
 STDMETHODIMP Write(const void *pv, ULONG cb, ULONG *pcbWritten)
 {
  if (pcbWritten) *pcbWritten = 0;
  return STG_E_ACCESSDENIED;
 }

protected:
 CROSequentialStreamBase() : m_cRef(1) { }
 virtual ~CROSequentialStreamBase() { }

 LONG m_cRef;
};

There's nothing exciting here at all. In addition to the boring IUnknown methods, we also implement ISequentialStream::Write by saying, "Sorry, you can't write to a read-only stream." The ISequentialStream::Read method is left unimplemented.

We can now cook up our CConcatStream class:

class CConcatStream : public CROSequentialStreamBase
{
public:
 CConcatStream(ISequentialStream *pstm1,
               ISequentialStream *pstm2);

 // *** ISequentialStream ***
 STDMETHODIMP Read(void *pv, ULONG cb, ULONG *pcbRead);

protected:
 ~CConcatStream();

 bool m_fFirst;
 ISequentialStream *m_pstm1;
 ISequentialStream *m_pstm2;
};

CConcatStream::CConcatStream(ISequentialStream *pstm1,
                             ISequentialStream *pstm2)
 : m_pstm1(pstm1), m_pstm2(pstm2), m_fFirst(true)
{
 assert(pstm1 != pstm2);
 m_pstm1->AddRef();
 m_pstm2->AddRef();
}

CConcatStream::~CConcatStream()
{
 m_pstm1->Release();
 m_pstm2->Release();
}

Our CConcatStream takes two sequential streams and saves them in member variables m_pstm1 and m_pstm2. The real work happens in ISequentialStream::Read method:

HRESULT CConcatStream::Read(void *pv, ULONG cb, ULONG *pcbRead)
{
 ULONG cbRead;
 HRESULT hr;
 if (m_fFirst) {
  hr = m_pstm1->Read(pv, cb, &cbRead);
 } else {
  hr = m_pstm2->Read(pv, cb, &cbRead);
 }
 if ((FAILED(hr) || cbRead == 0) && m_fFirst) {
  m_fFirst = false;
  hr = m_pstm2->Read(pv, cb, &cbRead);
 }
 if (pcbRead) *pcbRead = cbRead;
 return hr;
}

If we are still reading the first stream, then read from the first stream. Otherwise, read from the second stream. If the first stream reaches the end, then switch to the second stream. (Checking whether the end of the stream has been reached is very annoying since ISequentialStream implementations are inconsistent in the way they report the condition. Some return S_FALSE on a partial read; others return S_OK; still others return an error code. We need to check for all of these possibilities.)

And that's all there is. If you give this object two sequential streams, it will compose those two streams and act like one giant stream that is the concatenation of the two.

Let's illustrate with a simple program:

#include <stdio.h>
#include <windows.h>
#include <ole2.h>
#include <assert.h>
#include <shlwapi.h>
#include <tchar.h>

... insert CConcatStream class here ...

void PrintStream(ISequentialStream *pstm)
{
 ULONG cb;
 BYTE buf[256];
 while (SUCCEEDED(pstm->Read(buf, 255, &cb)) && cb) {
  buf[cb] = 0;
  printf("%s", buf);
 }
}

int __cdecl _tmain(int argc, TCHAR **argv)
{
 if(SUCCEEDED(CoInitialize(NULL)) {
  IStream *pstm1;
  if (SUCCEEDED(SHCreateStreamOnFile(argv[1], STGM_READ, &pstm1))) {
   IStream *pstm2;
   if (SUCCEEDED(SHCreateStreamOnFile(argv[2], STGM_READ, &pstm2))) {
    CConcatStream *pstm = new CConcatStream(pstm1, pstm2);
    if (pstm) {
     PrintStream(pstm);
     pstm->Release();
    }
    pstm2->Release();
   }
   pstm1->Release();
  }
  CoUninitialize();
 }
 return 0;
}

This program takes two file names on the command line and creates a stream for each one, then creates a CConcatStream out of them both, resulting in a composite stream that produces the contents of the first file, followed by the contents of the second file. When you run this program, you should see both files printed to the screen, one after the other.

Okay, there really wasn't much going on here, but we'll use this as groundwork for next time.

Exercise: What is the significance of the assertion in the constructor?

Comments (18)
  1. Bob says:

    There’s no rewind being performed on the streams (not that this is usually desired), so if you pass the same stream in twice you’re not going to get anything useful when concatenating. Stream2 will EOF as soon as you try to read from it.

  2. richard says:

    Never liked assert and other debug macros used during development. If it is important enough to check during development, it is important enough to check at runtime.

  3. I think what Raymond is trying to uphold with the assertion in the constructor is concat semantics. An example:

    1. string c = string("a") + string("b");
    2. string c = string("a") + string("a");

    In both cases you’d expect the string c to be two characters long.

    So, if the assertion is not there, an attempt to concatenate two identical streams will not return twice the information.

    Rgrds /Henry

  4. Nathan says:

    Bob appears correct.. But all the comparison is doing is seeing if the two pointers are the same object (doubting someone overrode the != operator for the base class…)..

    And given the reference counting, that should in theory prevent serious problems on release/use of a now-null obj.

  5. prevent serious problems on release/use of a now-null obj

    I don’t see that.  If pstm1 and pstm2 are the same, there are two AddRef() calls on it and two Release() calls.  No problem there.

  6. Bob is correct.

    If pstm1 == pstm2 then running read on the second one will immediately fail because you’ve already read to EOF on the first one (which is the second one).

    Unrelated to the “challenge”

    Raymond-

    Just a quick question, your code will obviously work, but if each stream is 300bytes and I ask to read it in blocks of 255, I will get a read of 255, followed by a read of 45, followed by a read of 255, followed by a read of 45. This seems it would be unexpected behavior, any reason you didn’t write it to read the 45 from stream one and another 210 from stream two to create reads of: 255, 255, 90?

    [You are free to add that feature if you want. It would have distracted from the point of the article. -Raymond]
  7. anonymous says:

    I think that this does not fully follow the contract for ISequentialStream::Read

    From MSDN:

    The actual number of bytes read can be less than the number of bytes requested if an error occurs or if the end of the stream is reached during the read operation…

    This implementation can also retrun less than the number of bytes requested in the middle creating a seam between the concatenated streams.  

    [“X if Y” is not the same as “X if and only if Y”. If you read further, you’ll also see the word “usually”. The documentation is just calling out some common cases for you. It’s trying to go beyond “just the facts ma’am” to also give some programming tips. Apparently people are misinterpreting the helpful tips… Maybe we should remove them. -Raymond]
  8. C Gomez says:

    Please don’t.  They are helpful.

  9. > What is the significance of the assertion in the constructor?

    You’re attempting to enforce the implicit contract that callers should not concatenate a stream with itself directly — although presumably they could concatenate indirectly via the following:

    IStream *pOne = …;

    IStream *pTwo = …;

    CConcatStream *pOneTwo = new CConcatStream(pOne, pTwo); // fine

    CConcatStream *pOneTwoOne = new CConcatStream(pOneTwo, pOne); // third stream will be EOF before it’s even read

    Making pTwo be an empty string allows a caller to work around the assert if they really want to.

    I have to agree with richard though that this makes a significant difference between runtime and debug checking.  Why not check at runtime?  Have the constructor set an HRESULT out parameter:

    CConcatStream::CConcatStream(

    /**/ ISequentialStream *pstm1,

    /**/ ISequentialStream *pstm2,

    /**/ HRESULT &hr

    )

    : m_pstm1(pstm1), m_pstm2(pstm2), m_fFirst(true), m_bValid(true)

    {

    // assert(pstm1 != pstm2);

    /**/ if (pstm1 == pstm2) {

    /****/ hr = E_INVALIDARG;

    /****/ m_bValid = false;  

    /**/ }

    /**/ m_pstm1->AddRef();

    /**/ m_pstm2->AddRef();

    }

    [You are free to use whatever error reporting mechanism you want. I didn’t want to wade into those dangerous waters since it was irrelevant to the article. Note also that the runtime error won’t catch the example you gave at the top of the comment. -Raymond]
  10. Steve says:

    This bit in QueryInterface() confused me a little:

       *ppv = static_cast<IUnknown*>(this);

    Since ppv is void **, the pointer returned by static_cast will be converted to void *. So why the intermediate cast to IUnknown *?

    [This is a critical detail. See your favorite COM textbook for an explanation. -Raymond]
  11. Arlie Davis says:

    Do people come here *just* to nitpick?  Are you the same people who, in college, stuck up their hands at every chance, so they could "correct" the professor on some trivial point, ignoring the real subject?

    Isn’t it obvious that this stream-concatenator is an *EXAMPLE* and not production code that Raymond dug out of SQL or somewhere and held up for your scrutiny?  It’s there for RHETORICAL purposes — to demonstrate an idea, and to lay the groundwork for future posts.

    As he said, this post is about composition — the fact that you can build a "concatenator" stream on top of 2 (or any number of) streams.  It’s not a forum for you to flex your inner language lawyer, or just to prat on about some totally trivial, irrelevant point.

    Just go back to Slashdot, already.

  12. Alex says:

    > This bit in QueryInterface() confused me a

    >little:

    >

    >   *ppv = static_cast<IUnknown*>(this);

    >

    >Since ppv is void **, the pointer returned by

    >static_cast will be converted to void *. So why

    >the intermediate cast to IUnknown *?

    To get the right pointer, because generally

    (void*)(static_cast<IUnknown*>(this)) != (void*) this.

  13. Norman Diamond says:

    Some return S_FALSE on a partial read; others

    return S_OK; still others return an error

    code.

    Valuable information.  It also triggered an old joke, "Consistency is the last refuge of an uncreative person."

    > but if each stream is 300bytes and I ask to

    > read it in blocks of 255, I will get a read

    > of 255, followed by a read of 45, followed

    > by a read of 255, followed by a read of 45.

    > This seems it would be unexpected behavior,

    >

    It would have distracted from the point of

    the article.

    I think Mr. Anderson’s observation looks right on target, equally valuable.  Consumers will have to experiment and/or reverse engineer, to figure out whether a short read is an error, etc.

    Sorry if this looks like I knitted something, but the two strands really wanted to be concatenated to each other.

  14. Wow, it’s hardcore to use only one space of indentation! I thought I was tough only using two, but now I see I have a lot to learn!

  15. Dave Harris says:

    It seems to me that if you attempt to read 0 bytes followed by 10 bytes, it will throw away the entire contents of the first stream and read 10 bytes from the second stream. There should be a test for cb != 0 somewhere. (Probably just after the test for cbRead == 0.)

  16. Ken Hagan says:

    “Do people come here *just* to nitpick?”

    Probably. Programmers have to have more than the usual concern for detail. Having said that, it would be nice if it could be switched off for social purposes.

    But it can’t, so I’ll jump in anyway. The assertion doesn’t do what some have claimed. Techniques like tear-offs can mean that pstm1!=pstm2 even if both interfaces were torn-off the same object. If you want to test for identity, query for IUnknown first.

    [Testing IUnknown still isn’t good enough. The streams could be connected through ways other than identity. -Raymond]
  17. Jules says:

    Since ppv is void **, the pointer returned by static_cast will be converted to void *. So why the intermediate cast to IUnknown *?

    [This is a critical detail. See your favorite COM textbook for an explanation. -Raymond]

    Or derive the reason from your knowledge of C++ without needing to know anything about COM.  The clue is that it’s static_cast, not reinterpret_cast which is what the implicit coversion would do.

  18. Neil says:

    m_fFirst doesn’t change until after it’s been tested twice, so you can hoist the second test into the then branch of the first test like so:

    HRESULT CConcatStream::Read(void *pv, ULONG cb, ULONG *pcbRead)

    {

    ULONG cbRead;

    HRESULT hr;

    if (m_fFirst) {

     hr = m_pstm1->Read(pv, cb, &cbRead);

     if (FAILED(hr) || cbRead == 0) {

      m_fFirst = false;

      hr = m_pstm2->Read(pv, cb, &cbRead);

     }

    }

    else {

     hr = m_pstm2->Read(pv, cb, &cbRead);

    }

    if (pcbRead) *pcbRead = cbRead;

    return hr;

    }

    A good optimising compiler will deal with the apparent redundancy, of course, but I still prefer this rewrite:

    HRESULT CConcatStream::Read(void *pv, ULONG cb, ULONG *pcbRead)

    {

    if (m_fFirst) {

     ULONG cbRead;

     HRESULT hr = m_pstm1->Read(pv, cb, &cbRead);

     if (SUCCEEDED(hr) && cbRead != 0) {

      if (pcbRead) *pcbRead = cbRead;

      return hr;

     }

     m_fFirst = false;

    }

    return m_pstm2->Read(pv, cb, pcbRead);

    }

    (not that I’ve tested or even compiled this…)

    Then again, I’d probably reuse m_pstm1 as m_fFirst by setting it to null at EOF.

Comments are closed.

Skip to main content