Class DataExtractorService
java.lang.Object
org.jumpmind.symmetric.service.impl.AbstractService
org.jumpmind.symmetric.service.impl.DataExtractorService
- All Implemented Interfaces:
IDataExtractorService
,INodeCommunicationService.INodeCommunicationExecutor
,IService
- Direct Known Subclasses:
FileSyncExtractorService
public class DataExtractorService
extends AbstractService
implements IDataExtractorService, INodeCommunicationService.INodeCommunicationExecutor
- See Also:
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsFields inherited from class org.jumpmind.symmetric.service.impl.AbstractService
log, parameterService, platform, sqlTemplate, sqlTemplateDirty, symmetricDialect, tablePrefix
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected ExtractDataReader
buildExtractDataReader
(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo) protected ExtractDataReader
buildExtractDataReader
(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob) protected MultiBatchStagingWriter
buildMultiBatchStagingWriter
(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) int
cancelExtractRequests
(long loadId) protected boolean
canProcessExtractRequest
(ExtractRequest request, NodeCommunication.CommunicationType communicationType) protected final boolean
changeBatchStatus
(AbstractBatch.Status status, OutgoingBatch currentBatch, DataExtractorService.ExtractMode mode) protected void
checkSendDeferredConstraints
(ExtractRequest request, List<ExtractRequest> childRequests, Node targetNode) protected void
cleanupIgnoredBatch
(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, IDataWriter writer) protected TransformWriter
createTransformDataWriter
(Node identity, Node targetNode, IDataWriter extractWriter) void
execute
(NodeCommunication nodeCommunication, RemoteNodeStatus status) This is a callback method used by the NodeCommunicationService that extracts an initial load in the background.extract
(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) protected List<OutgoingBatch>
extract
(ProcessInfo extractInfo, Node targetNode, List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) extract
(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport) protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatch
extractBatch
(OutgoingBatch extractBatch, org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, DataExtractorService.ExtractMode mode, List<OutgoingBatch> activeBatches) boolean
extractBatchRange
(Writer writer, String nodeId, long startBatchId, long endBatchId) boolean
extractBatchRange
(Writer writer, String nodeId, Date startBatchTime, Date endBatchTime, String... channelIds) void
extractConfigurationStandalone
(Node targetNode, Writer writer, String... tablesToExclude) Extract the SymmetricDS configuration for the passed inNode
.boolean
extractOnlyOutgoingBatch
(String nodeId, long batchId, Writer writer) This method will extract an outgoing batch, but will not update the outgoing batch statusprotected OutgoingBatch
extractOutgoingBatch
(ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, DataExtractorService.ExtractMode mode, IDataProcessorListener listener) extractToPayload
(ProcessInfo processInfo, Node targetNode, StructureDataWriter.PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers) protected int
findStatsIndex
(String bufferString, String prevBuffer) protected String
getBatchStats
(OutgoingBatch batch) protected String
getCompletedTablesForExtractByLoadId
(long loadId) getCompletedTablesForExtractByLoadIdAndNodeId
(long loadId, String nodeId) protected List<ExtractRequest>
getExtractChildRequestsForNode
(ExtractRequest parentRequest) protected Map<Long,
List<ExtractRequest>> getExtractChildRequestsForNode
(NodeCommunication nodeCommunication, String queue, QueueThread queueThread) protected ExtractRequest
List<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair>
protected List<ExtractRequest>
getExtractRequestsForNode
(NodeCommunication nodeCommunication, String queue, QueueThread queueThread) protected List<ExtractRequest>
protected Statistics
getExtractStats
(IDataWriter writer, OutgoingBatch currentBatch) getPendingTablesForExtractByLoadId
(long loadId) getPendingTablesForExtractByLoadIdAndNodeId
(long loadId, String nodeId) protected ProcessType
protected String
getSemaphoreKey
(OutgoingBatch batch, boolean useStagingDataWriter) protected IStagedResource
getStagedResource
(OutgoingBatch currentBatch) protected boolean
isApplicable
(NodeCommunication nodeCommunication) protected boolean
isPreviouslyExtracted
(OutgoingBatch currentBatch, boolean acquireReference) protected boolean
isRetry
(OutgoingBatch currentBatch, Node remoteNode) protected boolean
protected OutgoingBatches
loadPendingBatches
(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) protected void
queue
(String nodeId, String queue, RemoteNodeStatuses statuses) queueWork
(boolean force) protected void
releaseLock
(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) void
void
protected final OutgoingBatch
requeryIfEnoughTimeHasPassed
(long ts, OutgoingBatch currentBatch) If time has passed, then re-query the batch to double check that the status has not changedrequestExtractRequest
(ISqlTransaction transaction, String nodeId, String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) void
resetExtractRequest
(OutgoingBatch batch) protected void
restartExtractRequest
(List<OutgoingBatch> batches, ExtractRequest request, List<ExtractRequest> childRequests) protected OutgoingBatch
sendOutgoingBatch
(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) protected void
transferFromStaging
(DataExtractorService.ExtractMode mode, Batch.BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec, ProcessInfo processInfo) protected void
triggerReExtraction
(OutgoingBatch currentBatch) void
updateExtractRequestLoadTime
(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) protected void
protected void
updateExtractRequestsForThreading
(Collection<ExtractRequest> extractRequests) protected void
updateExtractRequestStatus
(ISqlTransaction transaction, long extractId, ExtractRequest.ExtractStatus status, long extractedRows, long extractedMillis) void
updateExtractRequestStatuses
(ISqlTransaction transaction, long loadId, String sourceNodeId, String fromStatus, String toStatus) void
updateExtractRequestTransferred
(OutgoingBatch batch, long transferMillis) protected IDataWriter
wrapWithTransformWriter
(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, boolean useStagingDataWriter) protected boolean
writeBatchStats
(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) protected void
writeKeepAliveAck
(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) Methods inherited from class org.jumpmind.symmetric.service.impl.AbstractService
assertNotNull, buildBatchOrderBy, buildBatchParams, buildBatchWhere, buildBatchWhereFromFilter, close, createSqlReplacementTokens, getJdbcTemplate, getParameterService, getSql, getSymmetricDialect, getTablePrefix, getTargetDialect, getTargetPlatform, getTargetPlatform, isCalledFromSymmetricAdminTool, isSet, isStreamClosedByClient, isSymmetricTable, logOnce, maxDate, readAcks, sendAck, setSqlMap, synchronize, toNodeIds, toNodeIds
-
Field Details
-
engine
-
-
Constructor Details
-
DataExtractorService
-
-
Method Details
-
extractConfigurationStandalone
public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) Extract the SymmetricDS configuration for the passed inNode
.- Specified by:
extractConfigurationStandalone
in interfaceIDataExtractorService
-
extractToPayload
public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node targetNode, StructureDataWriter.PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers) - Specified by:
extractToPayload
in interfaceIDataExtractorService
-
extract
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport) - Specified by:
extract
in interfaceIDataExtractorService
- Returns:
- a list of batches that were extracted
-
extract
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) - Specified by:
extract
in interfaceIDataExtractorService
-
loadPendingBatches
protected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) -
extractOnlyOutgoingBatch
This method will extract an outgoing batch, but will not update the outgoing batch status- Specified by:
extractOnlyOutgoingBatch
in interfaceIDataExtractorService
-
extract
protected List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) -
extractBatch
protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, DataExtractorService.ExtractMode mode, List<OutgoingBatch> activeBatches) throws Exception - Throws:
Exception
-
writeKeepAliveAck
protected void writeKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) -
changeBatchStatus
protected final boolean changeBatchStatus(AbstractBatch.Status status, OutgoingBatch currentBatch, DataExtractorService.ExtractMode mode) -
requeryIfEnoughTimeHasPassed
If time has passed, then re-query the batch to double check that the status has not changed -
extractOutgoingBatch
protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, DataExtractorService.ExtractMode mode, IDataProcessorListener listener) -
getSemaphoreKey
-
acquireStagingFileLock
- Specified by:
acquireStagingFileLock
in interfaceIDataExtractorService
-
releaseLock
protected void releaseLock(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) -
isStagingFileLockRequired
-
triggerReExtraction
-
buildExtractDataReader
protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo) -
buildExtractDataReader
protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob) -
getExtractStats
-
wrapWithTransformWriter
protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, boolean useStagingDataWriter) -
cleanupIgnoredBatch
protected void cleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, IDataWriter writer) -
getStagedResource
-
isPreviouslyExtracted
-
isRetry
-
sendOutgoingBatch
protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) -
transferFromStaging
protected void transferFromStaging(DataExtractorService.ExtractMode mode, Batch.BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec, ProcessInfo processInfo) -
findStatsIndex
-
getPendingTablesForExtractByLoadId
- Specified by:
getPendingTablesForExtractByLoadId
in interfaceIDataExtractorService
-
getCompletedTablesForExtractByLoadId
- Specified by:
getCompletedTablesForExtractByLoadId
in interfaceIDataExtractorService
-
getPendingTablesForExtractByLoadIdAndNodeId
- Specified by:
getPendingTablesForExtractByLoadIdAndNodeId
in interfaceIDataExtractorService
-
getCompletedTablesForExtractByLoadIdAndNodeId
public List<ExtractRequest> getCompletedTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId) - Specified by:
getCompletedTablesForExtractByLoadIdAndNodeId
in interfaceIDataExtractorService
-
updateExtractRequestLoadTime
public void updateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) - Specified by:
updateExtractRequestLoadTime
in interfaceIDataExtractorService
-
updateExtractRequestTransferred
- Specified by:
updateExtractRequestTransferred
in interfaceIDataExtractorService
-
cancelExtractRequests
public int cancelExtractRequests(long loadId) - Specified by:
cancelExtractRequests
in interfaceIDataExtractorService
-
writeBatchStats
protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) throws IOException - Throws:
IOException
-
getBatchStatsColumns
-
getBatchStats
-
extractBatchRange
- Specified by:
extractBatchRange
in interfaceIDataExtractorService
-
extractBatchRange
public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime, Date endBatchTime, String... channelIds) - Specified by:
extractBatchRange
in interfaceIDataExtractorService
-
createTransformDataWriter
protected TransformWriter createTransformDataWriter(Node identity, Node targetNode, IDataWriter extractWriter) -
queueWork
- Specified by:
queueWork
in interfaceIDataExtractorService
-
queue
-
getExtractRequestNodes
public List<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair> getExtractRequestNodes() -
updateExtractRequestsForThreading
protected void updateExtractRequestsForThreading() -
getExtractRequestsForThreading
-
updateExtractRequestsForThreading
-
getExtractRequestsForNode
protected List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication, String queue, QueueThread queueThread) -
getExtractRequestForBatch
-
getExtractChildRequestsForNode
protected Map<Long,List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, String queue, QueueThread queueThread) -
getExtractChildRequestsForNode
-
resetExtractRequest
- Specified by:
resetExtractRequest
in interfaceIDataExtractorService
-
requestExtractRequest
public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) - Specified by:
requestExtractRequest
in interfaceIDataExtractorService
-
updateExtractRequestStatus
protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId, ExtractRequest.ExtractStatus status, long extractedRows, long extractedMillis) -
canProcessExtractRequest
protected boolean canProcessExtractRequest(ExtractRequest request, NodeCommunication.CommunicationType communicationType) -
execute
This is a callback method used by the NodeCommunicationService that extracts an initial load in the background.- Specified by:
execute
in interfaceINodeCommunicationService.INodeCommunicationExecutor
-
restartExtractRequest
protected void restartExtractRequest(List<OutgoingBatch> batches, ExtractRequest request, List<ExtractRequest> childRequests) -
releaseMissedExtractRequests
public void releaseMissedExtractRequests()- Specified by:
releaseMissedExtractRequests
in interfaceIDataExtractorService
-
checkSendDeferredConstraints
protected void checkSendDeferredConstraints(ExtractRequest request, List<ExtractRequest> childRequests, Node targetNode) -
isApplicable
-
buildMultiBatchStagingWriter
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) -
getProcessType
-
removeBatchFromStaging
- Specified by:
removeBatchFromStaging
in interfaceIDataExtractorService
-
updateExtractRequestStatuses
public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId, String fromStatus, String toStatus) - Specified by:
updateExtractRequestStatuses
in interfaceIDataExtractorService
-