imp: 使用多线程同步。

lianzhong-receive
wyb 3 months ago
parent 34654712d2
commit 7f41fe15ec

@ -11,16 +11,16 @@
<option name="name" value="rdc-snapshots" /> <option name="name" value="rdc-snapshots" />
<option name="url" value="https://packages.aliyun.com/maven/repository/2360197-snapshot-rKvQJZ/" /> <option name="url" value="https://packages.aliyun.com/maven/repository/2360197-snapshot-rKvQJZ/" />
</remote-repository> </remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository> <remote-repository>
<option name="id" value="rdc-releases" /> <option name="id" value="rdc-releases" />
<option name="name" value="rdc-releases" /> <option name="name" value="rdc-releases" />
<option name="url" value="https://packages.aliyun.com/maven/repository/2360197-release-kcpW7u/" /> <option name="url" value="https://packages.aliyun.com/maven/repository/2360197-release-kcpW7u/" />
</remote-repository> </remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository> <remote-repository>
<option name="id" value="central" /> <option name="id" value="central" />
<option name="name" value="Maven Central repository" /> <option name="name" value="Maven Central repository" />
@ -31,6 +31,16 @@
<option name="name" value="JBoss Community repository" /> <option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" /> <option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository> </remote-repository>
<remote-repository>
<option name="id" value="snapshots" />
<option name="name" value="snapshots" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="central" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
<remote-repository> <remote-repository>
<option name="id" value="snapshots" /> <option name="id" value="snapshots" />
<option name="name" value="snapshots" /> <option name="name" value="snapshots" />

@ -6,12 +6,16 @@ import com.docus.demo.facade.IWebService;
import com.docus.demo.mapper.mysql.BasicMapper; import com.docus.demo.mapper.mysql.BasicMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -28,13 +32,23 @@ public class GzFirstLabReportSyncJob {
@Resource @Resource
private BasicMapper basicMapper; private BasicMapper basicMapper;
@Value("${labSyncDisDateStart:2018-01-01}")
private String labSyncDisDateStart;
private AtomicBoolean isAllRunning = new AtomicBoolean(false); private AtomicBoolean isAllRunning = new AtomicBoolean(false);
private AtomicBoolean isThreeRunning = new AtomicBoolean(false); private AtomicBoolean isThreeRunning = new AtomicBoolean(false);
private static final int coreThreads = Runtime.getRuntime().availableProcessors() - 1;
private static final ThreadPoolExecutor LAB_SYNC_EXECUTOR = new ThreadPoolExecutor(coreThreads,
coreThreads + 1,
30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy());
@Scheduled(cron = "0 0 3 1 * ?") @Scheduled(cron = "0 0 3 1 * ?")
public void syncAllLabReport() { public void syncAllLabReport() {
LocalDate today = LocalDate.now(); LocalDate today = LocalDate.now();
LocalDate syncDisDate = LocalDate.parse("2018-01-01"); LocalDate syncDisDate = LocalDate.parse(labSyncDisDateStart);
String startDateTime = "2018-01-01 00:00:00"; String startDateTime = "2018-01-01 00:00:00";
String endDateTime = today + " 23:59:59"; String endDateTime = today + " 23:59:59";
if (isAllRunning.compareAndSet(false, true)) { if (isAllRunning.compareAndSet(false, true)) {
@ -47,7 +61,7 @@ public class GzFirstLabReportSyncJob {
if (CollUtil.isNotEmpty(tbasicList)) { if (CollUtil.isNotEmpty(tbasicList)) {
for (Tbasic tbasic : tbasicList) { for (Tbasic tbasic : tbasicList) {
iWebService.syncInspection(tbasic, startDateTime, endDateTime); LAB_SYNC_EXECUTOR.execute(() -> iWebService.syncInspection(tbasic, startDateTime, endDateTime));
} }
} }
@ -83,7 +97,7 @@ public class GzFirstLabReportSyncJob {
break; break;
} }
for (Tbasic tbasic : tbasicList) { for (Tbasic tbasic : tbasicList) {
iWebService.syncInspection(tbasic, startDateTime, endDateTime); LAB_SYNC_EXECUTOR.execute(() -> iWebService.syncInspection(tbasic, startDateTime, endDateTime));
} }
offset = offset + limit; offset = offset + limit;
} }

@ -61,6 +61,6 @@ public interface BasicMapper {
int updateNotArchive(@Param("patientId")String patientId); int updateNotArchive(@Param("patientId")String patientId);
int addLabSync(@Param("patientId") String patientId,@Param("status") Integer status);
} }

@ -25,11 +25,13 @@ import java.io.IOException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Slf4j @Slf4j
@Service @Service
@ -100,71 +102,77 @@ public class WebServiceImpl implements IWebService {
@Override @Override
public void syncInspection(Tbasic tbasic, String startDate, String endDate) { public void syncInspection(Tbasic tbasic, String startDate, String endDate) {
log.info("正在同步" + tbasic.getInpatientNo() + " " + tbasic.getAdmissTimes()); Date admissDate = tbasic.getAdmissDate();
List<ScanAssort> oldScanAssortList = scanAssortMapper.getListByAssortId("A5A7AA6796D1715A2F1E35699C706C84", tbasic.getPatientId()); // 如果报告的开始时间大于入院时间,则开始时间取入院时间
log.info("旧数据" + oldScanAssortList.size()); if (admissDate != null) {
if (oldScanAssortList.size() > 0) { try {
log.info("跳过同步" + tbasic.getInpatientNo() + " " + tbasic.getAdmissTimes()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return; Date startDateParse = sdf.parse(startDate);
if (startDateParse.getTime() > admissDate.getTime()) {
startDate = sdf.format(admissDate);
} }
} catch (Exception ignored) {
String pNo = tbasic.getInpatientNo(); }
}
String inpatientNo = tbasic.getInpatientNo();
String times = tbasic.getAdmissTimes().toString(); String times = tbasic.getAdmissTimes().toString();
// String StartDate = sdf.format(tbasic.getAdmissDate());
// String EndDate = sdf.format(tbasic.getDisDate());
String patientId = tbasic.getPatientId(); String patientId = tbasic.getPatientId();
log.info("同步检验报告,病案号:{},住院次数:{}", inpatientNo, times);
List<ScanAssort> oldScanAssortList = scanAssortMapper.getListByAssortId("A5A7AA6796D1715A2F1E35699C706C84", patientId);
int size = oldScanAssortList.size();
if (size > 0) {
log.info("同步检验报告跳过同步,病案号:{},住院次数:{} 已经存在 {} 份检验报告", inpatientNo, times, size);
return;
}
basicMapper.addLabSync(patientId,0);
//获取所有的报告列表 //获取所有的报告列表
PatientListResult patientListResult = getCommonResult(pNo, times, startDate, endDate); PatientListResult patientListResult = getCommonResult(inpatientNo, times, startDate, endDate);
// log.info("返回结果"+patientListResult);
if (patientListResult == null) { if (patientListResult == null) {
log.error(pNo + " " + times + "调用失败"); log.warn("同步检验报告,病案号:{},住院次数:{},开始时间:{},结束时间:{} 未查到报告!", inpatientNo, times, startDate, endDate);
return;
}
// 匹配患者的报告列表
List<PatientListResult.Result.ReportInfo> reportInfoList = patientListResult.getResult().getPidReportMain();
reportInfoList = reportInfoList.stream()
.filter(reportInfo -> reportInfo.getPidAddmissTimes().equals(times) && reportInfo.getPidInNo().equalsIgnoreCase(inpatientNo))
.collect(Collectors.toList());
if (reportInfoList.isEmpty()) {
log.warn("同步检验报告,病案号:{},住院次数:{},开始时间:{},结束时间:{} 未查到报告!", inpatientNo, times, startDate, endDate);
return; return;
} }
List<ScanAssort> scanAssortList = new ArrayList<>(); List<ScanAssort> scanAssortList = new ArrayList<>();
for (int sortIndex = 0; sortIndex < reportInfoList.size(); sortIndex++) {
for (int i = 0; i < patientListResult.getResult().getPidReportMain().size(); i++) {
//根据旧数据判断是否需要做同步 //根据旧数据判断是否需要做同步
PatientListResult.Result.ReportInfo item = patientListResult.getResult().getPidReportMain().get(i); PatientListResult.Result.ReportInfo item = reportInfoList.get(sortIndex);
// ScanAssort oldAssort = oldScanAssortList.stream()
// .filter(f -> f.getFileTitle().equals(item.getPidComName())).findAny().orElse(null);
String root = "F:\\jianyan" + File.separator String root = "F:\\jianyan" + File.separator
+ tbasic.getInpatientNo() + File.separator + inpatientNo + File.separator
+ tbasic.getAdmissTimes(); + times;
// log.info("开始转换"+root);
//时间段内如果查询到住院次数相同的数据 则数据入库
if (item.getPidAddmissTimes().equals(times) && item.getPidInNo().equalsIgnoreCase(pNo)) {
log.info("正在同步" + item.getPidComName() + i);
// log.info("进入条件");
log.info("同步检验报告,病案号:{},住院次数:{},同步:{} - {}", inpatientNo, times, item.getRepId(), item.getPidComName());
List<ScanAssort> addScanList = new ArrayList<>(); List<ScanAssort> addScanList = new ArrayList<>();
String base64Result; String base64Result;
try { try {
PatientReportResult patientReportResult = this.getReportResult(item.getRepId()); PatientReportResult patientReportResult = this.getReportResult(item.getRepId());
base64Result = patientReportResult.getResult().getPatient().getPatientReport(); base64Result = patientReportResult.getResult().getPatient().getPatientReport();
} catch (Exception ex) { } catch (Exception ex) {
log.error("检验报告id{} 获取报告出错了!", item.getRepId()); log.error("同步检验报告,病案号:{},住院次数:{},检验报告id{} 获取报告出错了!", inpatientNo, times, item.getRepId());
continue; continue;
} }
try { try {
log.info("开始转换"); log.info("同步检验报告,病案号:{},住院次数:{},开始转换{} - {}", inpatientNo, times, item.getRepId(), item.getPidComName());
addScanList = PDFFileUtils.base64StringToPDF(base64Result, root, item.getRepId(), item.getPidComName(), i); addScanList = PDFFileUtils.base64StringToPDF(base64Result, root, item.getRepId(), item.getPidComName(), sortIndex);
log.info("转换结束"); log.info("同步检验报告,病案号:{},住院次数:{},{} - {} 转换结束", inpatientNo, times, item.getRepId(), item.getPidComName());
} catch (IOException e) { } catch (IOException e) {
log.error("检验pdf转化出错" + e.getMessage(), e); String msg = "同步检验报告,病案号:" + inpatientNo + ",住院次数:" + times + "," + item.getRepId() + " - " + item.getPidComName() + " 转换出错了!";
log.error(msg, e);
} }
addScanList.forEach(scanAssort -> {
for (ScanAssort scanAssort : addScanList) {
ScanAssort oldAssort = oldScanAssortList.stream() ScanAssort oldAssort = oldScanAssortList.stream()
.filter(f -> f.getScanPage().equals(scanAssort.getScanPage())).findAny().orElse(null); .filter(f -> f.getScanPage().equals(scanAssort.getScanPage()))
.findAny().orElse(null);
if (oldAssort != null) { if (oldAssort != null) {
scanAssort.setId(oldAssort.getId()); scanAssort.setId(oldAssort.getId());
} else { } else {
@ -172,16 +180,15 @@ public class WebServiceImpl implements IWebService {
} }
scanAssort.setPatientId(patientId); scanAssort.setPatientId(patientId);
scanAssortList.add(scanAssort); scanAssortList.add(scanAssort);
log.info("结果" + scanAssort);
});
} }
} }
//数据写入到3.0 //数据写入到3.0
if (ObjectUtil.isNotEmpty(scanAssortList)) { if (ObjectUtil.isNotEmpty(scanAssortList)) {
log.info("开始写入" + scanAssortList.size()); log.info("同步检验报告,病案号:{},住院次数:{},同步 {} 份文件。", inpatientNo, times,scanAssortList.size() );
scanAssortMapper.insertOrUpdateBatch(scanAssortList); scanAssortMapper.insertOrUpdateBatch(scanAssortList);
log.info("写入结束" + scanAssortList.size()); basicMapper.addLabSync(patientId,1);
} }
} }

@ -21,7 +21,7 @@ public class PDFFileUtils {
* @param rootPath * @param rootPath
* @param repId * @param repId
*/ */
public static List<ScanAssort> base64StringToPDF(String base64Content, String rootPath, String repId,String fileTitle,int index) throws IOException { public static synchronized List<ScanAssort> base64StringToPDF(String base64Content, String rootPath, String repId,String fileTitle,int index) throws IOException {
BASE64Decoder decoder = new BASE64Decoder(); BASE64Decoder decoder = new BASE64Decoder();
List<ScanAssort> addScanList = new ArrayList<>(); List<ScanAssort> addScanList = new ArrayList<>();
PDDocument document =null ; PDDocument document =null ;
@ -33,19 +33,19 @@ public class PDFFileUtils {
PDFRenderer renderer = new PDFRenderer(document); PDFRenderer renderer = new PDFRenderer(document);
// Iterate over each page and save it as an image // Iterate over each page and save it as an image
for (int pageIndex = 0; pageIndex < document.getNumberOfPages(); pageIndex++) { for (int pageIndex = 0; pageIndex < document.getNumberOfPages(); pageIndex++) {
// Render the page as an image // Render the page as an image,Set DPI value as needed
BufferedImage image = renderer.renderImageWithDPI(pageIndex, 300); // Set DPI value as needed BufferedImage image = renderer.renderImageWithDPI(pageIndex, 300);
// Save the image to a file // Save the image to a file ,Output file path
String outputFilePath = rootPath +File.separator+ repId +pageIndex+ ".jpg"; // Output file path String outputFilePath = rootPath +File.separator+ repId +pageIndex+ ".jpg";
//判断父级目录是否存在 不存在需要创建 //判断父级目录是否存在 不存在需要创建
File file = new File(outputFilePath); File file = new File(outputFilePath);
String parentPath = file.getParent(); String parentPath = file.getParent();
File dir = new File(parentPath); File dir = new File(parentPath);
if (!dir.exists()) { if (!dir.exists()) {
dir.mkdirs(); dir.mkdirs();
} }
ImageIO.write(image, "jpg", new File(outputFilePath)); ImageIO.write(image, "jpg", new File(outputFilePath));
ScanAssort scanAssort = new ScanAssort(); ScanAssort scanAssort = new ScanAssort();
scanAssort.setFileTitle(fileTitle); scanAssort.setFileTitle(fileTitle);
scanAssort.setScanPage(repId +pageIndex+ ".jpg"); scanAssort.setScanPage(repId +pageIndex+ ".jpg");
@ -60,17 +60,12 @@ public class PDFFileUtils {
scanAssort.setSort((index+1)*1000+pageIndex); scanAssort.setSort((index+1)*1000+pageIndex);
scanAssort.setAssortId("A5A7AA6796D1715A2F1E35699C706C84"); scanAssort.setAssortId("A5A7AA6796D1715A2F1E35699C706C84");
scanAssort.setTaskId("-1"); scanAssort.setTaskId("-1");
addScanList.add(scanAssort); addScanList.add(scanAssort);
} }
// Close the PDF document
}finally { }finally {
if (document!=null){ if (document!=null){
document.close(); document.close();
} }
} }
return addScanList; return addScanList;

@ -100,3 +100,4 @@ spring:
#文件保存路径 #文件保存路径
savePath: F:\jiashi\lianzhong savePath: F:\jiashi\lianzhong
labSyncDisDateStart: 2018-01-01

@ -64,6 +64,12 @@
) )
</foreach> </foreach>
</insert> </insert>
<insert id="addLabSync">
insert into docus_medicalrecord.lab_sync_patient (patient_id,status)
values (#{patientId},#{status})
ON DUPLICATE KEY UPDATE status = VALUES(status)
</insert>
<update id="updateScanStatus"> <update id="updateScanStatus">
update docus_medicalrecord.t_basic update docus_medicalrecord.t_basic
set scan_source = 1, set scan_source = 1,
@ -231,6 +237,9 @@
WHERE dis_date between #{startDisDateTime} and #{endDisDateTime} WHERE dis_date between #{startDisDateTime} and #{endDisDateTime}
AND is_cancel = 0 AND is_cancel = 0
AND scan_source = '1' AND scan_source = '1'
AND NOT EXISTS(
SELECT patient_id FROM docus_medicalrecord.lab_sync_patient WHERE lab_sync_patient.patient_id=t_basic.patient_id
)
ORDER BY dis_date ASC ORDER BY dis_date ASC
</select> </select>

Loading…
Cancel
Save