29 void run()
override { owner.runThread(); }
37 : useMessageThread (callbacksOnMessageThread),
38 magicMessageHeader (magicMessageHeaderNumber)
45 callbackConnectionState =
false;
47 masterReference.clear();
53 int portNumber,
int timeOutMillisecs)
60 if (socket->connect (hostName, portNumber, timeOutMillisecs))
62 threadIsRunning =
true;
64 thread->startThread();
76 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
78 if (newPipe->openExisting (pipeName))
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (newPipe.release());
93 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
95 if (newPipe->createNewPipe (pipeName, mustNotExist))
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (newPipe.release());
108 thread->signalThreadShouldExit();
112 if (socket !=
nullptr) socket->close();
113 if (pipe !=
nullptr) pipe->close();
116 thread->stopThread (4000);
117 deletePipeAndSocket();
121void InterprocessConnection::deletePipeAndSocket()
132 return ((socket !=
nullptr && socket->isConnected())
133 || (pipe !=
nullptr && pipe->isOpen()))
142 if (pipe ==
nullptr && socket ==
nullptr)
145 if (socket !=
nullptr && ! socket->isLocal())
146 return socket->getHostName();
149 return IPAddress::local().toString();
159 messageData.
copyFrom (messageHeader, 0,
sizeof (messageHeader));
162 return writeData (messageData.
getData(), (
int) messageData.
getSize()) == (int) messageData.
getSize();
165int InterprocessConnection::writeData (
void* data,
int dataSize)
169 if (socket !=
nullptr)
170 return socket->write (data, dataSize);
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
179void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
181 jassert (socket ==
nullptr && pipe ==
nullptr);
182 socket.reset (newSocket);
184 threadIsRunning =
true;
186 thread->startThread();
189void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
191 jassert (socket ==
nullptr && pipe ==
nullptr);
192 pipe.reset (newPipe);
194 threadIsRunning =
true;
196 thread->startThread();
203 : owner (ipc), connectionMade (connected)
206 void messageCallback()
override
208 if (
auto* ipc = owner.get())
223void InterprocessConnection::connectionMadeInt()
225 if (! callbackConnectionState)
227 callbackConnectionState =
true;
229 if (useMessageThread)
236void InterprocessConnection::connectionLostInt()
238 if (callbackConnectionState)
240 callbackConnectionState =
false;
242 if (useMessageThread)
243 (
new ConnectionStateMessage (
this,
false))->post();
252 : owner (ipc), data (d)
255 void messageCallback()
override
257 if (
auto* ipc = owner.get())
265void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
267 jassert (callbackConnectionState);
269 if (useMessageThread)
276int InterprocessConnection::readData (
void* data,
int num)
278 if (socket !=
nullptr)
279 return socket->read (data, num,
true);
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
288bool InterprocessConnection::readNextMessage()
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader,
sizeof (messageHeader));
293 if (bytes == (
int)
sizeof (messageHeader)
298 if (bytesInMessage > 0)
300 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
303 while (bytesInMessage > 0)
305 if (thread->threadShouldExit())
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
319 deliverDataInt (messageData);
327 if (socket !=
nullptr)
328 deletePipeAndSocket();
336void InterprocessConnection::runThread()
338 while (! thread->threadShouldExit())
340 if (socket !=
nullptr)
342 auto ready = socket->waitUntilReady (
true, 100);
346 deletePipeAndSocket();
357 else if (pipe !=
nullptr)
359 if (! pipe->isOpen())
361 deletePipeAndSocket();
371 if (thread->threadShouldExit() || ! readNextMessage())
375 threadIsRunning =
false;
static Type swapIfBigEndian(Type value) noexcept
Swaps the byte order of a signed or unsigned integer if the CPU is big-endian.
Automatically locks and unlocks a mutex object.
Manages a simple two-way messaging connection to another process, using either a socket or a named pi...
virtual void connectionMade()=0
Called when the connection is first connected.
virtual void messageReceived(const MemoryBlock &message)=0
Called when a message arrives.
String getConnectedHostName() const
Returns the name of the machine at the other end of this connection.
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
Creates a connection.
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
Tries to create a new pipe for other processes to connect to.
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
Tries to connect the object to an existing named pipe.
void disconnect()
Disconnects and closes any currently-open sockets or pipes.
bool isConnected() const
True if a socket or pipe is currently active.
virtual ~InterprocessConnection()
Destructor.
virtual void connectionLost()=0
Called when the connection is broken.
bool sendMessage(const MemoryBlock &message)
Tries to send a message to the other end of this connection.
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
Tries to connect this object to a socket.
A class to hold a resizable block of raw data.
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
Copies data into this MemoryBlock from a memory address.
void * getData() noexcept
Returns a void pointer to the data.
size_t getSize() const noexcept
Returns the block's current allocated size, in bytes.
Internal class used as the base class for all message objects.
The base class for objects that can be sent to a MessageListener.
A cross-process pipe that can have data written to and read from it.
A wrapper for a streaming (TCP) socket.
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
This class acts as a pointer which will automatically become null if the object to which it points is...
void run() override
Must be implemented to perform the thread's actual code.