001    package org.hd.d.pg2k.svrCore.datasource;
002    
003    import java.io.BufferedOutputStream;
004    import java.io.ByteArrayInputStream;
005    import java.io.ByteArrayOutputStream;
006    import java.io.DataInputStream;
007    import java.io.DataOutputStream;
008    import java.io.FilterInputStream;
009    import java.io.IOException;
010    import java.io.InputStream;
011    import java.io.InterruptedIOException;
012    import java.io.InvalidObjectException;
013    import java.io.ObjectInputStream;
014    import java.io.ObjectOutputStream;
015    import java.io.OutputStream;
016    import java.io.Serializable;
017    import java.nio.ByteBuffer;
018    import java.security.InvalidKeyException;
019    import java.security.Key;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.BitSet;
023    import java.util.Collections;
024    import java.util.Date;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.concurrent.Callable;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicReference;
032    import java.util.concurrent.locks.ReentrantLock;
033    import java.util.regex.Pattern;
034    
035    import javax.crypto.Mac;
036    import javax.crypto.SecretKey;
037    import javax.management.RuntimeErrorException;
038    
039    import org.hd.d.pg2k.svrCore.AllExhibitImmutableData;
040    import org.hd.d.pg2k.svrCore.AllExhibitProperties;
041    import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta;
042    import org.hd.d.pg2k.svrCore.AllExhibitPropertiesDelta.DiffException;
043    import org.hd.d.pg2k.svrCore.CompressionLevel;
044    import org.hd.d.pg2k.svrCore.CoreConsts;
045    import org.hd.d.pg2k.svrCore.DefInputStream;
046    import org.hd.d.pg2k.svrCore.DefOutputStream;
047    import org.hd.d.pg2k.svrCore.ExhibitName;
048    import org.hd.d.pg2k.svrCore.ExhibitStaticAttr;
049    import org.hd.d.pg2k.svrCore.ExhibitThumbnails;
050    import org.hd.d.pg2k.svrCore.FileTools;
051    import org.hd.d.pg2k.svrCore.GenUtils;
052    import org.hd.d.pg2k.svrCore.MemoryTools;
053    import org.hd.d.pg2k.svrCore.Name;
054    import org.hd.d.pg2k.svrCore.Name.ExhibitFull;
055    import org.hd.d.pg2k.svrCore.PGMasterNotInServiceException;
056    import org.hd.d.pg2k.svrCore.ROByteArray;
057    import org.hd.d.pg2k.svrCore.Rnd;
058    import org.hd.d.pg2k.svrCore.SimpleLoggerIF;
059    import org.hd.d.pg2k.svrCore.Stratum;
060    import org.hd.d.pg2k.svrCore.TextUtils;
061    import org.hd.d.pg2k.svrCore.Tuple;
062    import org.hd.d.pg2k.svrCore.WrappedByteArrayCharSequence;
063    import org.hd.d.pg2k.svrCore.collections.SoftReferenceMap;
064    import org.hd.d.pg2k.svrCore.props.GenProps;
065    import org.hd.d.pg2k.svrCore.stats.StatsLogger;
066    import org.hd.d.pg2k.svrCore.vars.BasicVarMgr;
067    import org.hd.d.pg2k.svrCore.vars.EventPeriod;
068    import org.hd.d.pg2k.svrCore.vars.EventVariableValue;
069    import org.hd.d.pg2k.svrCore.vars.InstanceID;
070    import org.hd.d.pg2k.svrCore.vars.SimpleVariableDefinition;
071    import org.hd.d.pg2k.svrCore.vars.SimpleVariableValue;
072    import org.hd.d.pg2k.svrCore.vars.SystemVariables;
073    
074    import ORG.hd.d.IsDebug;
075    
076    /**Exhibit pipeline stage that fetches its data across a master/slave tunnel.
077     * Some of the visible types and values are used by the implementations
078     * of both ends of the tunnel (eg the tunnel servlet)
079     * to help implement the shared protocol.
080     * <p>
081     * This takes data from the server's own DataSourceBean by default.
082     */
083    public abstract class ExhibitDataTunnelSource implements SimpleExhibitPipelineIF
084        {
085        /**Create an instance, passing in a (non-null) logger to use. */
086        public ExhibitDataTunnelSource(final SimpleLoggerIF logger)
087            {
088            if(logger == null) { throw new IllegalArgumentException(); }
089            this.logger = logger;
090            statsIDTS =
091                new StatsLogger.StatsConfig("TUNNELSOURCE",
092                                            logger, // Use servlet log if poss.
093                                            false, // Only dump summaries...
094                                            12 * 3600, // About every 12 hours.
095                                            true); // Adaptive.
096            }
097    
098        /**Thrown when RPC fails because of connection throttling on upstream side.
099         * Such exceptions do not represent true RPC failures,
100         * so need not put off another RPC attempt.
101         */
102        public static class TunnelBusyIOException extends InterruptedIOException
103            {
104            /**Unique serialisation ID. */
105            private static final long serialVersionUID = 5573883733879369445L;
106            public TunnelBusyIOException() { super(); }
107            //public TunnelBusyIOException(final String message, final Throwable cause)  { super(message, cause); }
108            public TunnelBusyIOException(final String message) { super(message); }
109            //public TunnelBusyIOException(final Throwable cause) { super(cause); }
110            }
111    
112        /**If true, try GC and running finalizers after some connection failures.
113         * If we think that connection failure may have been caused by
114         * failure to release resources (elsewhere in the VM!)
115         * then try to force a clean-up from time to time.
116         * But, in case we are wrong, do not do this too often anyway.
117         */
118        private static final boolean FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL = true;
119    
120        /**Minimum wait after failed tunnel call before retry (ms); strictly positive.
121         * A few tens of milliseconds to a second is probably best,
122         * to recover from transient I/O problems quickly,
123         * but without actually busy-waiting...
124         */
125        public static final int FAIL_RETRY_WAIT_MIN_MS = 101;
126    
127        /**Maximum wait after failed tunnel call before retry (ms).
128         * Several minutes is probably good,
129         * to cope with a crashed master server (and/or to wait for it to reboot),
130         * or a failed connection,
131         * but too long and our distributed variables will fail needlessly.
132         * <p>
133         * Most requests during the wait will be vetoed cleanly and quickly
134         * allowing timely service to the slave's clients.
135         * <p>
136         * We vary this limit slightly between slaves/instances to help avoid them
137         * colliding with one another especially if hitting this upper bound.
138         */
139        public static final int FAIL_RETRY_WAIT_MAX_MS = 2 * 60 * 1000 +
140            (1 | Rnd.fastRnd.nextInt(1 * 60 * 1000));
141    
142        /**If true, allow connection/resource sharing between calls if possible.
143         * We may also periodically check that we can still contact the master.
144         * <p>
145         * An open connection would probably improve TCP efficiency
146         * and reduce latency for an underlying HTTP[S] connection, for example.
147         * (We might allow HTTP/1.1 streaming and connection sharing to be
148         * done for us by the JDK runtime if this is true.)
149         */
150        protected static final boolean KEEP_SERVER_CONNECTION_ALIVE = true;
151    
152        /**If true, extra instrumentation for protocol debug; definitely not for release code. */
153        protected static final boolean _protocolDebug = false /* && ORG.hd.d.IsDebug.isDebug */ ;
154    
155        /**Our unique client-side end-point identifier.
156         * The ID goes into all outgoing variable updates (and possibly other data)
157         * going upstream to the master to uniquely identify this client.
158         * <p>
159         * This is unique to each tunnel client end-point instance
160         * and is created afresh on each run,
161         * so a rebooted client gets a new ID.
162         * <p>
163         * Uses secure/good generator to try to ensure global uniqueness.
164         * (This may be expensive to compute;
165         * we might be able to defer this until first needed.)
166         */
167        public final SimpleVariableValue uniqueClientID =
168            new SimpleVariableValue(SystemVariables.LOCAL_SYS_ID, InstanceID.createInstanceID());
169    
170    
171        /**Our logger which falls back to System.out if servlet log not available; never null. */
172        protected final SimpleLoggerIF logger;
173    
174        /**The stats set to which we log general tunnel source stats.
175         * The unique codes are the constants TSNAME_XXX.
176         */
177        private final StatsLogger.StatsConfig statsIDTS;
178    
179        /**General stats event name: RPC request. */
180        public static final String TSNAME_RPCREQUEST = "RPC-request";
181    
182        /**General stats event name: RPC failure due to an IOException. */
183        public static final String TSNAME_RPCIOEX = "RPC-IOException";
184    
185        /**General stats event name prefix: RPC request packet type. */
186        public static final String TSNAMEPR_RPCTYPE = "RPC-type-";
187    
188        /**General stats event name: "short" raw data read, less than bulk-transfer block size.. */
189        public static final String TSNAME_SHORTREAD = "shortRead";
190    
191        /**Convenience value; (immutable) zero-length byte array for use as an empty packet payload.
192         * Used by local classes and TunnelServlet; package visible.
193         */
194        public static final byte[] EMPTY_PAYLOAD = new byte[0];
195    
196        /**Time when last successful connection was made, or 0 if no connection..
197         * Declared volatile so that it can be accessed without a lock.
198         * <p>
199         * Set by _noteResult() after a good exchange with the master;
200         * never cleared and never null.
201         */
202        private volatile long lastSuccessfulConnectionTime;
203    
204        /**Get time of last successful connection, or 0 if none. */
205        public long getLastSuccessfulConnectionTime()
206            { return(lastSuccessfulConnectionTime); }
207    
208        /**Time before which we should not again try to contact the master; null if master is fine.
209         * Declared volatile so that it can be accessed without a lock.
210         * <p>
211         * Cleared to null by _noteResult() after a good exchange with the master;
212         * the wait to another attempt is approximately doubled after a failed exchange.
213         */
214        private volatile Long doNotTryMasterUntil;
215    
216        /**Returns true if we currently cannot talk to the master, false otherwise. */
217        public boolean isBroken()
218            {
219            final Long notUntil = doNotTryMasterUntil;
220            return((notUntil != null) &&
221                   (notUntil.longValue() > System.currentTimeMillis()));
222            }
223    
224        /**Successive failure count; reset to zero upon success.
225         * Private to _noteResult() and accessed under its private lock.
226         * <p>
227         * Cleared to zero on a successful packet exchange;
228         * incremented after each failure.
229         */
230        private int successiveFailureCount;
231    
232        /**Private lock for _noteResult. */
233        private final Object _nR_lock = new Object();
234    
235        /**Immutable empty result indicating no event values available. */
236        private static final EventVariableValue[] NO_EVENT_VALUES = new EventVariableValue[0];
237    
238        /**Routine to note success or failure of RPC call and adjust control variables.
239         * Has its own private lock to ensure atomic update of control variables.
240         *
241         * @param success  call with this true immediately after successful RPC exchange with master;
242         *     call with false otherwise
243         */
244        private void _noteResult(final boolean success)
245            {
246            synchronized(_nR_lock)
247                {
248                final long now = System.currentTimeMillis();
249                if(success)
250                    {
251                    // Successful RPC exchange with master.
252                    lastSuccessfulConnectionTime = now;
253                    doNotTryMasterUntil = null;
254                    successiveFailureCount = 0;
255                    return;
256                    }
257    
258                // Failed RPC exchange with the master...
259                if(++successiveFailureCount < 0)
260                    { successiveFailureCount = Integer.MAX_VALUE; }
261    
262                // Compute first-cut back-off time.
263                // Double for each successive failure.
264                final long approxBackoff = ((long) ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS) <<
265                    Math.min(32, successiveFailureCount);
266    
267                // Coerce to reasonable bounds...
268                final int waitBounded = (int) Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
269                    Math.max(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MIN_MS, approxBackoff));
270    
271                // Roughly double the pause before the next connection attempt,
272                // with a random component to help avoid inter-slave collisions.
273                final long newWaitTime = Math.min(ExhibitDataTunnelSource.FAIL_RETRY_WAIT_MAX_MS,
274                    waitBounded/2 + Rnd.fastRnd.nextInt(waitBounded));
275    
276                // Store the new "do not try before" time...
277                doNotTryMasterUntil = new Long(now + newWaitTime);
278                }
279            }
280    
281        /**Does a NO-OP on the server.
282         * If this returns without throwing an exception,
283         * then the connection to the back-end (master) server is probably OK.
284         *
285         * @param unguarded  if true, this ignores any recent connection problems
286         *     and immediately tries to contact the master,
287         *     else it behaves like other operations and is quickly vetoed
288         *     while the master is failing
289         *
290         * @throws java.io.IOException  in case of trouble communicating with master.
291         */
292        public void doNOOP(final boolean unguarded)
293            throws IOException
294            {
295            final RawPacket response;
296            if(unguarded)
297                { response = doRPCUnguarded(FixedEmptyFrames.RAW_PACKET_NOOP); }
298            else
299                { response = doRPC(FixedEmptyFrames.RAW_PACKET_NOOP); }
300            if(response.getActualPayloadLength() != 0)
301                { throw new IOException("invalid non-empty response to NOOP request"); }
302            return;
303            }
304    
305        /**Get the static attributes for a given exhibit.
306         * Returns null if the named exhibit does not exist.
307         * <p>
308         * Sends name as a UTF string and allows compression.
309         * (We use UTF since we know that the length is limited and
310         * the all-ASCII nature of valid names should yield
311         * one-byte-per-char encoding before compression.)
312         * <p>
313         * This will fail with a null name argument.
314         * <p>
315         * (Big secret: if this class is used behind, for example,
316         * ExhibitDataSimpleCache, this method will never be called
317         * because all such calls are answered direct from the cache.)
318         */
319        public ExhibitStaticAttr getStaticAttr(final ExhibitFull name)
320            throws IOException
321            {
322            final ByteArrayOutputStream baos =
323                new ByteArrayOutputStream(name.length());
324            final DataOutputStream dos = new DataOutputStream(baos);
325            dos.writeUTF(name.toString());
326            final byte data[] = baos.toByteArray();
327            dos.close(); // Attempt to free some resources quickly.
328    
329            final RawPacket request = new RawPacket(
330                RawPacket.OpCode.GetStaticAttr,
331                data);
332    
333            final RawPacket response = doRPC(request);
334            return((ExhibitStaticAttr) response.getSerializedObjectPayload());
335            }
336    
337        /**Get a chunk of the raw exhibit binary.
338         * The start position must be non-negative.
339         * <p>
340         * Sends name as a UTF string and allows compression of the request.
341         * (We use UTF since we know that the length is limited and
342         * the all-ASCII nature of valid names should yield
343         * one-byte-per-char encoding before compression.)
344         * <p>
345         * We send the start and (computed) afterEnd arguments as int values.
346         * <p>
347         * We return the data as-is.
348         *
349         *
350         * @throws java.lang.IllegalArgumentException  for blatantly invalid name,
351         *     or non-positive length
352         */
353        public void getRawFile(final ByteBuffer buf,
354                               final Name.ExhibitFull exhibitName,
355                               final int position,
356                               final boolean dontCache)
357            throws IOException
358            {
359            // We check that that the name is not blatantly illegal...
360            if(!ExhibitName.validNameSyntaxBasic(exhibitName))
361                { throw new IllegalArgumentException("invalid name"); }
362    
363            // We don't allow zero-byte requests over the expensive tunnel.
364            int len = buf.limit() - buf.position();
365            if(len < 1)
366                { throw new IllegalArgumentException("request length must be positive"); }
367    
368            // Limit any request to a size that the back-end would allow.
369            if(len > MAX_USER_READ_SIZE)
370                { len = MAX_USER_READ_SIZE; }
371    
372            if(ExhibitDataTunnelSource._protocolDebug)
373                {
374                System.err.println("ExhibitDataTunnelSource.getRawFile(" +
375                    "len="+len+", dontCache="+dontCache+")");
376                }
377    
378            // Do the transfer...
379            final int afterEnd = position + len;
380    
381    //        final ByteArrayOutputStream baos =
382    //            new ByteArrayOutputStream(11 + exhibitName.length());
383    //        final DataOutputStream dos = new DataOutputStream(baos);
384    //        dos.writeUTF(exhibitName.toString());
385    //        dos.writeInt(position);
386    //        dos.writeInt(afterEnd);
387    //        dos.writeBoolean(dontCache);
388    //        final byte data[] = baos.toByteArray();
389    //        dos.close(); // Attempt to free some resources quickly.
390    
391            // Optimised creation of outgoing packet.
392            final int nameLength = exhibitName.length();
393            final byte[] data = new byte[11 + nameLength];
394            data[0] = (byte) (nameLength >>> 8);
395            data[1] = (byte) nameLength;
396            exhibitName.writeToByteArray(data, 2);
397            data[2 + nameLength] = (byte) (position >>> 24);
398            data[3 + nameLength] = (byte) (position >>> 16);
399            data[4 + nameLength] = (byte) (position >>>  8);
400            data[5 + nameLength] = (byte) position;
401            data[6 + nameLength] = (byte) (afterEnd >>> 24);
402            data[7 + nameLength] = (byte) (afterEnd >>> 16);
403            data[8 + nameLength] = (byte) (afterEnd >>>  8);
404            data[9 + nameLength] = (byte) afterEnd;
405            data[10 + nameLength] = (byte) (dontCache ? 1 : 0);
406    
407            final RawPacket request = new RawPacket(
408                RawPacket.OpCode.GetRawFile,
409                data);
410    
411            final RawPacket response = doRPC(request);
412            response.getPayloadCopy(buf); // Straight into buffer for efficiency.
413            }
414    
415        /**Gets all static exhibit data if its timestamp is not that specified.
416         * If the time specified is negative the object will be returned unconditionally.
417         * <p>
418         * If no exhibits are currently installed a default set with a zero
419         * timestamp is returned.
420         * <p>
421         * If the caller's copy appears to be up-to-date (eg the oldStamp
422         * matches that that we would have been returned) null is returned.
423         */
424        public AllExhibitImmutableData getAllExhibitImmutableData(final long oldStamp)
425            throws IOException
426            {
427            final RawPacket response = doRPC(
428                new RawPacket(RawPacket.OpCode.GetAllExhibitImmutableData, ExhibitDataTunnelSource.longSer(oldStamp)));
429            return((AllExhibitImmutableData) response.getSerializedObjectPayload());
430            }
431    
432        /**Gets set of all exhibit properties if its hash is not that specified.
433         * If the hash specified is negative the object will be returned unconditionally.
434         * <p>
435         * If no exhibits are currently installed
436         * then a default set with a zero timestamp is returned.
437         * <p>
438         * If the caller's copy appears to be up-to-date
439         * (eg the oldHash matches that that would have been returned)
440         * then null is returned.
441         * <p>
442         * Because the AEP drives the entire Gallery,
443         * we always let the request go over the tunnel even if recent calls have failed.
444         * For example, we don't want mirrors to remain out of sync for too long.
445         * <p>
446         * This also typically takes a lot of bandwidth and CPU/wallclock time,
447         * so is optimised and stripped down to a minimum,
448         * and relies on the AEP deserialisation for serious error checking.
449         * <p>
450         * It is hoped that using the streamed form will overlap I/O and CPU work
451         * and thus get the new AEP into the client quicker and smoother.
452         * <p>
453         * We attempt to be good citizens and note most RPC successes/failures
454         * to help maintain the tunnel's view of the master's status.
455         * <p>
456         * (Because satisfying these calls may take a long time,
457         * the upstream server may veto concurrent queries that return anything
458         * other than null.)
459         * <p>
460         * As a further attempt to minimise time and bandwidth,
461         * we may try the "diff" version of this call,
462         * falling back to the normal version if this fails.
463         */
464        public AllExhibitProperties getAllExhibitProperties(final long oldHash)
465            throws IOException
466            {
467            final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitProperties, ExhibitDataTunnelSource.longSer(oldHash));
468    
469            // The RPC may take a long time to compute the response to, so have a big read timeout.
470            final InputStream stream = doRPCRawWithStreamResponse(packetOut, true);
471            if(stream == null)
472                {
473                // An empty response means a null AEP response.
474                _noteResult(true); /* Note RPC success. */
475                return(null);
476                }
477    
478            // Actually deserialise from the input stream.
479            final ObjectInputStream ois = new ObjectInputStream(stream);
480            // For robustness, treat a class mismatch problem like a data problem.
481            // We rely on the AllExhibitProperties deserialisation for most error checking.
482            try
483                {
484                final AllExhibitProperties result = (AllExhibitProperties) ois.readObject();
485                _noteResult(true); /* Note RPC success. */
486    
487                // Warn if the remote end of the tunnel seems to have sent us a redundant value,
488                // ie sent us something we already have with the hash code that we specified.
489                if((result != null) && (result.longHash == oldHash))
490                    {
491                    final String message = "WARNING: tunnel fetched AEP with existing hash: " + oldHash;
492                    System.err.println(message);
493                    logger.log(message);
494                    }
495    
496                return(result);
497                }
498            catch(final ClassNotFoundException e) { throw new IOException(e.getMessage()); }
499            catch(final InterruptedIOException e)
500                {
501                // Not a failure: can retry later...
502                throw e; /* Rethrow error as-is. */
503                }
504            catch(final IOException e)
505                {
506                _noteResult(false); /* Note RPC failure. */
507                throw e; /* Rethrow error as-is. */
508                }
509            finally { ois.close(); }
510            }
511    
512        /**Maximum level of compression in AEP "diff" RPC supported in this implementation, client or server side. */
513        public static final CompressionLevel MAX_AEP_DIFF_COMP_LEVEL = CompressionLevel.LZMA; // GenUtils.MAX_SUPPORTED_COMPRESSION_LEVEL;
514    
515        /**Gets set of all exhibit properties if not that specified, attempting minimise data transferred across the tunnel.
516         * This is a tunnel-specific optimisation to minimise bandwidth.
517         * For some clients this will save money and time.
518         * <p>
519         * If the AEP specified is null then the remote AEP will be fetched and returned unconditionally.
520         * <p>
521         * If no exhibits are currently installed
522         * then a default set with a zero timestamp is returned.
523         * <p>
524         * If the caller's copy appears to be up-to-date
525         * (eg the oldHash matches that that would have been returned)
526         * then null is returned.
527         * <p>
528         * This is an attempted optimisation of the getAllExhibitProperties(long oldHash),
529         * returning a diff or extra-highly-compressed AEP representation.
530         * <p>
531         * This may not be supported at all by the remote end of the connection,
532         * or may fail for lack of resources, etc,
533         * so we can automatically fall back to the usual call in case of difficulty.
534         * <p>
535         * (We do not record failures as connection/tunnel problems,
536         * though we do note successful calls in favour of the connection status.)
537         *
538         * @param oldAEP  current latest AEP held by the caller
539         * @param allowAutoRecovery  if true then allow fallback to generic AEP fetch method
540         */
541        public AllExhibitProperties getAllExhibitProperties(final AllExhibitProperties oldAEP,
542                                                            final boolean allowAutoRecovery)
543            throws IOException
544            {
545            final long oldHash = (oldAEP == null) ? -1 : oldAEP.longHash;
546    
547            try
548                {
549                // Construct the header.
550                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
551                final DataOutputStream dos = new DataOutputStream(baos);
552                // Write existing AEP longHash, or -1 if no extant AEP.
553                dos.writeLong(oldHash);
554                // Write current maximum supported AEP compression level as a UTF String.
555                dos.writeUTF(MAX_AEP_DIFF_COMP_LEVEL.name());
556                dos.close();
557                final byte data[] = baos.toByteArray();
558                final RawPacket packetOut = new RawPacket(RawPacket.OpCode.GetAllExhibitPropertiesDiff, data);
559    
560                // An empty response (zero bytes) means that our AEP is up-to-date...
561                // The RPC may take a long time to compute the response to, so have a big read timeout.
562                final InputStream stream = doRPCRawWithStreamResponse(packetOut, true);
563                if(null == stream)
564                    {
565                    // An empty response means a null AEP response.
566                    _noteResult(true); /* Note RPC success. */
567                    return(null);
568                    }
569    
570                // Else read from the stream.
571                //  1) A UTF string for the compression level/type (abort if unsupported).
572                //  2) A compressed object stream (in the appropriate format) which may be:
573                //  2a) AllExhibitProperties, in which case return as-is.
574                //  2b) AllExhibitPropertiesDelta, in which case apply to the input AEP and return.
575                try
576                    {
577                    final DataInputStream dis = new DataInputStream(stream);
578                    final CompressionLevel cl = CompressionLevel.valueOf(dis.readUTF());
579                    if(cl.getLevel() > MAX_AEP_DIFF_COMP_LEVEL.getLevel())
580                        { throw new IOException("server returned data compressed at too high a level"); }
581                    final InputStream is = GenUtils.wrapForDecompression(stream, cl);
582                    final ObjectInputStream ois = new ObjectInputStream(is);
583                    // Read in the object from the stream on the fly.
584                    // TODO: we may want to add a data pump to keep data flowing during processing.
585                    final Object o = ois.readObject();
586    
587                    // If the result is a full (and hopefully super-compressed) AEP
588                    // then unpack it and return it here.
589                    if(o instanceof AllExhibitProperties)
590                        {
591                        final AllExhibitProperties result = (AllExhibitProperties) o;
592                        _noteResult(true); /* Note RPC success. */
593    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) full: "+cl+".]"); }
594    
595                        // Warn if the remote end of the tunnel seems to have sent us a redundant value,
596                        // ie sent us something we already have with the hash code that we specified.
597                        if((result != null) && (result.longHash == oldHash))
598                            {
599                            final String message = "WARNING: tunnel fetched AEP (supercompressed) with existing hash: " + oldHash;
600                            System.err.println(message);
601                            logger.log(message);
602                            }
603    
604                        return(result);
605                        }
606    
607                    // If the result is a diff then apply it to the input AEP.
608                    if(o instanceof AllExhibitPropertiesDelta)
609                        {
610                        final AllExhibitPropertiesDelta delta = (AllExhibitPropertiesDelta) o;
611                        final AllExhibitProperties result = AllExhibitPropertiesDelta.applyDiff(oldAEP, delta);
612    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource: successful getAEP(AEP) diff: #before/after="+delta.lengthAEIDBefore+"/"+delta.lengthAEIDAfter+", aep before/after="+oldAEP+"/"+result+", "+cl+".]"); }
613    
614    //                    /* TEMPORARY SAFETY MEASURE: REJECT RESULT WITH FEWER EPL/EPC ENTRIES... */
615    //                    if(result.getExhibitPropsLoadableMap().size() < oldAEP.getExhibitPropsLoadableMap().size())
616    //                        { throw new IOException("WARNING: rejecting AEP diff result with fewer EPLs..."); }
617    //                    if(result.getExhibitPropsComputableMap().size() < oldAEP.getExhibitPropsComputableMap().size())
618    //                        { throw new IOException("WARNING: rejecting AEP diff result with fewer EPCs..."); }
619    
620                        _noteResult(true); /* Note RPC success. */
621    
622                        // Warn if the remote end of the tunnel seems to have sent us a redundant value,
623                        // ie sent us something we already have with the hash code that we specified.
624                        if((result != null) && (result.longHash == oldHash))
625                            {
626                            final String message = "WARNING: tunnel fetched AEP (via delta) with existing hash: " + oldHash;
627                            System.err.println(message);
628                            logger.log(message);
629                            }
630    
631                        return(result);
632                        }
633    
634                    throw new IOException("unexpected return type");
635                    }
636                finally { stream.close(); /* Free (network) resources. */ }
637                }
638            catch(final InterruptedIOException e)
639                { throw e; } // Rethrow as-is: don't attempt recovery yet...
640            catch(final Exception e)
641                {
642                // Always log the problem/exception.
643                e.printStackTrace();
644    
645                if(!allowAutoRecovery)
646                    {
647                    if(e instanceof IOException) { throw (IOException) e; }
648                    final IOException err = new IOException("aborting after failed AEP diff RPC");
649                    err.initCause(e);
650                    throw err;
651                    }
652    
653                logger.log("ERROR: ExhibitDataTunnelSource: automatically recovering from AEP diff error: "+e.getMessage()+".");
654    
655                // In case of error, automatically fall back to generic hash-arg call.
656                // (Iff allowing auto-recovery...)
657                return(getAllExhibitProperties(oldHash));
658                }
659            }
660    
661        /**Helper method to serialise a single free-standing long value.
662         * The long value is serialised just as DataOutputStream would,
663         * as 8 bytes with the high-byte first.
664         */
665        public static byte[] longSer(final long value)
666            {
667            final byte[] result = new byte[8];
668            result[0] = (byte) (value >> 56);
669            result[1] = (byte) (value >> 48);
670            result[2] = (byte) (value >> 40);
671            result[3] = (byte) (value >> 32);
672            result[4] = (byte) (value >> 24);
673            result[5] = (byte) (value >> 16);
674            result[6] = (byte) (value >>  8);
675            result[7] = (byte) (value      );
676            return(result);
677            }
678    
679        /**Helper method to serialise a single free-standing int value.
680         * The int value is serialised just as DataOutputStream would,
681         * as 4 bytes with the high-byte first.
682         */
683        public static byte[] intSer(final int value)
684            {
685            final byte[] result = new byte[4];
686            result[0] = (byte) (value >> 24);
687            result[1] = (byte) (value >> 16);
688            result[2] = (byte) (value >>  8);
689            result[3] = (byte) (value      );
690            return(result);
691            }
692    
693        /**Gets the general properties as a GenProps object if its timestamp is not that specified.
694         * If the time specified is negative the object will be returned unconditionally.
695         * <p>
696         * If no props are currently installed/available a default set with a zero
697         * timestamp is returned.
698         * <p>
699         * If the caller's copy appears to be up-to-date (eg the oldStamp
700         * matches that that we would have been returned) null is returned.
701         * <p>
702         * On the wire this is (outbound) a 8-byte timestamp and
703         * by return should be an empty payload (corresponding to null)
704         * or a serialised GenProps object (uncompressed for now).
705         * <p>
706         * Because GenProps are so important to the functioning of the entire Gallery,
707         * we always let the request go over the tunnel even if recent calls have failed.
708         */
709        public org.hd.d.pg2k.svrCore.props.GenProps getGenProps(final long oldStamp)
710            throws IOException
711            {
712    //System.err.println("[Fetching GenProps over HTTP tunnel...]");
713    
714            final RawPacket response = doRPCUnguarded(
715                new RawPacket(RawPacket.OpCode.GetGenProps, longSer(oldStamp)));
716    
717            final GenProps result = (GenProps) response.getSerializedObjectPayload();
718    //System.err.println(" [Fetched GenProps over HTTP tunnel.]");
719            return(result);
720            }
721    
722        /**Gets the security properties as a Properties object if its timestamp is not that specified.
723         * If the time specified is negative the object will be returned unconditionally.
724         * <p>
725         * If no props are currently installed/available a default set with a zero
726         * timestamp is returned.
727         * <p>
728         * If the caller's copy appears to be up-to-date (eg the oldStamp
729         * matches that that would have been returned) null is returned.
730         */
731        public java.util.Properties getGenSecProps(final long oldStamp)
732            throws IOException
733            {
734    //System.err.println("[Fetching GenSecProps over HTTP tunnel...]");
735    
736            final RawPacket response = doRPC(
737                new RawPacket(RawPacket.OpCode.GetGenSecProps, longSer(oldStamp)));
738    
739            final Properties result = (Properties) response.getSerializedObjectPayload();
740    //System.err.println(" [Fetched GenSecProps over HTTP tunnel.]");
741            return(result);
742            }
743    
744        /**Gets the thumbnails for an exhibit.
745         * A data source is at liberty to refuse to compute thumbnails
746         * in which case it may return null, else it returns a
747         * non-null value which may include the `could-not-compute'
748         * value to indicate that a thumbnail/sample cannot be made
749         * for this exhibit and no attempt need be made in future.
750         * <p>
751         * Because it is important for a new mirror to gather thumbnails ASAP
752         * to show to a user, we make this call immune to being blocked
753         * when the tunnel connection appears poor.
754         *
755         * @param create  if true, and no thumbnail yet exists, try to
756         *     create one if possible, else only return an existing one
757         */
758        public ExhibitThumbnails getThumbnails(final ExhibitFull name, final boolean create)
759            throws IOException
760            {
761    //System.err.println("[Fetching Thumbnails over HTTP tunnel...]");
762    
763    //        final ByteArrayOutputStream baos =
764    //            new ByteArrayOutputStream(name.length());
765    //        final DataOutputStream dos = new DataOutputStream(baos);
766    //        dos.writeUTF(name.toString());
767    //        dos.writeBoolean(create);
768    //        final byte data[] = baos.toByteArray();
769    //        dos.close(); // Attempt to free some resources quickly.
770    
771            final int nameLength = name.length();
772            final byte[] data = new byte[3 + nameLength];
773            data[0] = (byte) (nameLength >>> 8);
774            data[1] = (byte) nameLength;
775            name.writeToByteArray(data, 2);
776            data[2 + nameLength] = (byte) (create ? 1 : 0);
777    
778            final RawPacket response = doRPCUnguarded(
779                new RawPacket(RawPacket.OpCode.GetThumbnails, data));
780    
781            final ExhibitThumbnails result = (ExhibitThumbnails) response.getSerializedObjectPayload();
782    //System.err.println(" [Fetched Thumbnails over HTTP tunnel.]");
783            return(result);
784            }
785    
786        /**Set variable.
787         * Only non-local values are propagated upstream;
788         * others are explicitly rejected with an UnsupportedOperationException.
789         *
790         * @throws java.io.IOException in case of I/O problems
791         * @throws java.lang.UnsupportedOperationException  when attempting to set locals
792         */
793        public void setVariable(final SimpleVariableValue newValue)
794            throws IOException,
795            UnsupportedOperationException
796            {
797            if(newValue == null) { throw new IllegalArgumentException(); }
798            if(newValue.getDef().isLocal())
799                { throw new UnsupportedOperationException("cannot send local variables over the tunnel"); }
800            // Implement in terms of setVariables().
801            setVariables(new SimpleVariableValue[]{newValue});
802            }
803    
804        /**Adjust globalMap for outgoing (upstream) set operation.
805         * This expects the value passed to be:
806         * <ul>
807         * <li>Non-null
808         * <li>Non-local
809         * <li>Read/write
810         * <li>To have an empty globalMap or one with exactly one entry
811         *     whose key is the system ID for this system;
812         *     if the map is empty then this system is put in the global map
813         * </ul>
814         * <p>
815         * If the argument is in the correct form, it is returned unchanged.
816         * <p>
817         * If the argument is illegal, ie not suitable to send upstream,
818         * then an exception is thrown to veto the send.
819         *
820         * @throws java.lang.IllegalArgumentException  if an invalid argument is passed,
821         *     eg unable to be converted to be sent upstream because it has
822         *     an invalid globalMap
823         */
824        private SimpleVariableValue adjustGlobalMapForSet(final SimpleVariableValue svv)
825            {
826            if(svv == null)
827                { throw new IllegalArgumentException(); }
828    
829            final SimpleVariableDefinition def = svv.getDef();
830            if(def.isLocal() ||
831               def.isReadOnly() ||
832               !SystemVariables.defs.contains(def))
833                { throw new IllegalArgumentException(); }
834    
835            final Map<InstanceID,SimpleVariableValue> globalMap = svv.getGlobalMap();
836            if((globalMap != null) && (globalMap.size() > 1))
837                { throw new IllegalArgumentException(); }
838    
839            // If no globalMap,
840            // then make sure that we make an explicit map entry for this system.
841            final InstanceID sysID = (InstanceID) uniqueClientID.getValue();
842            if(globalMap == null)
843                {
844                // We're done.
845                return(svv.put(sysID, svv, true));
846                }
847    
848            if(!Collections.singleton(sysID).equals(globalMap.keySet()))
849                {
850                throw new IllegalArgumentException("globalMap contains ID that is not ours during set: " + sysID);
851                }
852    
853            // Value is fine as it is, so return it untouched...
854            return(svv);
855            }
856    
857        /**Set variables; must not be null or contain nulls.
858         * Only non-local variables are sent upstream;
859         * locals are silently discarded.
860         * <p>
861         * Variables not defined in SystemVariables are rejected.
862         * <p>
863         * Duplicates may be discarded or rejected,
864         * or sent as is in which case it is undefined in which order they
865         * are applied.
866         * <p>
867         * We try to set all values that we can for
868         * <p>
869         * Format on the wire:
870         * <ul>
871         * <li>Outbound: serialised SimpleVariableValue[]
872         * <li>Inbound: number of values set (int)
873         * </ul>
874         *
875         * @throws java.io.IOException in case of I/O problems
876         */
877        public int setVariables(final SimpleVariableValue[] newValues)
878            throws IOException,
879            UnsupportedOperationException
880            {
881            if(newValues == null) { throw new IllegalArgumentException(); }
882    
883            // Make a set of all non-local (and writable) values to pass upstream.
884            // Preserve order...
885            final int nvLen = newValues.length;
886            final List<SimpleVariableValue> toGo = new ArrayList<SimpleVariableValue>(nvLen);
887            for(int i = 0; i < nvLen; ++i)
888                {
889                final SimpleVariableValue svv = newValues[i];
890    
891                // Reject null values...
892                if(svv == null)
893                    { throw new IllegalArgumentException(); }
894    
895                final SimpleVariableDefinition def = svv.getDef();
896    
897                // Skip local values;
898                // they should not be propagated out of the machine.
899                // Silently ignore this so that any globals can be sent.
900                if(def.isLocal())
901                    { continue; }
902    
903                // Adjust globalMap if necessary for sending upstream.
904                // Veto invalid values with an IllegalArgumentException.
905                toGo.add(adjustGlobalMapForSet(svv));
906                }
907    
908            // If nothing to send, then return immediately.
909            if(toGo.size() == 0)
910                { return(0); }
911    
912            // TODO: eliminate duplicates
913            // TODO: sort into name order to help compression (but preserving order for the same variable)
914    
915            // Send set of values upstream
916            // by putting them into an array and serialising it.
917            final SimpleVariableValue r[] = new SimpleVariableValue[toGo.size()];
918            toGo.toArray(r);
919    
920            // Make the RPC.
921            final RawPacket response = doRPC(new RawPacket(
922                RawPacket.OpCode.SetVariables,
923                r,
924                true)); // Compression may be useful...
925    
926            // Extract return value...
927            final DataInputStream dis = new DataInputStream(response.getPayloadAsInputStream());
928            try
929                {
930                final int nSet = dis.readInt();
931                // Validate result returned.
932                if((nSet < 0) || (nSet > nvLen))
933                    { throw new IOException("corrupt reply packet: nSet=" + Integer.toHexString(nSet)); }
934                return(nSet);
935                }
936            finally { dis.close(); }
937            }
938    
939        /**Get variable, or returns null if no such non-local variable.
940         * We attempt only to fetch non-local variables; return null for others
941         * (except for the system ID).
942         * <p>
943         * When asked for the the system ID,
944         * we always answer the question locally.
945         * <p>
946         * Note that the local system ID can be retrieved whether or not
947         * the master server is responding.
948         * <p>
949         * Format on the wire is:
950         * <ul>
951         * <li>Outbound: variable name in UTF format
952         * <li>Inbound: serialised form of SimpleVariableValue (or empty for null)
953         * </ul>
954         *
955         * @throws java.io.IOException  in case of I/O error
956         * @throws UnsupportedOperationException  if a local variable
957         *     other than the system ID is requested
958         */
959        public SimpleVariableValue getVariable(final SimpleVariableDefinition var)
960            throws IOException,
961                   UnsupportedOperationException
962            {
963            if(var == null) { throw new IllegalArgumentException(); }
964    
965            // If the request is for the system ID,
966            // return it here.
967            if(var.equals(SystemVariables.LOCAL_SYS_ID))
968                { return(uniqueClientID); }
969    
970            // Always return null to requests for local data (other than system ID);
971            // local variables should not be propagated over the tunnel.
972            if(var.isLocal())
973                { throw new UnsupportedOperationException("local variables not available at tunnel source"); }
974    
975            // Fetch individual variable value over tunnel...
976            // We only send the name,
977            // and get back a whole variable value,
978            // but on return check for a compatible definition,
979            // returning null if the definitions don't match.
980            final String name = var.getName();
981            final ByteArrayOutputStream baos =
982                new ByteArrayOutputStream(name.length());
983            final DataOutputStream dos = new DataOutputStream(baos);
984            dos.writeUTF(name);
985            final byte data[] = baos.toByteArray();
986            dos.close(); // Attempt to free some resources quickly.
987    
988            final RawPacket request = new RawPacket(
989                RawPacket.OpCode.GetVariable,
990                data);
991    
992            final RawPacket response = doRPC(request);
993            final SimpleVariableValue svv =
994                (SimpleVariableValue) response.getSerializedObjectPayload();
995    
996            // If there was no variable,
997            // then return null immediately.
998            if(svv == null)
999                { return(null); }
1000    
1001            // Discard incompatible types,
1002            // and local variables.
1003            if(!var.equals(svv.getDef()))
1004                { return(null); }
1005    
1006            return(svv);
1007            }
1008    
1009        /**Fetch variable values from the master.
1010         * We fetch values from the master
1011         * (from which we eliminate any illegal values such as local variables
1012         * or values which are not locally valid)
1013         * and insert our unique local system ID
1014         * if the request stamp is -1 or older than our creation.
1015         * <p>
1016         * Note that the local system ID can be retrieved whether or not
1017         * the master server is responding providing the stamp is -1,
1018         * though in that case it might be the only value returned.
1019         * The caller should periodically use -1 to ensure that they
1020         * have a full set of global variables, eg in case the server rebooted.
1021         * <p>
1022         * Format on the wire:
1023         * <ul>
1024         * <li>Outbound: serialised timestamp
1025         * <li>Inbound: SimpleVariableValue[]
1026         * </ul>
1027         *
1028         * @throws java.io.IOException
1029         */
1030        public SimpleVariableValue[] getVariables(final long changedSince)
1031            throws IOException
1032            {
1033            final List<SimpleVariableValue> incoming = new ArrayList<SimpleVariableValue>();
1034    
1035            // Fetch all (global) variables over the tunnel.
1036            try
1037                {
1038                final RawPacket response = doRPC(
1039                    new RawPacket(RawPacket.OpCode.GetVariables, longSer(changedSince)));
1040                // Copy incoming array to List...
1041                incoming.addAll(Arrays.asList(
1042                    (SimpleVariableValue[]) response.getSerializedObjectPayload()));
1043                }
1044            catch(final IOException e)
1045                {
1046                // If changedSince == -1 we guarantee to return the local ID,
1047                // else we rethrow the exception...
1048                if(changedSince != -1) { throw e; }
1049                }
1050    
1051            // Remove any extant system ID value,
1052            // and any local variable (should not have come up the wire),
1053            // and any value that does not have an identical local definition.
1054            // We do not expect to find/remove any of these in normal operation.
1055            final SimpleVariableDefinition uCIDDef = uniqueClientID.getDef();
1056            for(int i = incoming.size(); --i >= 0; )
1057                {
1058                final SimpleVariableValue svv =
1059                        incoming.get(i);
1060                final SimpleVariableDefinition def = svv.getDef();
1061                if(def.equals(uCIDDef) ||
1062                   def.isLocal() ||
1063                   !SystemVariables.defs.contains(def) ||
1064                   !def.equals(SystemVariables.nameToDef.get(def.getName())))
1065                    {
1066                    incoming.remove(i);
1067                    continue;
1068                    }
1069                }
1070    
1071            // Insert the system ID variable value into the result if appropriate.
1072            if(changedSince <= uniqueClientID.getTimestamp()) // Handles -1 case...
1073                {
1074                incoming.add(uniqueClientID);
1075                }
1076    
1077            final SimpleVariableValue result[] =
1078                new SimpleVariableValue[incoming.size()];
1079            incoming.toArray(result);
1080            return(result);
1081            }
1082    
1083        /**Get the current partial, or previous full, event set at the specified interval; never null.
1084         * This is a simplified interface to return either the current event set
1085         * that is being collected, or the previous completed set.
1086         * <p>
1087         * The current set is the most timely, but may not contain enough data
1088         * to be meaningful if the new interval has just started.
1089         * <p>
1090         * The previous set is complete and thus most likely to have enough samples
1091         * to be useful, but is not completely current.
1092         * <p>
1093         * Implemented in terms of the more general call
1094         * in the hope that batched calls for several values will be
1095         * more common and more efficient.
1096         * This also reduces the number of distinct RPC calls that
1097         * we have to implement.
1098         *
1099         * @param def  event definition (must be for an event); never null
1100         * @param intervalSelector  one of EVENT_INTERVAL_SELECTOR_xxx values
1101         * @param current  if true the current event set is returned,
1102         *     else the previous complete set is returned
1103         *
1104         * @return  requested event set; may be empty but never null if requested set not available
1105         */
1106        public EventVariableValue getEventValue(final SimpleVariableDefinition def,
1107                                                final EventPeriod intervalSelector,
1108                                                final boolean current)
1109            {
1110            if((def == null) || (intervalSelector == null))
1111                { throw new IllegalArgumentException(); }
1112    
1113            final long now = System.currentTimeMillis();
1114            final long currentIntervalNumber = intervalSelector.getIntervalNumber(now);
1115            final long intervalNumber = (current ? currentIntervalNumber : (currentIntervalNumber-1));
1116    
1117            final EventVariableValue evv[] = getEventValues(def,
1118                                                            intervalSelector,
1119                                                            intervalNumber,
1120                                                            null);
1121    
1122            // If we got some real (non-null) data back, then return it...
1123            if((evv.length > 0) && (evv[0] != null))
1124                { return(evv[0]); }
1125    
1126            // Else get empty non-authoritative return value (with no events)...
1127            return(BasicVarMgr.getEmptyNonAuthEVV(def, intervalSelector, intervalNumber));
1128            }
1129    
1130        /**Get the specified global event sets for the specified intervals; never null.
1131         * This allows retrieval of zero or more event sets for the specified
1132         * interval size.
1133         * <p>
1134         * Requests for more than SystemVariables.EVENT_SAMPLES_RETAINED in the
1135         * past (or for the future!) cannot be satisfied and data will not be
1136         * returned for them.
1137         * <p>
1138         * Usually not more than SystemVariables.EVENT_SAMPLES_RETAINED samples
1139         * will be returned in response to any one request as a safety measure.
1140         * <p>
1141         * We do various small optimisations to reduce fruitless traffic over
1142         * what may be a slow and/or expensive connection:
1143         * <ul>
1144         * <li>This will reject requests will huge sets that go back to far so as
1145         *     to avoid huge RPC arguments and since the request is unlikely to be
1146         *     satisfiable.  The upstream implementation may be even more restrictive.
1147         * <li>This ignores requests too far in the future or the past.
1148         * <li>Trim leading empty slots from request sent over the tunnel.
1149         * </ul>
1150         * <p>
1151         * The responses from the server may be large and slow,
1152         * so we allow streaming to give better incremental CPU/resource consumption
1153         * and better throughput.
1154         * <p>
1155         * Returns an empty result if asked for a 'local' value.
1156         * <p>
1157         * Returns an empty result if the tunnel is currently broken
1158         * or in case of other non-permanent error.
1159         * <p>
1160         * TODO: Optimise RPC call by allowing for sparse requests with different request/response format.
1161         *
1162         * @param def  event definition (must be for an event); never null
1163         * @param intervalSelector  one of EVENT_INTERVAL_SELECTOR_xxx values
1164         * @param intervalNumber  a time (as from System.currentTimeMillis())
1165         *     which identifies the first interval for which data is potentially
1166         *     required; if too far in the past or future then possibly no data
1167         *     will be available,
1168         *     zero is used to access the "all" bucket
1169         * @param whichValues  each true bit represents a slot for which data is
1170         *     required, bit 0 indicating data from the slot within which
1171         *     firstIntervalTime is located, bit 1 the previous slot, etc;
1172         *     null is treated as the common case equivalent to just bit 0 set
1173         *
1174         * @return as many of the requested values as available,
1175         *     at least long enough to return all the available values,
1176         *     with [0] corresponding to bit 0 in the BitSet;
1177         *     may contain nulls or be zero-length (eg in case of error) but is never null
1178         */
1179        public EventVariableValue[] getEventValues(final SimpleVariableDefinition def,
1180                                                   final EventPeriod intervalSelector,
1181                                                   final long intervalNumber,
1182                                                   BitSet whichValues)
1183            {
1184            if((def == null) ||
1185                (intervalSelector == null) ||
1186                (intervalNumber < 0))
1187                { throw new IllegalArgumentException(); }
1188    
1189            if(def.isLocal())
1190                { return(NO_EVENT_VALUES); }
1191    
1192            // Don't bother trying this if the connection is currently known to be broken.
1193            if(isBroken()) // { throw new PGMasterNotInServiceException(); }
1194                { return(NO_EVENT_VALUES); }
1195    
1196            // See if we can optimise the whichValues argument by replacing it
1197            // with a null value...
1198            // This will reduce heap churn, data on the wire, etc.
1199            if((whichValues != null) && (whichValues.length() == 1) && whichValues.get(0))
1200                { whichValues = null; }
1201    
1202            // Various optimisations are possible for a null BitSet arg.
1203            final boolean nullWV = (whichValues == null);
1204    
1205            // Take copy of BitSet to ensure that it is minimal size,
1206            // and to reduce thread/race/safety issues.
1207            final BitSet wvCopy = nullWV ? null : new BitSet(whichValues.length());
1208            if(!nullWV) { wvCopy.or(whichValues); }
1209    
1210            final int wvl = nullWV ? 1 : wvCopy.length();
1211    
1212            // Avoid pointless call for no request bits.
1213            if(wvl < 1)
1214                { return(NO_EVENT_VALUES); }
1215    
1216            // Avoid a huge request/response packet.
1217            if(wvl > 2*SystemVariables.EVENT_SAMPLES_RETAINED)
1218                { throw new IllegalArgumentException("request set too large to send over the wire"); }
1219    
1220            // Avoid a call for values too far in the past to be likely to be available.
1221            // (This also ensures that the intervalNumber is large and positive for later.)
1222            // Although we allow a zero value for the "all" bucket.
1223            final long currentInterval = intervalSelector.getIntervalNumber(System.currentTimeMillis());
1224            if(intervalNumber == 0)
1225                {
1226                if(!nullWV && (!whichValues.get(0) || (wvl != 1)))
1227                    { throw new IllegalArgumentException(); }
1228                }
1229            else if(intervalNumber < currentInterval - 2*SystemVariables.EVENT_SAMPLES_RETAINED - 1)
1230                { return(NO_EVENT_VALUES); }
1231    
1232            // Avoid a call for values far too far in the future
1233            // for any skew to account for, etc, so we're sure no data is available.
1234            if(intervalNumber > currentInterval + 4*SystemVariables.EVENT_SAMPLES_RETAINED + 1)
1235                { return(NO_EVENT_VALUES); }
1236    
1237            // Get the offset of the first item in the request...
1238            // If this is greater than zero
1239            // then we can in principle optimise the request/response
1240            // by omitting the leading empty slots.
1241            final int firstItemOffset = nullWV ? 0 : wvCopy.nextSetBit(0);
1242            assert(firstItemOffset >= 0);
1243    
1244            // Reduce the BitSet that we send if the request can be trimmed...
1245            final BitSet wvToSend = (firstItemOffset == 0) ? wvCopy :
1246                (wvCopy.get(firstItemOffset, wvl));
1247    
1248            // Request is:
1249            //   * A byte consisting of the ordinal of period/interval enum ordinal.
1250            //   * The UTF-8 representation of the name of the definition.
1251            //   * The interval number (8 bytes).
1252            //   * The serialised form of the request BitSet if non-null.
1253            // Result is:
1254            //   * The in-order array of EventVariableValues (never null).
1255            final String name = def.getName();
1256            final ByteArrayOutputStream baos = new ByteArrayOutputStream(32 + name.length() + wvl/2);
1257            final DataOutputStream dos = new DataOutputStream(baos);
1258            final byte data[];
1259            try
1260                {
1261                dos.write(intervalSelector.ordinal());
1262                dos.writeUTF(name);
1263                dos.writeLong(intervalNumber - firstItemOffset);
1264                if(wvToSend != null)
1265                    {
1266                    final ObjectOutputStream oos = new ObjectOutputStream(dos);
1267                    oos.writeObject(wvToSend);
1268                    oos.flush();
1269                    }
1270                dos.flush();
1271                data = baos.toByteArray();
1272                dos.close(); // Attempt to free some resources quickly.
1273                }
1274            catch(final Exception e) { throw new Error(e); /* Should never happen. */ }
1275    
1276            final RawPacket request = new RawPacket(
1277                RawPacket.OpCode.GetEventValues,
1278                data);
1279    
1280            // Stream the result, since it may be large enough to be on the wire for a while.
1281            final EventVariableValue[] evvsRaw;
1282            try
1283                {
1284                final InputStream stream = doRPCRawWithStreamResponse(request, false);
1285                if(stream == null)
1286                    { throw new IOException("unexpected null return value"); }
1287                try
1288                    {
1289                    final ObjectInputStream ois = new ObjectInputStream(stream);
1290                    evvsRaw = (EventVariableValue[]) ois.readObject();
1291                    ois.close();
1292                    }
1293                finally { stream.close(); /* Ensure underlying resources are released. */ }
1294    
1295                // Check for unexpected return values.
1296                if(evvsRaw == null)
1297                    { throw new IOException("unexpected null return value"); }
1298    
1299                // If we offset the request then make a copy set back.
1300                final EventVariableValue[] evvs;
1301                if(firstItemOffset != 0)
1302                    {
1303                    evvs = new EventVariableValue[evvsRaw.length + firstItemOffset];
1304                    System.arraycopy(evvsRaw, 0, evvs, firstItemOffset, evvsRaw.length);
1305                    }
1306                else
1307                    {
1308                    // Did not offset the request...
1309                    evvs = evvsRaw;
1310                    }
1311    
1312                // Check for unexpected/incorrect return values.
1313                for(int i = evvs.length; --i >= 0; )
1314                    {
1315                    final EventVariableValue evv = evvs[i];
1316                    if(evv == null) { continue; }
1317                    if(!def.equals(evv.getDef()))
1318                        { throw new IOException("unexpected return value with wrong def"); }
1319                    if(!intervalSelector.equals(evv.getPeriod()))
1320                        { throw new IOException("unexpected return value with wrong period"); }
1321                    if(evv.getIntervalNumber() != intervalNumber - i)
1322                        { throw new IOException("unexpected return value with wrong interval number"); }
1323                    }
1324    
1325                _noteResult(true); // Seems to have succeeded.
1326                return(evvs);
1327                }
1328            // Deal with non-permanent errors as empty result value.
1329    //        catch(final IOException e) { _noteResult(false); throw e; }
1330            catch(final Exception e) { _noteResult(false); return(NO_EVENT_VALUES); }
1331            }
1332    
1333        /**Synchronise variables with upstream values.
1334         * Pushes updated values upstream to the source,
1335         * calls sync on the source if called with the "force" argument true,
1336         * and then retrieves changed values from upstream.
1337         * <p>
1338         * When called with force==true, this acts like a full "memory barrier",
1339         * flushing all write-cached items downstream immediately and afterwards
1340         * getting the value of all upstream values with getVariables(-1),
1341         * but may be expensive in terms of CPU or bandwidth, so use sparingly.
1342         * <p>
1343         * When called with force=false, this returns immediately
1344         * and the operation is not propagated across the tunnel.
1345         * <p>
1346         * In any case, it is rarely the right thing for a casual user
1347         * to vall this as it may be very expensive.
1348         *
1349         * @param force  if true, this will force a full write flush,
1350         *     a full sync upstream,
1351         *     then full read with getVariables(-1),
1352         *     to get the effect of a full "barrier";
1353         *     otherwise, do nothing
1354         *
1355         * @throws IOException if one is received from upstream
1356         */
1357        public void syncVariables(final boolean force)
1358            throws IOException
1359            {
1360            // Do not propagate across tunnel unless "force"d.
1361            if(!force)
1362                { return; }
1363    
1364            // Propagate the "force"d sync upstream,
1365            // and return when done...
1366            doRPC(FixedEmptyFrames.RAW_PACKET_SV);
1367            }
1368    
1369    
1370        /**Get requested Properties selected by key and versionID.
1371         * Fetches a Properties set unconditionally (versionID == -1)
1372         * else if the versionID presented is not current.
1373         *
1374         * @param key  selector (with possible embedded sub-key)
1375         *     for desired properties set; never null
1376         * @param versionID  if -1 then map is always returned if available,
1377         *     else must be non-negative and null is returned if the versionID
1378         *     presented matches that of the current version
1379         *     (ie if the caller has presumably got the up-to-date version);
1380         *     may be a timestamp or a hash or other value,
1381         *     and by convention is zero only for an empty properties set
1382         *
1383         * @return null, or Properties map guaranteed to contain only
1384         *     String keys and values
1385         */
1386        public java.util.Properties getProperties(final PropsKey key,
1387                                                  final long versionID)
1388            throws IOException
1389            {
1390            throw new IOException("NOT IMPLEMENTED");
1391            }
1392    
1393        /**Returns the (incremented) upstream stratum adjusted to include transit time; never null.
1394         * Returns Stratum.UNKNOWN if the upstream stratum is unknown
1395         * or the upstream instance is already at the maximum stratum.
1396         */
1397        public Stratum getStratum()
1398            throws IOException
1399            {
1400            final long before = System.currentTimeMillis();
1401            final RawPacket response = doRPCUnguarded(FixedEmptyFrames.RAW_PACKET_GETSTRATUM);
1402            final long after = System.currentTimeMillis();
1403    
1404            // Get raw result from upstream host.
1405            final Stratum upstream = (Stratum) response.getSerializedObjectPayload();
1406            assert(null != upstream);
1407    
1408            // If 'unknown' upstream stratum then return built-in 'unknown' value.
1409            if(upstream.isUnknownStratum()) { return(Stratum.UNKNOWN); }
1410    
1411            // If upstream is already at maximum stratum then this instance must be regarded as unknown stratum.
1412            if(upstream.getStratum() >= Stratum.MAX_STRATUM) { return(Stratum.UNKNOWN); }
1413    
1414            // Sanitised round-trip time.
1415            final int rtt = (int) Math.min(2*(int)Stratum.MAX_ROOT_DELAY, Math.max(0, after - before));
1416    
1417            // Create a new value for this instance with an incremented stratum
1418            // and one half the RTT to our immediate upstream peer added to the root delay.
1419            final Stratum processed = new Stratum(1 + upstream.getStratum(),
1420                                                  (rtt>>1) + upstream.getRootDelay(),
1421                                                  _getStratumUpstreamName(),
1422                                                  upstream.isUpstreamConserving());
1423    
1424            return(processed);
1425            }
1426    
1427        /**Return short unique name of upstream peer/server suitable for Stratum; never null but can be "" for 'unknown'.
1428         * Override this in implementations that know the upstream name.
1429         */
1430        protected String _getStratumUpstreamName()
1431            {
1432            return(""); // Unknown upstream peer/server name.
1433            }
1434    
1435        /**Poll periodically.
1436         * We can use this to attempt to repair ailing connections, etc,
1437         * as well as poll for asynchronous messages being sent to us by the master.
1438         * <p>
1439         * However, our general policy is not to force traffic if we need not.
1440         * <p>
1441         * This can be overridden by a derived class,
1442         * though it is suggested that this be called with super.poll(gp) if so.
1443         */
1444        public void poll(final GenProps gp)
1445            {
1446            if(KEEP_SERVER_CONNECTION_ALIVE)
1447                {
1448                // If we have a broken connection to the master,
1449                // then a little while before other users can retry it,
1450                // we retry it to see if the master is reachable again.
1451                //
1452                // This enables us to repair a connection silently,
1453                // so that a normal caller is never blocked until
1454                // it is repaired (other calls are vetoed quickly).
1455                //
1456                // Our calls are still backed off if the master is really unwell.
1457                //
1458                // This does not try to force an initial connection,
1459                // just repair a failing one.
1460                final Long notUntil = doNotTryMasterUntil;
1461                if((notUntil != null) &&
1462                   (notUntil.longValue() - _REPAIR_RETRY_LEAD_MS < System.currentTimeMillis()))
1463                    {
1464                    try { doNOOP(true); }
1465                    catch(final Exception e)
1466                        {
1467                        // It is possible that our connection problem is caused
1468                        // by a resource leak (eg of unclosed descriptors) elsewhere
1469                        // so try and force come cleanup now.
1470                        //
1471                        // We rely on our normal pacing/backoff of retries
1472                        // to avoid us hurting the system too much if we are wrong!
1473                        if(FORCE_GC_AND_FINALISERS_AFTER_POLL_FAIL)
1474                            {
1475    logger.log("[ExhibitDataTunnelSource: WARNING: failing to re-establish connection so attempting to free resources with gc(), etc...]");
1476                            System.runFinalization();
1477                            System.gc();
1478                            System.runFinalization();
1479                            }
1480                        }
1481                    }
1482                }
1483            }
1484    
1485        /**May attempt to free up outbound connections and/or prevent new ones. */
1486        public void destroy()
1487            {
1488            // Prevent most new outgoing connections.
1489            doNotTryMasterUntil = new Long(Long.MAX_VALUE);
1490            }
1491    
1492    
1493        /**How soon ahead of other callers we try to silently repair a broken connection, ms; non-negative. */
1494        private final int _REPAIR_RETRY_LEAD_MS = 17001 + Rnd.fastRnd.nextInt(3007);
1495    
1496    
1497        /**Make an RPC call over the underlying medium with the given outgoing packet; never null.
1498         * This collects the response packet and will object
1499         * if it sees any IOException
1500         * or if the packets come back with the wrong op code.
1501         * <p>
1502         * This can be locked on the instance lock to serialise RPCs,
1503         * if the tunnel can only usefully handle one call at once.
1504         * <p>
1505         * Must be implemented by the deriving class
1506         * to suit its transmission medium.
1507         *
1508         * @param  packetOut  request packet; never null
1509         * @return response packet; never null
1510         * @throws java.io.IOException  in case of I/O difficulties
1511         */
1512        protected abstract RawPacket doRPCRaw(final RawPacket packetOut)
1513            throws IOException;
1514    
1515        /**Optimised RPC call with the given outgoing packet and returning packet body as an InputStream; null if an empty stream.
1516         * This is an <em>optimisation</em> for performance-critical cases <em>only</em>,
1517         * and foregoes some error checking/handling for speed
1518         * (and thus the caller should ensure that it performs integrity checks).
1519         * There is an onus on the streaming-related code to behave safely
1520         * even if fed bogus/corrupt data,
1521         * and it must be able to safely/easily undo any work done
1522         * if the message is found to be bogus as late as the final byte(s).
1523         * <p>
1524         * This streams the content of the response packet
1525         * and will object if it sees any IOException
1526         * or if the packets come back with the wrong op code.
1527         * <p>
1528         * A terminating trailer byte may or may not be visible on the returned stream
1529         * thus allowing the implementation to be as efficient as possible.
1530         * <p>
1531         * This may return after all the input data has been collected,
1532         * or while some or all is still to come,
1533         * and thus the returned stream may fail and throw an exception.
1534         * <p>
1535         * The first element of the result is the length of the response data
1536         * (not including any non-data trailer bytes from the packet even if present)
1537         * but may be null if this length is not known when the packet header is seen,
1538         * eg because the packet body was compressed.
1539         * <p>
1540         * This may be implemented/overridden by the deriving class
1541         * to suit its transmission medium,
1542         * and as an optimisation to reduce copying and allow streaming,
1543         * ie starting to process the input before it is all received.
1544         * <p>
1545         * The data stream is always of uncompressed data,
1546         * regardless of whether the data was sent compressed on the wire,
1547         * ie this routine will correctly decompress data on the fly as/when needed.
1548         * <p>
1549         * The caller <strong>must close</strong> the stream promptly
1550         * to release resources such as file handles and non-Java memory.
1551         *
1552         * @param  packetOut  request packet; never null
1553         * @param allowBigReadTimeout TODO
1554         * @return response  length and data stream; never null
1555         * @throws java.io.IOException  in case of I/O difficulties
1556         */
1557        protected InputStream doRPCRawWithStreamResponse(final RawPacket packetOut, final boolean allowBigReadTimeout)
1558            throws IOException
1559            {
1560            // Simple implementation in terms of basic doRPCRaw().
1561            final RawPacket result = doRPCRaw(packetOut);
1562    
1563            // Check for mismatched response packets
1564            // or remote exceptions codes with special response op-codes.
1565            if(result.opCode != packetOut.opCode)
1566                {
1567                // Deal specially with some fixed remote exception types.
1568                switch(result.opCode)
1569                    {
1570                    case PGMNISEX:
1571                        { throw new PGMasterNotInServiceException(); }
1572                    case RUNTEX:
1573                        { throw new RuntimeException("remote threw RuntimeException"); }
1574                    case REMEX:
1575                        { throw new IOException("remote threw RemoteException"); }
1576                    case INTEX:
1577                        { throw new InterruptedIOException("remote operation interrupted"); }
1578                    }
1579                throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode.getCode());
1580                }
1581    
1582            if(0 == result.getActualPayloadLength()) { return(null); }
1583            return(result.getPayloadAsInputStream());
1584            }
1585    
1586    
1587        /**Make an RPC call over HTTP[S] with the given outgoing packet.
1588         * This provides error recovery, back-off, stats, etc,
1589         * and serves as a wrapper for the medium-specific doRPCRaw().
1590         * <p>
1591         * Called by all the public data-pipeline methods to transport information
1592         * over the tunnel.
1593         *
1594         * @param  packetOut  request packet; never null
1595         * @return  response packet; never null
1596         * @throws java.io.IOException  in case of I/O difficulties
1597         */
1598        protected RawPacket doRPC(final RawPacket packetOut)
1599            throws IOException
1600            {
1601            // Note RPC request...
1602            StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCREQUEST);
1603            // Note its type...
1604            StatsLogger.captureDataPoint(statsIDTS, TSNAMEPR_RPCTYPE + ((int) packetOut.opCode.getCode()));
1605    
1606            // If last communication with master failed too recently,
1607            // quickly veto a new connection attempt for now.
1608            if(isBroken())
1609                {
1610    if(_protocolDebug) { System.err.println("[doRPC(): vetoing connection to broken master.]"); }
1611                throw new PGMasterNotInServiceException("no connection to master");
1612                }
1613    
1614            return(doRPCUnguarded(packetOut));
1615            }
1616    
1617        /**Just like doRPC() but does not back off in face of previous failures; never null.
1618         * Can be used for calls that must attempt to contact the master anyway,
1619         * or are probing for it to be alive.
1620         * <p>
1621         * Most callers should use the normal doRPC().
1622         *
1623         * @param packetOut  never null
1624         * @return  response to RPC call; never null
1625         */
1626        protected RawPacket doRPCUnguarded(final RawPacket packetOut)
1627            throws IOException
1628            {
1629            if(_protocolDebug) { System.err.println("[doRPC(): connecting to master.]"); }
1630    
1631            try {
1632                if(_protocolDebug) { System.err.println(" [doRPC(): about to do raw RPC.]"); }
1633    
1634                // Do the raw, medium-specific RPC.
1635                final RawPacket result = doRPCRaw(packetOut);
1636    
1637                if(_protocolDebug) { System.err.println(" [doRPC(): about to check result type.]"); }
1638    
1639                // Check for mismatched response packets
1640                // or remote exceptions codes with special response op-codes.
1641                if(result.opCode != packetOut.opCode)
1642                    {
1643                    // Deal specially with some fixed remote exception types.
1644                    // Note that we don't count a OP__RUNTEX as a success OR failure
1645                    // for the purposes of link monitoring;
1646                    // this may be the master gently rebuffing a request for some reasons.
1647                    switch(result.opCode)
1648                        {
1649                        case PGMNISEX:
1650                            { throw new PGMasterNotInServiceException(); }
1651                        case RUNTEX:
1652                            { throw new RuntimeException("remote threw RuntimeException"); }
1653                        case REMEX:
1654                            { throw new IOException("remote threw RemoteException"); }
1655                        case INTEX:
1656                            { throw new InterruptedIOException("remote operation interrupted"); }
1657                        }
1658                    throw new IOException("packet response-type mismatch; got "+result.opCode+" expected "+packetOut.opCode);
1659                    }
1660    
1661                // Note last successful use.
1662                _noteResult(true);
1663    
1664                if(_protocolDebug) { System.err.println(" [doRPC(): about to return result.]"); }
1665    
1666                // Return result successfully.
1667                return(result);
1668                }
1669            catch(final TunnelBusyIOException e)
1670                {
1671                // Not an error (nor a success); voluntary rejection of excess connection on our side.
1672                logger.log("ExhibitDataTunnelSource: deferred RPC connection attempt upstream, packet: " + packetOut + ", reason: " + e.getMessage());
1673                // Rethrow the error that we encountered...
1674                throw e;
1675                }
1676            catch(final InterruptedIOException e)
1677                {
1678                // Not an error (nor a success): can retry later.
1679                throw e;
1680                }
1681            catch(final IOException e)
1682                {
1683                // Regard (only) IOException as indicator or some sort of connection problem.
1684    
1685                // Note RPC request failure with IOException...
1686                StatsLogger.captureDataPoint(statsIDTS, TSNAME_RPCIOEX);
1687    
1688                // Note failed connection to the master...
1689                // This will defer our next attempt...
1690                _noteResult(false);
1691    
1692                // Note failure in the logs:
1693                logger.log("ExhibitDataTunnelSource: RPC FAILED connection attempt upstream, failed packet: " + packetOut + ", reason: " + e.getMessage());
1694                final long lastS = lastSuccessfulConnectionTime;
1695                if(lastS > 0)
1696                    { logger.log("[doRPC(): last success at "+(new Date(lastS))+".]"); }
1697                final Long notUntil2 = doNotTryMasterUntil;
1698                if(notUntil2 != null)
1699                    { logger.log("[doRPC(): deferring next attempt until " + (new Date(notUntil2.longValue()))  + ".]"); }
1700    
1701                // Rethrow the error that we encountered...
1702                throw e;
1703                }
1704            }
1705    
1706        /**Minimum time (ms) for which we will hold latest newly-created AEP response; strictly positive.
1707         * A client that automatically retries after an interrupted IO response
1708         * to some sort of AEP request indicating 'already in progress'
1709         * should retry within this time to avoid starting its creation all over again.
1710         * <p>
1711         * Note that in dire straits this lower limit may be ignored.
1712         * <p>
1713         * Should typically be of the order of many minutes.
1714         */
1715        public static final int MIN_AEP_RETENTION_MS = 17 * 60 * 1000;
1716    
1717        /**Cache/lock to improve performance of inbound RPC.
1718         * Meant to be opaque to all but handleInboundRPC()
1719         * and is used to provide a better overall responsiveness,
1720         * especially from expensive calls
1721         * by vetoing concurrent expensive calls
1722         * and by cacheing the responses from especially expensive calls.
1723         * <p>
1724         * Not all possibilities are exploited,
1725         * just those that in practice seem to be important.
1726         * <p>
1727         * Can be registered to free its own content automagically
1728         * if the system is very short of memory.
1729         */
1730        public static final class HIRPCCache implements MemoryTools.RecurrentEmergencyFreeHandle
1731            {
1732            /**Lock to prevent servicing more than one very expensive call at once.
1733             * We do this to reduce strain on the memory/GC and CPU of the server.
1734             */
1735            private final ReentrantLock slowResponseLock = new ReentrantLock();
1736    
1737            /**Time of creation / last use of basic AEP response; initially zero.
1738             * This covers the GZIPped / extra-compressed / delta responses for the latest AEP,
1739             * but no previous AEP values which are not essential to minimal client progress.
1740             */
1741            private volatile long currentAEPResponseCreatedOrLastUsed;
1742    
1743            /**Cache of current AEP as pair of longHash and response packet for simple AEP fetch RPC; mutable.
1744             * Initially null.
1745             * <p>
1746             * Elements of this pair are never null.
1747             * <p>
1748             * Marked volatile for safe multi-threaded access.
1749             * <p>
1750             * Currently held for at least for a minimum period,
1751             * until the AEP fetched has a new hash,
1752             * not for example held via a SoftReference
1753             * since a SoftReference would probably be cleared too soon
1754             * for this to be effective.
1755             * <p>
1756             * This is likely to be many MB in size.
1757             */
1758            private volatile Tuple.Pair<Long, RawPacket> _AEP_response;
1759    
1760            /**Cache of full extra-compressed AEP as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1761             * Initially null.
1762             * <p>
1763             * Elements of this tuple are never null.
1764             * <p>
1765             * Marked volatile for safe multi-threaded access.
1766             * <p>
1767             * Currently held for at least for a minimum period,
1768             * until the AEP fetched has a new hash,
1769             * not for example held via a SoftReference
1770             * since a SoftReference would probably be cleared too soon
1771             * for this to be effective.
1772             * <p>
1773             * This is likely to be many MB in size.
1774             */
1775            private volatile Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> _AEP_extracomp_response;
1776    
1777            /**Cache of extra-compressed AEP diff as tuple of longHash, compression format, and response packet for diff AEP fetch RPC; mutable.
1778             * Initially null.
1779             * <p>
1780             * Elements of this tuple are never null.
1781             * <p>
1782             * Marked volatile for safe multi-threaded access.
1783             * <p>
1784             * Currently held for at least for a minimum period,
1785             * until the AEP fetched has a new hash,
1786             * not for example held via a SoftReference
1787             * since a SoftReference would probably be cleared too soon
1788             * for this to be effective.
1789             * <p>
1790             * This is likely relatively small.
1791             */
1792            private volatile Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> _AEP_diff_response;
1793    
1794            /**Cache of previous AEP value to assist with AEP diffs; initially null.
1795             * Currently held indefinitely (or until very short of free memory),
1796             * until the AEP fetched has a new hash,
1797             * not for example held via a SoftReference
1798             * since a SoftReference would probably be cleared too soon
1799             * for this to be effective.
1800             * <p>
1801             * This should generally share most of its state with the current AEP
1802             * and therefore should not represent a significant extra memory burden.
1803             */
1804            private volatile AllExhibitProperties aepPrev;
1805    
1806            /**Soft cache of older AEP values to assist with AEP diffs; never null.
1807             * This is present to help with requests for diffs against AEPs
1808             * older than the previous value, if any such requests are made.
1809             * <p>
1810             * Thread-safe cache of SoftReferences to older AEPs,
1811             * mapped from the Long hash values.
1812             * <p>
1813             * Map is not auto-clear()ed on memory stress
1814             * other than the individual SoftReferences themselves during GC.
1815             * <p>
1816             * It is possible to synchronise on this instance to exclude other activity.
1817             */
1818            private final SoftReferenceMap<Long, AllExhibitProperties> prevAEPs =
1819                SoftReferenceMap.<Long, AllExhibitProperties>create("prevAEPs");
1820    
1821            /**Can be called to purge most or all internal state when very short of memory.
1822             * Implements the emergency-free handler.
1823             * <p>
1824             * May not purge some elements which are not (or likely not) using significant memory
1825             * but usually save a lot of time for us and for clients.
1826             * To that end we try and estimate the space they are taking
1827             * vs actual current free heap space.
1828             */
1829            public void run()
1830                {
1831                // Clear strongly-referenced previous AEP value if very short of free memory.
1832                // Clearing this will prevent us from creating and sending space-efficient AEP deltas.
1833                if((null != aepPrev) && (MemoryTools.percentFreeWithinTarget() <= 5))
1834                    {
1835                    aepPrev = null;
1836                    System.err.println("HIRPCCache: emergency free of previous AEP value");
1837                    }
1838    
1839                // Clear the list of previous AEPs if very short of free memory.
1840                // Note that the softly-referenced content will be released automatically in dire straits.
1841                // We only forcefully clear this if no longer retaining the previous AEP strongly any more.
1842                if(!prevAEPs.isEmpty() && (null == aepPrev))
1843                   {
1844                   prevAEPs.clear();
1845                   System.err.println("HIRPCCache: emergency free of old AEP entries: "+prevAEPs.size());
1846                   }
1847    
1848                // If any basic of the AEP responses is only very recently created or used
1849                // then prevent them from being freed to avoid client starvation.
1850                final long lastUsedBAR = currentAEPResponseCreatedOrLastUsed;
1851                if(lastUsedBAR + MIN_AEP_RETENTION_MS > System.currentTimeMillis())
1852                    { return; }
1853    
1854                final long freeMem = Runtime.getRuntime().freeMemory();
1855    
1856                // This GZIP-compressed response is a bit bulky and shouldn't usually be needed
1857                // unless we are too short of memory to super-compress a response
1858                // and/or a client doesn't have enough memory to receive a super-compressed response.
1859                // We hold onto it grimly for a minimum amount of time
1860                // as being forced to continually regenerate it asynchronously causes starvation
1861                // since this is required for the minimum level of response to support clients.
1862                Tuple.Pair<Long, RawPacket> r = _AEP_response;
1863                if((null != r) && (null != r.second) &&
1864                        (r.second.getFrameLength() >= (freeMem>>1)))
1865                    {
1866                    _AEP_response = null;
1867                    System.err.println("HIRPCCache: emergency free of AEP response "+r.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1868                    }
1869                r = null; // Help GC.
1870    
1871                // This extra-compressed response is smaller and saves network bandwidth and is thus more valuable
1872                // and so we'd like to hang onto it if possible.
1873                Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> ecr = _AEP_extracomp_response;
1874                if((null != ecr) && (null != ecr.second) && (null != ecr.second.second) &&
1875                        (ecr.second.second.getFrameLength() >= (freeMem>>1)))
1876                    {
1877                    _AEP_extracomp_response = null;
1878                    System.err.println("HIRPCCache: emergency free of super-compressed AEP response "+ecr.second.second+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1879                    }
1880                ecr = null; // Help GC.
1881    
1882                // This extra-compressed diff response is the smallest and most valuable and most likely to be needed
1883                // and so we'd like to hang onto it if at all possible.
1884                // Though holding onto the delta is potentially more problematic.
1885                Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> dr = _AEP_diff_response;
1886                if((null != dr) && (null != dr.third) &&
1887                        (dr.third.getFrameLength() >= (freeMem>>1)))
1888                    {
1889                    _AEP_diff_response = null;
1890                    System.err.println("HIRPCCache: emergency free of super-compressed AEP diff response "+dr.third+" @"+TextUtils.sizeAsText(freeMem, true)+" free");
1891                    }
1892                dr = null; // Help GC.
1893                }
1894            }
1895    
1896        /**Allows lazy instantiation of some constant frame values. */
1897        private static final class FixedEmptyFrames
1898            {
1899            /**Constant empty NOOP return frame; non-null. */
1900            static final RawPacket RAW_PACKET_NOOP = new RawPacket(
1901                RawPacket.OpCode.NOOP);
1902    
1903            /**Constant empty/null GetGenProps return frame; non-null. */
1904            static final RawPacket RAW_PACKET_GP_NULL = new RawPacket(
1905                RawPacket.OpCode.GetGenProps);
1906    
1907            /**Constant empty/null GetAllExhibitPropertiesDiff return frame; non-null. */
1908            static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GAEPD_NULL = new RawPacket(
1909                RawPacket.OpCode.GetAllExhibitPropertiesDiff);
1910    
1911            /**Constant empty/null GetGenSecProps return frame; non-null. */
1912            static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GSP_NULL = new RawPacket(
1913                RawPacket.OpCode.GetGenSecProps);
1914    
1915            /**Constant empty/null SyncVariables return/request frame; non-null. */
1916            static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_SV = new RawPacket(
1917                RawPacket.OpCode.SyncVariables);
1918    
1919            /**Constant empty/null SyncVariables request frame; non-null. */
1920            static final ExhibitDataTunnelSource.RawPacket RAW_PACKET_GETSTRATUM = new RawPacket(
1921                RawPacket.OpCode.GetStratum);
1922            }
1923    
1924        /**Handles input request packet from slave across a tunnel.
1925         * Generates the response packet or throws an exception.
1926         *
1927         * @param reqPacket  the request packet; never null
1928         * @param clientAddr  the tunnel client's address as seen by us;
1929         *     may be null if not applicable or available
1930         * @param cache  if not null, then selected routines with
1931         *     slow/expensive responses will not be serviced concurrently
1932         *     but concurrent calls will be vetoed quickly instead
1933         *     and expensive-to-compute values may be (partially) cached
1934         * @param logger  logging area for warnings, etc; never null.
1935         *
1936         * @return the response packet; never null
1937         *
1938         * @throws java.io.IOException  in case of difficulty upstream
1939         * @throws PGMasterNotInServiceException  if the upstream data source
1940         *     is down/unavailable
1941         */
1942        public static RawPacket handleInboundRPC(final SimpleExhibitPipelineIF source,
1943                                                 final RawPacket reqPacket,
1944                                                 final String clientAddr,
1945                                                 final HIRPCCache cache,
1946                                                 final SimpleLoggerIF logger)
1947            throws IOException,
1948                   PGMasterNotInServiceException
1949            {
1950            if((source == null) || (reqPacket == null))
1951                { throw new IllegalArgumentException(); }
1952    
1953            switch(reqPacket.opCode)
1954                {
1955                // For NO-OP we expect an empty data section
1956                // (we simply ignore it even if not empty)
1957                // and reply with an empty data section.
1958                case NOOP:
1959                    {
1960                    // Return a fixed (empty) response packet.
1961                    // We don't attempt to compress this.
1962                    return(FixedEmptyFrames.RAW_PACKET_NOOP);
1963                    }
1964    
1965                case GetGenProps:
1966                    { return(handleGetGenProps(source, reqPacket)); }
1967    
1968                case GetGenSecProps:
1969                    { return(handleGetGenSecProps(source, reqPacket)); }
1970    
1971                case GetAllExhibitImmutableData:
1972                    { return(handleGetAllExhibitImmutableData(source, reqPacket, cache, logger)); }
1973    
1974                case GetAllExhibitProperties:
1975                    { return(handleGetAllExhibitProperties(source, reqPacket, clientAddr, cache, logger)); }
1976    
1977                case GetAllExhibitPropertiesDiff:
1978                    { return(handleGetAllExhibitPropertiesDiff(source, reqPacket, clientAddr, cache, logger)); }
1979    
1980                case GetThumbnails:
1981                    { return(handleGetThumbnails(source, reqPacket)); }
1982    
1983                case GetStaticAttr:
1984                    { return(handleGetStaticAttr(source, reqPacket)); }
1985    
1986                case GetRawFile:
1987                    { return(handleGetRawFile(source, reqPacket)); }
1988    
1989                case GetVariable:
1990                    { return(handleGetVariable(source, reqPacket)); }
1991    
1992                case SetVariables:
1993                    { return(handleSetVariables(source, reqPacket, clientAddr, logger)); }
1994    
1995                case GetVariables:
1996                    { return(handleGetVariables(source, reqPacket)); }
1997    
1998                case SyncVariables:
1999                    { return(handleSyncVariables(source, reqPacket)); }
2000    
2001                case GetEventValues:
2002                    { return(handleGetEventValues(source, reqPacket)); }
2003    
2004                case GetStratum:
2005                    { return(handleGetStratum(source, reqPacket)); }
2006    
2007                // For unrecognised requests,
2008                // pretend to throw a RuntimeException.
2009                default:
2010                    {
2011                    final String message = "unrecognised opcode " +reqPacket.opCode+ " in request from tunnel user " + clientAddr;
2012                    logger.log(message);
2013                    System.err.println(message);
2014                    return(new RawPacket(
2015                            RawPacket.OpCode.RUNTEX,
2016                            EMPTY_PAYLOAD,
2017                            false));
2018                    }
2019                }
2020            }
2021    
2022    
2023        /**Handle an incoming GetGenSecProps request; never null. */
2024        private static RawPacket handleGetGenSecProps(
2025                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2026            throws IOException
2027            {
2028            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2029                    readLong();
2030            final Properties gsp = source.getGenSecProps(stamp);
2031            // Create and send the response packet
2032            // (empty for null response,
2033            // else serialised form, compressed when helpful).
2034            if(null == gsp) { return(FixedEmptyFrames.RAW_PACKET_GSP_NULL); }
2035            return(new RawPacket(
2036                RawPacket.OpCode.GetGenSecProps,
2037                gsp,
2038                true)); // Attempt compression...
2039            }
2040    
2041        /**Handle an incoming SyncVariables request; never null. */
2042        private static RawPacket handleSyncVariables(
2043                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2044            throws IOException
2045            {
2046            // No args, no return value.
2047            // The tunnel call is only made when "forced".
2048            source.syncVariables(true);
2049            return(FixedEmptyFrames.RAW_PACKET_SV);
2050            }
2051    
2052        /**Handle an incoming GetAllExhibitPropertiesDiff request; never null.
2053         * Holds the 'slow response' lock while running
2054         * to exclude other such intensive/slow responses concurrently.
2055         */
2056        private static RawPacket handleGetAllExhibitPropertiesDiff(
2057                final SimpleExhibitPipelineIF source,
2058                final RawPacket reqPacket,
2059                final String clientAddr,
2060                final HIRPCCache cache,
2061                final SimpleLoggerIF logger)
2062            throws IOException, InterruptedIOException
2063            {
2064            // Prime a new empty cache with an extant non-empty AEP
2065            // so that we are likely to be able to service
2066            // the first responses after an AEP change with deltas.
2067            if(cache != null)
2068                {
2069                if((cache.aepPrev == null) || (cache.aepPrev.aeid.length == 0))
2070                    {
2071                    final AllExhibitProperties first = source.getAllExhibitProperties(-1);
2072                    if((null != first) && (first.aeid.length != 0))
2073                        {
2074                        cache.aepPrev = first;
2075                        cache.prevAEPs.put(first.longHash, first);
2076                        }
2077                    }
2078                }
2079    
2080            final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2081            final long oldHash = dis.readLong();
2082            // We read the compression level, but need not parse it yet.
2083            final String compression = dis.readUTF();
2084            AllExhibitProperties _aep =
2085                source.getAllExhibitProperties(oldHash);
2086    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: client="+clientAddr+" hash="+oldHash+" compression="+compression+" response null/hash="+((null==_aep)?"null":String.valueOf(_aep.longHash)));
2087    
2088            // Correct for expensive aberrant behaviour further up the stack.
2089            // Whine loudly...
2090            if((null != _aep) && (oldHash == _aep.longHash))
2091                {
2092                final String message = "ERROR: ExhibitDataTunnelSource.handleInboundRPC(): non-null return with identical hash "+oldHash+ " from source "+source;
2093                logger.log(message);
2094                System.err.println(message);
2095                _aep = null; // Pretend that no AEP was in fact returned...
2096                }
2097    
2098            // Simple case; nothing to return since the caller is up to date.
2099            if(_aep == null) { return(FixedEmptyFrames.RAW_PACKET_GAEPD_NULL); }
2100    
2101            final AllExhibitProperties aepFromUpstream = _aep;
2102    
2103            // Full implementation is too expensive sans cache.
2104            if(cache == null)
2105                { throw new IOException("operation not fully implemented due to resource constraints"); }
2106    
2107            // Decode the client's maximum supported compression level.
2108            // This will throw an IllegalArgumentException if unparseable.
2109            final CompressionLevel cl = CompressionLevel.valueOf(compression);
2110    
2111            // If we already have a suitable response cached
2112            // ie for the very same AEP diff with the same old/new hashes.
2113            // then return it immediately.
2114            // We assume therefore that the entire packet will
2115            // be the same for the same response data
2116            // regardless of client, etc.
2117            //
2118            // (This does mean that the caller may miss some
2119            // internally cached updates within the AEP instance, etc,
2120            // but nothing that it cannot recompute if needed.)
2121            //
2122            // Try first for a cached diff/delta.
2123            final Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel, RawPacket> cachedDiff = cache._AEP_diff_response;
2124            if((oldHash != -1) &&
2125                (cachedDiff != null) &&
2126                (aepFromUpstream.longHash == cachedDiff.first.longHashAEPAfter) &&
2127                (oldHash == cachedDiff.first.longHashAEPBefore))
2128                {
2129                // If result is compressed at level higher than caller can handle
2130                // then we abort.
2131                // (We could chose to try for the full AEP rather than abort now,
2132                // but it's unlikely to work if this didn't.)
2133                if(cachedDiff.second.getLevel() > cl.getLevel())
2134                    { throw new IOException("compression level too high for client"); }
2135    
2136                // Good, can use cached diff packet...
2137                logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) delta response packet: "+cachedDiff.third+" to client "+clientAddr+".]");
2138                return(cachedDiff.third);
2139                }
2140    
2141            // If the response is other than null
2142            // and we reject concurrent expensive calls,
2143            // then veto it if another such call is already in progress.
2144            // We allow a very limited wait for the lock.
2145            try
2146                {
2147                if(!cache.slowResponseLock.tryLock(CoreConsts.MAX_INTERACTIVE_DELAY_MS, TimeUnit.MILLISECONDS))
2148                    {
2149    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitPropertiesDiff: already in progress: aborting...");
2150                    throw new InterruptedIOException("expensive call already in progress, please retry");
2151                    }
2152                }
2153            catch(final InterruptedException e)
2154                {
2155                final InterruptedIOException err = new InterruptedIOException(e.getMessage());
2156                err.initCause(e);
2157                Thread.currentThread().interrupt(); // Don't lose "interrupt" status.
2158                throw err;
2159                }
2160    
2161            // Create and send the response packet
2162            // (empty for null response,
2163            // else serialised form, full or delta/diff, compressed when helpful).
2164            try
2165                {
2166                // Note whatever was the previous AEP that we had sequestered,
2167                // partly to stop it being GCed until we finish handling this RPC.
2168                final AllExhibitProperties oldAepPrev;
2169                // If the AEP has changed then cache it for deltas, etc.
2170                if((null == (oldAepPrev = cache.aepPrev)) || (aepFromUpstream.longHash != oldAepPrev.longHash))
2171                    {
2172                    cache.aepPrev = aepFromUpstream;
2173                    cache.prevAEPs.put(aepFromUpstream.longHash, aepFromUpstream);
2174                    }
2175    
2176                // See if we have the correct cached extra-compressed full AEP.
2177                RawPacket cachedFullAEPResponse = null; // Will null out as soon as known not useful to aid GC.
2178                final Tuple.Pair<Long, Tuple.Pair<CompressionLevel, RawPacket>> cachedFull; // Minimise scope to aid GC.
2179                if(((cachedFull  = cache._AEP_extracomp_response) != null) && (aepFromUpstream.longHash == cachedFull.first.longValue()))
2180                    { cachedFullAEPResponse = cachedFull.second.second; }
2181    
2182                if(oldHash != -1)
2183                    {
2184                    // Caller has supplied their current hash...
2185                    // Look for caller's AEP cached here for us to diff against.
2186                    // (Zap any dead cache entries that we find in passing.)
2187                    AllExhibitProperties callerAEP = null;
2188                    final Long hashKey = Long.valueOf(oldHash);
2189                    final AllExhibitProperties caep = cache.prevAEPs.get(hashKey);
2190                    if(caep != null)
2191                        {
2192                        assert(caep.longHash == oldHash);
2193                        // Good, we have the same AEP that the caller does!
2194                        callerAEP = caep;
2195                        }
2196    
2197    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): old AEP cache size:"+cache.prevAEPs.size()+", caep="+callerAEP+".]"); }
2198                    // Do not hold lock on cache.prevAEPs
2199                    // while trying to create and return diff/delta.
2200                    if(callerAEP != null)
2201                        {
2202                        // Computing a diff (if possible).
2203                        // This generally should not take an enormous amount of working memory.
2204                        try
2205                            {
2206    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating AEP diff, caep="+callerAEP+".]"); }
2207                            final long diffStartTime = System.currentTimeMillis();
2208                            // We can do a diff/delta!
2209                            final AllExhibitPropertiesDelta diff =
2210                                AllExhibitPropertiesDelta.createDiff(callerAEP, aepFromUpstream, false);
2211                            final long diffEndTime = System.currentTimeMillis();
2212    
2213                            // Prepare the output packet...
2214                            final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2215                            ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2216                            final DataOutputStream dos = new DataOutputStream(boas);
2217                            dos.writeUTF(levelUsed.name());
2218                            final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2219                            oos.writeObject(diff);
2220                            oos.close();
2221                            final long compEndTime = System.currentTimeMillis();
2222                            RawPacket response = new RawPacket(
2223                                            RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2224                                            boas.toByteArray(),
2225                                            false); // Already compressed.
2226                            boas = null; // Help GC.
2227    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed AEP diff ("+diff+") response packet "+response+", caep="+callerAEP+".]"); }
2228    
2229                            // Abort if we have a full (non-delta) response
2230                            // and the delta seems larger than the full response.
2231                            if((null != cachedFullAEPResponse) &&
2232                               (cachedFullAEPResponse.getActualPayloadLength() <= response.getActualPayloadLength()))
2233                                { throw new IOException("AEP delta larger than full response"); }
2234    
2235                            // Check that correct client AEP is held.
2236                            assert(callerAEP.longHash == oldHash);
2237                            // Check that we can construct the correct new AEP.
2238                            assert(aepFromUpstream.equals(AllExhibitPropertiesDelta.applyDiff(callerAEP, diff)));
2239    
2240                            // We intern() the (probably large) response
2241                            // to try to avoid holding any duplicates
2242                            // and to reduce old-generation heap churn.
2243                            response = MemoryTools.intern(response);
2244                            cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2245                            cache._AEP_diff_response = new Tuple.Triple<AllExhibitPropertiesDelta, CompressionLevel,RawPacket>(diff, levelUsed, response);
2246    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached AEP (diff) delta response packet: "+response+"; times: delta/comp="+(diffEndTime-diffStartTime)+"ms/"+(compEndTime-diffEndTime)+"ms; full: "+cachedFullAEPResponse+"; last full extra compressed response: "+cachedFull+".]"); }
2247    
2248                            // If the result is compressed at a level higher than the caller can handle
2249                            // then we must abort.
2250                            // The client will have to retry with the non-diff AEP call.
2251                            if(levelUsed.getLevel() > cl.getLevel())
2252                                { throw new IOException("compression level too high for client"); }
2253    
2254                            // Return it...
2255                            return(response);
2256                            }
2257                        catch(final DiffException e)
2258                            {
2259    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): could not compute AEP (diff) delta: "+e.getMessage()+".]"); }
2260                            /* Fall through to send full AEP. */
2261                            }
2262                        catch(final Exception e)
2263                            {
2264                            logger.log("ERROR: unexpected exception creating AEP diff: " + e.getMessage());
2265                            /* Fall through to send full AEP. */
2266                            }
2267                        }
2268                    }
2269    
2270                // CANNOT COMPUTE A DIFF...
2271                // So try to return/compute/cache a full response.
2272                if(cachedFullAEPResponse != null)
2273                    {
2274                    // If result is compressed at level higher than caller can handle
2275                    // then we must abort.
2276                    if(cachedFull.second.first.getLevel() > cl.getLevel())
2277                        { throw new IOException("compression level too high for client"); }
2278    
2279                    // Good, can use cached packet...
2280                    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP (diff) response packet: "+cachedFull.second+" to client "+clientAddr+".]");
2281                    return(cachedFull.second.second);
2282                    }
2283    
2284                // Compute a new full AEP response packet...
2285                // Super-compressed full AEP; null if not (yet) available/computed.
2286    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computing super-compressed AEP response packet: (longHash="+aepFromUpstream.longHash+") for client "+clientAddr+".]");
2287    
2288                // Discard known-defunct old value possibly allowing GC.
2289                cache._AEP_extracomp_response = null;
2290                cachedFullAEPResponse = null; // Help GC.
2291    
2292                final CompressionLevel levelUsed = MAX_AEP_DIFF_COMP_LEVEL;
2293                // If result would be compressed at level higher than caller can handle
2294                // then we must abort.
2295                if(levelUsed.getLevel() > cl.getLevel())
2296                    { throw new IOException("compression level too high for client"); }
2297    
2298                // Computing a full super-compressed AEP may be very memory intensive.
2299                // We do not assume that unlimited resources are available.
2300                // We estimate the working memory required as a fixed overhead
2301                // plus about twice the estimated uncompressed size of the serialised data
2302                // which allows for compression dictionaries plus the actual output size.
2303                final AtomicReference<RawPacket> responseAR = new AtomicReference<RawPacket>();
2304                if(!MemoryTools.runMemoryIntensiveOperation(new Runnable(){
2305                    public void run()
2306                        {
2307                        // Prepare the output packet...
2308                        final ByteArrayOutputStream boas = new ByteArrayOutputStream(1 << 10);
2309                        final DataOutputStream dos = new DataOutputStream(boas);
2310                        try
2311                            {
2312                            dos.writeUTF(levelUsed.name());
2313                            final ObjectOutputStream oos = new ObjectOutputStream(GenUtils.wrapForCompression(dos, levelUsed));
2314                            oos.writeObject(aepFromUpstream);
2315                            oos.close();
2316                            }
2317                        catch(final IOException e) { throw new Error("unexpected IOException when building super-compressed AEP", e); }
2318                        // We intern() the (probably very large) response
2319                        // to try to avoid holding duplicates
2320                        // and to reduce old-generation heap churn.
2321                        responseAR.set(MemoryTools.intern(new RawPacket(
2322                                        RawPacket.OpCode.GetAllExhibitPropertiesDiff,
2323                                        boas.toByteArray(),
2324                                        false))); // Already compressed.
2325    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed super-compressed AEP response packet: (length="+boas.size()+", compression="+levelUsed+") for client "+clientAddr+".]");
2326                        }
2327                    },
2328                        false, // Definitely not unlimited resources on the server!
2329                        (1<<20) /* 1MB overhead */ + 2*aepFromUpstream.estimateSerialBytes()))
2330                    { throw new IOException("not currently enough memory to attempt to build super-compressed AEP"); }
2331    
2332                final RawPacket response = responseAR.get();
2333                if(null == response) { throw new IOException("failed to build super-compressed AEP"); }
2334                final Tuple.Pair<CompressionLevel, RawPacket> newFull = new Tuple.Pair<CompressionLevel,RawPacket>(levelUsed, response);
2335                cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2336                cache._AEP_extracomp_response = new Tuple.Pair<Long, Tuple.Pair<CompressionLevel,RawPacket>>(new Long(aepFromUpstream.longHash), newFull);
2337    
2338    /* if(IsDebug.isDebug) */ { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): computed and cached supecompressed AEP (length="+levelUsed+") response packet: "+newFull+".]"); }
2339    
2340    //                    // If result is compressed at level higher than caller can handle
2341    //                    // then we must abort.
2342    //                    if(tooCompressedForClient)
2343    //                        { throw new IOException("compression level too high for client"); }
2344    
2345                // Return it...
2346                return(response);
2347                }
2348    
2349            finally { cache.slowResponseLock.unlock(); }
2350            }
2351    
2352        /**Handle an incoming GetAllExhibitProperties request; never null.
2353         * Holds the 'slow response' lock while running
2354         * to exclude other such intensive/slow responses concurrently.
2355         */
2356        private static RawPacket handleGetAllExhibitProperties(
2357                final SimpleExhibitPipelineIF source,
2358                final RawPacket reqPacket,
2359                final String clientAddr,
2360                final HIRPCCache cache,
2361                final SimpleLoggerIF logger)
2362            throws IOException
2363            {
2364            final long hash = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2365                    readLong();
2366            final AllExhibitProperties aep = source.getAllExhibitProperties(hash);
2367    
2368            // Simple case; nothing to return since caller is up to date,
2369            // or we allow concurrent expensive calls.
2370            if(aep == null)
2371                {
2372                // Trivial null case.
2373                return(new RawPacket(RawPacket.OpCode.GetAllExhibitProperties));
2374                }
2375            if(cache == null)
2376                {
2377                try
2378                    {
2379                    // Where the AEP is not null, force compressed format,
2380                    // stream-serialised to minimise peak/intermediate memory footprint.
2381                    return(MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2382                        public RawPacket call() throws IOException
2383                            {
2384                            return(RawPacket.streamSerialiseObject(
2385                                    RawPacket.OpCode.GetAllExhibitProperties,
2386                                    aep));
2387                            }
2388                        }),
2389                        aep.estimateSerialBytes())); // Take a guess as to memory space required.
2390                    }
2391                catch(final IOException e) { throw e; }
2392                catch(final RuntimeErrorException e) { throw e; }
2393                catch(final Exception e) { throw new Error("unexpected exception", e); }
2394                }
2395    
2396    
2397            // If we already have a suitable response cached
2398            // ie for the very same AEP logHash
2399            // then return it immediately.
2400            // We assume therefore that the entire packet will
2401            // be the same for the same response data
2402            // regardless of client, etc.
2403            //
2404            // (This does mean that the caller may miss some
2405            // internally cached updates within the AEP instance, etc,
2406            // but nothing that it cannot recompute if needed.)
2407            final Tuple.Pair<Long, RawPacket> cached; // Don't retain value preventing GC...
2408            if(((cached = cache._AEP_response) != null) && (aep.longHash == cached.first.longValue()))
2409                {
2410                // Good, can use cached packet...
2411                assert(cached.second != null);
2412                cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2413                logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): returning cached AEP response packet: "+cached.second+" to client "+clientAddr+".]");
2414                return(cached.second);
2415                }
2416    
2417            // If the response is other than null
2418            // and we reject concurrent expensive calls,
2419            // then veto it if another such call is already in progress.
2420            if(!cache.slowResponseLock.tryLock())
2421                {
2422    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitProperties: already in progress: aborting...");
2423                throw new IOException("expensive call already in progress, please retry");
2424                }
2425            // Create and send the response packet
2426            // (empty for null response,
2427            // else serialised form, compressed when helpful).
2428            try
2429                {
2430                // Clear defunct value while computing the new one.
2431                cache._AEP_response = null;
2432    
2433    if(aep != null) { logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): creating/cacheing/returning packet-compressed AEP response packet (longHash="+aep.longHash+") to client "+clientAddr+".]"); }
2434                // Compute the response packet...
2435                // Where the AEP is not null, force compressed format,
2436                // stream-serialised to minimise peak/intermediate memory footprint.
2437                RawPacket response;
2438                try
2439                    {
2440                    // Where the AEP is not null, force compressed format,
2441                    // stream-serialised to minimise peak/intermediate memory footprint.
2442                    response = MemoryTools.runMemoryIntensiveOperation((new Callable<RawPacket>(){
2443                        public RawPacket call() throws IOException
2444                            {
2445                            return(RawPacket.streamSerialiseObject(
2446                                    RawPacket.OpCode.GetAllExhibitProperties,
2447                                    aep));
2448                            }
2449                        }),
2450                        aep.estimateSerialBytes()); // Take a guess as to memory space required.
2451                    }
2452                catch(final IOException e) { throw e; }
2453                catch(final RuntimeErrorException e) { throw e; }
2454                catch(final Exception e) { throw new Error("unexpected exception", e); }
2455    
2456    
2457                // Cache it if the AEP is not null.
2458                if(aep != null)
2459                    {
2460                    // We intern() the (probably very large) response
2461                    // to try to avoid holding duplicates
2462                    // and to reduce old-generation heap churn.
2463                    response = MemoryTools.intern(response);
2464                    cache.currentAEPResponseCreatedOrLastUsed = System.currentTimeMillis(); // Keep cached response below alive...
2465                    cache._AEP_response = new Tuple.Pair<Long,RawPacket>(new Long(aep.longHash), response);
2466    logger.log("[ExhibitDataTunnelSource.handleInboundRPC(): created/cached/returning packet-compressed AEP response packet (longHash="+aep.longHash+", actual payload length="+response.getActualPayloadLength()+") to client "+clientAddr+".]");
2467                    }
2468                // Return it...
2469                return(response);
2470                }
2471            finally { cache.slowResponseLock.unlock(); }
2472            }
2473    
2474        /**Handle an incoming GetGenProps request; never null. */
2475        private static RawPacket handleGetGenProps(
2476                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2477            throws IOException
2478            {
2479            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2480                    readLong();
2481            final GenProps gp = source.getGenProps(stamp);
2482            // Create and send the response packet
2483            // (fixed empty result for efficient null response,
2484            // else serialised form, compressed when helpful).
2485            if(null == gp) { return(FixedEmptyFrames.RAW_PACKET_GP_NULL);}
2486            // Non-null value to return.
2487            return(new RawPacket(
2488                RawPacket.OpCode.GetGenProps,
2489                gp,
2490                true)); // Attempt compression...
2491            }
2492    
2493        /**Handle an incoming GetStaticAttr request; never null. */
2494        private static RawPacket handleGetStaticAttr(
2495                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2496            throws IOException
2497            {
2498            final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2499                    readUTF();
2500            final ExhibitStaticAttr esa;
2501            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2502            catch(final IllegalArgumentException e)
2503                { throw new IOException("invalid argument: bad exhibit name"); }
2504            if(esa == null)
2505                { throw new IOException("invalid argument: non-existent exhibit"); }
2506            // Create and send the response packet
2507            // (empty for null response,
2508            // else serialised form, compressed when helpful).
2509            return(new RawPacket(
2510                RawPacket.OpCode.GetStaticAttr,
2511                esa,
2512                true)); // Allow compression.
2513            }
2514    
2515        /**Handle an incoming GetEventValues request; never null.
2516         * Request is:
2517         * <ul>
2518         * <li>A byte consisting of the ordinal of period/interval enum val.
2519         * <li>The UTF-8 representation of the name of the definition.
2520         * <li>The interval number (8 bytes).
2521         * <li>The serialised form of the request BitSet.
2522         * </ul>
2523         * <p>
2524         * Result is:
2525         * <ul>
2526         * <li>The in-order array of EventVariableValues (never null).
2527         * </ul>
2528         */
2529        private static RawPacket handleGetEventValues(
2530                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2531            throws IOException
2532            {
2533            final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2534            final int ordinal = dis.readUnsignedByte();
2535            final EventPeriod epv[] = EventPeriod.values();
2536            if(ordinal >= epv.length)
2537                { throw new IOException("corrupt arguments: bad period ordinal"); }
2538            final EventPeriod intervalSelector = epv[ordinal];
2539            final String name = dis.readUTF();
2540            final SimpleVariableDefinition def = SystemVariables.nameToDef.get(name);
2541            // Deal kindly with possibly-new definition
2542            // (ie if there are newer clients than us)
2543            // by just returning an empty result.
2544            if(def == null)
2545                {
2546                return(new RawPacket(
2547                    RawPacket.OpCode.GetEventValues,
2548                    NO_EVENT_VALUES,
2549                    false));
2550                }
2551            if(!def.isEvent() || def.isLocal())
2552                { throw new IOException("corrupt arguments: bad definition type: "+def); }
2553            final long intervalNumber = dis.readLong();
2554            if(intervalNumber < 0)
2555                { throw new IOException("corrupt arguments: bad intervalNumber"); }
2556    
2557            // If there is trailing data then it is the serialised BitSet
2558            // else this is a null BitSet equivalent to just bit-0 true.
2559            final BitSet whichValues;
2560            if(dis.available() != 0)
2561                {
2562                final ObjectInputStream ois = new ObjectInputStream(dis);
2563                final Object o; // = null;
2564                try { o = ois.readObject(); }
2565                catch(final ClassNotFoundException e)
2566                    { throw new IOException("corrupt arguments: class not found: " + e.getMessage()); }
2567                if((o != null) && !(o instanceof BitSet)) // Allow null.
2568                    { throw new IOException("corrupt arguments: bad whichValues class on decode"); }
2569                whichValues = (BitSet) o;
2570                }
2571            else
2572                { whichValues = null; }
2573            assert(dis.available() == 0) : "There must be no excess/trailing data in getEventValues() RPC";
2574    
2575            // Call upstream to actually get any values that we have...
2576            final EventVariableValue result[] =
2577                source.getEventValues(def, intervalSelector, intervalNumber, whichValues);
2578    
2579            // Return the serialised result array.
2580            return(new RawPacket(
2581                RawPacket.OpCode.GetEventValues,
2582                result,
2583                true)); // Compression may be very effective.
2584            }
2585    
2586        /**Handle an incoming GetVariables request; never null. */
2587        private static RawPacket handleGetVariables(
2588                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2589            throws IOException
2590            {
2591            final long changedSince = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2592                    readLong();
2593            // Get values.
2594            final List<SimpleVariableValue> l = new ArrayList<SimpleVariableValue>(Arrays.asList(
2595                source.getVariables(changedSince)));
2596            // Return only non-local variables.
2597            for(final Iterator<SimpleVariableValue> it = l.iterator(); it.hasNext(); )
2598                {
2599                final SimpleVariableValue svv =
2600                        it.next();
2601                if(svv.getDef().isLocal())
2602                    { it.remove(); }
2603                }
2604            // Sort by variable name for better compression on the wire.
2605            Collections.sort(l, SimpleVariableValue.compByDef);
2606            // Convert to array.
2607            final SimpleVariableValue svvs[] =
2608                new SimpleVariableValue[l.size()];
2609            l.toArray(svvs);
2610            return(new RawPacket(
2611                RawPacket.OpCode.GetVariables,
2612                svvs,
2613                true)); // Compression may be very effective.
2614            }
2615    
2616        /**Handle an incoming SetVariables request; never null.
2617         * NOTE: for security/sanity reasons,
2618         *     reject any attempt to set where the value does not
2619         *     have a credible globalMap of exactly size == 1.
2620         * <p>
2621         * We'll validate the entire set of values,
2622         * and reject the whole lot if any are dubious.
2623         */
2624        private static RawPacket handleSetVariables(
2625                final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2626                final String clientAddr, final SimpleLoggerIF logger)
2627            throws IOException
2628            {
2629            // Decode array of variable values to apply.
2630            final SimpleVariableValue svvs[] = (SimpleVariableValue[])
2631                reqPacket.getSerializedObjectPayload();
2632    
2633            // Get our local ID so that we can veto changes
2634            // claiming to be from us for sanity/security.
2635            // If we get a null (not definition currently)
2636            // this will not cause an exception to be thrown,
2637            // we just won't be able to screen.
2638            final SimpleVariableValue ourID =
2639                source.getVariable(SystemVariables.LOCAL_SYS_ID);
2640            final InstanceID iid = (ourID == null) ? null:
2641                ((InstanceID) ourID.getValue());
2642    
2643            // Validate the requests.
2644            // Silently ignore:
2645            //   * null entries
2646            //   * locals
2647            //   * anything with a definition that we don't have
2648            //   * anything without a globalMap of size 1
2649            //   * TODO: anything claiming to be from our system ID
2650            //   * claims to be setting values from multiple client IDs
2651            //
2652            // We ignore invalid individual entries rather than
2653            // rejecting the entire request for robustness if talking to
2654            // a slightly different age client to eliminate any
2655            // variable value that has no local definition,
2656            // but still apply the rest of the values,
2657            // though we still reject obviously bogus values.
2658            //
2659            // Capture the client system ID in passing
2660            // (each variable value should have a globalMap with
2661            // exactly one entry, which is the client's ID).
2662            final InstanceID clientID = null;
2663            for(int i = svvs.length; --i >= 0; )
2664                {
2665                final SimpleVariableValue svv = svvs[i];
2666    
2667                // Reject obviously bogus values.
2668                if((svv == null) ||
2669                   svv.getDef().isLocal() ||
2670                   (svv.getGlobalMap() == null) ||
2671                        (svv.getGlobalMap().size() != 1))
2672                    { throw new IllegalArgumentException(); }
2673    
2674                try {
2675                    // Quietly ignore variables that we don't (currently)
2676                    // have definitions for locally.
2677                    if(!SystemVariables.defs.contains(svv.getDef()))
2678                        {
2679                        logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with no local definition");
2680                        continue;
2681                        }
2682    
2683                    // Get the system that *is* in the globalMap...
2684                    final InstanceID putativeClientID =
2685                        svv.getGlobalMap().keySet().iterator().next();
2686                    // Abort if it is our ID!
2687                    if(clientID == null)
2688                        {
2689                        // OK, this must be from the first variable,
2690                        // so trigger some one-off processing...
2691    
2692                        // Discard obviously-forged / looped values.
2693                        if((iid != null) &&
2694                            iid.equals(putativeClientID))
2695                            {
2696                            logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") with bad/looped IID: "+iid);
2697                            continue;
2698                            }
2699    
2700                        // Note the client's remote address
2701                        // (ie make an entry in an upstream global Map)...
2702                        final SimpleVariableValue newValue = new SimpleVariableValue(
2703                                                    SystemVariables.TunnelServlet_SLAVE_ADDRS,
2704                                                    clientAddr);
2705                        source.setVariable(newValue.put(putativeClientID,
2706                                                        newValue,
2707                                                        true));
2708                        }
2709                    else
2710                        {
2711                        if(!clientID.equals(putativeClientID))
2712                            {
2713                            logger.log("Stopping before accepting tunnelled setVariable("+svv.getDef()+"): client claiming multiple InstanceIDs!");
2714                            break; // Give up now...
2715                            }
2716                        }
2717                    }
2718                catch(final IllegalArgumentException e)
2719                    {
2720                    logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got IllegalArgumentException: " + e.getMessage());
2721                    continue;
2722                    }
2723                catch(final UnsupportedOperationException e)
2724                    {
2725                    logger.log("Ignoring tunnelled setVariable("+svv.getDef()+") that got UnsupportedOperationException: " + e.getMessage());
2726                    continue;
2727                    }
2728                }
2729    
2730            // Apply the values, in the order received...
2731            final int nSet = source.setVariables(svvs);
2732    
2733            // Create and send the response packet.
2734            return(new RawPacket(
2735                RawPacket.OpCode.SetVariables,
2736                intSer(nSet),
2737                false)); // Don't try compression; none to be had...
2738            }
2739    
2740        /**Handle an incoming GetVariable request; never null. */
2741        private static RawPacket handleGetVariable(
2742                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2743            throws IOException
2744            {
2745            final String name = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2746                    readUTF();
2747            // Look up variable definition from name.
2748            final SimpleVariableDefinition def =
2749                SystemVariables.nameToDef.get(name);
2750            // If no such variable,
2751            // or it is a local,
2752            // then immediately return an empty result.
2753            if((def == null) || def.isLocal())
2754                {
2755                return(new RawPacket(RawPacket.OpCode.GetVariable));
2756                }
2757            // Get the variable value (if any)...
2758            // Note that this may include a big globalMap.
2759            final SimpleVariableValue svv =
2760                source.getVariable(def);
2761            // Create and send the response packet
2762            // (empty for null response,
2763            // else serialised form, compressed when helpful).
2764            return(new RawPacket(
2765                RawPacket.OpCode.GetVariable,
2766                svv,
2767                true)); // Allow compression.
2768            }
2769    
2770        /**If true then never pass through a request to cache content locally.
2771         * Content may be cached anyway,
2772         * but avoiding passing through a cache request here may help break request 'loops'
2773         * and/or conserve master cache space for example.
2774         */
2775        private static final boolean BLOCK_RAW_FILE_CACHEING = true;
2776    
2777        /**Handle an incoming GetRawFile request; never null.
2778         * We satisfy the initial portion of over-length requests.
2779         * <p>
2780         * FIXME: decide whether to honour a false dontCache flag or not
2781         */
2782        private static RawPacket handleGetRawFile(
2783                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2784            throws IOException
2785            {
2786            // Decode the request directly from the payload (without a copy operation if possible).
2787            final byte[] payload = reqPacket.getPayload();
2788    
2789            // Validate for reasonable-size request packet.
2790            if(payload.length < 11 + ExhibitName.MIN_NAME_LENGTH)
2791                { throw new IOException("invalid argument: bad exhibit name"); }
2792    
2793            // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2794            final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2795            if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2796                { throw new IOException("invalid argument: bad exhibit name length"); }
2797            final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2798            // Look up (validate) name
2799            // and efficiently convert to (or get) Name.ExhibitFull
2800            // and get size bound
2801            // all in one step...
2802            final ExhibitStaticAttr esa;
2803            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2804            catch(final IllegalArgumentException e)
2805                { throw new IOException("invalid argument: bad exhibit name"); }
2806            if(esa == null)
2807                { throw new IOException("invalid argument: non-existent exhibit"); }
2808            final int start =
2809                ((payload[2 + nameLen]       ) << 24) +
2810                ((payload[3 + nameLen] & 0xff) << 16) +
2811                ((payload[4 + nameLen] & 0xff) <<  8) +
2812                ((payload[5 + nameLen] & 0xff)      );
2813            final int afterEndRaw =
2814                ((payload[6 + nameLen]       ) << 24) +
2815                ((payload[7 + nameLen] & 0xff) << 16) +
2816                ((payload[8 + nameLen] & 0xff) <<  8) +
2817                ((payload[9 + nameLen] & 0xff)      );
2818            // Coerce afterEnd into range, and allow zero-length requests.
2819            final int afterEnd = (int) Math.min(afterEndRaw, esa.length);
2820            final boolean dontCache = BLOCK_RAW_FILE_CACHEING || (payload[10 + nameLen] != 0);
2821    
2822            // Disallow patently absurd values, eg due to request-packet corruption.
2823            if((start < 0) || (afterEnd < start))
2824                { throw new IOException("invalid argument: bad range"); }
2825            final int len = afterEnd - start;
2826            if(len > MAX_USER_READ_SIZE)
2827                { throw new IOException("invalid argument: request too large"); }
2828    
2829            // Fetch the data from upstream...
2830            final byte rawData[] = new byte[len];
2831            final ByteBuffer buf = ByteBuffer.wrap(rawData);
2832            // Fetch whatever we quickly can.
2833            source.getRawFile(buf, esa.getExhibitFullName(), start, dontCache);
2834            // Flip the buffer to nominally switch to reading from it.
2835            buf.flip();
2836    
2837            // Create and send the response packet containing raw exhibit data.
2838            //
2839            // Useful compression may be possible for some exhibit fragments/types,
2840            // but here we are slightly uncomfortable with
2841            // the potential vulnerability to invisible corruption of binary data
2842            // on the core (and non-re-compressible) exhibit type (JPEG)
2843            // with no application-level checksum on these frames,
2844            // so we simply avoid trying to compress *.jpg file data for now.
2845            return(new RawPacket(
2846                RawPacket.OpCode.GetRawFile,
2847                rawData, buf.limit(),
2848                !TextUtils.endsWith(esa.getCharSequence(), ".jpg")));
2849            }
2850    
2851        /**Handle an incoming GetThumbnails request; never null. */
2852        private static RawPacket handleGetThumbnails(
2853                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2854            throws IOException
2855            {
2856            // Decode the request directly from the payload (without a copy operation if possible).
2857            final byte[] payload = reqPacket.getPayload();
2858            // Validate for reasonable-size request packet.
2859            if(payload.length < 3 + ExhibitName.MIN_NAME_LENGTH)
2860                { throw new IOException("invalid argument: bad exhibit name"); }
2861            // First the exhibit name as-if a UTF String value (but we know it to be all 7-bit).
2862            final int nameLen = (((payload[0] & 0xff) << 8) + (payload[1] & 0xff));
2863            if((nameLen < ExhibitName.MIN_NAME_LENGTH) || (nameLen > ExhibitName.MAX_NAME_LENGTH))
2864                { throw new IOException("invalid argument: bad exhibit name length"); }
2865            final CharSequence name = new WrappedByteArrayCharSequence(payload, 2, nameLen);
2866            final boolean create = (payload[2 + nameLen] != 0);
2867    
2868    //        final DataInputStream dis = new DataInputStream(reqPacket.getPayloadAsInputStream());
2869    //        final String name = dis.readUTF();
2870    //        final boolean create = dis.readBoolean();
2871    //        dis.close(); // Hopefully free some resources quickly.
2872    
2873            final ExhibitStaticAttr esa;
2874            try { esa = source.getAllExhibitImmutableData(-1).getStaticAttr(name); }
2875            catch(final IllegalArgumentException e)
2876                { throw new IOException("invalid argument: bad exhibit name"); }
2877            if(esa == null)
2878                { throw new IOException("invalid argument: non-existent exhibit"); }
2879    
2880            final ExhibitThumbnails eth =
2881                source.getThumbnails(esa.getExhibitFullName(), create);
2882    
2883            // Create and send the response packet
2884            // (empty for null response, else serialised form).
2885            // We allow compression because it may save a little bandwidth.
2886            return(new RawPacket(
2887                RawPacket.OpCode.GetThumbnails,
2888                eth,
2889                true)); // Allow compression to be attempted, though probably marginal at best.
2890            }
2891    
2892        /**Handle an incoming GetAllExhibitImmutableData request; never null.
2893         * Holds the 'slow response' lock while running
2894         * to exclude other such intensive/slow responses concurrently.
2895         */
2896        private static RawPacket handleGetAllExhibitImmutableData(
2897                final SimpleExhibitPipelineIF source, final RawPacket reqPacket,
2898                final HIRPCCache cache, final SimpleLoggerIF logger)
2899            throws IOException
2900            {
2901            final long stamp = (new DataInputStream(reqPacket.getPayloadAsInputStream())).
2902                    readLong();
2903            final AllExhibitImmutableData aeid =
2904                source.getAllExhibitImmutableData(stamp);
2905    
2906            // Simple case; nothing to return since caller is up to date,
2907            // or we allow concurrent expensive calls.
2908            if((aeid == null) || (cache == null))
2909                {
2910                return(new RawPacket(
2911                    RawPacket.OpCode.GetAllExhibitImmutableData,
2912                    aeid,
2913                    true)); // Allow compression.
2914                }
2915    
2916            // If the response is other than null
2917            // and we reject concurrent expensive calls,
2918            // then veto it if another such call is already in progress.
2919            if(!cache.slowResponseLock.tryLock())
2920                {
2921    logger.log("INFO: ExhibitDataTunnelSource.handleInboundRPC(): GetAllExhibitImmutableData: already in progress: aborting...");
2922                throw new IOException("expensive call already in progress, please retry");
2923                }
2924            // Create and send the response packet
2925            // (empty for null response,
2926            // else serialised form, compressed when helpful).
2927            try
2928                {
2929                return(new RawPacket(
2930                    RawPacket.OpCode.GetAllExhibitImmutableData,
2931                    aeid,
2932                    true)); // Allow compression.
2933                }
2934            finally { cache.slowResponseLock.unlock(); }
2935            }
2936    
2937        /**Handle an incoming GetStratum request; never null. */
2938        private static RawPacket handleGetStratum(
2939                final SimpleExhibitPipelineIF source, final RawPacket reqPacket)
2940            throws IOException
2941            {
2942            final Stratum stRaw = source.getStratum();
2943    
2944            // Just before returning the Stratum info,
2945            // generate an amended value to reflect our conservation status if necessary
2946            // so that it's logically right 'on the wire'.
2947            final Stratum st;
2948            if(GenUtils.mustConservePower() == stRaw.isUpstreamConserving())
2949                { st = stRaw; } // Will do as-is!
2950            else
2951                {
2952                // Flip conserving flag to correctly reflect this instance's state as the 'upstream' server.
2953                st = new Stratum(stRaw.getStratum(), stRaw.getRootDelay(), stRaw.getUpstreamName(),
2954                                 !stRaw.isUpstreamConserving());
2955                }
2956    
2957            // Create and send the response packet.
2958            return(new RawPacket(
2959                RawPacket.OpCode.GetStratum,
2960                st,
2961                false)); // Don't attempt compression (probably won't work and could inflate RTT estimates).
2962            }
2963    
2964    
2965        /**Immutable raw packet to send in either direction over a byte stream connection.
2966         * This consists of an op-code byte,
2967         * a byte-array payload,
2968         * and a trailer byte (different to any op-code byte).
2969         * <p>
2970         * We informally attempt to keep a decent Hamming distance
2971         * between different op-codes (ie more than one bit at a time
2972         * should have to change to mutate one into another).
2973         * <p>
2974         * The byte array can be sent compressed (using the gzip
2975         * algorithm), and this compression is adaptive, ie if
2976         * compression does not save space then it is not used.
2977         * The assumption if compression is requested is that it
2978         * is worthwhile burning lots of CPU cycles to compress
2979         * the data as far as possible, and so we may attempt very
2980         * CPU-hungry settings.  Compressed data is sent on the
2981         * wire with the length actually -compressedDataLength.
2982         * <p>
2983         * This NOT intended to be directly Serializable.
2984         * <p>
2985         * NOTE: this is only guaranteed immutable if the payload is not tampered with
2986         * after being passed to the constructor.
2987         */
2988        public static final class RawPacket implements MemoryTools.Internable
2989            {
2990            /**Operation code: NO-OP (no operation). */
2991            public static final byte OP_NOOP = 0;
2992    
2993            /**Operation code: getGenProps(). */
2994            public static final byte OP_getGenProps = 3;
2995    
2996            /**Operation code: getAllExhibitImmutableData(). */
2997            public static final byte OP_getAllExhibitImmutableData = 5;
2998    
2999            /**Operation code: getStaticAttr(). */
3000            public static final byte OP_getStaticAttr = 9;
3001    
3002            /**Operation code: getRawFile(). */
3003            public static final byte OP_getRawFile = 10;
3004    
3005            /**Operation code: getGenSecProps(). */
3006            public static final byte OP_getGenSecProps = 13;
3007    
3008            /**Operation code: getAllExhibitProperties(). */
3009            public static final byte OP_getAllExhibitProperties = 17;
3010            /**Operation code: getAllExhibitPropertiesDiff(). */
3011            public static final byte OP_getAllExhibitPropertiesDiff = 18;
3012    
3013            /**Operation code: getThumbnails(). */
3014            public static final byte OP_getThumbnails = 23;
3015    
3016            /**Operation code: getVariable(). */
3017            public static final byte OP_getVariable = 26;
3018    
3019            /**Operation code: getVariables(). */
3020            public static final byte OP_getVariables = 29;
3021    
3022            /**Operation code: setVariables(). */
3023            public static final byte OP_setVariables = 30;
3024    
3025            /**Operation code: syncVariables(). */
3026            public static final byte OP_syncVariables = 33;
3027    
3028    //        /**Operation code: getEventValue(). */
3029    //        public static final byte OP_getEventValue = 38;
3030    
3031            /**Operation code: getEventValues(). */
3032            public static final byte OP_getEventValues = 41;
3033    
3034            /**Operation code: getStratum(). */
3035            public static final byte OP_getStratum = 46;
3036    
3037    
3038            /**Reserved op-code for use in response packets to indicate that the operation was interrupted and can be retried (not an error). */
3039            public static final byte OP__INTEX = 119;
3040    
3041            /**Reserved op-code for use in response packets to indicate a PGMasterNotInServiceException. */
3042            public static final byte OP__PGMNISEX = 121;
3043    
3044            /**Reserved op-code for use in response packets to indicate a RemoteException. */
3045            public static final byte OP__REMEX = 125;
3046    
3047            /**Reserved op-code for use in response packets to indicate a RuntimeException.
3048             * Receipt of one of these DOES NOT indicate a link failure
3049             * (and thus need to back off use of the link).
3050             */
3051            public static final byte OP__RUNTEX = 126;
3052    
3053            public static enum OpCode
3054                {
3055                NOOP(OP_NOOP),
3056                GetGenProps(OP_getGenProps),
3057                GetAllExhibitImmutableData(OP_getAllExhibitImmutableData),
3058                GetStaticAttr(OP_getStaticAttr),
3059                GetRawFile(OP_getRawFile),
3060                GetGenSecProps(OP_getGenSecProps),
3061                GetAllExhibitProperties(OP_getAllExhibitProperties),
3062                GetAllExhibitPropertiesDiff(OP_getAllExhibitPropertiesDiff),
3063                GetThumbnails(OP_getThumbnails),
3064                GetVariable(OP_getVariable),
3065                GetVariables(OP_getVariables),
3066                SetVariables(OP_setVariables),
3067                SyncVariables(OP_syncVariables),
3068                GetEventValues(OP_getEventValues),
3069                GetStratum(OP_getStratum),
3070                INTEX(OP__INTEX),
3071                PGMNISEX(OP__PGMNISEX),
3072                REMEX(OP__REMEX),
3073                RUNTEX(OP__RUNTEX);
3074    
3075                private OpCode(final byte code) { this.code = code; }
3076    
3077                /**The byte code; normal values are small and positive.
3078                 * We try to maintain a good Hamming distance between any two codes.
3079                 */
3080                private final byte code;
3081    
3082                /**Get the "closeness" factor higher meaning closer; strictly positive. */
3083                public final byte getCode() { return(code); }
3084    
3085                /**Constant-time lookup from (unsigned) byte value to enum.
3086                 * Created on first use.
3087                 */
3088                private static final class LookupCache
3089                    {
3090                    private LookupCache() { /* No instances needed. */ }
3091                    static final OpCode lookup[] = new OpCode[256];
3092                    static
3093                        {
3094                        for(final OpCode oc : OpCode.values())
3095                            {
3096                            final int index = oc.getCode() & 0xff;
3097                            assert(lookup[index] == null) : "all OpCode values must be unique";
3098                            lookup[index] = oc;
3099                            }
3100                        }
3101                    }
3102    
3103                /**Look up from byte to enum value, else null if no matching enum value. */
3104                public static OpCode lookupCode(final byte code)
3105                    {
3106                    return(LookupCache.lookup[code & 0xff]); // Constant-time lookup.
3107    //                // Above should be equivalent to this (slow) linear lookup...
3108    //                for(final OpCode oc : OpCode.values())
3109    //                    { if(oc.code == code) { return(oc); } }
3110    //                return(null); // None found.
3111                    }
3112                }
3113    
3114    
3115            /**Trailer byte; different to all valid op-code byte values. */
3116            public static final byte TRAILER = (byte) 0xda;
3117            /**Verify that TRAILER is indeed not a valid op-code. */
3118            static { assert(null == OpCode.lookupCode(TRAILER)); }
3119    
3120    
3121            /**Minimum size of payload for which we will try heaviest compression modes; strictly positive.
3122             * This is for modes such as BZIP2 or LZMA with heavy memory, CPU and
3123             * start-up costs (ie much heavier than GZIP).
3124             * <p>
3125             * Below this we assume that the absolute bandwidth savings
3126             * are not worth the pain.
3127             * <p>
3128             * Probably bigger than a "bulk transfer" block too,
3129             * so that the heavy guns are wielded only for the biggest frames.
3130             */
3131            public static final int MIN_PLLENGTH_FOR_HEAVY_COMP_ALGS = 64 * 1024;
3132    
3133            /**Opcode (single byte); never null.
3134             * Set first in the raw frame on the wire.
3135             * <p>
3136             * Never numerically equal to TRAILER.
3137             */
3138            public final OpCode opCode;
3139    
3140            /**Frame overhead (header and trailer) in bytes; strictly positive. */
3141            public static final int FRAME_OVERHEAD_BYTES = 6;
3142    
3143            /**Maximum permitted total frame length including header/trailer; strictly positive power of two minus the frame overhead.
3144             * Set much larger than the user-level transfer-size limit
3145             * to allow for some control data overhead on top of the given data,
3146             * and for much larger non-user-controlled items such as the AEP.
3147             * <p>
3148             * This limit mainly exists to prevent DoS-style out-of-memory problems
3149             * arising from corrupt (huge) length values.
3150             */
3151            public static final int MAX_FRAME_SIZE = 128*1024*1024;
3152    
3153            /**Maximum permitted payload length; strictly positive power of two minus the frame overhead.
3154             * Set much larger than the user-level transfer-size limit
3155             * to allow for some control data overhead on top of the given data,
3156             * and for much larger non-user-controlled items such as the AEP.
3157             * <p>
3158             * This mainly exists to prevent DoS-style out-of-memory problems
3159             * arising from corrupt (huge) length values.
3160             */
3161            public static final int MAX_PAYLOAD_SIZE = MAX_FRAME_SIZE - FRAME_OVERHEAD_BYTES;
3162            /**Check that we really do allow room for the maximum user-data transfer and then some. */
3163            static { assert(MAX_PAYLOAD_SIZE > 1024+SimpleExhibitPipelineIF.MAX_USER_READ_SIZE); }
3164    
3165            /**Payload data, non-null unless the data is stored compressed.
3166             * Private in order to keep the whole packet immutable.
3167             */
3168            private final byte payload[];
3169    
3170            /**Compressed payload data, null unless the data is stored compressed.
3171             * Private in order to keep the whole packet immutable.
3172             */
3173            private final byte payloadCompressed[];
3174    
3175            /**Trailer byte; not a valid op-code. */
3176            public static final byte trailer = TRAILER;
3177    
3178            /**Depends on the whole packet being identical.
3179             * This depends on the header and data being identical,
3180             * including being compressed in the same way if at all.
3181             */
3182            @Override public boolean equals(final Object obj)
3183                {
3184                if(obj == this) { return(true); }
3185                if(!(obj instanceof RawPacket)) { return(false); }
3186                final RawPacket other = (RawPacket) obj;
3187    
3188                if(opCode != other.opCode) { return(false); }
3189                if(getActualPayloadLength() != other.getActualPayloadLength()) { return(false); }
3190    
3191                // Checking the data for equality may be very slow...
3192                if(!Arrays.equals(payloadCompressed, other.payloadCompressed)) { return(false); }
3193                if(!Arrays.equals(payload, other.payload)) { return(false); }
3194    
3195                return(true); // Yes, identical.
3196                }
3197    
3198            /**Checks for equivalent raw packets, ignoring internal representation details.
3199             * Mainly this means ignoring whether the data is held compressed or not,
3200             * as long as the values that the user can see are identical.
3201             * <p>
3202             * May be very slow and memory intensive.
3203             */
3204            public boolean equivalentTo(final RawPacket other)
3205                {
3206                if(opCode != other.opCode) { return(false); }
3207    
3208                // Check that the (uncompressed) data appears identical to the user.
3209                return(Arrays.equals(payloadCompressed, other.payloadCompressed) ||
3210                        Arrays.equals(payload, other.payload) ||
3211                        Arrays.equals(getPayload(), other.getPayload()));
3212                }
3213    
3214            /**Hash is based on just packet type and (actual) packet length. */
3215            @Override public int hashCode()
3216                { return(((opCode.getCode() * 69069) + getActualPayloadLength())); }
3217    
3218            /**Construct a new raw packet with an empty payload.
3219             *
3220             * @param op  the op-code of the RPC; never null
3221             */
3222            public RawPacket(final OpCode opCode)
3223                {
3224                if(null == opCode) { throw new IllegalArgumentException(); }
3225                this.opCode = opCode;
3226                this.payload = EMPTY_PAYLOAD;
3227                this.payloadCompressed = null;
3228                }
3229    
3230            /**Construct a new raw packet with the whole data array as the payload.
3231             * Note that the data is <em>not</em> copied (nor modified);
3232             * the array should not subsequently be modified.
3233             * <p>
3234             * This assumes that it is worth trying to compress the payload,
3235             * if other conditions support it.
3236             *
3237             * @param op  the op-code of the RPC; never null
3238             * @param data  the payload bytes; never null
3239             */
3240            public RawPacket(final OpCode op, final byte data[])
3241                { this(op, data, true); }
3242    
3243            /**Construct a new raw packet with the whole data array as the payload.
3244             * Note that the data is <em>not</em> copied (nor modified);
3245             * the array should not subsequently be modified.
3246             *
3247             * @param op  the op-code of the RPC; never null
3248             * @param data  the payload bytes; never null
3249             * @param attemptCompression  if true, it is probably worth attempting
3250             *     to compress the payload data unless very short
3251             */
3252            public RawPacket(final OpCode op, final byte data[],
3253                             final boolean attemptCompression)
3254                { this(op, data, data.length, attemptCompression); }
3255    
3256            /**Construct a new raw packet with the initial portion of the data array as the payload.
3257             * Note that the data is <em>not</em> copied (nor modified),
3258             * unless it is compressed or the data does not occupy the whole array;
3259             * the array should not subsequently be modified.
3260             * <p>
3261             * If compression is requested and is possible
3262             * then the compression may be performed at construction time
3263             * and the compressed form of the data held to save space.
3264             *
3265             * @param op  the op-code of the RPC; never null
3266             * @param data  the (uncompressed) payload bytes; never null
3267             * @param initialPortion  the initial portion of data[] containing
3268             *     payload data; never negative or larger than data.length or MAX_PAYLOAD_SIZE
3269             * @param attemptCompression  if true, it is probably worth attempting
3270             *     to compress the payload data unless very short
3271             */
3272            public RawPacket(final OpCode op, final byte data[], final int initialPortion,
3273                             final boolean attemptCompression)
3274                {
3275                if((op == null) ||
3276                   (data == null) ||
3277                   (initialPortion > MAX_PAYLOAD_SIZE) ||
3278                   (initialPortion > data.length) || (initialPortion < 0))
3279                    { throw new IllegalArgumentException(); }
3280    
3281                // Capture the opcode and the raw payload length immediately.
3282                opCode = op;
3283    
3284                // See if the data needs compressing and can be compressed.
3285                // If not, then keep it uncompressed.
3286                //
3287                // If compression is requested and the payload size is significant
3288                // (allowing for for data overheads of the compression scheme)
3289                // then check out the possibility of compression,
3290                // else ignore, and send the data uncompressed.
3291                //
3292                // We assume that trying to deflate anything smaller than
3293                // (say) 256 bytes is a waste of effort given the
3294                // deflate overhead and, indeed, the packet overhead.
3295                if(attemptCompression &&
3296                   (initialPortion >= FileTools.TYPICAL_DEFLATE_MIN_TEXT_SIZE_COMPRESSABLE))
3297                    {
3298                    final byte compressed[] = FileTools.compressDeflatableData(data, 0, initialPortion);
3299                    final boolean savedSpace = (compressed.length < initialPortion);
3300    
3301                    // If successful then store the compressed data.
3302                    if(savedSpace)
3303                        {
3304                        payload = null;
3305                        payloadCompressed = compressed;
3306    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[RawPacket: compressed payload from "+initialPortion+" to "+(payloadCompressed.length)+" bytes.]"); }
3307                        return;
3308                        }
3309                    }
3310    
3311                // Store uncompressed data.
3312                // Store without copying if data is entire array.
3313                if(data.length == initialPortion)
3314                    { payload = data; }
3315                else
3316                    {
3317                    // It is inefficient to copy,
3318                    // so we want to avoid this where possible,
3319                    // and in debug will warn where it is happening.
3320    if(IsDebug.isDebug) { System.err.println("[RawPacket: WARNING: copying uncompressed payload from oversize buffer.]"); Thread.dumpStack(); }
3321                    payload = Arrays.copyOf(data, initialPortion);
3322                    }
3323                payloadCompressed = null;
3324                }
3325    
3326            /**Construct a new raw packet with data provided.
3327             * This accepts whatever it is passed
3328             * (we check for some errors with run-time assertions in debug code),
3329             * and is intended only to be called from other constructors
3330             * or factory methods.
3331             *
3332             * @param op  the op-code of the RPC; never null
3333             */
3334            private RawPacket(final OpCode opCode,
3335                              final byte[] payload,
3336                              final byte[] payloadCompressed)
3337                {
3338                if(IsDebug.isDebug)
3339                    {
3340                    assert(opCode != null);
3341                    assert((payload == null) != (payloadCompressed == null)); // Exactly one must be null.
3342                    }
3343    
3344                this.opCode = opCode;
3345                this.payload = payload;
3346                this.payloadCompressed = payloadCompressed;
3347                }
3348    
3349            /**Construct a new raw packet with a serialised object as the payload.
3350             * This will use an uncompressed form if more efficient on the wire.
3351             * <p>
3352             * This will have the entire uncompressed form in memory at peak,
3353             * so may be unsuitable for very large (and highly-compressible)
3354             * objects.
3355             *
3356             * @param op  the op-code of the RPC; never null
3357             * @param obj  the Serializable object to send,
3358             *     or null to send empty payload
3359             * @param attemptCompression  if true, it is probably worth attempting
3360             *     to compress the payload data if not very short
3361             */
3362            public RawPacket(final OpCode op, final Serializable obj,
3363                             final boolean attemptCompression)
3364                throws IOException
3365                {
3366                this(op, _serObjForPayload(obj), attemptCompression);
3367                }
3368    
3369            /**Construct a new raw packet with a stream-serialised object as the payload.
3370             * This will always compress the supplied object
3371             * (unless null, in which case an empty payload is used)
3372             * to try to avoid ever having the full unserialised form in memory.
3373             * <p>
3374             * This will veto with an IOException any attempt to write more than
3375             * MAX_PAYLOAD_SIZE pre-compression or pre-compression bytes.
3376             *
3377             * @param op  the op-code of the RPC; one of the OP_XXX values
3378             * @param obj  the Serializable object to send,
3379             *     or null to send empty payload
3380             * @param attemptCompression  if true, it is probably worth attempting
3381             *     to compress the payload data if not very short
3382             */
3383            public static RawPacket streamSerialiseObject(final OpCode op, final Serializable obj)
3384                throws IOException
3385                {
3386                if(op == null) { throw new IllegalArgumentException(); }
3387    
3388                // If the object to be serialised is null
3389                // then return a result using an empty payload.
3390                if(null == obj) { return(new RawPacket(op)); }
3391    
3392                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3393                // We veto any attempt to construct a frame
3394                // whose compressed or uncompressed payload would exceed MAX_PAYLOAD_SIZE bytes.
3395                // We apply exactly the same header-less deflate as in other places we compress.
3396                final DefOutputStream dos = new DefOutputStream(new GenUtils.LengthLimitedOutputStream(baos, MAX_PAYLOAD_SIZE));
3397                final GenUtils.LengthLimitedOutputStream llos = new GenUtils.LengthLimitedOutputStream(dos, MAX_PAYLOAD_SIZE); // Must not throw an exception...
3398                ObjectOutputStream oos = null;
3399                try
3400                    {
3401                    oos = new ObjectOutputStream(llos);
3402                    oos.writeObject(obj);
3403                    oos.flush();
3404                    }
3405                finally
3406                    { dos.finish(); } // Finish compression and ensure that native resources are released.
3407                if(oos != null) { oos.close(); } // Hopefully free resources quickly.
3408    
3409                // Construct packet with compressed data...
3410                return(new RawPacket(op, null, baos.toByteArray()));
3411                }
3412    
3413            /**Serialise object for payload.
3414             * Returns a zero-length array if the object is null.
3415             */
3416            private static byte[] _serObjForPayload(final Serializable obj)
3417                throws IOException
3418                {
3419                if(obj == null)
3420                    { return(ExhibitDataTunnelSource.EMPTY_PAYLOAD); }
3421    
3422                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3423                final ObjectOutputStream oos = new ObjectOutputStream(baos);
3424                oos.writeObject(obj);
3425                oos.flush();
3426                final byte[] responsePayload = baos.toByteArray();
3427                oos.close(); // Hopefully free resources quickly.
3428                return(responsePayload);
3429                }
3430    
3431            /**Retrieves a single serialised Object from the payload.
3432             * If the payload is empty (zero-length), we return null.
3433             * <p>
3434             * For robustness, treats a class mismatch problem like a data problem,
3435             * ie this will re-throw any ClassNotFoundException as an IOException.
3436             */
3437            public Object getSerializedObjectPayload()
3438                throws IOException
3439                {
3440                if(getActualPayloadLength() == 0) { return(null); }
3441    
3442                final InputStream is = getPayloadAsInputStream();
3443                final ObjectInputStream ois = new ObjectInputStream(is);
3444                try { return(ois.readObject()); }
3445                catch(final ClassNotFoundException e) { throw new IOException(e); }
3446                finally { ois.close(); } // Release resources, especially native in decompressor.
3447                }
3448    
3449            /**Computes the total number of bytes that would be written by writePacket(); strictly positive.
3450             */
3451            public int getFrameLength()
3452                {
3453                // The total length on the wire of the entire packet.
3454                // An excess 6 bytes allows for 1 opCode, 4 length, 1 trailer.
3455                return(FRAME_OVERHEAD_BYTES + getActualPayloadLength());
3456                }
3457    
3458            /**Send the packet down the given OutputStream with header and trailer.
3459             * This flushes the data after writing it.
3460             * <p>
3461             * This does not close() the stream.
3462             */
3463            public void writePacket(final OutputStream os)
3464                throws IOException
3465                { writePacket(os, false); }
3466    
3467            /**Send the packet down the given OutputStream with header and trailer.
3468             * This flushes the data after writing it.
3469             * <p>
3470             * If the payload is not too large,
3471             * then the entire packet will be buffered
3472             * and sent in one write(),
3473             * else it will be sent in LAN-packet-size chunks.
3474             * <p>
3475             * This does not close() the stream.
3476             *
3477             * @param reduceWrites if true then make an effort to reduce the number of writes done
3478             *     (ie try to write fewer larger blocks)
3479             *     since the output may be to a raw network stream or similar
3480             *     which may require more copying of data first
3481             */
3482            public void writePacket(final OutputStream os, final boolean reduceWrites)
3483                throws IOException
3484                {
3485    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[writePacket(): op="+opCode+" len="+getActualPayloadLength()+".]"); }
3486    
3487                final boolean isCompressed = (payload == null);
3488                final byte[] dataOnTheWire =
3489                    isCompressed ? payloadCompressed : payload;
3490                final int realPayloadLen = dataOnTheWire.length;
3491    
3492                // Optimisation for empty frame (no payload).
3493                if(0 == realPayloadLen)
3494                    {
3495                    final byte[] out = new byte[FRAME_OVERHEAD_BYTES];
3496                    out[0] = opCode.getCode();
3497                    // Length is all zeros.
3498                    out[FRAME_OVERHEAD_BYTES-1] = trailer;
3499                    os.write(out);
3500                    os.flush();
3501                    return;
3502                    }
3503    
3504                // The total length on the wire of the entire packet.
3505                final int totalPacketLen = FRAME_OVERHEAD_BYTES + realPayloadLen;
3506    
3507                // Buffer the output (with a carefully-crafted buffer size) if requested
3508                // so that we can write it all in one operating-system interaction
3509                // if small enough, else in efficient-ish disc/network-size chunks.
3510                // (Note that Tomcat 4.1.31 writes chunked data to the network
3511                // in blocks of 2kB on Solaris 8/9,
3512                // but also note that the write of a large payload will cause
3513                // the header data to be flushed through the buffer first,
3514                // so anything other than quite a small buffer will be a waste.)
3515                // But we don't want to do pointless extra copies.
3516                final DataOutputStream dos = new DataOutputStream((!reduceWrites) ? os :
3517                    new BufferedOutputStream(os,
3518                        (totalPacketLen <= CoreConsts.WAN_LARGE_FRAME_PAYLOAD_BYTES) ?
3519                            totalPacketLen : (FRAME_OVERHEAD_BYTES-1)));
3520    
3521                // Send the opcode.
3522                dos.writeByte(opCode.getCode());
3523                // Send the payload length (negative indicating compressed).
3524                dos.writeInt(isCompressed ? -realPayloadLen : realPayloadLen);
3525                // Send the payload.
3526                dos.write(dataOnTheWire, 0, realPayloadLen);
3527                // Write the trailer.
3528                dos.writeByte(trailer);
3529                // Flush data though any buffers and down the wire if possible.
3530                dos.flush();
3531                }
3532    
3533            /**Reads a packet from the input stream, blocking until done.
3534             * Preserves its compressed/uncompressed form,
3535             * possibly helping reduce memory footprint at the receiver.
3536             * <p>
3537             * Compressed data is signalled by a negative length on the wire,
3538             * where this is the negation of the compressed data size.
3539             * <p>
3540             * The ZLIB deflater with maximum compression and
3541             * no checksum is used for any (de)compression.
3542             * <p>
3543             * To prevent unpleasantness such a OutOfMemoryError from a corrupt length value,
3544             * we veto lengths greater than the maximum specified for this frame type.
3545             */
3546            public static RawPacket readPacket(final InputStream is)
3547                throws IOException
3548                {
3549                // While not wonderfully efficient,
3550                // to save a great deal of fiddly coding
3551                // the input stream is wrapped in a DataInputStream.
3552                final DataInputStream dis = new DataInputStream(is);
3553                // Start collecting the data...
3554                final byte opC = dis.readByte();
3555                final OpCode op = OpCode.lookupCode(opC);
3556                if(op == null) { throw new IOException("invalid opcode "+opC); }
3557                final int rawLen = dis.readInt();
3558                final boolean isCompressed = (rawLen < 0);
3559                final int len = ((isCompressed) ? -rawLen : rawLen);
3560    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): op="+opC+" rawLen="+rawLen+"...]"); }
3561                if(len > MAX_PAYLOAD_SIZE) { throw new IOException("corrupt length"); }
3562                final byte data[];
3563                if(0 == len)
3564                    { data = EMPTY_PAYLOAD; }
3565                else
3566                    {
3567                    data = new byte[len];
3568                    dis.readFully(data);
3569                    }
3570                final byte tr = dis.readByte();
3571                if(tr != TRAILER)
3572                    { throw new IOException("corrupt trailer"); }
3573                try {
3574                    // Decompress the data if necessary...
3575                    if(isCompressed)
3576                        {
3577    if(ExhibitDataTunnelSource._protocolDebug) { System.err.println("[readPacket(): compressed payload len="+data.length+" bytes.]"); }
3578                        return(new RawPacket(op, null, data));
3579                        }
3580                    return(new RawPacket(op, data, null));
3581                    }
3582                catch(final IllegalArgumentException e)
3583                    { throw new IOException("corrupt packet: " + e.getMessage()); }
3584                }
3585    
3586            /**Create a human-readable summary (not including the payload data). */
3587            @Override
3588            public String toString()
3589                {
3590                final StringBuilder sb = new StringBuilder(79);
3591                sb.append("RawPacket");
3592                sb.append(":opCode=").append(opCode);
3593                sb.append(":actualPayloadLength=").append(getActualPayloadLength());
3594                return(sb.toString());
3595                }
3596    
3597            /**Get the payload data as an InputStream; never null.
3598             * If the internal data is stored compressed however,
3599             * then this will return the uncompressed version.
3600             * <p>
3601             * Each stream returned is independent.
3602             * <p>
3603             * This stream should be explicitly closed when finished with
3604             * to release underlying memory and other resources ASAP.
3605             *
3606             * @return  the uncompressed payload data as a stream of length plLength
3607             */
3608            public InputStream getPayloadAsInputStream()
3609                {
3610                if(payload != null)
3611                    {
3612                    // Payload is uncompressed; wrap it as-is.
3613                    return(new ByteArrayInputStream(payload));
3614                    }
3615    
3616                // Payload is held compressed so decompress it
3617                // (on the fly to reduce peak memory usage).
3618                try { return(new DefInputStream(new ByteArrayInputStream(payloadCompressed))); }
3619                catch(final IOException e) { throw new IllegalStateException("unable to decompress"); }
3620                }
3621    
3622            /**Get a copy of the payload data; never null.
3623             * We return a copy to keep this object immutable.
3624             * <p>
3625             * If the internal data is stored compressed
3626             * then this will return the original uncompressed version.
3627             *
3628             * @return  a copy of the payload data, of length plLength
3629             */
3630            public byte[] getPayloadCopy()
3631                {
3632                if(payload != null)
3633                    {
3634                    // Payload is uncompressed; clone it as-is.
3635                    // Take copy to protect internal state.
3636                    return(payload.clone());
3637                    }
3638    
3639                // Payload is held compressed, so decompress it.
3640                try
3641                    {
3642                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3643                    // No need to copy (private) data returned from decompression.
3644                    return(uncompressedData);
3645                    }
3646                catch(final IOException e)
3647                    {
3648                    throw new IllegalStateException("corrupt compressed data");
3649                    }
3650                }
3651    
3652            /**Get direct access to the uncompressed payload; never null.
3653             * We avoid making a copy, for speed.
3654             * <em>The caller must not alter the byte array returned.</em>
3655             * <p>
3656             * If the internal data is stored compressed
3657             * then this will return an uncompressed copy.
3658             * <p>
3659             * This is package-visible and is only for trusted callers
3660             * that will not alter the content.
3661             *
3662             * @return  a the payload data, of length plLength; never null,
3663             */
3664            byte[] getPayload()
3665                {
3666                if(payload != null)
3667                    {
3668                    // Trust caller not to mess around...
3669                    return(payload);
3670                    }
3671    
3672                // Payload is held compressed, so decompress it.
3673                try
3674                    {
3675                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3676                    // No need to copy (private) data returned from decompression.
3677                    return(uncompressedData);
3678                    }
3679                catch(final IOException e)
3680                    {
3681                    throw new IllegalStateException("corrupt compressed data");
3682                    }
3683                }
3684    
3685            /**Copy the payload data into the supplied (trusted) buffer.
3686             * We only make this package-visible to keep this object immutable.
3687             * <p>
3688             * If the internal data is stored compressed
3689             * then this will return the original uncompressed version.
3690             */
3691            void getPayloadCopy(final ByteBuffer buf)
3692                {
3693                if(payload != null)
3694                    {
3695                    // Payload is uncompressed; clone it as-is.
3696                    // Copy uncompressed data directly.
3697                    buf.put(payload);
3698                    return;
3699                    }
3700    
3701                // Payload is held compressed, so decompress it.
3702                try
3703                    {
3704                    final byte uncompressedData[] = FileTools.decompressDeflatedData(payloadCompressed);
3705                    // No need to copy (private) data returned from decompression.
3706                    buf.put(uncompressedData);
3707                    return;
3708                    }
3709                catch(final IOException e)
3710                    {
3711                    throw new IllegalStateException("corrupt compressed data");
3712                    }
3713                }
3714    
3715            /**Return the actual payload length; non-negative.
3716             * The payload may be compressed
3717             * so this may be less than the uncompressed full payload size.
3718             * <p>
3719             * If the original payload is empty (zero length)
3720             * and getUncompressedPayloadLength() returns 0
3721             * then this returns 0 also.
3722             */
3723            public int getActualPayloadLength()
3724                { return((null != payload) ? payload.length : payloadCompressed.length); }
3725            }
3726    
3727        /**The immutable adjunct for a RawPacket that includes the HMAC and other anti-attack data.
3728         * Note that this class is NOT directly serialisable,
3729         * with the data fields sent in some other way,
3730         * eg as HTTP header fields "out-of-band" from the actual HTTP message.
3731         */
3732        public static final class PacketProtector
3733            {
3734            /**The timestamp for the RawPacket, input to each MAC; strictly positive. */
3735            public final long timestamp;
3736    
3737            /**The length of the entire frame/datastream being protected, input to each MAC; non-negative. */
3738            public final int length;
3739    
3740            /**The immutable in-order list of MAC authenticator segments for the stream and fields herein; never null nor empty nor containing nulls.
3741             * This sequence of HMAC values together makes up the message MAC.
3742             * It is designed not to be possible to re-arrange these segments,
3743             * as each depends in part on the value of its predecessor
3744             * as well as the data in its segment.
3745             * <p>
3746             * Note that the <em>last</em> MAC, which depends on all the data being protected,
3747             * can be used as a unique message MAC on its own.
3748             */
3749            public final List<ROByteArray> mac;
3750    
3751            /**Maximum size in (ASCII) characters of output of toCheckString(); strictly positive.
3752             * Chosen to allow inclusion of the output of toCheckString() in an HTTP header.
3753             */
3754            public static final int MAX_CHECK_STRING_CHARS = 900;
3755    
3756            /**Generate (HTTP-header) check-string; never null nor empty.
3757             * This is a pure-ASCII single-line control-code-free check String
3758             * that contains the input fields and MAC from this object
3759             * to serve as the "checksum" of a RawPacket.
3760             * <p>
3761             * This object is suitable (short enough, avoidance of meta-characters)
3762             * to be used directly in an HTTP header.
3763             * <p>
3764             * This value is suitable to be decoded by fromCheckString().
3765             * <p>
3766             * The format is a space-separated list of fields:
3767             * <ol>
3768             * <li>The Java-style UTC timestamp in milliseconds as an unsigned decimal.
3769             * <li>The length of the protected stream in bytes as an unsigned decimal.
3770             * <li>The MAC values for each segment, in order, encoded Base64.
3771             * </ol>
3772             */
3773            public String toCheckString()
3774                {
3775                // Do a worst-case check given the size of the (first) MAC as encoded.
3776                assert((""+Long.MAX_VALUE+' '+Integer.MAX_VALUE+' ').length() + (MAX_SEGMENTS*(1+TextUtils.encode8To6(mac.get(0).toByteArray()).length()))-1 < MAX_CHECK_STRING_CHARS) : "check string must always be short enough for HTTP header";
3777    
3778                final int initialCapacity = 32 + (45*mac.size());
3779                final StringBuilder sb = new StringBuilder(initialCapacity);
3780                sb.append(timestamp).append(' ');
3781                sb.append(length).append(' ');
3782                for(final ROByteArray r : mac)
3783                    { sb.append(TextUtils.encode8To6(r.toByteArray())).append(' '); }
3784                // Rip off trailing space...
3785                sb.setLength(sb.length()-1);
3786                assert(sb.length() <= MAX_CHECK_STRING_CHARS) : "check string must be short enough for HTTP header";
3787    //if(IsDebug.isDebug && (sb.length() > initialCapacity)) { System.err.println("WARNING: incorrectly sized toCheckString() buffer: initial|actual|mac.size() = "+initialCapacity+"|"+sb.length()+"|"+mac.size()+ ": "+sb); }
3788                return(sb.toString());
3789                }
3790    
3791            /**Field splitter regex pattern compiled once for efficiency; never null. */
3792            private static final Pattern fieldSplitPattern = Pattern.compile(" ");
3793    
3794            /**Parses a check-string as generated by toCheckString(); never null.
3795             * This is tolerant of some leading and trailing whitespace.
3796             *
3797             * @throws IllegalArgumentException  if the input is unparsable
3798             */
3799            public static PacketProtector fromCheckString(final String check)
3800                {
3801                if((check == null) || (check.length() > 2*MAX_CHECK_STRING_CHARS))
3802                    { throw new IllegalArgumentException("too big before trim"); }
3803                final String trimmed = check.trim();
3804                if(trimmed.length() > MAX_CHECK_STRING_CHARS)
3805                    { throw new IllegalArgumentException("too big"); }
3806                final String fields[] = fieldSplitPattern.split(check, 0);
3807                if(fields.length < 3)
3808                    { throw new IllegalArgumentException("not enough fields"); }
3809                final long timestamp = Long.parseLong(fields[0], 10);
3810                final int length = Integer.parseInt(fields[1], 10);
3811                if((length < 0) || (length > RawPacket.MAX_FRAME_SIZE))
3812                    { throw new IllegalArgumentException("out-of-range length value"); }
3813                final ArrayList<ROByteArray> mac = new ArrayList<ROByteArray>(fields.length-2);
3814                for(int i = 2; i < fields.length; ++i)
3815                    { mac.add(new ROByteArray(TextUtils.decode8To6(fields[i]))); }
3816                return(new PacketProtector(timestamp, length, mac));
3817                }
3818    
3819            /**Maximum number of segments protected stream may be broken into; strictly positive power of two.
3820             * This determines the maximum number of incremental portions
3821             * we can process while streaming a RawPacket.
3822             * <p>
3823             * This value is capped to limit the amount of MAC data that needs to be sent,
3824             * eg in an HTTP header of limited length.
3825             */
3826            public static final int MAX_SEGMENTS = 16;
3827    
3828            /**Minimum segment size (other than final segment); strictly positive power of two.
3829             * This is set to be somewhat larger than the largest typical frame
3830             * (ie the entire header, payload and trailer)
3831             * that we usually don't want to stream, eg bulk data transfer,
3832             * so that such frames/packets will get a single MAC for efficiency.
3833             * This is also set large enough to amortise the cost of each segment MAC.
3834             * <p>
3835             * This is small enough to represent a reasonable incremental amount of CPU
3836             * for streamed inputs.
3837             */
3838            public static final int MIN_SEGMENT_SIZE = (1<<16);
3839    
3840            /**Maximum segment size; strictly positive power of two.
3841             * This is determined from the maximum frame size
3842             * and the maximum number of segments.
3843             */
3844            public static final int MAX_SEGMENT_SIZE = RawPacket.MAX_FRAME_SIZE / MAX_SEGMENTS;
3845            /**Check some invariants. */
3846            static { assert(RawPacket.MAX_FRAME_SIZE == MAX_SEGMENT_SIZE * MAX_SEGMENTS); }
3847            static { assert(MAX_SEGMENT_SIZE > MIN_SEGMENT_SIZE); }
3848    
3849            /**Create an adjunct to protect a RawPacket, including a current timestamp.
3850             * This must be supplied with the (secret) Key shared between the servers.
3851             *
3852             * @param raw  the RawPacket to protect; never null
3853             * @param key  the (secret) key for the HMAC; never null
3854             */
3855            public PacketProtector(final RawPacket raw,
3856                                   final SecretKey key)
3857                throws InvalidKeyException
3858                { this(raw, System.currentTimeMillis(), key); }
3859    
3860            /**Create an adjunct to protect a RawPacket, including the given timestamp.
3861             * This must be supplied with the (secret) Key shared between the servers.
3862             *
3863             * @param raw  the RawPacket to protect; never null
3864             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3865             * @param key  the (secret) key for the HMAC; never null
3866             */
3867            public PacketProtector(final RawPacket raw,
3868                                   final long timestamp,
3869                                   final SecretKey key)
3870                throws InvalidKeyException
3871                { this(timestamp, raw.getFrameLength(), computeMAC(timestamp, raw, key)); }
3872    
3873            /**Create an adjunct to protect a RawPacket.
3874             *
3875             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3876             * @param mac  the HMAC for the RawPacket and other fields, not checked; never null
3877             */
3878            public PacketProtector(final long timestamp,
3879                                   final int length,
3880                                   final List<ROByteArray> mac)
3881                {
3882                this.timestamp = timestamp;
3883                this.length = length;
3884                // Take defensive immutable copy of MAC segment list.
3885                // A single entry is the common case (for typical/small packets).
3886                this.mac = (mac.size() == 1) ? Collections.singletonList(mac.get(0)) :
3887                    Collections.unmodifiableList(new ArrayList<ROByteArray>(mac));
3888    
3889                try { validateObject(); } // Validate this instance.
3890                catch(final InvalidObjectException e) { throw new IllegalArgumentException(e); }
3891                }
3892    
3893            /**Compute segment size; strictly positive power of two.
3894             * Given the full message/frame length
3895             * (strictly positive and <= RawPacket.MAX_FRAME_SIZE)
3896             * this computes the segment size between/at the MIN/MAX limits
3897             * to maximise the number of segments
3898             * and thus maximise the the amount of incremental MAC processing
3899             * and incremental processing of streamed frames that can be done,
3900             * minimising buffer working space and time/bandwidth before corruption is found.
3901             * <p>
3902             * All segments are of the same size, except the last one which may be shorter.
3903             */
3904            private static final int computeSegmentSize(final int frameLength)
3905                {
3906                assert(frameLength <= RawPacket.MAX_FRAME_SIZE);
3907                assert(frameLength > 0);
3908    
3909                // Common case (min segment size) for reasonable/small packets.
3910                if(frameLength <= MIN_SEGMENT_SIZE * MAX_SEGMENTS)
3911                    { return(MIN_SEGMENT_SIZE); }
3912    
3913                // Find minimum segment size that will cover the frame if not MIN_SEGMENT_SIZE.
3914                for(int segSize = MIN_SEGMENT_SIZE<<1; segSize <= MAX_SEGMENT_SIZE; segSize <<= 1)
3915                    {
3916                    if(segSize * MAX_SEGMENTS >= frameLength)
3917                        { return(segSize); }
3918                    }
3919    
3920                throw new Error("should not get here");
3921                }
3922    
3923            /**Compute the MAC given the message and other fields to be included; never null.
3924             * The (secret) Key must also be supplied
3925             * and be suitable for the HMAC algorithm used.
3926             * <p>
3927             * The MAC is computed on a series of segments of the input of the same length
3928             * (except for a possibly-shorter final segment)
3929             * producing an HMAC on each segment.
3930             * <p>
3931             * Each HMAC is computed over a binary message consisting of:
3932             * <ol>
3933             * <li>the 4-byte length of the entire protected stream/frame.
3934             * <li>the 8-byte (Java-style millisecond UTC) timestamp in network order,
3935             * <li>for all segments other than the first, the HMAC of the previous segment,
3936             * <li>the full on-the-wire form of the RawPacket (including header/trailer).
3937             * </ol>
3938             * <p>
3939             * The chaining should make it impossible to reorder the segments.
3940             * <p>
3941             * The segmentation is so that the data in each segment is known-safe
3942             * and can be safely consumed by incremental/streaming CPU-heavy operations
3943             * before subsequent segments have been received and decoded.
3944             * This segmentation also means that we can abort a damaged message
3945             * as soon as we check the damaged segment: we do not need to wait
3946             * to receive and store and check the whole message.
3947             *
3948             * @param raw  the RawPacket to protect; never null
3949             * @param timestamp  the timestamp for creation/send of the RawPacket; strictly positive
3950             * @param key  the (secret) key for the HMAC; never null
3951             *
3952             * @throws InvalidKeyException  if the Key supplied is inappropriate
3953             *
3954             * @throws IllegalArgumentException  if the timestamp is non-positive
3955             *     or any other argument is null
3956             * @throws IllegalStateException  if the HMAC algorithm is unavailable
3957             *
3958             * @return immutable in-order list of HMACs to protect each stream segment
3959             */
3960            public static List<ROByteArray> computeMAC(final long timestamp,
3961                                                       final RawPacket raw,
3962                                                       final SecretKey key)
3963                throws InvalidKeyException
3964                {
3965                if((timestamp <= 0) || (raw == null) || (key == null))
3966                    { throw new IllegalArgumentException(); }
3967    
3968                // Compute the segment size that we will be using.
3969                final int frameLength = raw.getFrameLength();
3970                final int segmentSize = computeSegmentSize(frameLength);
3971                assert(0 == (segmentSize & (segmentSize-1))); // Segment size should be a power of two.
3972    
3973                // Compute the number of segments that we will be using.
3974                final int nSegs = (frameLength <= MIN_SEGMENT_SIZE) ? 1 : // Note frameLength never zero.
3975                    (frameLength + segmentSize - 1) / segmentSize;
3976    
3977                // Compose our result MAC list...
3978                final List<ROByteArray> mac = new ArrayList<ROByteArray>(nSegs);
3979    
3980                // Incrementally generate the MAC block by block.
3981                // We use an output stream sink to avoid having to copy large frames.
3982                // This OutputStream is not thread-safe and does not need to be.
3983                // Working memory required is basically only that for the current segment's Mac.
3984                final OutputStream sink = new OutputStream(){
3985                    /**MAC for current segment; initialised for first block. */
3986                    Mac current = startMacForSegment();
3987    
3988                    /**Start MAC for next segment.
3989                     * The working MAC must be null before this is called.
3990                     */
3991                    private Mac startMacForSegment()
3992                        {
3993                        try
3994                            {
3995                            final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
3996                            m.init(key);
3997    
3998                            // First, write 4-byte length of entire message.
3999                            m.update(intSer(frameLength));
4000                            // Second, write 8-byte timestamp.
4001                            m.update(longSer(timestamp));
4002                            // If there is a previous MAC, stir it in.
4003                            if(null != prevMAC) { m.update(prevMAC); }
4004    
4005                            return(m);
4006                            }
4007                        catch(final Exception e)
4008                            {
4009                            e.printStackTrace();
4010                            throw new Error("internal error", e);
4011                            }
4012                        }
4013    
4014                    /**The previous MAC value, or null while gathering the first segment. */
4015                    private byte[] prevMAC;
4016    
4017                    /**Count of bytes seen so far; non-negative.
4018                     * Implicitly this also indicates how far into the current segment we are.
4019                     */
4020                    private int bytesSeenSoFar;
4021    
4022                    /**Process the next block to generate the next MAC.
4023                     * Must only be called when we've collected a full segment, or at EOF/close().
4024                     */
4025                    private void endMacForSegment()
4026                        {
4027                        try
4028                            {
4029                            // Compute and record the MAC.
4030                            final byte[] hmac = current.doFinal();
4031                            prevMAC = hmac;
4032                            mac.add(new ROByteArray(hmac));
4033                            }
4034                        catch(final Exception e)
4035                            {
4036                            e.printStackTrace();
4037                            throw new Error("internal error", e);
4038                            }
4039                        }
4040    
4041                    @Override
4042                    public void write(final byte[] b, int off, int len) throws IOException
4043                        {
4044                        // Loop to generate multiple segment MACs if needed.
4045                        while(len > 0)
4046                            {
4047                            // Compute bytes needed to fill the buffer: always positive.
4048                            final int offsetInSegBuf = bytesSeenSoFar & (segmentSize-1); // Faster version of bytesSeenSoFar % segmentSize;
4049                            final int needed = segmentSize - (offsetInSegBuf);
4050                            // How many will we copy in...
4051                            final int toCopy = Math.min(needed, len);
4052    
4053                            // Write the frame bytes to protect.
4054                            current.update(b, off, toCopy);
4055    
4056                            // Adjust all our indexes...
4057                            bytesSeenSoFar += toCopy;
4058                            off += toCopy;
4059                            len -= toCopy;
4060    
4061                            // If we copied exactly up to the end of the segBuf
4062                            // then complete the current MAC and start the next one...
4063                            if(toCopy == needed) { endMacForSegment(); current = startMacForSegment(); }
4064                            }
4065                        }
4066    
4067                    /**Horribly inefficient, but correct, and should never actually get called. */
4068                    @Override public void write(final int b) throws IOException
4069                        {
4070                        final byte buf[] = new byte[1];
4071                        buf[0] = (byte) b;
4072                        write(buf, 0, 1);
4073                        }
4074    
4075                    /**Handle the tail of the input. */
4076                    @Override public void close() throws IOException
4077                        {
4078                        assert(bytesSeenSoFar == frameLength); // Should have seen all the data.
4079                        endMacForSegment(); // Push out the current (final) MAC...
4080                        }
4081                    };
4082                // Compute the HMAC value(s).
4083                try { raw.writePacket(sink, false); sink.close(); }
4084                catch(final IOException e) { throw new Error("internal error", e); }
4085    //System.out.println("frameLength/nSegs/mac.size() = " + frameLength + '/' + nSegs + '/' + mac.size());
4086                assert(nSegs == mac.size()); // Did we create the correct number of MACs values?
4087    
4088                // Let caller wrap (private) List if needed.
4089                return(mac);
4090                }
4091    
4092            /**Protect an input stream with our MAC; aborts with IOException in case of corruption.
4093             * This acts as a transparent pass-through filter
4094             * that will read a MAC-protected-segment at a time from the underlying stream
4095             * and either pass it through having verified it,
4096             * or abort with an IOException if the segment is corrupt.
4097             * <p>
4098             * This closes its input stream and vetoes any further operations
4099             * once any error has been encountered.
4100             * <p>
4101             * We use Key rather than SecretKey,
4102             * since the latter depends on Java extensions that may not be available,
4103             * eg when run in JWS.
4104             * <p>
4105             * Returned input stream not to be multi-threaded.
4106             */
4107            public InputStream protectInputStream(final Key key, final InputStream is)
4108                {
4109                if(key == null) { throw new IllegalArgumentException(); }
4110                if(is == null) { throw new IllegalArgumentException(); }
4111    
4112                // Compute our segment size for buffering.
4113                final int segmentSize = computeSegmentSize(length);
4114    
4115                // Number of segments.
4116                final int nSegs = mac.size();
4117    
4118                // To avoid any confusion, we do not allow mark/reset.
4119                return(new FilterInputStream(is){
4120                    /**Set true upon any error; subsequently we will not allow any further read()s. */
4121                    private boolean hasErred;
4122    
4123                    /**Our read buffer for one segment's data (or whole frame if smaller); never null. */
4124                    private final byte[] segBuf = new byte[Math.min(length, segmentSize)];
4125    
4126                    /**Data remaining in segBuf; non-negative. */
4127                    private int bufLen;
4128    
4129                    /**Start of remaining data in segBuf; non-negative. */
4130                    private int bufPos;
4131    
4132                    /**Which MAC segment is next to be loaded/verified. */
4133                    private int nextSegNumber;
4134    
4135                    /**Whatever is in our buffer plus from upstream without blocking. */
4136                    @Override public int available() throws IOException
4137                        { return(in.available() + bufLen); }
4138    
4139                    @Override public void mark(final int readlimit)
4140                        { throw new UnsupportedOperationException("mark not supported"); }
4141    
4142                    /**Mark/reset is not allowed to avoid confusion/complexity. */
4143                    @Override public boolean markSupported() { return(false); }
4144    
4145                    /**Read a single byte: we try to make this moderately efficient. */
4146                    @Override public int read() throws IOException
4147                        {
4148                        if(hasErred) { throw new IOException("corrupt stream"); }
4149    
4150                        // Fast path to return data already in our buffer.
4151                        if(bufLen > 0)
4152                            {
4153                            --bufLen;
4154                            return(segBuf[bufPos++] & 0xff);
4155                            }
4156    
4157                        // Use full read() to do all the leg-work.
4158                        final byte[] buf1 = new byte[1];
4159                        final int n = read(buf1, 0, 1);
4160                        if(n < 1) { return(-1); /* EOF */ }
4161                        assert(n == 1);
4162                        return(buf1[0] & 0xff);
4163                        }
4164    
4165                    /**Bulk data read.
4166                     * This routine returns any data it has buffered (ie already verified)
4167                     * else it reads a segment of data from the underlying stream
4168                     * (possibly less than a full segment at EOF, but verifying the length)
4169                     * and validates it with the appropriate MAC.
4170                     * <p>
4171                     * If validation against the MAC fails
4172                     * then this and all subsequent calls to read()
4173                     * are vetoed with an IOException.
4174                     * <p>
4175                     * If validation succeeds then the requested data is returned.
4176                     * <p>
4177                     * We do not return data in one read across segment boundaries,
4178                     * as we are not required to and it would increase complexity.
4179                     */
4180                    @Override
4181                    public int read(final byte[] b, final int off, final int len) throws IOException
4182                        {
4183                        if((b == null) ||
4184                           (off < 0) || (off >= b.length) ||
4185                           (len < 0) || (off + len > b.length))
4186                            { throw new IllegalArgumentException(); }
4187    
4188                        if(hasErred) { throw new IOException("corrupt stream"); }
4189    
4190                        if(len == 0) { return(0); }
4191    
4192                        // If we have no verified data left in our buffer
4193                        // then unless at EOF we need to load another segment and verify it.
4194                        if(bufLen == 0)
4195                            {
4196                            // We are at EOF if we have read the last segment.
4197                            if(nextSegNumber >= nSegs) { return(-1); }
4198    
4199                            // Allow for the final segment to be smaller.
4200                            final int readSize = (nextSegNumber < nSegs-1) ? segmentSize :
4201                                (length - (segmentSize * (nSegs-1)));
4202                            bufPos = 0; // Next caller will read from offset zero.
4203                            bufLen = 0; // Buffer is currently empty...
4204                            while(bufLen != readSize)
4205                                {
4206                                try
4207                                    {
4208                                    final int n = in.read(segBuf, bufLen, readSize - bufLen);
4209                                    if(n < 1) { throw new IOException("premature EOF"); }
4210                                    bufLen += n;
4211                                    }
4212                                catch(final IOException e) { hasErred = true; throw e; }
4213                                }
4214    
4215                            // Verify the data.
4216                            try
4217                                {
4218                                final Mac m = Mac.getInstance(CoreConsts.HMAC_ALG);
4219                                m.init(key);
4220    
4221                                // First, write 4-byte length of entire message.
4222                                m.update(intSer(length));
4223                                // Second, write 8-byte timestamp.
4224                                m.update(longSer(timestamp));
4225                                // If there is a previous MAC, stir it in.
4226                                if(nextSegNumber > 0) { m.update(mac.get(nextSegNumber-1).toByteArray()); }
4227                                // Finally, use the bytes read from upstream.
4228                                m.update(segBuf, 0, bufLen);
4229    
4230                                // Compute and record the MAC.
4231                                final byte[] hmac = m.doFinal();
4232                                if(!Arrays.equals(mac.get(nextSegNumber++).toByteArray(), hmac))
4233                                    { throw new IOException("corrupt input data"); }
4234                                }
4235                            catch(final IOException e)
4236                                {
4237                                hasErred = true; // Prevent further read()s from this stream.
4238                                throw e;
4239                                }
4240                            catch(final Exception e)
4241                                {
4242                                hasErred = true;
4243                                e.printStackTrace();
4244                                throw new Error("internal error", e);
4245                                }
4246                            }
4247    
4248                        // The buffer must now have something in it.
4249                        assert(bufLen > 0);
4250                        // Return verified data from our buffer to the caller.
4251                        final int toCopy = Math.min(bufLen, len);
4252                        System.arraycopy(segBuf, bufPos, b, off, toCopy);
4253                        bufLen -= toCopy;
4254                        bufPos += toCopy;
4255                        return(toCopy);
4256                        }
4257    
4258                    /**Implemented as read(b, 0, b.length). */
4259                    @Override public int read(final byte[] b) throws IOException
4260                        { return(read(b, 0, b.length)); }
4261    
4262                    @Override public void reset() throws IOException
4263                        { throw new IOException("reset not allowed"); }
4264    
4265                    /**We have to read the input to skip it. */
4266                    @Override public long skip(long n) throws IOException
4267                        {
4268                        long skipped = 0;
4269                        final byte b[] = new byte[(int) Math.min(1024, n)];
4270                        while(n > 0)
4271                            {
4272                            final int r = read(b, 0, (int) Math.min(n, b.length));
4273                            if(r < 1) { break; /* EOF */ }
4274                            n -= r;
4275                            skipped += r;
4276                            }
4277                        return(skipped);
4278                        }
4279                    });
4280                }
4281    
4282            /**Checks only that the object content is valid.
4283             * This does NOT attempt to check that the MAC matches the message and fields;
4284             * that must be done explicitly elsewhere.
4285             * <p>
4286             * Partly this does not check the MAC because it does not have access to the key.
4287             */
4288            public void validateObject() throws InvalidObjectException
4289                {
4290                if(timestamp <= 0) { throw new InvalidObjectException("bad object: bad timestamp"); }
4291                if(length < 0) { throw new InvalidObjectException("bad object: bad length"); }
4292                if(mac == null) { throw new InvalidObjectException("bad object: null MAC"); }
4293                if(mac.size() == 0) { throw new InvalidObjectException("bad object: empty MAC"); }
4294                for(final ROByteArray m : mac)
4295                    {
4296                    if(!(m instanceof ROByteArray))  { throw new InvalidObjectException("bad object: null/bad segment MAC"); }
4297                    if(m.length() == 0) { throw new InvalidObjectException("bad object: empty segment MAC"); }
4298                    }
4299                }
4300    
4301            /**Equality depends on all the members being equal. */
4302            @Override
4303            public boolean equals(final Object obj)
4304                {
4305                if(this == obj) { return(true); }
4306                if(!(obj instanceof PacketProtector)) { return(false); }
4307                final PacketProtector other = (PacketProtector) obj;
4308                return((timestamp == other.timestamp) &&
4309                       (length == other.length) &&
4310                       mac.equals(other.mac));
4311                }
4312    
4313            /**We use the timestamp and length fields in the hash for the entire collection. */
4314            @Override public int hashCode() { return(length ^ (int) timestamp); }
4315            }
4316        }