public class DataExtractorService extends AbstractService implements IDataExtractorService, INodeCommunicationService.INodeCommunicationExecutor
IDataExtractorService| Modifier and Type | Class and Description |
|---|---|
protected static class |
DataExtractorService.ExtractMode |
log, parameterService, platform, sqlTemplate, sqlTemplateDirty, symmetricDialect, tablePrefix| Constructor and Description |
|---|
DataExtractorService(ISymmetricEngine engine) |
| Modifier and Type | Method and Description |
|---|---|
protected org.jumpmind.symmetric.io.stage.StagingFileLock |
acquireStagingFileLock(OutgoingBatch batch) |
protected org.jumpmind.symmetric.io.data.reader.ExtractDataReader |
buildExtractDataReader(Node sourceNode,
Node targetNode,
OutgoingBatch currentBatch,
ProcessInfo processInfo) |
protected org.jumpmind.symmetric.io.data.reader.ExtractDataReader |
buildExtractDataReader(Node sourceNode,
Node targetNode,
OutgoingBatch currentBatch,
ProcessInfo processInfo,
boolean containsBigLob) |
protected MultiBatchStagingWriter |
buildMultiBatchStagingWriter(ExtractRequest request,
java.util.List<ExtractRequest> childRequests,
Node sourceNode,
Node targetNode,
java.util.List<OutgoingBatch> batches,
ProcessInfo processInfo,
Channel channel,
boolean isRestarted) |
int |
cancelExtractRequests(long loadId) |
protected boolean |
canProcessExtractRequest(ExtractRequest request,
NodeCommunication.CommunicationType communicationType) |
protected boolean |
changeBatchStatus(AbstractBatch.Status status,
OutgoingBatch currentBatch,
DataExtractorService.ExtractMode mode) |
protected void |
checkSendDeferredConstraints(ExtractRequest request,
java.util.List<ExtractRequest> childRequests,
Node targetNode) |
protected void |
cleanupIgnoredBatch(Node sourceNode,
Node targetNode,
OutgoingBatch currentBatch,
org.jumpmind.symmetric.io.data.IDataWriter writer) |
protected org.jumpmind.symmetric.io.data.writer.TransformWriter |
createTransformDataWriter(Node identity,
Node targetNode,
org.jumpmind.symmetric.io.data.IDataWriter extractWriter) |
void |
execute(NodeCommunication nodeCommunication,
RemoteNodeStatus status)
This is a callback method used by the NodeCommunicationService that extracts an initial load
in the background.
|
java.util.List<OutgoingBatch> |
extract(ProcessInfo extractInfo,
Node targetNode,
IOutgoingTransport transport) |
protected java.util.List<OutgoingBatch> |
extract(ProcessInfo extractInfo,
Node targetNode,
java.util.List<OutgoingBatch> activeBatches,
org.jumpmind.symmetric.io.data.IDataWriter dataWriter,
java.io.BufferedWriter writer,
DataExtractorService.ExtractMode mode) |
java.util.List<OutgoingBatch> |
extract(ProcessInfo extractInfo,
Node targetNode,
java.lang.String queue,
IOutgoingTransport transport) |
protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatch |
extractBatch(OutgoingBatch extractBatch,
org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status,
ProcessInfo extractInfo,
Node targetNode,
org.jumpmind.symmetric.io.data.IDataWriter dataWriter,
DataExtractorService.ExtractMode mode,
java.util.List<OutgoingBatch> activeBatches) |
boolean |
extractBatchRange(java.io.Writer writer,
java.lang.String nodeId,
java.util.Date startBatchTime,
java.util.Date endBatchTime,
java.lang.String... channelIds) |
boolean |
extractBatchRange(java.io.Writer writer,
java.lang.String nodeId,
long startBatchId,
long endBatchId) |
void |
extractConfigurationOnly(Node node,
java.io.OutputStream out) |
void |
extractConfigurationStandalone(Node node,
java.io.OutputStream out) |
void |
extractConfigurationStandalone(Node targetNode,
java.io.Writer writer,
java.lang.String... tablesToExclude)
Extract the SymmetricDS configuration for the passed in
Node. |
boolean |
extractOnlyOutgoingBatch(java.lang.String nodeId,
long batchId,
java.io.Writer writer)
This method will extract an outgoing batch, but will not update the outgoing batch status
|
protected OutgoingBatch |
extractOutgoingBatch(ProcessInfo extractInfo,
Node targetNode,
org.jumpmind.symmetric.io.data.IDataWriter dataWriter,
OutgoingBatch currentBatch,
boolean useStagingDataWriter,
boolean updateBatchStatistics,
DataExtractorService.ExtractMode mode,
org.jumpmind.symmetric.io.data.IDataProcessorListener listener) |
java.util.List<OutgoingBatchWithPayload> |
extractToPayload(ProcessInfo processInfo,
Node targetNode,
org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType payloadType,
boolean useJdbcTimestampFormat,
boolean useUpsertStatements,
boolean useDelimiterIdentifiers) |
protected boolean |
filter(Node targetNode,
java.lang.String tableName) |
protected int |
findStatsIndex(java.lang.String bufferString,
java.lang.String prevBuffer) |
protected java.lang.String |
getBatchStats(OutgoingBatch batch) |
protected java.lang.String |
getBatchStatsColumns() |
java.util.List<ExtractRequest> |
getCompletedTablesForExtractByLoadId(long loadId) |
protected java.util.List<ExtractRequest> |
getExtractChildRequestsForNode(ExtractRequest parentRequest) |
protected java.util.Map<java.lang.Long,java.util.List<ExtractRequest>> |
getExtractChildRequestsForNode(NodeCommunication nodeCommunication,
java.util.List<ExtractRequest> parentRequests) |
protected ExtractRequest |
getExtractRequestForBatch(OutgoingBatch batch) |
java.util.List<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair> |
getExtractRequestNodes() |
protected java.util.List<ExtractRequest> |
getExtractRequestsForNode(NodeCommunication nodeCommunication) |
protected org.jumpmind.util.Statistics |
getExtractStats(org.jumpmind.symmetric.io.data.IDataWriter writer,
OutgoingBatch currentBatch) |
java.util.List<ExtractRequest> |
getPendingTablesForExtractByLoadId(long loadId) |
protected ProcessType |
getProcessType() |
protected java.lang.String |
getSemaphoreKey(OutgoingBatch batch,
boolean useStagingDataWriter) |
protected org.jumpmind.symmetric.io.stage.IStagedResource |
getStagedResource(OutgoingBatch currentBatch) |
protected boolean |
hasLobsThatNeedExtract(org.jumpmind.db.model.Table table,
org.jumpmind.symmetric.io.data.CsvData data) |
protected boolean |
isApplicable(NodeCommunication nodeCommunication) |
protected boolean |
isPreviouslyExtracted(OutgoingBatch currentBatch,
boolean acquireReference) |
protected boolean |
isRetry(OutgoingBatch currentBatch,
Node remoteNode) |
protected boolean |
isStagingFileLockRequired(OutgoingBatch batch) |
protected OutgoingBatches |
loadPendingBatches(ProcessInfo extractInfo,
Node targetNode,
java.lang.String queue,
IOutgoingTransport transport) |
protected org.jumpmind.db.model.Table |
lookupAndOrderColumnsAccordingToTriggerHistory(java.lang.String routerId,
TriggerHistory triggerHistory,
Node sourceNode,
Node targetNode,
boolean setTargetTableName,
boolean useDatabaseDefinition) |
protected void |
queue(java.lang.String nodeId,
java.lang.String queue,
RemoteNodeStatuses statuses) |
RemoteNodeStatuses |
queueWork(boolean force) |
protected void |
releaseLock(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock,
OutgoingBatch batch,
boolean useStagingDataWriter) |
void |
releaseMissedExtractRequests() |
void |
removeBatchFromStaging(OutgoingBatch batch) |
protected java.lang.String |
replaceVariables(Node sourceNode,
Node targetNode,
java.lang.String str) |
protected OutgoingBatch |
requeryIfEnoughTimeHasPassed(long ts,
OutgoingBatch currentBatch)
If time has passed, then re-query the batch to double check that the
status has not changed
|
ExtractRequest |
requestExtractRequest(org.jumpmind.db.sql.ISqlTransaction transaction,
java.lang.String nodeId,
java.lang.String queue,
TriggerRouter triggerRouter,
long startBatchId,
long endBatchId,
long loadId,
java.lang.String table,
long rows,
long parentRequestId) |
void |
resetExtractRequest(OutgoingBatch batch) |
protected void |
restartExtractRequest(java.util.List<OutgoingBatch> batches,
ExtractRequest request,
java.util.List<ExtractRequest> childRequests) |
protected OutgoingBatch |
sendOutgoingBatch(ProcessInfo processInfo,
Node targetNode,
OutgoingBatch currentBatch,
boolean isRetry,
org.jumpmind.symmetric.io.data.IDataWriter dataWriter,
java.io.BufferedWriter writer,
DataExtractorService.ExtractMode mode) |
protected void |
transferFromStaging(DataExtractorService.ExtractMode mode,
org.jumpmind.symmetric.io.data.Batch.BatchType batchType,
OutgoingBatch batch,
boolean isRetry,
org.jumpmind.symmetric.io.stage.IStagedResource stagedResource,
java.io.BufferedWriter writer,
org.jumpmind.symmetric.io.data.DataContext context,
java.math.BigDecimal maxKBytesPerSec) |
protected void |
triggerReExtraction(OutgoingBatch currentBatch) |
void |
updateExtractRequestLoadTime(java.util.Date loadTime,
OutgoingBatch outgoingBatch) |
protected void |
updateExtractRequestStatus(org.jumpmind.db.sql.ISqlTransaction transaction,
long extractId,
ExtractRequest.ExtractStatus status,
long extractedRows,
long extractedMillis) |
void |
updateExtractRequestTransferred(OutgoingBatch batch,
long transferMillis) |
protected org.jumpmind.symmetric.io.data.IDataWriter |
wrapWithTransformWriter(Node sourceNode,
Node targetNode,
ProcessInfo processInfo,
org.jumpmind.symmetric.io.data.IDataWriter dataWriter,
boolean useStagingDataWriter) |
protected boolean |
writeBatchStats(java.io.BufferedWriter writer,
char[] buffer,
int bufferSize,
java.lang.String prevBuffer,
OutgoingBatch batch) |
protected void |
writeKeepAliveAck(java.io.BufferedWriter writer,
Node sourceNode,
boolean streamToFileEnabled) |
assertNotNull, buildBatchWhere, close, createSqlReplacementTokens, createSqlReplacementTokens, getJdbcTemplate, getParameterService, getSql, getSymmetricDialect, getTablePrefix, getTargetDialect, getTargetPlatform, getTargetPlatform, isCalledFromSymmetricAdminTool, isSet, isStreamClosedByClient, isSymmetricTable, logOnce, maxDate, readAcks, sendAck, setSqlMap, synchronize, toNodeIds, toNodeIdspublic DataExtractorService(ISymmetricEngine engine)
public void extractConfigurationStandalone(Node node, java.io.OutputStream out)
extractConfigurationStandalone in interface IDataExtractorServiceDataExtractorService#extractConfigurationStandalone(Node, Writer)public void extractConfigurationOnly(Node node, java.io.OutputStream out)
extractConfigurationOnly in interface IDataExtractorServiceprotected boolean filter(Node targetNode, java.lang.String tableName)
public void extractConfigurationStandalone(Node targetNode, java.io.Writer writer, java.lang.String... tablesToExclude)
Node.extractConfigurationStandalone in interface IDataExtractorServicepublic java.util.List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node targetNode, org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers)
extractToPayload in interface IDataExtractorServicepublic java.util.List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport)
extract in interface IDataExtractorServicepublic java.util.List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, java.lang.String queue, IOutgoingTransport transport)
extract in interface IDataExtractorServiceprotected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, java.lang.String queue, IOutgoingTransport transport)
public boolean extractOnlyOutgoingBatch(java.lang.String nodeId,
long batchId,
java.io.Writer writer)
extractOnlyOutgoingBatch in interface IDataExtractorServiceprotected java.util.List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, java.util.List<OutgoingBatch> activeBatches, org.jumpmind.symmetric.io.data.IDataWriter dataWriter, java.io.BufferedWriter writer, DataExtractorService.ExtractMode mode)
protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, org.jumpmind.symmetric.io.data.IDataWriter dataWriter, DataExtractorService.ExtractMode mode, java.util.List<OutgoingBatch> activeBatches) throws java.lang.Exception
java.lang.Exceptionprotected void writeKeepAliveAck(java.io.BufferedWriter writer,
Node sourceNode,
boolean streamToFileEnabled)
protected final boolean changeBatchStatus(AbstractBatch.Status status, OutgoingBatch currentBatch, DataExtractorService.ExtractMode mode)
protected final OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch)
protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targetNode, org.jumpmind.symmetric.io.data.IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, DataExtractorService.ExtractMode mode, org.jumpmind.symmetric.io.data.IDataProcessorListener listener)
protected java.lang.String getSemaphoreKey(OutgoingBatch batch, boolean useStagingDataWriter)
protected org.jumpmind.symmetric.io.stage.StagingFileLock acquireStagingFileLock(OutgoingBatch batch)
protected void releaseLock(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock,
OutgoingBatch batch,
boolean useStagingDataWriter)
protected boolean isStagingFileLockRequired(OutgoingBatch batch)
protected void triggerReExtraction(OutgoingBatch currentBatch)
protected org.jumpmind.symmetric.io.data.reader.ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo)
protected org.jumpmind.symmetric.io.data.reader.ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob)
protected org.jumpmind.util.Statistics getExtractStats(org.jumpmind.symmetric.io.data.IDataWriter writer,
OutgoingBatch currentBatch)
protected org.jumpmind.symmetric.io.data.IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, org.jumpmind.symmetric.io.data.IDataWriter dataWriter, boolean useStagingDataWriter)
protected void cleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, org.jumpmind.symmetric.io.data.IDataWriter writer)
protected org.jumpmind.symmetric.io.stage.IStagedResource getStagedResource(OutgoingBatch currentBatch)
protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference)
protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode)
protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, org.jumpmind.symmetric.io.data.IDataWriter dataWriter, java.io.BufferedWriter writer, DataExtractorService.ExtractMode mode)
protected void transferFromStaging(DataExtractorService.ExtractMode mode, org.jumpmind.symmetric.io.data.Batch.BatchType batchType, OutgoingBatch batch, boolean isRetry, org.jumpmind.symmetric.io.stage.IStagedResource stagedResource, java.io.BufferedWriter writer, org.jumpmind.symmetric.io.data.DataContext context, java.math.BigDecimal maxKBytesPerSec)
protected int findStatsIndex(java.lang.String bufferString,
java.lang.String prevBuffer)
public java.util.List<ExtractRequest> getPendingTablesForExtractByLoadId(long loadId)
getPendingTablesForExtractByLoadId in interface IDataExtractorServicepublic java.util.List<ExtractRequest> getCompletedTablesForExtractByLoadId(long loadId)
getCompletedTablesForExtractByLoadId in interface IDataExtractorServicepublic void updateExtractRequestLoadTime(java.util.Date loadTime,
OutgoingBatch outgoingBatch)
updateExtractRequestLoadTime in interface IDataExtractorServicepublic void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis)
updateExtractRequestTransferred in interface IDataExtractorServicepublic int cancelExtractRequests(long loadId)
cancelExtractRequests in interface IDataExtractorServiceprotected boolean writeBatchStats(java.io.BufferedWriter writer,
char[] buffer,
int bufferSize,
java.lang.String prevBuffer,
OutgoingBatch batch)
throws java.io.IOException
java.io.IOExceptionprotected java.lang.String getBatchStatsColumns()
protected java.lang.String getBatchStats(OutgoingBatch batch)
public boolean extractBatchRange(java.io.Writer writer,
java.lang.String nodeId,
long startBatchId,
long endBatchId)
extractBatchRange in interface IDataExtractorServicepublic boolean extractBatchRange(java.io.Writer writer,
java.lang.String nodeId,
java.util.Date startBatchTime,
java.util.Date endBatchTime,
java.lang.String... channelIds)
extractBatchRange in interface IDataExtractorServiceprotected org.jumpmind.symmetric.io.data.writer.TransformWriter createTransformDataWriter(Node identity, Node targetNode, org.jumpmind.symmetric.io.data.IDataWriter extractWriter)
protected org.jumpmind.db.model.Table lookupAndOrderColumnsAccordingToTriggerHistory(java.lang.String routerId,
TriggerHistory triggerHistory,
Node sourceNode,
Node targetNode,
boolean setTargetTableName,
boolean useDatabaseDefinition)
protected java.lang.String replaceVariables(Node sourceNode, Node targetNode, java.lang.String str)
public RemoteNodeStatuses queueWork(boolean force)
queueWork in interface IDataExtractorServiceprotected void queue(java.lang.String nodeId,
java.lang.String queue,
RemoteNodeStatuses statuses)
public java.util.List<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair> getExtractRequestNodes()
protected java.util.List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication)
protected ExtractRequest getExtractRequestForBatch(OutgoingBatch batch)
protected java.util.Map<java.lang.Long,java.util.List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, java.util.List<ExtractRequest> parentRequests)
protected java.util.List<ExtractRequest> getExtractChildRequestsForNode(ExtractRequest parentRequest)
public void resetExtractRequest(OutgoingBatch batch)
resetExtractRequest in interface IDataExtractorServicepublic ExtractRequest requestExtractRequest(org.jumpmind.db.sql.ISqlTransaction transaction, java.lang.String nodeId, java.lang.String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, java.lang.String table, long rows, long parentRequestId)
requestExtractRequest in interface IDataExtractorServiceprotected void updateExtractRequestStatus(org.jumpmind.db.sql.ISqlTransaction transaction,
long extractId,
ExtractRequest.ExtractStatus status,
long extractedRows,
long extractedMillis)
protected boolean canProcessExtractRequest(ExtractRequest request, NodeCommunication.CommunicationType communicationType)
public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status)
execute in interface INodeCommunicationService.INodeCommunicationExecutorprotected void restartExtractRequest(java.util.List<OutgoingBatch> batches, ExtractRequest request, java.util.List<ExtractRequest> childRequests)
public void releaseMissedExtractRequests()
releaseMissedExtractRequests in interface IDataExtractorServiceprotected void checkSendDeferredConstraints(ExtractRequest request, java.util.List<ExtractRequest> childRequests, Node targetNode)
protected boolean isApplicable(NodeCommunication nodeCommunication)
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, java.util.List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, java.util.List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted)
protected ProcessType getProcessType()
protected boolean hasLobsThatNeedExtract(org.jumpmind.db.model.Table table,
org.jumpmind.symmetric.io.data.CsvData data)
public void removeBatchFromStaging(OutgoingBatch batch)
removeBatchFromStaging in interface IDataExtractorService