Class RouterService
java.lang.Object
org.jumpmind.symmetric.service.impl.AbstractService
org.jumpmind.symmetric.service.impl.RouterService
- All Implemented Interfaces:
INodeCommunicationService.INodeCommunicationExecutor
,IRouterService
,IService
public class RouterService
extends AbstractService
implements IRouterService, INodeCommunicationService.INodeCommunicationExecutor
- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected long
protected long
protected ISymmetricEngine
protected IExtensionService
protected boolean
protected DataGapDetector
protected Map<String,
CounterStat> protected boolean
protected Map<Integer,
CounterStat> protected Map<Integer,
CounterStat> protected ExecutorService
protected long
protected boolean
Fields inherited from class org.jumpmind.symmetric.service.impl.AbstractService
log, parameterService, platform, sqlTemplate, sqlTemplateDirty, symmetricDialect, tablePrefix
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected Table
buildTableFromTriggerHistory
(TriggerHistory triggerHistory) protected void
completeBatches
(ChannelRouterContext context, List<OutgoingBatch> batches, Set<IDataRouter> usedRouters) protected void
protected boolean
doesColumnCountMatchValues
(DataMetaData dataMetaData, Data data) void
execute
(NodeCommunication nodeCommunication, RemoteNodeStatus status) findAvailableNodes
(TriggerRouter triggerRouter, ChannelRouterContext context) void
Get a list of available batch algorithms that can be used for the different channelsprotected IDataRouter
getDataRouter
(Router router, DataMetaData dataMetaData) protected List<TriggerRouter>
getTriggerRoutersForData
(Data data, ChannelRouterContext context) long
protected boolean
protected int
insertDataEvents
(ProcessInfo processInfo, ChannelRouterContext context, DataMetaData dataMetaData, Collection<String> nodeIds, TriggerRouter triggerRouter) protected boolean
protected boolean
onlyDefaultRoutersAssigned
(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) protected boolean
producesCommonBatches
(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) Deprecated.protected String
qualifyUsingDataGaps
(List<DataGap> dataGaps, int numberOfGapsToQualify, String sql) long
routeData
(boolean force) This method will route data to specific nodes.protected int
routeData
(ProcessInfo processInfo, Data data, ChannelRouterContext context) protected long
routeDataForChannel
(ProcessInfo processInfo, NodeChannel nodeChannel, Node sourceNode, NodeCommunication nodeCommunication, ChannelRouterContext context) protected long
We route data channel by channel for two reasons.protected long
selectDataAndRoute
(ProcessInfo processInfo, NodeCommunication nodeCommunication, ChannelRouterContext context) Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary.boolean
shouldDataBeRouted
(SimpleRouterContext context, DataMetaData dataMetaData, Node node, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) For use in data load eventsprotected IDataToRouteReader
startReading
(ChannelRouterContext context) void
stop()
Methods inherited from class org.jumpmind.symmetric.service.impl.AbstractService
assertNotNull, buildBatchOrderBy, buildBatchParams, buildBatchWhere, buildBatchWhereFromFilter, close, createSqlReplacementTokens, getJdbcTemplate, getParameterService, getSql, getSymmetricDialect, getTablePrefix, getTargetDialect, getTargetPlatform, getTargetPlatform, isCalledFromSymmetricAdminTool, isSet, isStreamClosedByClient, isSymmetricTable, logOnce, maxDate, readAcks, sendAck, setSqlMap, synchronize, toNodeIds, toNodeIds
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.jumpmind.symmetric.service.IService
getSql, synchronize
-
Field Details
-
missingTriggerRouter
-
invalidRouterType
-
missingColumns
-
triggerRouterCacheTime
protected long triggerRouterCacheTime -
commonBatchesLastKnownState
-
commonBatchesCacheTime
protected long commonBatchesCacheTime -
defaultRouterOnlyLastKnownState
-
defaultRoutersCacheTime
protected long defaultRoutersCacheTime -
isAllDataReadByChannel
-
hasMaxDataRoutedByChannel
-
readThread
-
engine
-
extensionService
-
gapDetector
-
firstTimeCheck
protected boolean firstTimeCheck -
isUsingTargetExternalId
protected boolean isUsingTargetExternalId -
useChannelThreading
protected boolean useChannelThreading
-
-
Constructor Details
-
RouterService
-
-
Method Details
-
shouldDataBeRouted
public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData, Node node, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) For use in data load events- Specified by:
shouldDataBeRouted
in interfaceIRouterService
-
stop
public void stop()- Specified by:
stop
in interfaceIRouterService
-
flushCache
public void flushCache()- Specified by:
flushCache
in interfaceIRouterService
-
routeData
public long routeData(boolean force) This method will route data to specific nodes.- Specified by:
routeData
in interfaceIRouterService
-
routeDataForEachChannel
protected long routeDataForEachChannel()We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is to reduce the number of connections we are required to have. -
getReadyChannels
-
qualifyUsingDataGaps
-
producesCommonBatches
@Deprecated protected boolean producesCommonBatches(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) Deprecated. -
onlyDefaultRoutersAssigned
protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) -
execute
- Specified by:
execute
in interfaceINodeCommunicationService.INodeCommunicationExecutor
-
routeDataForChannel
protected long routeDataForChannel(ProcessInfo processInfo, NodeChannel nodeChannel, Node sourceNode, NodeCommunication nodeCommunication, ChannelRouterContext context) -
completeBatchesAndCommit
-
completeBatches
protected void completeBatches(ChannelRouterContext context, List<OutgoingBatch> batches, Set<IDataRouter> usedRouters) -
findAvailableNodes
-
startReading
-
selectDataAndRoute
protected long selectDataAndRoute(ProcessInfo processInfo, NodeCommunication nodeCommunication, ChannelRouterContext context) throws InterruptedException Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary. Then route eachData
while continuing to keep the queue filled until the result set is entirely read.- Parameters:
conn
- The connection to use for selecting the data.context
- The current context of the routing process- Throws:
InterruptedException
-
routeData
-
insertDataEvents
protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext context, DataMetaData dataMetaData, Collection<String> nodeIds, TriggerRouter triggerRouter) -
getDataRouter
-
getTriggerRoutersForData
-
getUnroutedDataCount
public long getUnroutedDataCount()- Specified by:
getUnroutedDataCount
in interfaceIRouterService
-
getAvailableBatchAlgorithms
Description copied from interface:IRouterService
Get a list of available batch algorithms that can be used for the different channels- Specified by:
getAvailableBatchAlgorithms
in interfaceIRouterService
-
getRouters
- Specified by:
getRouters
in interfaceIRouterService
-
getDataGaps
- Specified by:
getDataGaps
in interfaceIRouterService
-
buildTableFromTriggerHistory
-
doesColumnCountMatchValues
-
hasMaxDataRouted
protected boolean hasMaxDataRouted() -
isAllDataRead
protected boolean isAllDataRead()
-