Быстрый старт
Компонент «Пакетная обработка задач Platform V Batch» позволяет осуществлять асинхронный запуск группы вызовов по HTTP.
Схема работы с Компонентом выглядит следующим образом:
- Создается пустая очередь задач для исполнения задач. В очереди задается максимальное количество одновременно выполняемых задач.
- В созданную очередь добавляются задачи по вызову сервисов через HTTP.
- Для созданных задач отсматривается статус для каждой задачи и для всей очереди.
- Когда очередь более не нужна, очередь необходимо удалить.
Взаимодействие с сервисом осуществляется посредством JSON-RPC вызовов.
Адрес сервиса:
- http://client-platform-gatewaybatch/v2/queues - для работы с очередями;
- http://client-platform-gatewaybatch/v2/tasks - для работы с задачами.
Ниже описаны шаги для интеграции с Компонентом «Пакетная обработка задач Platform V Batch» из Spring Boot приложения:
-
Создайте общие классы для JSON-RPC:ErrorDataDto.java type=java
package com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; @Data public class ErrorDataDto { private String exceptionTypeName; private String message; }
ErrorDto.java type=javapackage com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; @Data public class ErrorDto { private Integer code; private String message; private ErrorDataDto data; }
JRpcBadResponseDto.java type=javapackage com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data public class JRpcBadResponseDto extends JRpcBaseResponseDto { private ErrorDto error; }
JRpcBaseResponseDto.java type=javapackage com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; @Data public class JRpcBaseResponseDto { private String jsonrpc; private String id; }
JRpcOkResponseDto.java type=javapackage com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data public class JRpcOkResponseDto<T> extends JRpcBaseResponseDto { private T result; }
JRpcRequestDto.java type=javapackage com.sbt.demo.batchschedulerest.dto.batch.jrpc.base; import lombok.Data; @Data public class JRpcRequestDto<T> { private String jsonrpc; private String method; private T params; private String id; }
-
Создайте классы описания моделей взаимодействия с очередями:QueueBaseDto.java type=java
package com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import lombok.Data; @Data public class QueueBaseDto { private String name; private String description; private String maxRunningTasks; }
QueueRequestDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data @JsonInclude(JsonInclude.Include.NON_NULL) public class QueueRequestDto extends QueueBaseDto { private String fromName; private Integer limit; }
QueueResponseDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import lombok.Data; import lombok.EqualsAndHashCode; import java.util.Date; @EqualsAndHashCode(callSuper = true) @Data public class QueueResponseDto extends QueueBaseDto { private String state; private Date createTime; private Date updateTime; private RetryPolicyDto retryPolicy; }
RetryPolicyDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import lombok.Data; @Data public class RetryPolicyDto { private Integer maxAttempts; private String startRetryDuration; private Integer increasePercentage; private String maxRetryDuration; }
QueueQueryDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class QueueQueryDto { private String name; }
QueueSearchDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @NoArgsConstructor @AllArgsConstructor public class QueueSearchDto { private String fromName; private int limit; }
-
Создайте классы описания моделей взаимодействия с задачами:AttemptDto.java type=java
package com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.ResponseStatusDto; import lombok.Data; import java.util.Date; @Data public class AttemptDto { private Date scheduleTime; private Date dispatchTime; private Date responseTime; private ResponseStatusDto responseStatus; }
RetryStateDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import lombok.Data; import java.util.Date; @Data public class RetryStateDto { private Integer numAttempts; private Date nextAttemptTime; private Integer intervalSeconds; }
TaskBaseDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import com.sbt.demo.batchtasksrest.dto.batch.jrpc._Targ_etDto; import lombok.Data; @Data public class TaskBaseDto { private String queue; private String description; private HttpTargetDto httpTarget; }
TaskRequestDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data @JsonInclude(JsonInclude.Include.NON_NULL) public class TaskRequestDto extends TaskBaseDto { private String name; private String fromName; private Integer limit; }
TaskResponseDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import lombok.EqualsAndHashCode; import java.util.Date; @EqualsAndHashCode(callSuper = true) @Data @JsonInclude(JsonInclude.Include.NON_NULL) public class TaskResponseDto extends TaskBaseDto { private String name; private String state; private Date createTime; private Date updateTime; private Date scheduleTime; private String firstAttempt; private AttemptDto lastAttempt; private AttemptDto dependsOn; private RetryStateDto retryState; }
HttpTargetDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.Map; @Data @NoArgsConstructor @AllArgsConstructor public class HttpTargetDto { private String url; private String method; private Map<String, String> headers; private String body; }
ResponseStatusDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task; import lombok.Data; @Data public class ResponseStatusDto { private int code; private String message; }
InteractionDto.java type=javapackage com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks; import com.fasterxml.jackson.annotation.JsonGetter; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcOkResponseListDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcRequestDto; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; @Data @AllArgsConstructor @NoArgsConstructor public class InteractionDto<T> { private String url; private JRpcRequestDto jsonRpcRequest; private JRpcOkResponseListDto<T> jsonRpcResponse; @JsonGetter("result") public List<T> getJsonRpcResponseResult() { return jsonRpcResponse.getResult(); } }
-
Создайте класс ошибки для возврата при неудачном вызове сервиса.BatchException.java type=java
package com.sbt.demo.batchtasksrest.exceptions; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcBadResponseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcRequestDto; public class BatchException extends RuntimeException { private final String url; private final JRpcRequestDto jsonRpcRequest; private final JRpcBadResponseDto badResponseDto; public BatchException( String message, String url, JRpcRequestDto jsonRpcRequest, JRpcBadResponseDto badResponseDto) { super(message); this.url = url; this.jsonRpcRequest = jsonRpcRequest; this.badResponseDto = badResponseDto; } public String getUrl() { return url; } public JRpcRequestDto getJsonRpcRequest() { return jsonRpcRequest; } public JRpcBadResponseDto getBadResponseDto() { return badResponseDto; } }
-
Добавьте класс для взаимодействия с сервисом пакетной обработки задач.TasksService.java type=java
package com.sbt.demo.batchtasksrest.services; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcBadResponseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcOkResponseListDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcOkResponseObjectDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.base.JRpcRequestDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue.*; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task.TaskBaseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task.TaskRequestDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task.TaskResponseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.InteractionDto; import com.sbt.demo.batchtasksrest.dto.demo.TaskInfo; import com.sbt.demo.batchtasksrest.exceptions.BatchException; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.web.client.HttpStatusCodeException; import org.springframework.web.client.RestTemplate; import java.util.Collections; import java.util.UUID; @Service public class TasksService { private final static String JSON_RPC_VERSION = "2.0"; private final ObjectMapper mapper = new ObjectMapper(); private final RestTemplate restTemplate; private final String batchServerQueuesUrl; private final String batchServerTasksUrl; public TasksService( RestTemplate restTemplate, @Value("${batch.tasks.serverUrl}") String serverUrl ) { this.restTemplate = restTemplate; this.batchServerQueuesUrl = serverUrl + "/batch/v2/queues"; this.batchServerTasksUrl = serverUrl + "/batch/v2/tasks"; } public String generateStringWithParam(String param) { return "Input parameter: " + param; } public InteractionDto<QueueResponseDto> createQueue(QueueBaseDto queueBaseDto) { JRpcRequestDto<QueueBaseDto> jRpcRequestDto = buildQuery("create", queueBaseDto); return executeTasksServerForObject(batchServerQueuesUrl, jRpcRequestDto, QueueResponseDto.class, true); } public InteractionDto<QueueResponseDto> getQueue(String queueName) { JRpcRequestDto<QueueQueryDto> jRpcRequestDto = buildQuery("get", new QueueQueryDto(queueName)); return executeTasksServerForObject(batchServerQueuesUrl, jRpcRequestDto, QueueResponseDto.class, true); } public InteractionDto<QueueResponseDto> deleteQueue(String queueName) { JRpcRequestDto<QueueQueryDto> jRpcRequestDto = buildQuery("delete", new QueueQueryDto(queueName)); return executeTasksServerForObject(batchServerQueuesUrl, jRpcRequestDto, QueueResponseDto.class, true); } public InteractionDto<QueueResponseDto> listQueues(String fromName, int limit) { QueueSearchDto queueSearchDto = QueueSearchDto.builder() .fromName(fromName) .limit(limit).build(); JRpcRequestDto<QueueSearchDto> jRpcRequestDto = buildQuery("list", queueSearchDto); return executeTasksServerForObject(batchServerQueuesUrl, jRpcRequestDto, QueueResponseDto.class, false); } public InteractionDto<TaskResponseDto> createTask(String queueName, TaskInfo taskInfo) { TaskBaseDto taskBaseDto = TaskBaseDto .builder() .queue(queueName) .description(taskInfo.getDescription()) .httpTarget(taskInfo.getHttpTarget()) .build(); JRpcRequestDto<TaskBaseDto> jRpcRequestDto = buildQuery("create", taskBaseDto); return executeTasksServerForObject(batchServerTasksUrl, jRpcRequestDto, TaskResponseDto.class, true); } public InteractionDto<TaskResponseDto> getTask(String taskName) { TaskRequestDto taskRequestDto = new TaskRequestDto(); taskRequestDto.setName(taskName); JRpcRequestDto<TaskRequestDto> jRpcRequestDto = buildQuery("get", taskRequestDto); return executeTasksServerForObject(batchServerTasksUrl, jRpcRequestDto, TaskResponseDto.class, true); } public InteractionDto<TaskResponseDto> getLastTask(String queueName) { TaskRequestDto taskRequestDto = new TaskRequestDto(); taskRequestDto.setQueue(queueName); taskRequestDto.setFromName(""); taskRequestDto.setLimit(-1); JRpcRequestDto<TaskRequestDto> jRpcRequestDto = buildQuery("list", taskRequestDto); return executeTasksServerForObject(batchServerTasksUrl, jRpcRequestDto, TaskResponseDto.class, false); } public InteractionDto<TaskResponseDto> getQueueTasks(String queueName) { TaskRequestDto taskRequestDto = new TaskRequestDto(); taskRequestDto.setQueue(queueName); taskRequestDto.setFromName(""); taskRequestDto.setLimit(0); JRpcRequestDto<TaskRequestDto> jRpcRequestDto = buildQuery("list", taskRequestDto); return executeTasksServerForObject(batchServerTasksUrl, jRpcRequestDto, TaskResponseDto.class, false); } private <T> JRpcRequestDto<T> buildQuery(String method, T params) { String id = UUID.randomUUID().toString(); JRpcRequestDto<T> rpcRequestDto = new JRpcRequestDto<>(); rpcRequestDto.setJsonrpc(JSON_RPC_VERSION); rpcRequestDto.setMethod(method); rpcRequestDto.setParams(params); rpcRequestDto.setId(id); return rpcRequestDto; } private <T> InteractionDto<T> executeTasksServerForObject( String url, JRpcRequestDto jRpcRequestDto, Class<T> tClass, boolean shouldCastToList ) { JRpcOkResponseListDto<T> jRpcOkResponseListDto; HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.add("ott-subject", "_global"); HttpEntity<JRpcRequestDto> httpEntity = new HttpEntity<>(jRpcRequestDto, headers); try { String res = restTemplate.postForObject( url, httpEntity, String.class ); jRpcOkResponseListDto = readResponse(res, tClass, shouldCastToList); } catch (HttpStatusCodeException httpException) { JRpcBadResponseDto jRpcBadResponseDto = readBadResponse(httpException); throw new BatchException(httpException.getResponseBodyAsString(), url, jRpcRequestDto, jRpcBadResponseDto); } return new InteractionDto<T>(url, jRpcRequestDto, jRpcOkResponseListDto); } private <T> JRpcOkResponseListDto<T> readResponse(String response, Class<T> tClass, boolean shouldCastToList) { JRpcOkResponseListDto<T> result = null; try { JavaType type; if (shouldCastToList) { type = mapper.getTypeFactory().constructParametricType(JRpcOkResponseObjectDto.class, tClass); JRpcOkResponseObjectDto<T> jRpcOkResponseObjectDto = mapper.readValue(response, type); result = new JRpcOkResponseListDto<>(); result.setId(jRpcOkResponseObjectDto.getId()); result.setJsonrpc(jRpcOkResponseObjectDto.getJsonrpc()); result.setResult(Collections.singletonList(jRpcOkResponseObjectDto.getResult())); } else { type = mapper.getTypeFactory().constructParametricType(JRpcOkResponseListDto.class, tClass); result = mapper.readValue(response, type); } } catch (JsonProcessingException e) { e.printStackTrace(); } return result; } private JRpcBadResponseDto readBadResponse(HttpStatusCodeException ex) { JRpcBadResponseDto result = null; try { result = mapper.readValue( ex.getResponseBodyAsString(), new TypeReference<>() { } ); } catch (JsonProcessingException e) { e.printStackTrace(); } return result; } }
-
Добавьте переменную batch.schedule.serverUrl в application.yml/application.properties со значением URL для обращения к сервису.application.yml type=yml
batch: tasks: serverUrl: http://client-platform-gateway
-
Для демонстрации взаимодействия добавьте Controller. В нем добавьте метод для вызова из сервиса пакетной обработки generateStringWithParam, а также методы для демонстрации работы с сервисом.TasksController.java type=java
package com.sbt.demo.batchtasksrest.controllers; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue.QueueBaseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.queue.QueueResponseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.task.TaskResponseDto; import com.sbt.demo.batchtasksrest.dto.batch.jrpc.tasks.InteractionDto; import com.sbt.demo.batchtasksrest.dto.demo.TaskInfo; import com.sbt.demo.batchtasksrest.services.TasksService; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class TasksController { private final TasksService tasksService; public TasksController(TasksService tasksService) { this.tasksService = tasksService; } @ApiOperation( value = "Генерация ответа по параметру", notes = "Генерация ответа по входящему параметру, используется в качестве httpTarget при взаимодействии " + "с Tasks Server") @PostMapping("/example") public String generateStringWithParam( @ApiParam("Произвольное тело запроса") @RequestBody String param) { return tasksService.generateStringWithParam(param); } @ApiOperation( value = "Создание очереди", notes = "Пример создания очереди в Tasks Server для последующей генерации задач") @PostMapping("/queues") public InteractionDto<QueueResponseDto> createQueue( @ApiParam("Наименование очереди") @RequestBody QueueBaseDto queueBaseDto) { return tasksService.createQueue(queueBaseDto); } @ApiOperation( value = "Получение очереди по наименованию", notes = "Пример получения очереди из Tasks Server для последующей генерации задач") @GetMapping("/queues/{queueName}") public InteractionDto<QueueResponseDto> getQueue( @ApiParam("Наименование очереди") @PathVariable String queueName) { return tasksService.getQueue(queueName); } @ApiOperation( value = "Получение списка очередей", notes = "Пример получения списка очередей в Tasks Server") @GetMapping("/queues") public InteractionDto<QueueResponseDto> listQueues( @ApiParam("Подстрока для поиска") @RequestParam(required = false, defaultValue = "") String fromName, @ApiParam("Максимальное количество Очередей для получения") @RequestParam(required = false, defaultValue = "0") Integer limit) { return tasksService.listQueues(fromName, limit); } @ApiOperation( value = "Удаление очереди", notes = "Пример удаления очереди в Tasks Server") @DeleteMapping("/queues/{queueName}") public InteractionDto<QueueResponseDto> deleteQueue( @ApiParam("Наименование очереди") @PathVariable String queueName) { return tasksService.deleteQueue(queueName); } @ApiOperation( value = "Создание задачи", notes = "Пример создания задачи в Tasks Server") @PostMapping("/queues/{queueName}/tasks") public InteractionDto<TaskResponseDto> createTask( @ApiParam("Наименование очереди") @PathVariable String queueName, @ApiParam("Параметры задачи") @RequestBody TaskInfo taskInfo) { return tasksService.createTask(queueName, taskInfo); } @ApiOperation( value = "Получение задачи", notes = "Пример получения задачи из Tasks Server") @GetMapping("/tasks/{taskName}") public InteractionDto<TaskResponseDto> getTask( @ApiParam("Наименование задачи") @PathVariable String taskName) { return tasksService.getTask(taskName); } @ApiOperation( value = "Получение последней задачи", notes = "Пример получения последней задачи из Tasks Server") @GetMapping("/queues/{queueName}/tasks/last") public InteractionDto<TaskResponseDto> getLastTask( @ApiParam("Наименование очереди") @PathVariable String queueName) { return tasksService.getLastTask(queueName); } @ApiOperation( value = "Получение списка задач", notes = "Пример получения созданных задач из Tasks Server по наименованию очереди, в рамках которой они " + "были созданы") @GetMapping("/queues/{queueName}/tasks") public InteractionDto<TaskResponseDto> getQueueTasks( @ApiParam("Наименование очереди") @PathVariable String queueName) { return tasksService.getQueueTasks(queueName); } }
Создание очереди
Для создания Очереди необходимо выполнить следующее:
- На странице Очереди задач нажмите кнопку Создать очередь.
- В открывшемся модальном окне заполните поля:
- имя очереди;
- максимально допустимое количество одновременно запущенных задач.

Увеличить
- Нажмите кнопку Сохранить.
Создание задачи
Для создания задачи необходимо выполнить следующее:
-
В боковом меню перейдите на страницу Задачи.
-
На открывшейся странице Задачи нажмите кнопку Создать задачу.
-
В открывшемся модальном окне заполните поля:
- Очередь — из выпадающего списка выберите нужное значение;
- URL-адрес — введите
endpoint
запроса API; - Метод — из выпадающего списка выберите метод, применимый для данного запроса.

Увеличить
- Нажмите кнопку Сохранить.
Проверка статуса выполнения задачи
Для проверки статуса выполнения задачи необходимо выполнить следующее:
- Чтобы обновить список задач и их статус, нажмите кнопку Обновить.
- Проверьте статус выполнения задачи. Для этого нажмите на задачу, чтобы развернуть вложенную форму. На вкладке Информация проверьте описание последней попытки запуска и результата запуска вычислений.

Увеличить
В демопримерах реализованы стандартные сценарии, демонстрирующие работу отдельных сервисов Платформы ГосТех. Отправка тестовых запросов в демопримерах позволит вам ознакомиться с функциями того или иного сервиса. Доступные демопримеры и описания реализованной в них функциональности представлены ниже.
Пакетная обработка задач
Создание, настройка и удаление очередей, заполнение очередей задачами. Просмотр списков созданных очередей и задач в каждой очереди